消息属性与编码
RabbitMQ 消息可携带多种属性用于控制投递行为,下面梳理常用属性配置与消息编码方法。
Maven 依赖
XML
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
消息属性配置
BasicProperties 构建
Java
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class MessagePropertiesExample {
private static final String EXCHANGE_NAME = "props_exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
// 构建消息属性
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.contentType("application/json") // 内容类型
.contentEncoding("UTF-8") // 内容编码
.deliveryMode(2) // 持久化模式:1=非持久化, 2=持久化
.priority(5) // 优先级 0-9
.expiration("60000") // 过期时间(毫秒)
.messageId("msg_001") // 消息唯一标识
.timestamp(new java.util.Date()) // 时间戳
.build();
String message = "{\"task\":\"send_email\",\"to\":\"user@example.com\"}";
channel.basicPublish(EXCHANGE_NAME, "props_key", props,
message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息已发送: " + message);
}
}
}
常用属性说明
| 属性 | 类型 | 说明 |
|---|---|---|
| contentType | String | 消息内容类型,如 application/json、text/plain |
| contentEncoding | String | 内容编码,如 UTF-8 |
| deliveryMode | Integer | 1=非持久化,2=持久化(服务器重启后保留) |
| priority | Integer | 优先级 0-9,需队列开启优先级支持 |
| expiration | String | 消息过期时间(毫秒字符串),超时后消息被丢弃 |
| messageId | String | 消息唯一标识,用于去重 |
| timestamp | Date | 消息发送时间戳 |
| headers | Map<String, Object> | 自定义头部参数,用于 headers 交换机匹配 |
多格式消息编码
JSON 消息
Java
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.HashMap;
public class JsonMessageExample {
private static final String QUEUE_NAME = "json_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
ObjectMapper mapper = new ObjectMapper();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 发送 JSON 消息
Map<String, Object> data = new HashMap<>();
data.put("action", "update_user");
data.put("userId", 1001);
data.put("name", "张三");
String jsonBody = mapper.writeValueAsString(data);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.build();
channel.basicPublish("", QUEUE_NAME, props,
jsonBody.getBytes(StandardCharsets.UTF_8));
System.out.println("发送JSON: " + jsonBody);
// 消费并解析 JSON
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
String body = new String(delivery.getBody(), StandardCharsets.UTF_8);
Map<String, Object> receivedData = mapper.readValue(body, Map.class);
System.out.println("解析JSON: " + receivedData);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
System.err.println("JSON解析失败: " + e.getMessage());
}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}
}
序列化对象消息
Java
import com.rabbitmq.client.*;
import java.io.*;
import java.nio.charset.StandardCharsets;
public class SerializableMessageExample {
private static final String QUEUE_NAME = "serial_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 序列化对象
TaskMessage task = new TaskMessage("process_data", 100);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(task);
oos.close();
channel.basicPublish("", QUEUE_NAME, null, baos.toByteArray());
System.out.println("序列化消息已发送");
}
}
static class TaskMessage implements Serializable {
private String type;
private int priority;
public TaskMessage(String type, int priority) {
this.type = type;
this.priority = priority;
}
@Override
public String toString() {
return "TaskMessage{type='" + type + "', priority=" + priority + "}";
}
}
}
注意事项
deliveryMode 设为 2 时消息会持久化到磁盘,但需配合持久化队列和交换机使用才有效。
消息优先级功能需在声明队列时通过参数
x-max-priority开启,否则优先级设置无效。
expiration 属性值为字符串格式的毫秒数,消息超时后会被自动删除。
消息体最大限制由 RabbitMQ 配置决定(默认无限制),超大消息可能影响性能。
要点总结
- AMQP.BasicProperties.Builder 用于构建消息属性,支持 contentType、deliveryMode、priority 等配置。
- deliveryMode 为 2 时消息持久化,为 1 时不持久化。
- JSON 格式消息需设置 contentType 为 application/json,消费端按相同编码解析。
- 序列化对象消息需实现 Serializable 接口,通过 ObjectOutputStream 转换。
- 消息优先级和过期时间需在队列和消息层面配合使用才生效。
📝 发现内容有误?点击此处直接编辑