作者 Ethan 发布的文章

Redis高可用架构设计

在生产环境中,Redis的高可用性至关重要。本文将介绍Redis的主从复制、哨兵模式和集群模式。

哨兵模式

哨兵提供自动故障转移功能:

性能监控

使用redis-cli监控性能:

最佳实践

  1. 至少部署3个节点(1主2从)
  2. 配置合适的持久化策略
  3. 设置内存淘汰策略
  4. 定期备份数据

MySQL索引优化实战

索引是数据库性能的基石。本文将深入讲解MySQL索引的工作原理、优化技巧和实战案例。

索引类型

  1. B+树索引:最常用的索引类型
  2. 哈希索引:Memory引擎使用
  3. 全文索引:文本搜索优化
  4. 空间索引:地理数据查询

优化原则

  • 选择性高的列优先建索引
  • 遵循最左前缀原则
  • 避免在索引列上使用函数
  • 合理使用覆盖索引

实战案例

通过合理索引设计,查询性能提升10倍以上。

MySQL索引优化与分库分表实战

索引是数据库性能优化的核心。本文将分享MySQL索引优化和分库分表的实践经验。

最佳实践

  1. 索引设计:选择区分度高的列
  2. 查询优化:避免SELECT *,使用LIMIT
  3. 分片策略:根据业务选择分片键
  4. 监控维护:定期分析慢查询

通过系统优化,可以构建高性能的数据库系统。

Vue3组合式API实战:大型项目管理经验

Vue3的组合式API(Composition API)为大型前端项目带来了革命性的改进。通过更灵活的逻辑组织和更好的TypeScript支持,组合式API让我们能够构建更可维护、可测试的前端应用。本文将分享在大型项目中应用组合式API的实践经验。

组合式API基础

1. Setup函数与响应式数据

<template>
  <div>
    <p>Count: {{ count }}</p>
    <button @click="increment">Increment</button>
    <p>Doubled: {{ doubledCount }}</p>
  </div>
</template>

<script setup>
import { ref, computed } from 'vue'

// 响应式数据
const count = ref(0)

// 计算属性
const doubledCount = computed(() => count.value * 2)

// 方法
function increment() {
  count.value++
}
</script>

2. 组合式函数(Composables)

组合式函数是组合式API的核心,用于逻辑复用:

// useCounter.js
import { ref, computed } from 'vue'

export function useCounter(initialValue = 0) {
  const count = ref(initialValue)
  
  const doubled = computed(() => count.value * 2)
  const tripled = computed(() => count.value * 3)
  
  function increment() {
    count.value++
  }
  
  function decrement() {
    count.value--
  }
  
  function reset() {
    count.value = initialValue
  }
  
  return {
    count,
    doubled,
    tripled,
    increment,
    decrement,
    reset
  }
}

使用组合式函数:

<template>
  <div>
    <p>Count: {{ count }}</p>
    <p>Doubled: {{ doubled }}</p>
    <p>Tripled: {{ tripled }}</p>
    <button @click="increment">+</button>
    <button @click="decrement">-</button>
    <button @click="reset">Reset</button>
  </div>
</template>

<script setup>
import { useCounter } from './useCounter.js'

const { count, doubled, tripled, increment, decrement, reset } = useCounter(10)
</script>

大型项目实践

1. 状态管理策略

在大型项目中,合理组织状态至关重要:

// store/userStore.js
import { ref, computed } from 'vue'
import { defineStore } from 'pinia'

export const useUserStore = defineStore('user', () => {
  // 状态
  const user = ref(null)
  const permissions = ref([])
  const loading = ref(false)
  const error = ref(null)
  
  // getters
  const isLoggedIn = computed(() => !!user.value)
  const hasPermission = (permission) => 
    computed(() => permissions.value.includes(permission))
  
  // actions
  async function login(credentials) {
    loading.value = true
    error.value = null
    
    try {
      const response = await fetch('/api/login', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(credentials)
      })
      
      if (!response.ok) {
        throw new Error('Login failed')
      }
      
      const data = await response.json()
      user.value = data.user
      permissions.value = data.permissions
      
      return data
    } catch (err) {
      error.value = err.message
      throw err
    } finally {
      loading.value = false
    }
  }
  
  function logout() {
    user.value = null
    permissions.value = []
  }
  
  return {
    user,
    permissions,
    loading,
    error,
    isLoggedIn,
    hasPermission,
    login,
    logout
  }
})

