全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-19 10 分钟 ✍️ juanwangdev

Python异步上下文管理器

异步上下文管理器支持async with语句,用于管理异步资源的获取和释放。

异步上下文管理器基础

Python
import asyncio

class AsyncContextManager:
    "异步上下文管理器示例"

    async def __aenter__(self):
        "异步进入"
        print("获取资源...")
        await asyncio.sleep(0.1)  # 模拟异步操作
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        "异步退出"
        print("释放资源...")
        await asyncio.sleep(0.1)
        return False  # 不抑制异常

# 使用 async with
async def main():
    async with AsyncContextManager() as manager:
        print("使用资源...")

asyncio.run(main())
Python
# 对比同步上下文管理器
class SyncContextManager:
    def __enter__(self):
        print("同步获取")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print("同步释放")
        return False

# 同步用 with,异步用 async with
with SyncContextManager() as sync_mgr:
    pass

async with AsyncContextManager() as async_mgr:
    pass

异步资源管理

Python
import asyncio

class AsyncDatabaseConnection:
    "异步数据库连接模拟"

    def __init__(self, host: str):
        self.host = host
        self.connected = False

    async def __aenter__(self):
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.disconnect()

    async def connect(self):
        print(f"连接到 {self.host}...")
        await asyncio.sleep(0.2)
        self.connected = True
        print("已连接")

    async def disconnect(self):
        print("断开连接...")
        await asyncio.sleep(0.1)
        self.connected = False
        print("已断开")

    async def execute(self, query: str):
        if not self.connected:
            raise RuntimeError("未连接")
        await asyncio.sleep(0.1)
        return f"执行: {query}"

# 使用
async def query_database():
    async with AsyncDatabaseConnection("localhost") as conn:
        result = await conn.execute("SELECT * FROM users")
        print(result)

asyncio.run(query_database())
Python
# 异步文件操作
import aiofiles

async def read_file_async(filepath: str):
    "异步读取文件"
    async with aiofiles.open(filepath, 'r') as f:
        content = await f.read()
        return content

async def write_file_async(filepath: str, content: str):
    "异步写入文件"
    async with aiofiles.open(filepath, 'w') as f:
        await f.write(content)

contextlib.asynccontextmanager

Python
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_resource(name: str):
    "装饰器创建异步上下文管理器"
    # 进入阶段
    print(f"获取资源: {name}")
    await asyncio.sleep(0.1)
    try:
        yield name  # 返回资源
    finally:
        # 退出阶段
        print(f"释放资源: {name}")
        await asyncio.sleep(0.1)

# 使用
async def use_resource():
    async with async_resource("db_connection") as res:
        print(f"使用 {res}")

asyncio.run(use_resource())
Python
# 复杂资源管理
@asynccontextmanager
async def managed_async_session(url: str):
    "管理异步会话"
    session = await create_session(url)
    try:
        yield session
    except Exception as e:
        await session.handle_error(e)
        raise
    finally:
        await session.close()

async def create_session(url: str):
    "模拟创建会话"
    await asyncio.sleep(0.1)
    return AsyncSession(url)

class AsyncSession:
    def __init__(self, url):
        self.url = url

    async def handle_error(self, exc):
        print(f"处理错误: {exc}")
        await asyncio.sleep(0.05)

    async def close(self):
        print("关闭会话")
        await asyncio.sleep(0.05)

async def use_session():
    async with managed_async_session("https://api.example.com") as session:
        print(f"会话URL: {session.url}")

异步迭代器

Python
import asyncio

class AsyncIterator:
    "异步迭代器"

    def __init__(self, items):
        self.items = items
        self.index = 0

    def __aiter__(self):
        "返回异步迭代器"
        return self

    async def __anext__(self):
        "异步获取下一个元素"
        await asyncio.sleep(0.1)  # 模拟异步操作

        if self.index >= len(self.items):
            raise StopAsyncIteration

        item = self.items[self.index]
        self.index += 1
        return item

# 使用 async for
async def iterate_async():
    async for item in AsyncIterator([1, 2, 3, 4, 5]):
        print(f"处理: {item}")

asyncio.run(iterate_async())
Python
# 异步数据流迭代器
class AsyncDataStream:
    "模拟异步数据流"

    def __init__(self, source: str):
        self.source = source
        self.position = 0
        self.chunk_size = 10

    def __aiter__(self):
        return self

    async def __anext__(self):
        await asyncio.sleep(0.05)  # 模拟IO延迟

        if self.position >= len(self.source):
            raise StopAsyncIteration

        chunk = self.source[self.position:self.position + self.chunk_size]
        self.position += self.chunk_size
        return chunk

async def process_stream():
    async for chunk in AsyncDataStream("这是一段异步处理的数据流示例"):
        print(f"收到块: {chunk}")

asyncio.run(process_stream())

异步生成器

Python
import asyncio

