全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

消息内容类型

RabbitMQ通过消息属性中的content_type字段标识消息体格式,消费者据此选择正确的解析方式。

定义

content_type是RabbitMQ消息属性(BasicProperties)的一部分,采用MIME类型格式标识消息体的数据格式。常见的值包括application/jsontext/plainapplication/octet-stream等。

content_type 常用值

content_type说明典型场景
application/jsonJSON格式跨语言服务间通信
text/plain纯文本简单字符串消息
text/xmlXML格式遗留系统对接
application/octet-stream二进制流文件传输、Protobuf
application/x-java-serialized-objectJava序列化Java系统内部通信

发送端设置

Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.AMQP.BasicProperties;

public class ContentTypeProducer {
    private static final String QUEUE_NAME = "content_type_queue";
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            // 示例1: JSON消息
            String jsonMessage = "{\"orderId\":\"ORD-001\",\"amount\":100}";
            BasicProperties jsonProps = new BasicProperties.Builder()
                    .contentType("application/json")
                    .contentEncoding("UTF-8")
                    .build();
            channel.basicPublish("", QUEUE_NAME, jsonProps, jsonMessage.getBytes("UTF-8"));
            
            // 示例2: 纯文本消息
            String textMessage = "Hello RabbitMQ";
            BasicProperties textProps = new BasicProperties.Builder()
                    .contentType("text/plain")
                    .contentEncoding("UTF-8")
                    .build();
            channel.basicPublish("", QUEUE_NAME, textProps, textMessage.getBytes("UTF-8"));
            
            // 示例3: 二进制消息
            byte[] binaryMessage = new byte[]{0x01, 0x02, 0x03, 0x04};
            BasicProperties binaryProps = new BasicProperties.Builder()
                    .contentType("application/octet-stream")
                    .build();
            channel.basicPublish("", QUEUE_NAME, binaryProps, binaryMessage);
            
            System.out.println("已发送3种不同content_type的消息");
        }
    }
}

接收端解析

Java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class ContentTypeConsumer {
    private static final String QUEUE_NAME = "content_type_queue";
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                BasicProperties props = delivery.getProperties();
                byte[] body = delivery.getBody();
                String contentType = props.getContentType();
                
                System.out.println("收到消息, content_type: " + contentType);
                
                switch (contentType) {
                    case "application/json":
                        handleJsonMessage(body);
                        break;
                    case "text/plain":
                        handleTextMessage(body);
                        break;
                    case "application/octet-stream":
                        handleBinaryMessage(body);
                        break;
                    default:
                        System.err.println("未知消息类型: " + contentType);
                        // 根据业务决定是否拒绝
                        channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                        return;
                }
                
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            };
            
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
        }
    }
    
    private static void handleJsonMessage(byte[] body) {
        String json = new String(body, StandardCharsets.UTF_8);
        System.out.println("JSON消息: " + json);
        // 使用Jackson/Gson解析
    }
    
    private static void handleTextMessage(byte[] body) {
        String text = new String(body, StandardCharsets.UTF_8);
        System.out.println("文本消息: " + text);
    }
    
    private static void handleBinaryMessage(byte[] body) {
        System.out.println("二进制消息, 长度: " + body.length + " bytes");
        // 按业务协议解析
    }
}

content_type 与 content_encoding

Java
// 同时设置 content_type 和 content_encoding
BasicProperties props = new BasicProperties.Builder()
        .contentType("application/json")        // 数据格式
        .contentEncoding("UTF-8")               // 字符编码
        .build();
  • content_type:标识数据类型(MIME类型)
  • content_encoding:标识字符编码(仅对文本类数据有效)

注意事项

content_type是可选属性,不设置时默认为null。但强烈建议始终设置,否则消费者无法可靠识别消息格式。

RabbitMQ本身不验证content_type值的有效性,它仅透传该属性。格式校验由消费者完成。

混合格式消息队列中,必须根据content_type路由到不同的处理器,避免解析错误。

使用Spring AMQP时,可通过MessageConverter自动设置和识别content_type

Spring AMQP 自动识别示例

Java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringAutoTypeConsumer {
    
    @RabbitListener(queues = "content_type_queue")
    public void handleMessage(Message message) {
        MessageProperties props = message.getMessageProperties();
        String contentType = props.getContentType();
        
        System.out.println("Spring自动识别content_type: " + contentType);
        
        if ("application/json".equals(contentType)) {
            // Spring会根据contentType自动反序列化
            byte[] body = message.getBody();
            System.out.println("JSON消息体: " + new String(body));
        }
    }
}

要点总结

  • content_type使用MIME类型格式标识消息体数据格式
  • 常见值:application/jsontext/plainapplication/octet-stream
  • 发送端必须设置content_type,接收端根据该值选择解析方式
  • 可配合content_encoding指定字符编码
  • RabbitMQ不验证content_type有效性,校验由消费者完成
  • 混合格式队列中,必须根据content_type路由到对应处理器
  • Spring AMQP支持基于content_type的自动消息转换

📝 发现内容有误?点击此处直接编辑

← 上一篇 大消息处理策略
下一篇 → Channel 复用策略
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库