全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

死信消息处理

死信队列中积累了无法被正常消费的消息,需要建立监控、分析和补偿机制。

监控死信队列

通过 HTTP API 或管理插件监控死信队列消息堆积情况:

Bash
# 查看死信队列消息数量
rabbitmqctl list_queues name messages | grep dlx

# 或通过 Management API
curl -u guest:guest http://localhost:15672/api/queues/%2F/dlx.queue

分析死信原因

死信消息的 headers 中携带 x-death 信息,可用于排查原因。

Java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class DlxAnalysisExample {

    private static final String DLX_QUEUE = "dlx.queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.basicConsume(DLX_QUEUE, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, StandardCharsets.UTF_8);

                    // 获取 x-death 信息
                    Map<String, Object> headers = properties.getHeaders();
                    if (headers != null && headers.containsKey("x-death")) {
                        List<Map<String, Object>> xDeaths =
                                (List<Map<String, Object>>) headers.get("x-death");

                        for (Map<String, Object> xDeath : xDeaths) {
                            String reason = (String) xDeath.get("reason");
                            String queue = (String) xDeath.get("queue");
                            Long count = (Long) xDeath.get("count");

                            System.out.println("死信分析:");
                            System.out.println("  原因: " + reason);
                            System.out.println("  原队列: " + queue);
                            System.out.println("  死信次数: " + count);
                            System.out.println("  消息内容: " + message);
                        }
                    } else {
                        System.out.println("无 x-death 信息: " + message);
                    }

                    // 手动确认,避免消息再次进入死信循环
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });

            System.out.println("死信监控消费者已启动");
        }
    }
}

死信补偿处理

根据不同死因实现补偿策略:

Java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class DlxCompensationExample {

    private static final String DLX_QUEUE = "dlx.queue";
    private static final String RETRY_EXCHANGE = "retry.exchange";
    private static final String MANUAL_EXCHANGE = "manual.review.exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明重试交换机和人工审核交换机
            channel.exchangeDeclare(RETRY_EXCHANGE, BuiltinExchangeType.DIRECT, true);
            channel.exchangeDeclare(MANUAL_EXCHANGE, BuiltinExchangeType.DIRECT, true);

            // 消费死信队列,根据原因路由到不同处理队列
            channel.basicConsume(DLX_QUEUE, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    Map<String, Object> headers = properties.getHeaders();
                    String reason = extractReason(headers);

                    if ("rejected".equals(reason)) {
                        // 消费者主动拒绝:可能是业务逻辑异常,尝试重试
                        channel.basicPublish(RETRY_EXCHANGE, "retry.key", properties, body);
                        System.out.println("消息已转入重试队列");
                    } else if ("expired".equals(reason)) {
                        // TTL 过期:可能是延迟业务,正常处理
                        processMessage(body);
                    } else if ("overflow".equals(reason)) {
                        // 队列满:发送人工审核告警
                        channel.basicPublish(MANUAL_EXCHANGE, "review.key", properties, body);
                        System.out.println("队列溢出,消息已转入人工审核");
                    }

                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });

            System.out.println("死信补偿处理器已启动");
        }
    }

    private static String extractReason(Map<String, Object> headers) {
        if (headers == null || !headers.containsKey("x-death")) {
            return "unknown";
        }
        List<Map<String, Object>> xDeaths =
                (List<Map<String, Object>>) headers.get("x-death");
        if (xDeaths.isEmpty()) {
            return "unknown";
        }
        return (String) xDeaths.get(0).get("reason");
    }

    private static void processMessage(byte[] body) {
        System.out.println("处理消息: " + new String(body));
    }
}

补偿策略汇总

死信原因建议策略
rejected转入重试队列,限制最大重试次数
expired正常消费(延迟业务场景)
overflow转入人工审核队列,发送告警

死信队列消费必须调用 basicAck,否则消息会再次进入死信循环。

要点总结

  • 通过 x-death headers 分析死信原因、原队列、死信次数
  • 按死信原因分类补偿:rejected 重试、expired 正常消费、overflow 人工审核
  • 死信消费者必须 basicAck,避免死信循环
  • 建议配合监控告警,及时发现死信堆积问题
  • 生产环境建议记录死信日志,便于后续排查

📝 发现内容有误?点击此处直接编辑

← 上一篇 死信交换机概念
下一篇 → 死信队列配置
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库