async def async_range(start: int, stop: int, step: int = 1):
    "异步生成器"
    current = start
    while current < stop:
        await asyncio.sleep(0.05)
        yield current
        current += step

# 使用 async for
async def consume_async_generator():
    async for num in async_range(0, 5):
        print(f"值: {num}")

asyncio.run(consume_async_generator())
Python
# 实际应用:异步分页数据
async def async_paginate(api_url: str, page_size: int = 10):
    "异步分页获取数据"
    page = 1
    while True:
        await asyncio.sleep(0.1)
        # 模拟API调用
        data = await fetch_page(api_url, page, page_size)

        if not data:
            break

        for item in data:
            yield item

        page += 1

async def fetch_page(url: str, page: int, size: int):
    "模拟获取页数据"
    if page > 3:
        return None
    return [{"id": i, "page": page} for i in range(size)]

async def process_pages():
    async for item in async_paginate("https://api.example.com/data"):
        print(f"处理: {item}")

asyncio.run(process_pages())

组合使用

Python
import asyncio

class AsyncProcessor:
    "组合异步上下文管理器和迭代器"

    def __init__(self, items):
        self.items = items

    async def __aenter__(self):
        print("启动处理器")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("关闭处理器")
        await asyncio.sleep(0.1)

    def __aiter__(self):
        self.index = 0
        return self

    async def __anext__(self):
        await asyncio.sleep(0.05)

        if self.index >= len(self.items):
            raise StopAsyncIteration

        item = self.items[self.index]
        self.index += 1
        return await self.process(item)

    async def process(self, item):
        await asyncio.sleep(0.02)
        return f"processed_{item}"

async def combined_usage():
    async with AsyncProcessor(['a', 'b', 'c']) as processor:
        async for result in processor:
            print(result)

asyncio.run(combined_usage())

异步资源池

Python
import asyncio
from collections import deque

class AsyncResourcePool:
    "异步资源池"

    def __init__(self, max_size: int = 5):
        self.max_size = max_size
        self.available = deque()
        self.in_use = set()
        self.semaphore = asyncio.Semaphore(max_size)

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close_all()

    async def acquire(self):
        "获取资源"
        await self.semaphore.acquire()

        if self.available:
            resource = self.available.pop()
        else:
            resource = await self.create_resource()

        self.in_use.add(resource)
        return resource

    async def release(self, resource):
        "释放资源"
        self.in_use.remove(resource)
        self.available.append(resource)
        self.semaphore.release()

    async def create_resource(self):
        await asyncio.sleep(0.1)
        return f"resource_{len(self.in_use)}"

    async def close_all(self):
        for resource in list(self.in_use):
            await self.release(resource)

async def use_pool():
    async with AsyncResourcePool(max_size=3) as pool:
        res1 = await pool.acquire()
        print(f"获取: {res1}")
        res2 = await pool.acquire()
        print(f"获取: {res2}")

        await pool.release(res1)
        print(f"释放: {res1}")

        res3 = await pool.acquire()
        print(f"获取: {res3}")

asyncio.run(use_pool())

异步锁与上下文管理器

Python
import asyncio

class AsyncLockContext:
    "异步锁上下文管理器"

    def __init__(self):
        self.lock = asyncio.Lock()

    async def __aenter__(self):
        await self.lock.acquire()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.lock.release()

async def protected_operation():
    lock_ctx = AsyncLockContext()

    async with lock_ctx:
        print("执行受保护操作")
        await asyncio.sleep(0.1)
        print("操作完成")

async def concurrent_tasks():
    tasks = [
        protected_operation(),
        protected_operation(),
        protected_operation()
    ]
    await asyncio.gather(*tasks)

asyncio.run(concurrent_tasks())

异步上下文管理器嵌套

Python
import asyncio

async def nested_contexts():
    "嵌套异步上下文"
    async with AsyncContextManager() as outer:
        async with AsyncContextManager() as inner:
            print("在嵌套上下文中")

asyncio.run(nested_contexts())

# 使用 AsyncExitStack 管理多个上下文
from contextlib import AsyncExitStack

async def managed_multiple_contexts():
    "动态管理多个异步上下文"
    async with AsyncExitStack() as stack:
        # 动态添加上下文
        conn1 = await stack.enter_async_context(AsyncDatabaseConnection("db1"))
        conn2 = await stack.enter_async_context(AsyncDatabaseConnection("db2"))

        # 使用资源
        result1 = await conn1.execute("SELECT 1")
        result2 = await conn2.execute("SELECT 2")

        print(result1, result2)

asyncio.run(managed_multiple_contexts())

要点总结

  1. **__aenter____aexit__**定义异步上下文管理器
  2. **@asynccontextmanager**装饰器简化创建
  3. **__aiter____anext__**定义异步迭代器
  4. 异步生成器使用async defyield
  5. AsyncExitStack动态管理多个异步上下文

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python多进程并发模式
下一篇 → Python异步生成器
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库