Return 消息机制
当消息成功到达交换机但无法匹配到任何队列时,Return 机制可将消息返回给生产者,避免消息静默丢失。
Return 机制原理
触发条件
| 条件 | 说明 |
|---|---|
| 交换机存在 | 消息成功投递到已声明的交换机 |
| 无匹配队列 | 没有队列绑定该交换机或路由键不匹配任何绑定 |
| mandatory=true | 生产者在发送消息时设置 mandatory 为 true |
与 Confirm 的区别
| 机制 | 触发时机 | 作用 |
|---|---|---|
| Confirm | 消息到达交换机后立即触发 | 确认消息是否到达交换机 |
| Return | 消息无法路由到队列时触发 | 返回未路由的消息给生产者 |
注意:Confirm 和 Return 可同时使用,Confirm 先触发,Return 后触发。
配置 ReturnCallback
基础示例
Java
// Maven 依赖
// <dependency>
// <groupId>com.rabbitmq</groupId>
// <artifactId>amqp-client</artifactId>
// <version>5.20.0</version>
// </dependency>
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class ReturnCallbackExample {
private static final String EXCHANGE_NAME = "return_exchange";
private static final String QUEUE_NAME = "return_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明交换机和队列(故意不绑定)
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 不执行 queueBind,模拟路由失败
// 注册 Return 回调
channel.addReturnListener((replyCode, replyText, exchange,
routingKey, properties, body) -> {
String message = new String(body, StandardCharsets.UTF_8);
System.err.println("Return 触发:");
System.err.println(" 返回码: " + replyCode); // 312: NO_ROUTE
System.err.println(" 返回文本: " + replyText); // NO_ROUTE
System.err.println(" 交换机: " + exchange);
System.err.println(" 路由键: " + routingKey);
System.err.println(" 消息: " + message);
// 处理逻辑:记录日志、重发、转存死信队列等
});
String message = "Unroutable Message";
// mandatory = true 启用 Return 机制
channel.basicPublish(EXCHANGE_NAME, "nonexistent_key", true, null,
message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息已发送: " + message);
// 等待 Return 回调执行
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Return 响应码说明
| 响应码 | 常量 | 含义 |
|---|---|---|
| 312 | NO_ROUTE | 消息无匹配队列,无法路由 |
| 313 | NO_CONSUMERS | 队列存在但无消费者(部分场景触发) |
完整可靠性方案:Confirm + Return
Java
public class ConfirmAndReturnExample {
private static final String EXCHANGE_NAME = "reliable_exchange";
private static final String QUEUE_NAME = "reliable_queue";
private static final String ROUTING_KEY = "reliable.key";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 开启 Confirm 模式
channel.confirmSelect();
// 注册 Return 回调
channel.addReturnListener((replyCode, replyText, exchange,
routingKey, properties, body) -> {
System.err.println("[Return] 消息未路由: " + routingKey);
// 记录未路由消息,后续可重发或告警
});
// 注册 Confirm 回调
channel.addConfirmListener(
(deliveryTag, multiple) ->
System.out.println("[Confirm] 消息已到达交换机, tag: " + deliveryTag),
(deliveryTag, multiple) ->
System.err.println("[Confirm] 消息未到达交换机, tag: " + deliveryTag)
);
String message = "Reliable Message";
// mandatory = true 同时启用 Confirm 和 Return
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, null,
message.getBytes(StandardCharsets.UTF_8));
// 等待 Confirm
if (channel.waitForConfirms()) {
System.out.println("消息投递成功");
}
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
注意事项
- 必须设置
mandatory = true,否则 Return 回调不会触发,消息直接丢弃。- Return 回调是异步的,需确保连接和 Channel 在回调执行前不被关闭。
- 若消息既通过 Confirm 又触发 Return,Confirm 先于 Return 返回。
- Return 机制仅处理路由失败场景,不处理交换机不存在或队列异常的情况。
- 生产环境中,Return 回调应配合日志记录和告警机制,及时发现路由异常。
要点总结
- Return 机制处理消息到达交换机但无法路由到队列的场景
- 必须设置
mandatory = true并注册addReturnListener才会触发返回 - 常见返回码 312(NO_ROUTE)表示无匹配队列
- Confirm 与 Return 可同时使用,Confirm 保证到达交换机,Return 处理路由失败
- Return 回调异步执行,需保持连接存活并配合日志监控
📝 发现内容有误?点击此处直接编辑