全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

并发消费者模型

并发消费者模型通过多线程方式并行处理消息,显著提升消费吞吐量,适用于高吞吐业务场景。

核心思路

RabbitMQ Java Client 默认单线程消费,每个 Channel 绑定一个消费者。实现并发消费的方式:

  1. 多 Channel + 多消费者:每个线程创建独立 Channel 和消费者
  2. 线程池 + 单 Channel:单 Channel 接收消息,线程池异步处理
  3. 多 Channel + 线程池:多个 Channel 各自绑定消费者,配合线程池

推荐方案一:多 Channel + 多消费者,每个消费者独立 Channel,天然支持并发,且隔离性好。

Java 示例:多 Channel 并发消费

Java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

public class ConcurrentConsumerExample {

    private static final String QUEUE = "concurrent.queue";
    private static final int CONSUMER_THREAD_COUNT = 5;
    private static final int PREFETCH_COUNT = 10;

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();

        // 创建固定线程池管理消费者线程
        ExecutorService executor = Executors.newFixedThreadPool(CONSUMER_THREAD_COUNT);

        AtomicLong messageCounter = new AtomicLong(0);

        // 启动多个消费者线程
        for (int i = 0; i < CONSUMER_THREAD_COUNT; i++) {
            final int threadId = i;
            executor.submit(() -> {
                try {
                    // 每个线程创建独立 Connection 和 Channel
                    Connection conn = factory.newConnection();
                    Channel channel = conn.createChannel();

                    // 设置预取数量
                    channel.basicQos(PREFETCH_COUNT);

                    // 注册消费者
                    channel.basicConsume(QUEUE, false, new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope,
                                                   AMQP.BasicProperties properties, byte[] body)
                                throws IOException {
                            long count = messageCounter.incrementAndGet();
                            String message = new String(body, StandardCharsets.UTF_8);
                            System.out.printf("[线程-%d] 处理消息 #%d: %s%n",
                                    threadId, count, message);

                            // 模拟业务处理
                            processMessage(message);

                            // 手动确认
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                    });

                    System.out.println("[线程-" + threadId + "] 消费者已启动");
                    // 保持线程运行
                    Thread.currentThread().join();
                } catch (IOException | TimeoutException | InterruptedException e) {
                    System.err.println("[线程-" + threadId + "] 消费者异常: " + e.getMessage());
                }
            });
        }

        System.out.println("并发消费者已启动,线程数: " + CONSUMER_THREAD_COUNT);
    }

    private static void processMessage(String message) {
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

并发度配置建议

场景线程数prefetch count
低吞吐(<100 msg/s)1-21-5
中吞吐(100-1000 msg/s)3-510-20
高吞吐(>1000 msg/s)5-1020-50

注意事项

  • 每个消费者应使用独立 Channel,Channel 不是线程安全的
  • 线程数不应超过队列分区数,否则竞争同一队列反而降低性能
  • 并发消费者需要配合 basicQos 使用,避免单线程积压消息
  • Connection 可以复用,但 Channel 必须线程隔离
  • 优雅关闭时需要等待所有线程处理完当前消息后再关闭 Connection

线程数不是越多越好,建议通过压测找到最优值。过多线程会导致上下文切换开销大于收益。

要点总结

  • 推荐多 Channel + 多消费者方案,每个线程独立 Channel
  • Channel 不是线程安全的,必须线程隔离
  • 配合 basicQos 预取数量使用,实现负载均衡
  • 线程数建议 3-10 个,根据吞吐量压测调优
  • Connection 可复用,Channel 必须独立创建

📝 发现内容有误?点击此处直接编辑

← 上一篇 死信队列配置
下一篇 → 手动确认最佳实践
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库