2. API请求封装

// composables/useApi.js
import { ref } from 'vue'

export function useApi(baseURL = '') {
  const loading = ref(false)
  const error = ref(null)
  const data = ref(null)
  
  async function fetchData(endpoint, options = {}) {
    loading.value = true
    error.value = null
    
    try {
      const url = `${baseURL}${endpoint}`
      const response = await fetch(url, {
        headers: {
          'Content-Type': 'application/json',
          ...options.headers
        },
        ...options
      })
      
      if (!response.ok) {
        throw new Error(`HTTP ${response.status}: ${response.statusText}`)
      }
      
      data.value = await response.json()
      return data.value
    } catch (err) {
      error.value = err.message
      throw err
    } finally {
      loading.value = false
    }
  }
  
  async function postData(endpoint, body, options = {}) {
    return fetchData(endpoint, {
      method: 'POST',
      body: JSON.stringify(body),
      ...options
    })
  }
  
  async function putData(endpoint, body, options = {}) {
    return fetchData(endpoint, {
      method: 'PUT',
      body: JSON.stringify(body),
      ...options
    })
  }
  
  async function deleteData(endpoint, options = {}) {
    return fetchData(endpoint, {
      method: 'DELETE',
      ...options
    })
  }
  
  return {
    loading,
    error,
    data,
    fetchData,
    postData,
    putData,
    deleteData
  }
}

3. 表单处理组合式函数

// composables/useForm.js
import { ref, computed } from 'vue'

export function useForm(initialValues = {}, validationRules = {}) {
  const formValues = ref({ ...initialValues })
  const errors = ref({})
  const touched = ref({})
  const isSubmitting = ref(false)
  
  // 验证表单
  const validate = () => {
    errors.value = {}
    
    Object.keys(validationRules).forEach(field => {
      const rules = validationRules[field]
      const value = formValues.value[field]
      
      rules.forEach(rule => {
        if (!rule.validator(value, formValues.value)) {
          if (!errors.value[field]) {
            errors.value[field] = []
          }
          errors.value[field].push(rule.message)
        }
      })
    })
    
    return Object.keys(errors.value).length === 0
  }
  
  // 字段验证
  const validateField = (field) => {
    if (!validationRules[field]) return true
    
    errors.value[field] = []
    const rules = validationRules[field]
    const value = formValues.value[field]
    
    rules.forEach(rule => {
      if (!rule.validator(value, formValues.value)) {
        errors.value[field].push(rule.message)
      }
    })
    
    return errors.value[field].length === 0
  }
  
  // 重置表单
  const reset = () => {
    formValues.value = { ...initialValues }
    errors.value = {}
    touched.value = {}
  }
  
  // 表单状态
  const isValid = computed(() => validate())
  const isDirty = computed(() => {
    return Object.keys(formValues.value).some(key => 
      formValues.value[key] !== initialValues[key]
    )
  })
  
  // 字段处理
  const handleChange = (field, value) => {
    formValues.value[field] = value
    touched.value[field] = true
    validateField(field)
  }
  
  const handleBlur = (field) => {
    touched.value[field] = true
    validateField(field)
  }
  
  return {
    formValues,
    errors,
    touched,
    isSubmitting,
    isValid,
    isDirty,
    validate,
    validateField,
    reset,
    handleChange,
    handleBlur
  }
}

// 使用示例
const validationRules = {
  email: [
    {
      validator: (value) => value && value.includes('@'),
      message: '请输入有效的邮箱地址'
    },
    {
      validator: (value) => value && value.length > 0,
      message: '邮箱不能为空'
    }
  ],
  password: [
    {
      validator: (value) => value && value.length >= 8,
      message: '密码至少8个字符'
    }
  ]
}

