全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

消息去重方案

在消息重试、网络重传或生产者重发等场景下,消费者可能收到重复消息。消息去重确保同一消息仅被处理一次,保障业务幂等性。

为什么需要消息去重

重复消息来源

来源说明
生产者重发网络超时未收到 Confirm,生产者重新发送
消息重新入队消费者超时未确认,消息重新入队被再次消费
RabbitMQ 重启持久化消息在极端情况下可能重复投递

注意:RabbitMQ 本身不提供去重机制,需在业务层实现幂等性校验。

基于消息 ID 去重

发送端设置消息 ID

Java
// Maven 依赖
// <dependency>
//     <groupId>com.rabbitmq</groupId>
//     <artifactId>amqp-client</artifactId>
//     <version>5.20.0</version>
// </dependency>

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class MessageIdPublisher {
    
    private static final String EXCHANGE_NAME = "dedup_exchange";
    private static final String ROUTING_KEY = "dedup.key";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
            
            String message = "Order-Payment-12345";
            String messageId = UUID.randomUUID().toString();
            
            // 设置 message_id
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .messageId(messageId)
                    .build();
            
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props,
                    message.getBytes(StandardCharsets.UTF_8));
            
            System.out.println("消息已发送, messageId: " + messageId);
        }
    }
}

消费端去重逻辑

Java
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;

public class DedupConsumer {
    
    // 本地缓存已处理的消息 ID(生产环境应使用 Redis/DB)
    private static final Set<String> processedIds = ConcurrentHashMap.newKeySet();
    
    public static void consume(Channel channel) throws IOException {
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String messageId = delivery.getProperties().getMessageId();
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            
            // 检查是否已处理
            if (messageId != null && processedIds.contains(messageId)) {
                System.out.println("重复消息,已跳过: " + messageId);
                channel.basicAck(deliveryTag, false);
                return;
            }
            
            try {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                processMessage(message);
                
                // 处理成功后记录 messageId
                if (messageId != null) {
                    processedIds.add(messageId);
                }
                
                channel.basicAck(deliveryTag, false);
                System.out.println("消息处理成功: " + messageId);
                
            } catch (Exception e) {
                System.err.println("消息处理失败: " + e.getMessage());
                channel.basicNack(deliveryTag, false, true);
            }
        };
        
        channel.basicConsume("dedup_queue", false, deliverCallback, consumerTag -> {});
    }
    
    private static void processMessage(String message) {
        // 业务处理逻辑
        System.out.println("处理消息: " + message);
    }
}

基于业务唯一键去重

适用场景

当消息本身没有 messageId 时,可从消息内容中提取业务唯一键(如订单号、流水号)进行去重。

Java
public class BusinessKeyDedup {
    
    public static void consume(Channel channel) throws IOException {
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            
            // 从消息内容提取业务唯一键
            String businessKey = extractBusinessKey(message);
            
            if (isProcessed(businessKey)) {
                System.out.println("业务键重复,已跳过: " + businessKey);
                channel.basicAck(deliveryTag, false);
                return;
            }
            
            try {
                processMessage(message);
                markProcessed(businessKey);
                channel.basicAck(deliveryTag, false);
            } catch (Exception e) {
                channel.basicNack(deliveryTag, false, true);
            }
        };
        
        channel.basicConsume("biz_queue", false, deliverCallback, consumerTag -> {});
    }
    
    private static String extractBusinessKey(String message) {
        // 解析消息提取业务唯一键,如订单号
        // 示例:{"orderId":"10001","amount":100}
        return message.split("\"orderId\":\"")[1].split("\"")[0];
    }
    
    private static boolean isProcessed(String businessKey) {
        // 查询数据库或缓存判断是否已处理
        return false;
    }
    
    private static void markProcessed(String businessKey) {
        // 标记为已处理
    }
    
    private static void processMessage(String message) {
        // 业务处理
    }
}

基于数据库唯一约束去重

方案说明

利用数据库唯一索引实现幂等性,是最可靠的去重方式。

Java
import java.sql.*;

public class DatabaseDedup {
    
    private static final String INSERT_SQL = 
            "INSERT INTO message_log (message_id, content, status) VALUES (?, ?, ?)";
    
    public static boolean tryInsert(Connection conn, String messageId, String content) {
        try (PreparedStatement ps = conn.prepareStatement(INSERT_SQL)) {
            ps.setString(1, messageId);
            ps.setString(2, content);
            ps.setString(3, "PROCESSED");
            ps.executeUpdate();
            return true;
        } catch (SQLException e) {
            // 唯一约束冲突,说明已处理过
            if (e.getSQLState().equals("23505")) {
                System.out.println("消息已处理,跳过: " + messageId);
                return false;
            }
            throw new RuntimeException(e);
        }
    }
}

注意:数据库唯一约束需提前在 message_id 字段上创建 UNIQUE INDEX。

注意事项

  1. 本地内存缓存(如 ConcurrentHashMap)在服务重启后失效,仅适用于单机短期场景。
  2. 生产环境推荐使用 Redis(SETNX)或数据库唯一约束实现分布式去重。
  3. 去重逻辑应在消息确认之前执行,避免确认后再处理失败导致重复消费。
  4. 消息 ID 由生产者设置,若未设置则需从业务内容中提取唯一键。
  5. 去重缓存需设置过期时间,避免内存无限增长,过期时间应大于消息重试窗口。

要点总结

  • RabbitMQ 本身不提供去重机制,需在消费端实现幂等性校验
  • 基于 messageId 去重需生产者在发送时设置唯一消息 ID
  • 无 messageId 时可从消息内容提取业务唯一键(如订单号)进行去重
  • 数据库唯一约束是最可靠的去重方式,适用于分布式场景
  • 去重状态需设置过期时间,避免内存持续增长

📝 发现内容有误?点击此处直接编辑

← 上一篇 Return 消息机制
下一篇 → 消费端可靠性
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库