消息持久化配置
消息持久化是保障 RabbitMQ 在服务重启或崩溃时不丢失数据的核心机制。
定义
持久化包含两个层面:队列持久化(声明 durable 队列)与消息持久化(设置消息 deliveryMode 为 2)。两者缺一不可,仅持久化队列不持久化消息,消息仍会丢失。
Maven 依赖
XML
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
配置示例
声明持久化队列
Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeExchangeType;
public class DurableQueueProducer {
private static final String QUEUE_NAME = "persistent_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// durable=true 声明持久化队列
boolean durable = true;
boolean exclusive = false;
boolean autoDelete = false;
channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, null);
System.out.println("持久化队列已声明: " + QUEUE_NAME);
}
}
}
发送持久化消息
Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class DurableMessageProducer {
private static final String QUEUE_NAME = "persistent_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 先声明持久化队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "This is a persistent message";
// MessageProperties.PERSISTENT_TEXT_PLAIN 设置 deliveryMode=2
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println("持久化消息已发送: " + message);
}
}
}
自定义消息属性持久化
Java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class CustomPersistentMessage {
private static final String QUEUE_NAME = "persistent_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Custom persistent message";
// 手动构建持久化属性
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 2=持久化, 1=非持久化
.contentType("text/plain")
.contentEncoding("UTF-8")
.build();
channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));
System.out.println("自定义持久化消息已发送");
}
}
}
注意事项
仅声明 durable 队列不足以实现持久化,必须同时设置消息的 deliveryMode 为 2。
持久化会将数据写入磁盘,对性能有一定影响。高吞吐场景需权衡持久化与性能的关系。
RabbitMQ 不会等待持久化消息真正落盘后再返回 publish 确认,如需强保证,需配合 publisher confirms 机制。
非持久化消息在队列重启或服务重启后会丢失,但性能更高。
要点总结
- 队列持久化:
channel.queueDeclare(queue, true, false, false, null)中第二个参数设为true - 消息持久化:使用
MessageProperties.PERSISTENT_TEXT_PLAIN或手动设置deliveryMode=2 - 两者必须同时配置,缺一不可
- 持久化会降低吞吐,需根据业务场景权衡使用
📝 发现内容有误?点击此处直接编辑