性能优化技巧

1. 组件懒加载

// 路由配置中的懒加载
import { defineAsyncComponent } from 'vue'

const routes = [
  {
    path: '/dashboard',
    component: defineAsyncComponent(() =>
      import('./views/Dashboard.vue')
    )
  },
  {
    path: '/settings',
    component: defineAsyncComponent(() =>
      import('./views/Settings.vue')
    )
  }
]

// 条件懒加载组件
const AsyncModal = defineAsyncComponent({
  loader: () => import('./components/Modal.vue'),
  loadingComponent: LoadingSpinner,
  errorComponent: ErrorComponent,
  delay: 200,
  timeout: 3000
})

2. 列表虚拟滚动

<template>
  <div ref="containerRef" class="virtual-list-container" @scroll="handleScroll">
    <div :style="{ height: totalHeight + 'px' }">
      <div 
        v-for="item in visibleItems" 
        :key="item.id"
        :style="{ transform: `translateY(${item.offset}px)` }"
        class="virtual-list-item"
      >
        {{ item.content }}
      </div>
    </div>
  </div>
</template>

<script setup>
import { ref, computed, onMounted } from 'vue'

const props = defineProps({
  items: {
    type: Array,
    required: true
  },
  itemHeight: {
    type: Number,
    default: 50
  },
  buffer: {
    type: Number,
    default: 5
  }
})

const containerRef = ref(null)
const scrollTop = ref(0)
const containerHeight = ref(0)

// 总高度
const totalHeight = computed(() => props.items.length * props.itemHeight)

// 可见项范围
const startIndex = computed(() => {
  return Math.max(0, Math.floor(scrollTop.value / props.itemHeight) - props.buffer)
})

const endIndex = computed(() => {
  const visibleCount = Math.ceil(containerHeight.value / props.itemHeight)
  return Math.min(
    props.items.length,
    Math.ceil((scrollTop.value + containerHeight.value) / props.itemHeight) + props.buffer
  )
})

// 可见项
const visibleItems = computed(() => {
  return props.items.slice(startIndex.value, endIndex.value).map((item, index) => ({
    ...item,
    offset: (startIndex.value + index) * props.itemHeight
  }))
})

function handleScroll() {
  if (containerRef.value) {
    scrollTop.value = containerRef.value.scrollTop
  }
}

onMounted(() => {
  if (containerRef.value) {
    containerHeight.value = containerRef.value.clientHeight
  }
})
</script>

<style scoped>
.virtual-list-container {
  height: 500px;
  overflow-y: auto;
  position: relative;
}

.virtual-list-item {
  position: absolute;
  width: 100%;
  height: 50px;
  box-sizing: border-box;
  border-bottom: 1px solid #eee;
  display: flex;
  align-items: center;
  padding: 0 16px;
}
</style>

3. 防抖与节流组合式函数

// composables/useDebounce.js
import { ref, watch } from 'vue'

export function useDebounce(value, delay = 500) {
  const debouncedValue = ref(value.value)
  let timeoutId
  
  watch(value, (newValue) => {
    clearTimeout(timeoutId)
    timeoutId = setTimeout(() => {
      debouncedValue.value = newValue
    }, delay)
  }, { immediate: true })
  
  const cancel = () => {
    clearTimeout(timeoutId)
  }
  
  return {
    debouncedValue,
    cancel
  }
}

// composables/useThrottle.js
import { ref, watch } from 'vue'

export function useThrottle(value, limit = 500) {
  const throttledValue = ref(value.value)
  let lastCall = 0
  let timeoutId
  
  watch(value, (newValue) => {
    const now = Date.now()
    const remaining = limit - (now - lastCall)
    
    if (remaining <= 0) {
      // 立即执行
      lastCall = now
      throttledValue.value = newValue
    } else if (!timeoutId) {
      // 延迟执行
      timeoutId = setTimeout(() => {
        lastCall = Date.now()
        throttledValue.value = newValue
        timeoutId = null
      }, remaining)
    }
  }, { immediate: true })
  
  const cancel = () => {
    if (timeoutId) {
      clearTimeout(timeoutId)
      timeoutId = null
    }
  }
  
  return {
    throttledValue,
    cancel
  }
}

