全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-19 10 分钟 ✍️ juanwangdev

Python asyncio协程原理

协程是Python异步编程的核心,理解其原理对于编写高效并发程序至关重要。

协程基础概念

什么是协程

协程(Coroutine)是一种比线程更轻量的并发单元:

  • 用户态调度:由程序自身控制切换,无需内核参与
  • 协作式调度:协程主动让出控制权,而非抢占式
  • 低开销:创建成本极低,可轻松创建百万级协程

生成器与协程的关系

协程源于生成器,通过yield实现暂停与恢复:

Python
# 生成器函数
def simple_generator():
    yield 1
    yield 2
    yield 3

gen = simple_generator()
print(next(gen))  # 1
print(next(gen))  # 2

# 协程通过yield实现双向通信
def coroutine():
    value = yield
    print(f"Received: {value}")

coro = coroutine()
next(coro)        # 预激协程
coro.send(10)     # 发送值,输出: Received: 10

async/await语法

async def定义协程

Python
import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 协程函数调用返回协程对象,不会立即执行
coro = hello()

# 运行协程
asyncio.run(hello())

await挂起协程

Python
async def fetch_data():
    await asyncio.sleep(1)  # 模拟IO操作
    return "data"

async def main():
    # await会挂起当前协程,让出控制权
    result = await fetch_data()
    print(result)

asyncio.run(main())

可等待对象

await只能用于可等待对象(Awaitable):

Python
import asyncio
from collections.abc import Awaitable

# 1. 协程
async def coro():
    return "coroutine"

# 2. Task
async def task_example():
    task = asyncio.create_task(coro())
    result = await task

# 3. Future(低层API)
async def future_example():
    future = asyncio.Future()
    future.set_result("future result")
    result = await future

事件循环机制

事件循环核心

事件循环是异步执行的引擎:

Python
import asyncio

# 获取当前事件循环
loop = asyncio.get_event_loop()

# 事件循环生命周期
async def main():
    print("Running in event loop")

# asyncio.run做了以下事情:
# 1. 创建新事件循环
# 2. 运行main()直到完成
# 3. 关闭事件循环
asyncio.run(main())

事件循环内部流程

Python
# 伪代码展示事件循环核心逻辑
def event_loop():
    ready_queue = deque()
    sleeping_tasks = []  # 最小堆,按唤醒时间排序
    readers = {}  # {fd: callback}
    writers = {}  # {fd: callback}

    while True:
        # 1. 处理就绪队列
        while ready_queue:
            task = ready_queue.popleft()
            run_task(task)

        # 2. 处理IO事件(select/poll/epoll)
        io_events = selector.select(timeout=next_sleep_time)
        for fd, event in io_events:
            if event & READ:
                readers[fd]()
            if event & WRITE:
                writers[fd]()

        # 3. 唤醒到期任务
        now = time.time()
        while sleeping_tasks and sleeping_tasks[0].wake_time <= now:
            task = heapq.heappop(sleeping_tasks)
            ready_queue.append(task)

任务调度示例

Python
async def task_a():
    print("A start")
    await asyncio.sleep(1)
    print("A end")

async def task_b():
    print("B start")
    await asyncio.sleep(0.5)
    print("B end")

async def main():
    # 并发执行两个任务
    await asyncio.gather(task_a(), task_b())

# 输出顺序:
# A start  (立即)
# B start  (立即,因为A让出了控制权)
# B end    (0.5秒后)
# A end    (1秒后)

asyncio.run(main())

协程调度原理

协程状态机

每个协程都有状态转换:

Python
CREATED → RUNNING → SUSPENDED → RUNNING → FINISHED
Python
import asyncio
import inspect

async def demo():
    await asyncio.sleep(1)
    return "done"

coro = demo()
print(inspect.getcoroutinestate(coro))  # CORO_CREATED

# 预激
next(coro.__await__())  # 或 send(None)
print(inspect.getcoroutinestate(coro))  # CORO_SUSPENDED

# 完成后
# CORO_CLOSED

Task封装

Python
async def coro():
    await asyncio.sleep(1)
    return "result"

# 直接创建Task
task = asyncio.create_task(coro())

# Task内部维护协程的状态
print(task.done())    # False
print(task.cancelled())  # False
print(task.get_name())  # Task-1

# 等待结果
result = await task
print(task.done())    # True
print(task.result())  # "result"

调度器实现

Python
async def scheduler_example():
    # 简化的调度器示例
    tasks = []

    async def worker(n):
        for i in range(3):
            print(f"Worker {n}: step {i}")
            await asyncio.sleep(0)  # 让出控制权

    # 创建多个任务
    for i in range(3):
        tasks.append(asyncio.create_task(worker(i)))

    # 等待所有任务完成
    await asyncio.gather(*tasks)

asyncio.run(scheduler_example())

底层实现原理

协程栈帧

Python
# 协程使用生成器实现
async def foo():
    x = await bar()
    return x

# 等价于生成器
def foo_gen():
    # 保存栈帧状态
    x = None
    while True:
        try:
            if x is None:
                x = yield from bar_gen()
            return x
        except Exception as e:
            # 处理异常
            raise

awaitable协议

Python
class MyAwaitable:
    def __await__(self):
        return self._async_method()

    async def _async_method(self):
        await asyncio.sleep(0.1)
        return "custom awaitable"

async def main():
    result = await MyAwaitable()
    print(result)  # custom awaitable

协程上下文切换

Python
import asyncio
import contextvars

# 协程上下文变量
request_id = contextvars.ContextVar('request_id')

async def handler():
    print(f"Request ID: {request_id.get()}")

async def main():
    # 设置上下文
    request_id.set("req-001")

    # 协程继承上下文
    await handler()

asyncio.run(main())

性能分析

协程vs线程对比

text
import asyncio
import threading
import time

# 线程版本
def thread_worker(n):
    time.sleep(0.001)  # 模拟IO
    return n

def thread_version():
    threads = []
    for i in range(10000):
        t = threading.Thread(target=thread_worker, args=(i,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

# 协程版本
async def coro_worker(n):
    await asyncio.sleep(0.001)
    return n

async def coro_version():
    tasks = [coro_worker(i) for i in range(10000)]
    await asyncio.gather(*tasks)

# 协程版本内存占用约几十KB
# 线程版本内存占用约8MB/线程

要点总结

  • 协程是用户态轻量级并发单元,通过yield/await实现暂停与恢复
  • async/await是语法糖,底层基于生成器实现
  • 事件循环通过IO多路复用实现高效调度,核心是任务队列+事件监听
  • Task是协程的调度封装,提供取消、状态查询等功能
  • 协程适用于IO密集型场景,可轻松创建百万级并发

存放路径articles/PYTHON/专家/并发与异步高级/asyncio协程原理.md

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python异步生成器
下一篇 → Python分布式任务队列
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库