在生产环境中,Redis的高可用性至关重要。本文将介绍Redis的主从复制、哨兵模式和集群模式。
哨兵模式
哨兵提供自动故障转移功能:
性能监控
使用redis-cli监控性能:
最佳实践
- 至少部署3个节点(1主2从)
- 配置合适的持久化策略
- 设置内存淘汰策略
- 定期备份数据
在生产环境中,Redis的高可用性至关重要。本文将介绍Redis的主从复制、哨兵模式和集群模式。
哨兵提供自动故障转移功能:
使用redis-cli监控性能:
索引是数据库性能的基石。本文将深入讲解MySQL索引的工作原理、优化技巧和实战案例。
通过合理索引设计,查询性能提升10倍以上。
索引是数据库性能优化的核心。本文将分享MySQL索引优化和分库分表的实践经验。
通过系统优化,可以构建高性能的数据库系统。
Vue3的组合式API(Composition API)为大型前端项目带来了革命性的改进。通过更灵活的逻辑组织和更好的TypeScript支持,组合式API让我们能够构建更可维护、可测试的前端应用。本文将分享在大型项目中应用组合式API的实践经验。
<template>
<div>
<p>Count: {{ count }}</p>
<button @click="increment">Increment</button>
<p>Doubled: {{ doubledCount }}</p>
</div>
</template>
<script setup>
import { ref, computed } from 'vue'
// 响应式数据
const count = ref(0)
// 计算属性
const doubledCount = computed(() => count.value * 2)
// 方法
function increment() {
count.value++
}
</script>组合式函数是组合式API的核心,用于逻辑复用:
// useCounter.js
import { ref, computed } from 'vue'
export function useCounter(initialValue = 0) {
const count = ref(initialValue)
const doubled = computed(() => count.value * 2)
const tripled = computed(() => count.value * 3)
function increment() {
count.value++
}
function decrement() {
count.value--
}
function reset() {
count.value = initialValue
}
return {
count,
doubled,
tripled,
increment,
decrement,
reset
}
}使用组合式函数:
<template>
<div>
<p>Count: {{ count }}</p>
<p>Doubled: {{ doubled }}</p>
<p>Tripled: {{ tripled }}</p>
<button @click="increment">+</button>
<button @click="decrement">-</button>
<button @click="reset">Reset</button>
</div>
</template>
<script setup>
import { useCounter } from './useCounter.js'
const { count, doubled, tripled, increment, decrement, reset } = useCounter(10)
</script>在大型项目中,合理组织状态至关重要:
// store/userStore.js
import { ref, computed } from 'vue'
import { defineStore } from 'pinia'
export const useUserStore = defineStore('user', () => {
// 状态
const user = ref(null)
const permissions = ref([])
const loading = ref(false)
const error = ref(null)
// getters
const isLoggedIn = computed(() => !!user.value)
const hasPermission = (permission) =>
computed(() => permissions.value.includes(permission))
// actions
async function login(credentials) {
loading.value = true
error.value = null
try {
const response = await fetch('/api/login', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(credentials)
})
if (!response.ok) {
throw new Error('Login failed')
}
const data = await response.json()
user.value = data.user
permissions.value = data.permissions
return data
} catch (err) {
error.value = err.message
throw err
} finally {
loading.value = false
}
}
function logout() {
user.value = null
permissions.value = []
}
return {
user,
permissions,
loading,
error,
isLoggedIn,
hasPermission,
login,
logout
}
})// composables/useApi.js
import { ref } from 'vue'
export function useApi(baseURL = '') {
const loading = ref(false)
const error = ref(null)
const data = ref(null)
async function fetchData(endpoint, options = {}) {
loading.value = true
error.value = null
try {
const url = `${baseURL}${endpoint}`
const response = await fetch(url, {
headers: {
'Content-Type': 'application/json',
...options.headers
},
...options
})
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`)
}
data.value = await response.json()
return data.value
} catch (err) {
error.value = err.message
throw err
} finally {
loading.value = false
}
}
async function postData(endpoint, body, options = {}) {
return fetchData(endpoint, {
method: 'POST',
body: JSON.stringify(body),
...options
})
}
async function putData(endpoint, body, options = {}) {
return fetchData(endpoint, {
method: 'PUT',
body: JSON.stringify(body),
...options
})
}
async function deleteData(endpoint, options = {}) {
return fetchData(endpoint, {
method: 'DELETE',
...options
})
}
return {
loading,
error,
data,
fetchData,
postData,
putData,
deleteData
}
}// composables/useForm.js
import { ref, computed } from 'vue'
export function useForm(initialValues = {}, validationRules = {}) {
const formValues = ref({ ...initialValues })
const errors = ref({})
const touched = ref({})
const isSubmitting = ref(false)
// 验证表单
const validate = () => {
errors.value = {}
Object.keys(validationRules).forEach(field => {
const rules = validationRules[field]
const value = formValues.value[field]
rules.forEach(rule => {
if (!rule.validator(value, formValues.value)) {
if (!errors.value[field]) {
errors.value[field] = []
}
errors.value[field].push(rule.message)
}
})
})
return Object.keys(errors.value).length === 0
}
// 字段验证
const validateField = (field) => {
if (!validationRules[field]) return true
errors.value[field] = []
const rules = validationRules[field]
const value = formValues.value[field]
rules.forEach(rule => {
if (!rule.validator(value, formValues.value)) {
errors.value[field].push(rule.message)
}
})
return errors.value[field].length === 0
}
// 重置表单
const reset = () => {
formValues.value = { ...initialValues }
errors.value = {}
touched.value = {}
}
// 表单状态
const isValid = computed(() => validate())
const isDirty = computed(() => {
return Object.keys(formValues.value).some(key =>
formValues.value[key] !== initialValues[key]
)
})
// 字段处理
const handleChange = (field, value) => {
formValues.value[field] = value
touched.value[field] = true
validateField(field)
}
const handleBlur = (field) => {
touched.value[field] = true
validateField(field)
}
return {
formValues,
errors,
touched,
isSubmitting,
isValid,
isDirty,
validate,
validateField,
reset,
handleChange,
handleBlur
}
}
// 使用示例
const validationRules = {
email: [
{
validator: (value) => value && value.includes('@'),
message: '请输入有效的邮箱地址'
},
{
validator: (value) => value && value.length > 0,
message: '邮箱不能为空'
}
],
password: [
{
validator: (value) => value && value.length >= 8,
message: '密码至少8个字符'
}
]
}// 路由配置中的懒加载
import { defineAsyncComponent } from 'vue'
const routes = [
{
path: '/dashboard',
component: defineAsyncComponent(() =>
import('./views/Dashboard.vue')
)
},
{
path: '/settings',
component: defineAsyncComponent(() =>
import('./views/Settings.vue')
)
}
]
// 条件懒加载组件
const AsyncModal = defineAsyncComponent({
loader: () => import('./components/Modal.vue'),
loadingComponent: LoadingSpinner,
errorComponent: ErrorComponent,
delay: 200,
timeout: 3000
})<template>
<div ref="containerRef" class="virtual-list-container" @scroll="handleScroll">
<div :style="{ height: totalHeight + 'px' }">
<div
v-for="item in visibleItems"
:key="item.id"
:style="{ transform: `translateY(${item.offset}px)` }"
class="virtual-list-item"
>
{{ item.content }}
</div>
</div>
</div>
</template>
<script setup>
import { ref, computed, onMounted } from 'vue'
const props = defineProps({
items: {
type: Array,
required: true
},
itemHeight: {
type: Number,
default: 50
},
buffer: {
type: Number,
default: 5
}
})
const containerRef = ref(null)
const scrollTop = ref(0)
const containerHeight = ref(0)
// 总高度
const totalHeight = computed(() => props.items.length * props.itemHeight)
// 可见项范围
const startIndex = computed(() => {
return Math.max(0, Math.floor(scrollTop.value / props.itemHeight) - props.buffer)
})
const endIndex = computed(() => {
const visibleCount = Math.ceil(containerHeight.value / props.itemHeight)
return Math.min(
props.items.length,
Math.ceil((scrollTop.value + containerHeight.value) / props.itemHeight) + props.buffer
)
})
// 可见项
const visibleItems = computed(() => {
return props.items.slice(startIndex.value, endIndex.value).map((item, index) => ({
...item,
offset: (startIndex.value + index) * props.itemHeight
}))
})
function handleScroll() {
if (containerRef.value) {
scrollTop.value = containerRef.value.scrollTop
}
}
onMounted(() => {
if (containerRef.value) {
containerHeight.value = containerRef.value.clientHeight
}
})
</script>
<style scoped>
.virtual-list-container {
height: 500px;
overflow-y: auto;
position: relative;
}
.virtual-list-item {
position: absolute;
width: 100%;
height: 50px;
box-sizing: border-box;
border-bottom: 1px solid #eee;
display: flex;
align-items: center;
padding: 0 16px;
}
</style>// composables/useDebounce.js
import { ref, watch } from 'vue'
export function useDebounce(value, delay = 500) {
const debouncedValue = ref(value.value)
let timeoutId
watch(value, (newValue) => {
clearTimeout(timeoutId)
timeoutId = setTimeout(() => {
debouncedValue.value = newValue
}, delay)
}, { immediate: true })
const cancel = () => {
clearTimeout(timeoutId)
}
return {
debouncedValue,
cancel
}
}
// composables/useThrottle.js
import { ref, watch } from 'vue'
export function useThrottle(value, limit = 500) {
const throttledValue = ref(value.value)
let lastCall = 0
let timeoutId
watch(value, (newValue) => {
const now = Date.now()
const remaining = limit - (now - lastCall)
if (remaining <= 0) {
// 立即执行
lastCall = now
throttledValue.value = newValue
} else if (!timeoutId) {
// 延迟执行
timeoutId = setTimeout(() => {
lastCall = Date.now()
throttledValue.value = newValue
timeoutId = null
}, remaining)
}
}, { immediate: true })
const cancel = () => {
if (timeoutId) {
clearTimeout(timeoutId)
timeoutId = null
}
}
return {
throttledValue,
cancel
}
}
// 使用示例
const searchQuery = ref('')
const { debouncedValue: debouncedQuery } = useDebounce(searchQuery, 300)
watch(debouncedQuery, (newQuery) => {
// 只有在用户停止输入300ms后才会触发搜索
if (newQuery) {
performSearch(newQuery)
}
})// __tests__/useCounter.spec.js
import { renderHook, act } from '@testing-library/vue'
import { useCounter } from '../useCounter'
describe('useCounter', () => {
it('should initialize with default value', () => {
const { result } = renderHook(() => useCounter())
expect(result.current.count.value).toBe(0)
})
it('should initialize with custom value', () => {
const { result } = renderHook(() => useCounter(10))
expect(result.current.count.value).toBe(10)
})
it('should increment counter', () => {
const { result } = renderHook(() => useCounter(5))
act(() => {
result.current.increment()
})
expect(result.current.count.value).toBe(6)
expect(result.current.doubled.value).toBe(12)
})
it('should decrement counter', () => {
const { result } = renderHook(() => useCounter(5))
act(() => {
result.current.decrement()
})
expect(result.current.count.value).toBe(4)
})
it('should reset counter', () => {
const { result } = renderHook(() => useCounter(5))
act(() => {
result.current.increment()
result.current.increment()
result.current.reset()
})
expect(result.current.count.value).toBe(5)
})
})// __tests__/UserProfile.spec.js
import { mount } from '@vue/test-utils'
import UserProfile from '../UserProfile.vue'
import { createPinia, setActivePinia } from 'pinia'
describe('UserProfile', () => {
let pinia
beforeEach(() => {
pinia = createPinia()
setActivePinia(pinia)
})
it('should render user information', async () => {
const wrapper = mount(UserProfile, {
global: {
plugins: [pinia]
},
props: {
userId: 1
}
})
// 等待异步数据加载
await wrapper.vm.()
expect(wrapper.find('.user-name').text()).toBe('John Doe')
expect(wrapper.find('.user-email').text()).toBe('john@example.com')
})
it('should show loading state', () => {
const wrapper = mount(UserProfile, {
global: {
plugins: [pinia]
},
props: {
userId: 1
}
})
expect(wrapper.find('.loading-spinner').exists()).toBe(true)
})
it('should show error message', async () => {
// Mock API失败
jest.spyOn(global, 'fetch').mockRejectedValue(new Error('Network error'))
const wrapper = mount(UserProfile, {
global: {
plugins: [pinia]
},
props: {
userId: 1
}
})
await wrapper.vm.()
expect(wrapper.find('.error-message').text()).toContain('Network error')
})
})src/
├── components/ # 通用组件
│ ├── common/ # 全局通用组件
│ ├── layout/ # 布局组件
│ └── ui/ # UI基础组件
├── composables/ # 组合式函数
│ ├── useApi.js # API请求
│ ├── useForm.js # 表单处理
│ ├── useAuth.js # 认证逻辑
│ └── index.js # 统一导出
├── stores/ # Pinia状态管理
│ ├── userStore.js
│ ├── appStore.js
│ └── index.js
├── views/ # 页面组件
│ ├── Home.vue
│ ├── Dashboard.vue
│ └── Settings.vue
├── router/ # 路由配置
│ ├── index.js
│ └── guards.js
├── utils/ # 工具函数
│ ├── validators.js
│ ├── formatters.js
│ └── constants.js
├── assets/ # 静态资源
├── styles/ # 全局样式
├── types/ # TypeScript类型定义
└── main.js # 应用入口// composables/index.js
export { useApi } from './useApi'
export { useForm } from './useForm'
export { useAuth } from './useAuth'
export { useDebounce } from './useDebounce'
export { useThrottle } from './useThrottle'
// 使用示例
import { useApi, useForm } from '@/composables'// 正确的依赖管理
import { useAuth } from './useAuth'
import { useApi } from './useApi'
export function useUserProfile() {
const { user } = useAuth()
const { fetchData, loading, error } = useApi()
const userProfile = ref(null)
async function loadProfile() {
if (!user.value) return
try {
userProfile.value = await fetchData(`/users/${user.value.id}/profile`)
} catch (err) {
console.error('Failed to load profile:', err)
}
}
watch(user, (newUser) => {
if (newUser) {
loadProfile()
} else {
userProfile.value = null
}
}, { immediate: true })
return {
userProfile,
loading,
error,
loadProfile
}
}import { onUnmounted } from 'vue'
export function useEventListener(target, event, handler) {
// 添加事件监听
target.addEventListener(event, handler)
// 组件卸载时移除
onUnmounted(() => {
target.removeEventListener(event, handler)
})
}
export function useInterval(callback, delay) {
let intervalId
const start = () => {
intervalId = setInterval(callback, delay)
}
const stop = () => {
if (intervalId) {
clearInterval(intervalId)
intervalId = null
}
}
// 自动清理
onUnmounted(stop)
return {
start,
stop
}
}Vue3的组合式API为大型前端项目开发带来了全新的可能性。通过合理的架构设计和最佳实践,我们可以构建出更可维护、可测试、高性能的前端应用。在实践中不断总结经验,持续优化代码结构,才能真正发挥组合式API的优势。
提示:在迁移现有Vue2项目到Vue3时,建议逐步采用组合式API,先从新功能开始,逐步重构旧代码。请输入代码 Python的异步编程模型通过asyncio库为我们提供了强大的并发处理能力。在现代Web开发中,异步编程可以显著提高应用的性能和响应速度。本文将深入探讨asyncio在Web开发中的实际应用。
协程是异步编程的基础,使用定义:
import asyncio
# 定义一个协程
async def hello_world():
print("Hello")
await asyncio.sleep(1) # 异步等待
print("World")
# 运行协程
async def main():
await hello_world()
# Python 3.7+
asyncio.run(main())使用管理异步资源:
import asyncio
class AsyncDatabaseConnection:
async def __aenter__(self):
print("Connecting to database...")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing database connection...")
await asyncio.sleep(0.5)
async def query(self, sql):
print(f"Executing query: {sql}")
await asyncio.sleep(0.3)
return [{"id": 1, "name": "John"}]
async def main():
async with AsyncDatabaseConnection() as db:
result = await db.query("SELECT * FROM users")
print(f"Result: {result}")
asyncio.run(main())FastAPI是基于asyncio的现代Web框架:
from fastapi import FastAPI, HTTPException
from typing import Optional
import asyncio
app = FastAPI()
# 异步端点
@app.get("/users/{user_id}")
async def read_user(user_id: int):
# 模拟异步数据库查询
await asyncio.sleep(0.1)
users = {
1: {"name": "John", "email": "john@example.com"},
2: {"name": "Jane", "email": "jane@example.com"}
}
if user_id not in users:
raise HTTPException(status_code=404, detail="User not found")
return users[user_id]
# 批量数据处理
@app.post("/process-batch")
async def process_batch(items: list[int]):
results = []
async def process_item(item):
# 模拟异步处理
await asyncio.sleep(0.05)
return {"item": item, "processed": True, "result": item * 2}
# 并发处理所有项目
tasks = [process_item(item) for item in items]
results = await asyncio.gather(*tasks)
return {"results": results}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)使用异步数据库驱动提高并发性能:
import asyncio
import aiomysql
from typing import List, Dict
class AsyncMySQL:
def __init__(self):
self.pool = None
async def connect(self, **kwargs):
self.pool = await aiomysql.create_pool(**kwargs)
async def close(self):
self.pool.close()
await self.pool.wait_closed()
async def fetch_all(self, query: str, params=None) -> List[Dict]:
async with self.pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(query, params or ())
return await cursor.fetchall()
async def fetch_one(self, query: str, params=None) -> Dict:
async with self.pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(query, params or ())
return await cursor.fetchone()
async def execute(self, query: str, params=None) -> int:
async with self.pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(query, params or ())
await conn.commit()
return cursor.rowcount
async def main():
db = AsyncMySQL()
await db.connect(
host="localhost",
port=3306,
user="root",
password="password",
db="testdb",
charset="utf8mb4"
)
try:
# 并发查询多个表
tasks = [
db.fetch_all("SELECT * FROM users LIMIT 10"),
db.fetch_all("SELECT * FROM orders LIMIT 5"),
db.fetch_all("SELECT COUNT(*) as count FROM products")
]
users, orders, count_result = await asyncio.gather(*tasks)
print(f"Users: {len(users)} records")
print(f"Orders: {len(orders)} records")
print(f"Product count: {count_result[0]['count']}")
finally:
await db.close()
asyncio.run(main())处理大量文件时的异步优化:
import asyncio
import aiofiles
from pathlib import Path
import json
from typing import List
class AsyncFileProcessor:
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def process_file(self, file_path: Path):
async with self.semaphore:
try:
async with aiofiles.open(file_path, "r", encoding="utf-8") as f:
content = await f.read()
# 模拟处理逻辑
await asyncio.sleep(0.01)
result = {
"file": str(file_path),
"size": len(content),
"lines": len(content.split("
")),
"processed": True
}
return result
except Exception as e:
return {"file": str(file_path), "error": str(e), "processed": False}
async def process_directory(self, directory: Path) -> List[dict]:
if not directory.is_dir():
raise ValueError(f"{directory} is not a directory")
tasks = []
for file_path in directory.glob("*.txt"):
tasks.append(self.process_file(file_path))
results = await asyncio.gather(*tasks)
return results
async def main():
processor = AsyncFileProcessor(max_concurrent=5)
# 创建一个测试目录
test_dir = Path("./test_files")
test_dir.mkdir(exist_ok=True)
# 创建一些测试文件
for i in range(20):
file_path = test_dir / f"file_{i:03d}.txt"
file_path.write_text(f"Test content for file {i}
" * 100)
# 处理目录中的所有文件
results = await processor.process_directory(test_dir)
successful = sum(1 for r in results if r.get("processed"))
print(f"Processed {successful}/{len(results)} files successfully")
# 保存结果
async with aiofiles.open("results.json", "w", encoding="utf-8") as f:
await f.write(json.dumps(results, indent=2, ensure_ascii=False))
# 清理测试文件
for file_path in test_dir.glob("*.txt"):
file_path.unlink()
test_dir.rmdir()
asyncio.run(main())使用信号量(Semaphore)控制并发数量:
import asyncio
from typing import List
class RateLimitedProcessor:
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def process_item(self, item):
async with self.semaphore:
# 模拟处理逻辑
await asyncio.sleep(0.1)
return {"item": item, "processed": True}
async def process_batch(self, items: List):
tasks = [self.process_item(item) for item in items]
return await asyncio.gather(*tasks)
async def main():
processor = RateLimitedProcessor(max_concurrent=5)
# 处理100个项目,但最多同时处理5个
items = list(range(100))
results = await processor.process_batch(items)
print(f"Processed {len(results)} items")
print(f"Successful: {sum(1 for r in results if r['processed'])}")
asyncio.run(main())import asyncio
from functools import wraps
from typing import Any, Callable
import time
def async_cache(ttl: int = 300):
"""异步缓存装饰器"""
cache = {}
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = (args, tuple(kwargs.items()))
# 检查缓存
if cache_key in cache:
cached_value, timestamp = cache[cache_key]
if time.time() - timestamp < ttl:
return cached_value
# 执行函数
result = await func(*args, **kwargs)
# 更新缓存
cache[cache_key] = (result, time.time())
return result
return wrapper
return decorator
@async_cache(ttl=60) # 缓存60秒
async def expensive_operation(user_id: int):
print(f"Performing expensive operation for user {user_id}")
await asyncio.sleep(2) # 模拟耗时操作
return {"user_id": user_id, "data": "expensive_result"}
async def main():
# 第一次调用,会执行操作
result1 = await expensive_operation(1)
print(f"Result 1: {result1}")
# 第二次调用,从缓存获取
result2 = await expensive_operation(1)
print(f"Result 2: {result2}")
# 不同的参数,会重新执行
result3 = await expensive_operation(2)
print(f"Result 3: {result3}")
asyncio.run(main())import asyncio
from typing import List, Callable, Any
import time
class AsyncTaskQueue:
def __init__(self, max_workers: int = 5):
self.max_workers = max_workers
self.queue = asyncio.Queue()
self.workers = []
self.results = []
async def worker(self, worker_id: int):
while True:
try:
task = await self.queue.get()
if task is None: # 停止信号
break
func, args, kwargs = task
try:
result = await func(*args, **kwargs)
self.results.append((worker_id, "success", result))
except Exception as e:
self.results.append((worker_id, "error", str(e)))
self.queue.task_done()
except asyncio.CancelledError:
break
async def add_task(self, func: Callable, *args, **kwargs):
await self.queue.put((func, args, kwargs))
async def run(self):
# 启动worker
self.workers = [
asyncio.create_task(self.worker(i))
for i in range(self.max_workers)
]
# 等待所有任务完成
await self.queue.join()
# 停止worker
for _ in range(self.max_workers):
await self.queue.put(None)
# 等待worker结束
await asyncio.gather(*self.workers)
return self.results
async def sample_task(task_id: int, duration: float):
await asyncio.sleep(duration)
return {"task_id": task_id, "duration": duration, "completed": True}
async def main():
queue = AsyncTaskQueue(max_workers=3)
# 添加任务
for i in range(10):
duration = 0.1 * (i % 3 + 1) # 0.1, 0.2, 0.3秒
await queue.add_task(sample_task, i, duration)
# 运行队列
start_time = time.time()
results = await queue.run()
end_time = time.time()
print(f"Total time: {end_time - start_time:.2f} seconds")
print(f"Processed {len(results)} tasks")
successful = sum(1 for _, status, _ in results if status == "success")
print(f"Successful tasks: {successful}")
asyncio.run(main())import asyncio
async def risky_operation():
await asyncio.sleep(0.1)
raise ValueError("Something went wrong!")
async def handle_exceptions():
try:
await risky_operation()
except ValueError as e:
print(f"Caught exception: {e}")
return "fallback_value"
async def gather_with_exceptions():
tasks = [
risky_operation(),
asyncio.sleep(0.5),
risky_operation()
]
# gather会等待所有任务完成,返回结果列表
# 如果任何任务抛出异常,gather也会抛出异常
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
except Exception as e:
print(f"Gather failed: {e}")
async def main():
print("=== Exception handling ===")
result = await handle_exceptions()
print(f"Result: {result}")
print("
=== Gather with exceptions ===")
await gather_with_exceptions()
asyncio.run(main())import asyncio
import logging
from functools import wraps
# 配置日志
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def async_logging(func):
@wraps(func)
async def wrapper(*args, **kwargs):
logger.debug(f"Starting {func.__name__} with args={args}, kwargs={kwargs}")
try:
result = await func(*args, **kwargs)
logger.debug(f"Completed {func.__name__} with result={result}")
return result
except Exception as e:
logger.error(f"Failed {func.__name__}: {e}", exc_info=True)
raise
return wrapper
@async_logging
async def monitored_operation(duration: float):
await asyncio.sleep(duration)
return {"duration": duration, "status": "completed"}
async def main():
tasks = [
monitored_operation(0.1),
monitored_operation(0.2),
monitored_operation(0.3)
]
results = await asyncio.gather(*tasks)
logger.info(f"All tasks completed: {results}")
asyncio.run(main())通过合理应用Python的异步编程特性,可以显著提升Web应用的性能和响应能力,特别是在高并发场景下。
提示:在生产环境中使用异步代码时,确保充分测试并监控性能指标。