// 使用示例
const searchQuery = ref('')
const { debouncedValue: debouncedQuery } = useDebounce(searchQuery, 300)

watch(debouncedQuery, (newQuery) => {
  // 只有在用户停止输入300ms后才会触发搜索
  if (newQuery) {
    performSearch(newQuery)
  }
})

测试策略

1. 组合式函数测试

// __tests__/useCounter.spec.js
import { renderHook, act } from '@testing-library/vue'
import { useCounter } from '../useCounter'

describe('useCounter', () => {
  it('should initialize with default value', () => {
    const { result } = renderHook(() => useCounter())
    expect(result.current.count.value).toBe(0)
  })
  
  it('should initialize with custom value', () => {
    const { result } = renderHook(() => useCounter(10))
    expect(result.current.count.value).toBe(10)
  })
  
  it('should increment counter', () => {
    const { result } = renderHook(() => useCounter(5))
    
    act(() => {
      result.current.increment()
    })
    
    expect(result.current.count.value).toBe(6)
    expect(result.current.doubled.value).toBe(12)
  })
  
  it('should decrement counter', () => {
    const { result } = renderHook(() => useCounter(5))
    
    act(() => {
      result.current.decrement()
    })
    
    expect(result.current.count.value).toBe(4)
  })
  
  it('should reset counter', () => {
    const { result } = renderHook(() => useCounter(5))
    
    act(() => {
      result.current.increment()
      result.current.increment()
      result.current.reset()
    })
    
    expect(result.current.count.value).toBe(5)
  })
})

2. 组件测试

// __tests__/UserProfile.spec.js
import { mount } from '@vue/test-utils'
import UserProfile from '../UserProfile.vue'
import { createPinia, setActivePinia } from 'pinia'

describe('UserProfile', () => {
  let pinia
  
  beforeEach(() => {
    pinia = createPinia()
    setActivePinia(pinia)
  })
  
  it('should render user information', async () => {
    const wrapper = mount(UserProfile, {
      global: {
        plugins: [pinia]
      },
      props: {
        userId: 1
      }
    })
    
    // 等待异步数据加载
    await wrapper.vm.()
    
    expect(wrapper.find('.user-name').text()).toBe('John Doe')
    expect(wrapper.find('.user-email').text()).toBe('john@example.com')
  })
  
  it('should show loading state', () => {
    const wrapper = mount(UserProfile, {
      global: {
        plugins: [pinia]
      },
      props: {
        userId: 1
      }
    })
    
    expect(wrapper.find('.loading-spinner').exists()).toBe(true)
  })
  
  it('should show error message', async () => {
    // Mock API失败
    jest.spyOn(global, 'fetch').mockRejectedValue(new Error('Network error'))
    
    const wrapper = mount(UserProfile, {
      global: {
        plugins: [pinia]
      },
      props: {
        userId: 1
      }
    })
    
    await wrapper.vm.()
    
    expect(wrapper.find('.error-message').text()).toContain('Network error')
  })
})

项目结构组织

推荐的项目结构

src/
├── components/           # 通用组件
│   ├── common/          # 全局通用组件
│   ├── layout/          # 布局组件
│   └── ui/              # UI基础组件
├── composables/         # 组合式函数
│   ├── useApi.js        # API请求
│   ├── useForm.js       # 表单处理
│   ├── useAuth.js       # 认证逻辑
│   └── index.js         # 统一导出
├── stores/              # Pinia状态管理
│   ├── userStore.js
│   ├── appStore.js
│   └── index.js
├── views/               # 页面组件
│   ├── Home.vue
│   ├── Dashboard.vue
│   └── Settings.vue
├── router/              # 路由配置
│   ├── index.js
│   └── guards.js
├── utils/               # 工具函数
│   ├── validators.js
│   ├── formatters.js
│   └── constants.js
├── assets/              # 静态资源
├── styles/              # 全局样式
├── types/               # TypeScript类型定义
└── main.js              # 应用入口

