全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

生产者与消费者模型

消息队列的核心交互模型由两方组成:生产者(发送消息)和消费者(处理消息)。

角色定义

  • 生产者(Producer):创建消息并发送到消息队列的组件
  • 消费者(Consumer):从消息队列中拉取消息并执行处理的组件
  • 消息队列(Broker):中转站,负责消息存储与路由

交互流程

XML
生产者                    消息队列                    消费者
  │                        │                          │
  ├─ 发送消息 ───────────>│                          │
  │                        ├─ 消息入队                │
  │                        ├────────── 推送/拉取 ───>│
  │                        │                          ├─ 处理消息
  │                        │<───────── 确认(Ack) ────┤
  │                        ├─ 移除消息                │
  └─ 无需等待返回          │                          │

关键步骤:

  1. 生产者建立连接,声明队列,发送消息
  2. 消息队列接收并持久化消息
  3. 消费者监听队列,消息到达后触发消费
  4. 消费者处理完成后发送 Ack 确认
  5. 消息队列收到 Ack 后移除消息

Java 完整示例

Maven 依赖:

Java
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

生产者:

Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class TaskProducer {
    private static final String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明持久化队列,重启后消息不丢失
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            for (int i = 1; i <= 5; i++) {
                String message = "Task-" + i;
                // 设置消息持久化
                channel.basicPublish("", QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes("UTF-8"));
                System.out.println("发送: " + message);
            }
            System.out.println("所有任务已发送完毕");
        }
    }
}

消费者:

text
import com.rabbitmq.client.*;

public class TaskConsumer {
    private static final String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 队列已由生产者声明,此处直接消费
        channel.basicQos(1); // 公平分发:未确认不推送新消息

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("收到: " + message);

            // 模拟处理耗时
            doWork(message.length());

            // 手动确认
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            System.out.println("处理完成: " + message);
        };

        // autoAck=false 表示手动确认
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
        System.out.println("消费者已启动,等待任务...");
    }

    private static void doWork(int dots) throws InterruptedException {
        for (int i = 0; i < dots; i++) {
            Thread.sleep(500);
        }
    }
}

关键参数说明

参数生产者侧消费者侧
queueDeclare声明队列,控制持久化(durable)可不声明,直接消费
basicPublish指定 Exchange、RoutingKey、消息属性
basicQos控制预取数量(prefetchCount)
basicAck手动确认消息

basicQos(1) 实现公平分发,避免单个消费者堆积大量未处理消息。

消息持久化需同时设置:队列 durable=true + 消息 deliveryMode=2。

消费者未发送 Ack 时,消息在连接断开后会重新入队。

要点总结

  • 生产者负责创建和发送消息,消费者负责拉取和处理消息
  • 标准流程:发送 -> 入队 -> 消费 -> Ack确认 -> 移除
  • 手动确认(autoAck=false)是可靠消费的基础
  • basicQos 控制预取数量,实现公平分发与负载平衡
  • 消息可靠性依赖队列持久化和消息持久化的双重保障

📝 发现内容有误?点击此处直接编辑

← 上一篇 消息队列概念
下一篇 → Docker 快速部署
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库