全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

Protobuf 序列化

Protocol Buffers(Protobuf)是Google推出的二进制序列化方案,在RabbitMQ中可显著降低消息体积并提升编解码性能。

定义

Protobuf通过.proto文件定义消息结构,编译器生成对应语言的序列化代码。相比JSON/XML,Protobuf具备体积更小、编解码更快、强类型约束的特点,适合高性能消息传输。

Maven 依赖

XML
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.24.3</version>
</dependency>

定义 .proto 文件

创建 order.proto

protobuf
syntax = "proto3";

package rabbitmq.example;

message OrderMessage {
  string order_id = 1;
  string product_name = 2;
  int32 quantity = 3;
  double price = 4;
  string buyer_email = 5;
}

编译生成 Java 类

Bash
protoc --java_out=src/main/java src/main/proto/order.proto

生成 OrderMessage 类后,即可在代码中使用。

发送端示例

Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.example.OrderProto.OrderMessage;

public class ProtobufProducer {
    private static final String QUEUE_NAME = "protobuf_queue";
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            // 构建Protobuf消息
            OrderMessage order = OrderMessage.newBuilder()
                    .setOrderId("ORD-001")
                    .setProductName("iPhone 15")
                    .setQuantity(1)
                    .setPrice(7999.0)
                    .setBuyerEmail("user@example.com")
                    .build();
            
            // 序列化为字节数组
            byte[] body = order.toByteArray();
            
            // 设置content_type
            BasicProperties props = new BasicProperties.Builder()
                    .contentType("application/x-protobuf")
                    .build();
            
            channel.basicPublish("", QUEUE_NAME, props, body);
            System.out.println("已发送Protobuf消息, 大小: " + body.length + " bytes");
        }
    }
}

接收端示例

Java
import com.rabbitmq.client.*;
import com.rabbitmq.example.OrderProto.OrderMessage;
import java.io.IOException;

public class ProtobufConsumer {
    private static final String QUEUE_NAME = "protobuf_queue";
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                byte[] body = delivery.getBody();
                String contentType = delivery.getProperties().getContentType();
                
                try {
                    // 验证消息类型
                    if (!"application/x-protobuf".equals(contentType)) {
                        System.err.println("消息类型不匹配: " + contentType);
                        channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
                        return;
                    }
                    
                    // 反序列化Protobuf消息
                    OrderMessage order = OrderMessage.parseFrom(body);
                    System.out.println("收到订单: " + order.getOrderId() + 
                            ", 商品: " + order.getProductName() + 
                            ", 数量: " + order.getQuantity());
                    
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                } catch (Exception e) {
                    System.err.println("Protobuf解析失败: " + e.getMessage());
                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
                }
            };
            
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
        }
    }
}

JSON vs Protobuf 对比

Java
public class SerializationComparison {
    public static void main(String[] args) throws Exception {
        OrderJson orderJson = new OrderJson("ORD-001", "iPhone 15", 1, 7999.0);
        OrderMessage orderProto = OrderMessage.newBuilder()
                .setOrderId("ORD-001")
                .setProductName("iPhone 15")
                .setQuantity(1)
                .setPrice(7999.0)
                .build();
        
        // JSON序列化
        ObjectMapper mapper = new ObjectMapper();
        byte[] jsonBytes = mapper.writeValueAsBytes(orderJson);
        System.out.println("JSON大小: " + jsonBytes.length + " bytes");
        
        // Protobuf序列化
        byte[] protoBytes = orderProto.toByteArray();
        System.out.println("Protobuf大小: " + protoBytes.length + " bytes");
    }
}

// 输出示例:
// JSON大小: ~85 bytes
// Protobuf大小: ~45 bytes

注意事项

Protobuf消息不具备自描述性,接收方必须持有对应的.proto定义(或生成的代码),否则无法解析。

字段编号(field number)一旦使用不应修改,删除字段可保留编号但标记为reserved,避免后续复用。

application/x-protobuf是Protobuf的MIME类型,需在消息属性中设置。

Protobuf不支持Map的复杂嵌套作为key,建议仅使用基本类型或消息类型作为Map的value。

Protobuf 版本兼容

protobuf
syntax = "proto3";

message OrderMessage {
  string order_id = 1;
  string product_name = 2;
  int32 quantity = 3;
  double price = 4;
  
  // 新增字段:向后兼容
  string buyer_email = 5;
  
  // 删除的字段标记为reserved,防止编号复用
  reserved 6;
  reserved "old_discount_field";
}

Protobuf 3中新增字段向后兼容(旧解析器忽略未知字段),但删除字段需用reserved声明。

要点总结

  • Protobuf是二进制序列化方案,体积更小、编解码更快
  • 通过.proto文件定义消息结构,编译器生成对应语言代码
  • 发送端调用toByteArray()序列化,接收端调用parseFrom()反序列化
  • 必须设置contentType = "application/x-protobuf"
  • 接收方必须持有.proto定义,Protobuf消息不具备自描述性
  • 字段编号不可修改,删除字段需用reserved标记
  • 适合对性能和带宽敏感的高吞吐量场景

📝 发现内容有误?点击此处直接编辑

← 上一篇 JSON 序列化
下一篇 → 大消息处理策略
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库