统一导出模式

// composables/index.js
export { useApi } from './useApi'
export { useForm } from './useForm'
export { useAuth } from './useAuth'
export { useDebounce } from './useDebounce'
export { useThrottle } from './useThrottle'

// 使用示例
import { useApi, useForm } from '@/composables'

最佳实践总结

  1. 逻辑复用优先:使用组合式函数提取可复用逻辑
  2. 状态管理清晰:合理划分store,避免全局状态滥用
  3. 性能意识:使用懒加载、虚拟滚动等技术优化性能
  4. 类型安全:充分利用TypeScript的类型系统
  5. 测试覆盖:为组合式函数和复杂组件编写测试
  6. 代码分割:按路由和功能模块进行代码分割
  7. 错误处理:统一的错误处理和用户反馈
  8. 响应式设计:考虑不同设备和屏幕尺寸

常见问题与解决方案

1. 组合式函数之间的依赖

// 正确的依赖管理
import { useAuth } from './useAuth'
import { useApi } from './useApi'

export function useUserProfile() {
  const { user } = useAuth()
  const { fetchData, loading, error } = useApi()
  
  const userProfile = ref(null)
  
  async function loadProfile() {
    if (!user.value) return
    
    try {
      userProfile.value = await fetchData(`/users/${user.value.id}/profile`)
    } catch (err) {
      console.error('Failed to load profile:', err)
    }
  }
  
  watch(user, (newUser) => {
    if (newUser) {
      loadProfile()
    } else {
      userProfile.value = null
    }
  }, { immediate: true })
  
  return {
    userProfile,
    loading,
    error,
    loadProfile
  }
}

2. 内存泄漏预防

import { onUnmounted } from 'vue'

export function useEventListener(target, event, handler) {
  // 添加事件监听
  target.addEventListener(event, handler)
  
  // 组件卸载时移除
  onUnmounted(() => {
    target.removeEventListener(event, handler)
  })
}

export function useInterval(callback, delay) {
  let intervalId
  
  const start = () => {
    intervalId = setInterval(callback, delay)
  }
  
  const stop = () => {
    if (intervalId) {
      clearInterval(intervalId)
      intervalId = null
    }
  }
  
  // 自动清理
  onUnmounted(stop)
  
  return {
    start,
    stop
  }
}

结语

Vue3的组合式API为大型前端项目开发带来了全新的可能性。通过合理的架构设计和最佳实践,我们可以构建出更可维护、可测试、高性能的前端应用。在实践中不断总结经验,持续优化代码结构,才能真正发挥组合式API的优势。

提示:在迁移现有Vue2项目到Vue3时,建议逐步采用组合式API,先从新功能开始,逐步重构旧代码。请输入代码

Python异步编程:asyncio在Web开发中的应用

Python的异步编程模型通过asyncio库为我们提供了强大的并发处理能力。在现代Web开发中,异步编程可以显著提高应用的性能和响应速度。本文将深入探讨asyncio在Web开发中的实际应用。

asyncio基础概念

协程(Coroutines)

协程是异步编程的基础,使用定义:

import asyncio

# 定义一个协程
async def hello_world():
    print("Hello")
    await asyncio.sleep(1)  # 异步等待
    print("World")

# 运行协程
async def main():
    await hello_world()

# Python 3.7+
asyncio.run(main())

异步上下文管理器

使用管理异步资源:

import asyncio

class AsyncDatabaseConnection:
    async def __aenter__(self):
        print("Connecting to database...")
        await asyncio.sleep(0.5)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection...")
        await asyncio.sleep(0.5)
    
    async def query(self, sql):
        print(f"Executing query: {sql}")
        await asyncio.sleep(0.3)
        return [{"id": 1, "name": "John"}]

async def main():
    async with AsyncDatabaseConnection() as db:
        result = await db.query("SELECT * FROM users")
        print(f"Result: {result}")

