全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-19 10 分钟 ✍️ juanwangdev

Python异步同步机制

asyncio提供了完整的同步原语,用于协调多个协程的执行顺序和资源共享。

异步锁 (Lock)

基本用法

Python
import asyncio

class AsyncCounter:
    def __init__(self):
        self.value = 0
        self._lock = asyncio.Lock()

    async def increment(self):
        async with self._lock:
            old = self.value
            await asyncio.sleep(0)  # 模拟IO
            self.value = old + 1

async def race_condition():
    counter = AsyncCounter()

    # 无锁情况下可能丢失更新
    tasks = [counter.increment() for _ in range(1000)]
    await asyncio.gather(*tasks)

    print(counter.value)  # 正确:1000

asyncio.run(race_condition())

手动控制

Python
async def lock_manual():
    lock = asyncio.Lock()

    await lock.acquire()
    try:
        # 临界区代码
        await do_something()
    finally:
        lock.release()

    # 或使用上下文管理器
    async with lock:
        await do_something()

公平锁与非公平锁

Python
async def lock_ordering():
    lock = asyncio.Lock()
    order = []

    async def worker(name):
        await lock.acquire()
        order.append(name)
        await asyncio.sleep(0.1)
        lock.release()

    # asyncio.Lock是公平锁,按请求顺序获取
    tasks = [
        worker("A"),
        worker("B"),
        worker("C")
    ]
    await asyncio.gather(*tasks)
    print(order)  # ['A', 'B', 'C']

事件 (Event)

基本用法

Python
async def event_example():
    event = asyncio.Event()

    async def waiter(name):
        print(f"{name} waiting...")
        await event.wait()
        print(f"{name} received event!")

    async def setter():
        await asyncio.sleep(1)
        print("Setting event!")
        event.set()

    await asyncio.gather(
        waiter("W1"),
        waiter("W2"),
        setter()
    )

等待与清除

Python
async def event_operations():
    event = asyncio.Event()

    # 检查是否已设置
    print(event.is_set())  # False

    # 设置事件
    event.set()
    print(event.is_set())  # True

    # 清除事件
    event.clear()
    print(event.is_set())  # False

    # 等待(会阻塞直到set)
    await event.wait()

生产者-消费者模式

Python
async def producer_consumer():
    data_ready = asyncio.Event()
    data = None

    async def producer():
        nonlocal data
        await asyncio.sleep(1)
        data = "produced data"
        data_ready.set()

    async def consumer():
        await data_ready.wait()
        print(f"Got: {data}")
        data_ready.clear()

    await asyncio.gather(producer(), consumer())

条件变量 (Condition)

基本用法

Python
async def condition_example():
    condition = asyncio.Condition()
    shared_data = []

    async def producer():
        async with condition:
            shared_data.append("item")
            condition.notify()  # 通知一个等待者
            # condition.notify_all()  # 通知所有等待者

    async def consumer():
        async with condition:
            while not shared_data:
                await condition.wait()  # 释放锁并等待
            item = shared_data.pop()
            print(f"Consumed: {item}")

    await asyncio.gather(producer(), consumer())

有界缓冲区

Python
class AsyncBoundedBuffer:
    def __init__(self, capacity):
        self.buffer = []
        self.capacity = capacity
        self.not_full = asyncio.Condition()
        self.not_empty = asyncio.Condition()

    async def put(self, item):
        async with self.not_full:
            while len(self.buffer) >= self.capacity:
                await self.not_full.wait()
            self.buffer.append(item)
            async with self.not_empty:
                self.not_empty.notify()

    async def get(self):
        async with self.not_empty:
            while not self.buffer:
                await self.not_empty.wait()
            item = self.buffer.pop(0)
            async with self.not_full:
                self.not_full.notify()
            return item

信号量 (Semaphore)

基本用法

Python
async def semaphore_example():
    sem = asyncio.Semaphore(3)  # 最多3个并发

    async def worker(n):
        async with sem:
            print(f"Worker {n} started")
            await asyncio.sleep(1)
            print(f"Worker {n} finished")

    tasks = [worker(i) for i in range(10)]
    await asyncio.gather(*tasks)
    # 同时最多3个worker在执行

限制并发请求

Python
import aiohttp

async def limited_fetch(urls, max_concurrent=5):
    sem = asyncio.Semaphore(max_concurrent)

    async def fetch(url):
        async with sem:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as resp:
                    return await resp.text()

    return await asyncio.gather(*[fetch(url) for url in urls])

有界信号量

Python
async def bounded_semaphore():
    # 有界信号量:release超过初始值会抛出异常
    sem = asyncio.BoundedSemaphore(2)

    async with sem:
        pass

    # sem.release()  # 如果已释放到最大值会报ValueError

