Python异步上下文管理器
异步上下文管理器支持async with语句,用于管理异步资源的获取和释放。
异步上下文管理器基础
Python
import asyncio
class AsyncContextManager:
"异步上下文管理器示例"
async def __aenter__(self):
"异步进入"
print("获取资源...")
await asyncio.sleep(0.1) # 模拟异步操作
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"异步退出"
print("释放资源...")
await asyncio.sleep(0.1)
return False # 不抑制异常
# 使用 async with
async def main():
async with AsyncContextManager() as manager:
print("使用资源...")
asyncio.run(main())
Python
# 对比同步上下文管理器
class SyncContextManager:
def __enter__(self):
print("同步获取")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print("同步释放")
return False
# 同步用 with,异步用 async with
with SyncContextManager() as sync_mgr:
pass
async with AsyncContextManager() as async_mgr:
pass
异步资源管理
Python
import asyncio
class AsyncDatabaseConnection:
"异步数据库连接模拟"
def __init__(self, host: str):
self.host = host
self.connected = False
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.disconnect()
async def connect(self):
print(f"连接到 {self.host}...")
await asyncio.sleep(0.2)
self.connected = True
print("已连接")
async def disconnect(self):
print("断开连接...")
await asyncio.sleep(0.1)
self.connected = False
print("已断开")
async def execute(self, query: str):
if not self.connected:
raise RuntimeError("未连接")
await asyncio.sleep(0.1)
return f"执行: {query}"
# 使用
async def query_database():
async with AsyncDatabaseConnection("localhost") as conn:
result = await conn.execute("SELECT * FROM users")
print(result)
asyncio.run(query_database())
Python
# 异步文件操作
import aiofiles
async def read_file_async(filepath: str):
"异步读取文件"
async with aiofiles.open(filepath, 'r') as f:
content = await f.read()
return content
async def write_file_async(filepath: str, content: str):
"异步写入文件"
async with aiofiles.open(filepath, 'w') as f:
await f.write(content)
contextlib.asynccontextmanager
Python
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_resource(name: str):
"装饰器创建异步上下文管理器"
# 进入阶段
print(f"获取资源: {name}")
await asyncio.sleep(0.1)
try:
yield name # 返回资源
finally:
# 退出阶段
print(f"释放资源: {name}")
await asyncio.sleep(0.1)
# 使用
async def use_resource():
async with async_resource("db_connection") as res:
print(f"使用 {res}")
asyncio.run(use_resource())
Python
# 复杂资源管理
@asynccontextmanager
async def managed_async_session(url: str):
"管理异步会话"
session = await create_session(url)
try:
yield session
except Exception as e:
await session.handle_error(e)
raise
finally:
await session.close()
async def create_session(url: str):
"模拟创建会话"
await asyncio.sleep(0.1)
return AsyncSession(url)
class AsyncSession:
def __init__(self, url):
self.url = url
async def handle_error(self, exc):
print(f"处理错误: {exc}")
await asyncio.sleep(0.05)
async def close(self):
print("关闭会话")
await asyncio.sleep(0.05)
async def use_session():
async with managed_async_session("https://api.example.com") as session:
print(f"会话URL: {session.url}")
异步迭代器
Python
import asyncio
class AsyncIterator:
"异步迭代器"
def __init__(self, items):
self.items = items
self.index = 0
def __aiter__(self):
"返回异步迭代器"
return self
async def __anext__(self):
"异步获取下一个元素"
await asyncio.sleep(0.1) # 模拟异步操作
if self.index >= len(self.items):
raise StopAsyncIteration
item = self.items[self.index]
self.index += 1
return item
# 使用 async for
async def iterate_async():
async for item in AsyncIterator([1, 2, 3, 4, 5]):
print(f"处理: {item}")
asyncio.run(iterate_async())
Python
# 异步数据流迭代器
class AsyncDataStream:
"模拟异步数据流"
def __init__(self, source: str):
self.source = source
self.position = 0
self.chunk_size = 10
def __aiter__(self):
return self
async def __anext__(self):
await asyncio.sleep(0.05) # 模拟IO延迟
if self.position >= len(self.source):
raise StopAsyncIteration
chunk = self.source[self.position:self.position + self.chunk_size]
self.position += self.chunk_size
return chunk
async def process_stream():
async for chunk in AsyncDataStream("这是一段异步处理的数据流示例"):
print(f"收到块: {chunk}")
asyncio.run(process_stream())
异步生成器
Python
import asyncio
async def async_range(start: int, stop: int, step: int = 1):
"异步生成器"
current = start
while current < stop:
await asyncio.sleep(0.05)
yield current
current += step
# 使用 async for
async def consume_async_generator():
async for num in async_range(0, 5):
print(f"值: {num}")
asyncio.run(consume_async_generator())
Python
# 实际应用:异步分页数据
async def async_paginate(api_url: str, page_size: int = 10):
"异步分页获取数据"
page = 1
while True:
await asyncio.sleep(0.1)
# 模拟API调用
data = await fetch_page(api_url, page, page_size)
if not data:
break
for item in data:
yield item
page += 1
async def fetch_page(url: str, page: int, size: int):
"模拟获取页数据"
if page > 3:
return None
return [{"id": i, "page": page} for i in range(size)]
async def process_pages():
async for item in async_paginate("https://api.example.com/data"):
print(f"处理: {item}")
asyncio.run(process_pages())
组合使用
Python
import asyncio
class AsyncProcessor:
"组合异步上下文管理器和迭代器"
def __init__(self, items):
self.items = items
async def __aenter__(self):
print("启动处理器")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("关闭处理器")
await asyncio.sleep(0.1)
def __aiter__(self):
self.index = 0
return self
async def __anext__(self):
await asyncio.sleep(0.05)
if self.index >= len(self.items):
raise StopAsyncIteration
item = self.items[self.index]
self.index += 1
return await self.process(item)
async def process(self, item):
await asyncio.sleep(0.02)
return f"processed_{item}"
async def combined_usage():
async with AsyncProcessor(['a', 'b', 'c']) as processor:
async for result in processor:
print(result)
asyncio.run(combined_usage())
异步资源池
Python
import asyncio
from collections import deque
class AsyncResourcePool:
"异步资源池"
def __init__(self, max_size: int = 5):
self.max_size = max_size
self.available = deque()
self.in_use = set()
self.semaphore = asyncio.Semaphore(max_size)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close_all()
async def acquire(self):
"获取资源"
await self.semaphore.acquire()
if self.available:
resource = self.available.pop()
else:
resource = await self.create_resource()
self.in_use.add(resource)
return resource
async def release(self, resource):
"释放资源"
self.in_use.remove(resource)
self.available.append(resource)
self.semaphore.release()
async def create_resource(self):
await asyncio.sleep(0.1)
return f"resource_{len(self.in_use)}"
async def close_all(self):
for resource in list(self.in_use):
await self.release(resource)
async def use_pool():
async with AsyncResourcePool(max_size=3) as pool:
res1 = await pool.acquire()
print(f"获取: {res1}")
res2 = await pool.acquire()
print(f"获取: {res2}")
await pool.release(res1)
print(f"释放: {res1}")
res3 = await pool.acquire()
print(f"获取: {res3}")
asyncio.run(use_pool())
异步锁与上下文管理器
Python
import asyncio
class AsyncLockContext:
"异步锁上下文管理器"
def __init__(self):
self.lock = asyncio.Lock()
async def __aenter__(self):
await self.lock.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.lock.release()
async def protected_operation():
lock_ctx = AsyncLockContext()
async with lock_ctx:
print("执行受保护操作")
await asyncio.sleep(0.1)
print("操作完成")
async def concurrent_tasks():
tasks = [
protected_operation(),
protected_operation(),
protected_operation()
]
await asyncio.gather(*tasks)
asyncio.run(concurrent_tasks())
异步上下文管理器嵌套
Python
import asyncio
async def nested_contexts():
"嵌套异步上下文"
async with AsyncContextManager() as outer:
async with AsyncContextManager() as inner:
print("在嵌套上下文中")
asyncio.run(nested_contexts())
# 使用 AsyncExitStack 管理多个上下文
from contextlib import AsyncExitStack
async def managed_multiple_contexts():
"动态管理多个异步上下文"
async with AsyncExitStack() as stack:
# 动态添加上下文
conn1 = await stack.enter_async_context(AsyncDatabaseConnection("db1"))
conn2 = await stack.enter_async_context(AsyncDatabaseConnection("db2"))
# 使用资源
result1 = await conn1.execute("SELECT 1")
result2 = await conn2.execute("SELECT 2")
print(result1, result2)
asyncio.run(managed_multiple_contexts())
要点总结
- **
__aenter__和__aexit__**定义异步上下文管理器 - **
@asynccontextmanager**装饰器简化创建 - **
__aiter__和__anext__**定义异步迭代器 - 异步生成器使用
async def和yield - AsyncExitStack动态管理多个异步上下文
📝 发现内容有误?点击此处直接编辑