asyncio.run(main())

Web开发中的应用

1. 异步Web框架:FastAPI

FastAPI是基于asyncio的现代Web框架:

from fastapi import FastAPI, HTTPException
from typing import Optional
import asyncio

app = FastAPI()

# 异步端点
@app.get("/users/{user_id}")
async def read_user(user_id: int):
    # 模拟异步数据库查询
    await asyncio.sleep(0.1)
    
    users = {
        1: {"name": "John", "email": "john@example.com"},
        2: {"name": "Jane", "email": "jane@example.com"}
    }
    
    if user_id not in users:
        raise HTTPException(status_code=404, detail="User not found")
    
    return users[user_id]

# 批量数据处理
@app.post("/process-batch")
async def process_batch(items: list[int]):
    results = []
    
    async def process_item(item):
        # 模拟异步处理
        await asyncio.sleep(0.05)
        return {"item": item, "processed": True, "result": item * 2}
    
    # 并发处理所有项目
    tasks = [process_item(item) for item in items]
    results = await asyncio.gather(*tasks)
    
    return {"results": results}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

2. 异步数据库操作

使用异步数据库驱动提高并发性能:

import asyncio
import aiomysql
from typing import List, Dict

class AsyncMySQL:
    def __init__(self):
        self.pool = None
    
    async def connect(self, **kwargs):
        self.pool = await aiomysql.create_pool(**kwargs)
    
    async def close(self):
        self.pool.close()
        await self.pool.wait_closed()
    
    async def fetch_all(self, query: str, params=None) -> List[Dict]:
        async with self.pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cursor:
                await cursor.execute(query, params or ())
                return await cursor.fetchall()
    
    async def fetch_one(self, query: str, params=None) -> Dict:
        async with self.pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cursor:
                await cursor.execute(query, params or ())
                return await cursor.fetchone()
    
    async def execute(self, query: str, params=None) -> int:
        async with self.pool.acquire() as conn:
            async with conn.cursor() as cursor:
                await cursor.execute(query, params or ())
                await conn.commit()
                return cursor.rowcount

async def main():
    db = AsyncMySQL()
    await db.connect(
        host="localhost",
        port=3306,
        user="root",
        password="password",
        db="testdb",
        charset="utf8mb4"
    )
    
    try:
        # 并发查询多个表
        tasks = [
            db.fetch_all("SELECT * FROM users LIMIT 10"),
            db.fetch_all("SELECT * FROM orders LIMIT 5"),
            db.fetch_all("SELECT COUNT(*) as count FROM products")
        ]
        
        users, orders, count_result = await asyncio.gather(*tasks)
        
        print(f"Users: {len(users)} records")
        print(f"Orders: {len(orders)} records")
        print(f"Product count: {count_result[0]['count']}")
        
    finally:
        await db.close()

asyncio.run(main())

3. 异步文件处理

处理大量文件时的异步优化:

import asyncio
import aiofiles
from pathlib import Path
import json
from typing import List

