全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-19 10 分钟 ✍️ juanwangdev

Python分布式任务队列

Celery是Python最流行的分布式任务队列框架,支持异步任务执行、定时调度和任务监控。

核心架构

Celery架构由三部分组成:

  • Producer:任务生产者,通常是Web应用
  • Broker:消息代理,推荐Redis或RabbitMQ
  • Worker:任务消费者,执行具体任务

基础配置

安装依赖

Bash
pip install celery redis

创建Celery应用

Python
# tasks.py
from celery import Celery

app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Shanghai',
    enable_utc=True,
)

任务定义

普通任务

Python
@app.task
def add(x, y):
    return x + y

@app.task(bind=True)
def process_data(self, data):
    # bind=True可访问self.request获取任务上下文
    task_id = self.request.id
    return f"Processed: {data}, task_id: {task_id}"

重试机制

Python
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def unreliable_task(self):
    try:
        # 可能失败的操作
        call_external_api()
    except Exception as exc:
        raise self.retry(exc=exc)

任务调用

异步调用

Python
# 发送任务到队列
result = add.delay(4, 5)

# 应用异步调用
result = add.apply_async(args=[4, 5])

# 延迟执行
result = add.apply_async(args=[4, 5], countdown=10)

# 指定ETA执行
from datetime import datetime, timedelta
result = add.apply_async(
    args=[4, 5],
    eta=datetime.utcnow() + timedelta(minutes=5)
)

获取结果

Python
result = add.delay(4, 5)

# 阻塞等待结果(带超时)
value = result.get(timeout=10)

# 检查状态
if result.ready():
    print(result.result)
    print(result.status)  # SUCCESS, FAILURE, PENDING

任务签名与链式调用

签名(Signature)

Python
from celery import signature

# 创建任务签名
s = add.s(2, 3)  # 偏参数
s = add.s(2)     # 部分参数

# 执行签名
result = s.delay()

链式任务

Python
# 链式调用:前一个任务的输出作为后一个任务的输入
from celery import chain

workflow = chain(add.s(2, 3), add.s(4))
result = workflow.apply_async()  # (2+3)+4 = 9

# 简写形式
result = (add.s(2, 3) | add.s(4)).apply_async()

任务组

Python
from celery import group

# 并行执行多个任务
job = group(add.s(i, i) for i in range(10))
result = job.apply_async()

# 获取所有结果
values = result.get()  # [0, 2, 4, 6, 8, ...]

任务和弦(Chord)

Python
from celery import chord

# 先并行执行,再汇总
callback = sum_results.s()
header = [add.s(i, i) for i in range(10)]
result = chord(header)(callback)

定时任务

Celery Beat配置

Python
from celery.schedules import crontab

app.conf.beat_schedule = {
    'add-every-minute': {
        'task': 'tasks.add',
        'schedule': 60.0,  # 每60秒
        'args': (4, 5)
    },
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (10, 20),
    },
}

启动Beat进程

Bash
celery -A tasks beat --loglevel=info

Worker管理

启动Worker

Bash
# 基本启动
celery -A tasks worker --loglevel=info

# 指定并发数
celery -A tasks worker --concurrency=4

# 指定队列
celery -A tasks worker -Q queue1,queue2

# 后台运行
celery -A tasks worker --detach --loglevel=info

任务路由

Python
app.conf.task_routes = {
    'tasks.compute_*': {'queue': 'compute'},
    'tasks.io_*': {'queue': 'io'},
}

# 启动特定队列的Worker
celery -A tasks worker -Q compute --concurrency=2
celery -A tasks worker -Q io --concurrency=10

监控与管理

Flower监控面板

Bash
pip install flower
celery -A tasks flower --port=5555

访问 http://localhost:5555 查看任务状态、Worker状态、任务历史等。

命令行管理

Bash
# 查看活跃任务
celery -A tasks inspect active

# 查看注册任务
celery -A tasks inspect registered

# 撤销任务
celery -A tasks revoke <task_id>

# 终止正在执行的任务
celery -A tasks revoke <task_id> --terminate

最佳实践

任务设计原则

Python
# 任务要幂等
@app.task
def process_order(order_id):
    order = Order.get(order_id)
    if order.processed:
        return "Already processed"
    # 处理订单
    order.processed = True
    order.save()

# 避免大任务
@app.task
def bad_task():
    # 不要这样
    process_million_records()

@app.task
def good_task():
    # 拆分小任务
    for batch in get_batches():
        process_batch.delay(batch)

错误处理

Python
@app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3})
def task_with_auto_retry(self):
    # 自动重试所有异常
    pass

@app.task(bind=True)
def task_with_fallback(self):
    try:
        return risky_operation()
    except SpecificError:
        return fallback_value()

注意:生产环境必须配置结果后端的过期时间,避免Redis内存溢出。

要点总结

  • Celery架构:Producer → Broker → Worker,Broker推荐Redis/RabbitMQ
  • 任务定义使用@app.task装饰器,支持重试、绑定、超时配置
  • 异步调用使用delay()apply_async(),链式任务使用|操作符
  • 定时任务使用Celery Beat,通过beat_schedule配置调度规则
  • 生产环境使用Flower监控,任务设计要幂等、细粒度、可重试

存放路径articles/PYTHON/专家/并发与异步高级/分布式任务队列.md

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python asyncio协程原理
下一篇 → Python异步IO实现
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库