异步队列 (Queue)

基本用法

Python
async def queue_example():
    queue = asyncio.Queue()

    async def producer():
        for i in range(5):
            await queue.put(f"item-{i}")
            print(f"Produced: item-{i}")
            await asyncio.sleep(0.1)

    async def consumer():
        for _ in range(5):
            item = await queue.get()
            print(f"Consumed: {item}")
            queue.task_done()

    await asyncio.gather(producer(), consumer())

优先级队列

Python
async def priority_queue():
    queue = asyncio.PriorityQueue()

    await queue.put((2, "low priority"))
    await queue.put((1, "high priority"))
    await queue.put((3, "lowest priority"))

    while not queue.empty():
        priority, item = await queue.get()
        print(f"{priority}: {item}")
    # 输出: 1: high priority, 2: low priority, 3: lowest priority

LIFO队列

Python
async def lifo_queue():
    queue = asyncio.LifoQueue()

    await queue.put("first")
    await queue.put("second")
    await queue.put("third")

    while not queue.empty():
        item = await queue.get()
        print(item)  # third, second, first

工作队列模式

Python
async def worker_pool():
    queue = asyncio.Queue()
    num_workers = 3

    async def worker(name):
        while True:
            item = await queue.get()
            try:
                await process_item(item)
            finally:
                queue.task_done()

    # 启动workers
    workers = [
        asyncio.create_task(worker(f"Worker-{i}"))
        for i in range(num_workers)
    ]

    # 添加任务
    for i in range(10):
        await queue.put(f"Task-{i}")

    # 等待队列清空
    await queue.join()

    # 取消workers
    for w in workers:
        w.cancel()

队列超时操作

Python
async def queue_timeout():
    queue = asyncio.Queue()

    try:
        # 等待获取,带超时
        item = await asyncio.wait_for(queue.get(), timeout=1.0)
    except asyncio.TimeoutError:
        print("Queue get timeout")

    # 非阻塞获取
    try:
        item = queue.get_nowait()
    except asyncio.QueueEmpty:
        print("Queue is empty")

    # 非阻塞放入
    try:
        queue.put_nowait("item")
    except asyncio.QueueFull:
        print("Queue is full")

屏障 (Barrier)

Python 3.11+

Python
async def barrier_example():
    barrier = asyncio.Barrier(3)  # 需要3个协程同时到达

    async def worker(name):
        print(f"{name} phase 1")
        await barrier.wait()  # 等待所有worker完成phase 1

        print(f"{name} phase 2")
        await barrier.wait()  # 等待所有worker完成phase 2

        print(f"{name} phase 3")

    await asyncio.gather(
        worker("A"),
        worker("B"),
        worker("C")
    )

同步原语对比

原语用途特点
Lock互斥访问只有一个协程可持有
Event事件通知一次通知多个等待者
Condition条件等待需配合Lock使用
Semaphore并发限制可多个协程同时持有
Queue任务分发生产者-消费者模式
Barrier阶段同步等待所有协程到达

死锁避免

死锁场景

Python
async def deadlock_example():
    lock_a = asyncio.Lock()
    lock_b = asyncio.Lock()

    async def task1():
        async with lock_a:
            await asyncio.sleep(0.1)
            async with lock_b:  # 死锁!
                pass

    async def task2():
        async with lock_b:
            await asyncio.sleep(0.1)
            async with lock_a:  # 死锁!
                pass

    # 死锁发生
    await asyncio.gather(task1(), task2())

避免死锁

Python
async def avoid_deadlock():
    lock_a = asyncio.Lock()
    lock_b = asyncio.Lock()

    async def task1():
        # 方案1:统一锁顺序
        async with lock_a:
            async with lock_b:
                pass

    async def task2():
        async with lock_a:  # 保持相同顺序
            async with lock_b:
                pass

    # 方案2:使用超时
    async def with_timeout():
        try:
            await asyncio.wait_for(task1(), timeout=1.0)
        except asyncio.TimeoutError:
            print("Potential deadlock detected")

注意:同步原语必须使用async withawait,不能混用同步版本(threading.Lock等)。

要点总结

  • Lock保护临界区,确保同一时刻只有一个协程访问共享资源
  • Event用于事件通知,一次set唤醒所有等待者
  • Condition用于复杂条件等待,需配合Lock使用
  • Semaphore限制并发数量,适用于连接池、限流场景
  • Queue实现生产者-消费者模式,支持PriorityQueue和LifoQueue
  • 避免死锁:统一锁顺序、设置超时、减少锁持有时间

存放路径articles/PYTHON/专家/并发与异步高级/异步同步机制.md

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python异步任务调度
下一篇 → Python C API
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库