class AsyncFileProcessor:
    def __init__(self, max_concurrent: int = 10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_file(self, file_path: Path):
        async with self.semaphore:
            try:
                async with aiofiles.open(file_path, "r", encoding="utf-8") as f:
                    content = await f.read()
                
                # 模拟处理逻辑
                await asyncio.sleep(0.01)
                
                result = {
                    "file": str(file_path),
                    "size": len(content),
                    "lines": len(content.split("
")),
                    "processed": True
                }
                
                return result
                
            except Exception as e:
                return {"file": str(file_path), "error": str(e), "processed": False}
    
    async def process_directory(self, directory: Path) -> List[dict]:
        if not directory.is_dir():
            raise ValueError(f"{directory} is not a directory")
        
        tasks = []
        for file_path in directory.glob("*.txt"):
            tasks.append(self.process_file(file_path))
        
        results = await asyncio.gather(*tasks)
        return results

async def main():
    processor = AsyncFileProcessor(max_concurrent=5)
    
    # 创建一个测试目录
    test_dir = Path("./test_files")
    test_dir.mkdir(exist_ok=True)
    
    # 创建一些测试文件
    for i in range(20):
        file_path = test_dir / f"file_{i:03d}.txt"
        file_path.write_text(f"Test content for file {i}
" * 100)
    
    # 处理目录中的所有文件
    results = await processor.process_directory(test_dir)
    
    successful = sum(1 for r in results if r.get("processed"))
    print(f"Processed {successful}/{len(results)} files successfully")
    
    # 保存结果
    async with aiofiles.open("results.json", "w", encoding="utf-8") as f:
        await f.write(json.dumps(results, indent=2, ensure_ascii=False))
    
    # 清理测试文件
    for file_path in test_dir.glob("*.txt"):
        file_path.unlink()
    test_dir.rmdir()

asyncio.run(main())

性能优化技巧

1. 限制并发数

使用信号量(Semaphore)控制并发数量:

import asyncio
from typing import List

class RateLimitedProcessor:
    def __init__(self, max_concurrent: int = 10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_item(self, item):
        async with self.semaphore:
            # 模拟处理逻辑
            await asyncio.sleep(0.1)
            return {"item": item, "processed": True}
    
    async def process_batch(self, items: List):
        tasks = [self.process_item(item) for item in items]
        return await asyncio.gather(*tasks)

async def main():
    processor = RateLimitedProcessor(max_concurrent=5)
    
    # 处理100个项目,但最多同时处理5个
    items = list(range(100))
    results = await processor.process_batch(items)
    
    print(f"Processed {len(results)} items")
    print(f"Successful: {sum(1 for r in results if r['processed'])}")

asyncio.run(main())

2. 异步缓存策略

import asyncio
from functools import wraps
from typing import Any, Callable
import time

def async_cache(ttl: int = 300):
    """异步缓存装饰器"""
    cache = {}
    
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 生成缓存键
            cache_key = (args, tuple(kwargs.items()))
            
            # 检查缓存
            if cache_key in cache:
                cached_value, timestamp = cache[cache_key]
                if time.time() - timestamp < ttl:
                    return cached_value
            
            # 执行函数
            result = await func(*args, **kwargs)
            
            # 更新缓存
            cache[cache_key] = (result, time.time())
            
            return result
        
        return wrapper
    
    return decorator

@async_cache(ttl=60)  # 缓存60秒
async def expensive_operation(user_id: int):
    print(f"Performing expensive operation for user {user_id}")
    await asyncio.sleep(2)  # 模拟耗时操作
    return {"user_id": user_id, "data": "expensive_result"}

async def main():
    # 第一次调用,会执行操作
    result1 = await expensive_operation(1)
    print(f"Result 1: {result1}")
    
    # 第二次调用,从缓存获取
    result2 = await expensive_operation(1)
    print(f"Result 2: {result2}")
    
    # 不同的参数,会重新执行
    result3 = await expensive_operation(2)
    print(f"Result 3: {result3}")

asyncio.run(main())

3. 异步任务队列

import asyncio
from typing import List, Callable, Any
import time

class AsyncTaskQueue:
    def __init__(self, max_workers: int = 5):
        self.max_workers = max_workers
        self.queue = asyncio.Queue()
        self.workers = []
        self.results = []
    
    async def worker(self, worker_id: int):
        while True:
            try:
                task = await self.queue.get()
                if task is None:  # 停止信号
                    break
                
                func, args, kwargs = task
                try:
                    result = await func(*args, **kwargs)
                    self.results.append((worker_id, "success", result))
                except Exception as e:
                    self.results.append((worker_id, "error", str(e)))
                
                self.queue.task_done()
                
            except asyncio.CancelledError:
                break
    
    async def add_task(self, func: Callable, *args, **kwargs):
        await self.queue.put((func, args, kwargs))
    
    async def run(self):
        # 启动worker
        self.workers = [
            asyncio.create_task(self.worker(i))
            for i in range(self.max_workers)
        ]
        
        # 等待所有任务完成
        await self.queue.join()
        
        # 停止worker
        for _ in range(self.max_workers):
            await self.queue.put(None)
        
        # 等待worker结束
        await asyncio.gather(*self.workers)
        
        return self.results

async def sample_task(task_id: int, duration: float):
    await asyncio.sleep(duration)
    return {"task_id": task_id, "duration": duration, "completed": True}

async def main():
    queue = AsyncTaskQueue(max_workers=3)
    
    # 添加任务
    for i in range(10):
        duration = 0.1 * (i % 3 + 1)  # 0.1, 0.2, 0.3秒
        await queue.add_task(sample_task, i, duration)
    
    # 运行队列
    start_time = time.time()
    results = await queue.run()
    end_time = time.time()
    
    print(f"Total time: {end_time - start_time:.2f} seconds")
    print(f"Processed {len(results)} tasks")
    
    successful = sum(1 for _, status, _ in results if status == "success")
    print(f"Successful tasks: {successful}")

asyncio.run(main())

错误处理与调试

1. 异步异常处理

import asyncio

async def risky_operation():
    await asyncio.sleep(0.1)
    raise ValueError("Something went wrong!")

async def handle_exceptions():
    try:
        await risky_operation()
    except ValueError as e:
        print(f"Caught exception: {e}")
        return "fallback_value"

async def gather_with_exceptions():
    tasks = [
        risky_operation(),
        asyncio.sleep(0.5),
        risky_operation()
    ]
    
    # gather会等待所有任务完成,返回结果列表
    # 如果任何任务抛出异常,gather也会抛出异常
    try:
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Task {i} failed: {result}")
            else:
                print(f"Task {i} succeeded: {result}")
    except Exception as e:
        print(f"Gather failed: {e}")

async def main():
    print("=== Exception handling ===")
    result = await handle_exceptions()
    print(f"Result: {result}")
    
    print("
=== Gather with exceptions ===")
    await gather_with_exceptions()

asyncio.run(main())

2. 异步调试技巧

import asyncio
import logging
from functools import wraps

# 配置日志
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

def async_logging(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        logger.debug(f"Starting {func.__name__} with args={args}, kwargs={kwargs}")
        try:
            result = await func(*args, **kwargs)
            logger.debug(f"Completed {func.__name__} with result={result}")
            return result
        except Exception as e:
            logger.error(f"Failed {func.__name__}: {e}", exc_info=True)
            raise
    return wrapper

@async_logging
async def monitored_operation(duration: float):
    await asyncio.sleep(duration)
    return {"duration": duration, "status": "completed"}

async def main():
    tasks = [
        monitored_operation(0.1),
        monitored_operation(0.2),
        monitored_operation(0.3)
    ]
    
    results = await asyncio.gather(*tasks)
    logger.info(f"All tasks completed: {results}")

asyncio.run(main())

最佳实践总结

  1. 合理使用async/await:只在真正的I/O操作处使用await
  2. 控制并发数量:使用Semaphore防止资源耗尽
  3. 错误处理:为每个异步操作添加适当的异常处理
  4. 性能监控:使用异步友好的监控工具
  5. 测试策略:编写针对异步代码的测试
  6. 资源管理:使用异步上下文管理器管理资源
  7. 超时设置:为所有异步操作设置合理的超时

实际项目建议

Web API开发

  • 使用FastAPI或aiohttp框架
  • 异步数据库驱动(aiomysql, asyncpg等)
  • 异步缓存(aioredis)
  • 异步任务队列(arq, dramatiq)

数据处理管道

  • 异步文件处理(aiofiles)
  • 异步网络请求(aiohttp)
  • 异步消息队列(aio-pika)
  • 并行处理(asyncio.gather, asyncio.as_completed)

监控与调试

  • 结构化日志记录
  • 异步性能分析
  • 错误追踪集成
  • 指标收集

通过合理应用Python的异步编程特性,可以显著提升Web应用的性能和响应能力,特别是在高并发场景下。

提示:在生产环境中使用异步代码时,确保充分测试并监控性能指标。