生产者基础发送
生产者负责将消息发送至指定交换机,下面梳理基础发送流程与核心 API。
Maven 依赖
XML
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
基础发送流程
Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class BasicProducerExample {
private static final String EXCHANGE_NAME = "demo_exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明交换机(类型为 direct,持久化)
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
// 发送消息
String message = "Hello RabbitMQ!";
channel.basicPublish(
EXCHANGE_NAME, // 交换机名称
"routing_key", // 路由键
null, // 基础属性(null 表示默认)
message.getBytes(StandardCharsets.UTF_8) // 消息体
);
System.out.println("消息已发送: " + message);
}
}
}
核心 API 说明
basicPublish 方法签名
Java
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
| 参数 | 说明 |
|---|---|
| exchange | 目标交换机名称,空字符串表示默认交换机 |
| routingKey | 路由键,用于交换机匹配目标队列 |
| props | 消息属性,如持久化、过期时间等,null 表示默认 |
| body | 消息体字节数组 |
交换机声明
Java
channel.exchangeDeclare(String exchange, String type, boolean durable)
| 参数 | 说明 |
|---|---|
| exchange | 交换机名称 |
| type | 交换机类型:direct、fanout、topic、headers |
| durable | 是否持久化,true 表示服务器重启后保留 |
发送多条消息
Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class MultiMessageProducerExample {
private static final String EXCHANGE_NAME = "multi_exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
for (int i = 1; i <= 5; i++) {
String message = "Message #" + i;
channel.basicPublish(EXCHANGE_NAME, "task_key", null,
message.getBytes(StandardCharsets.UTF_8));
System.out.println("已发送: " + message);
}
}
}
}
注意事项
发送消息前必须确保交换机已声明,否则消息将被丢弃。
basicPublish 的 routingKey 需与交换机类型匹配,direct 交换机需精确匹配路由键。
使用 try-with-resources 可自动关闭 Connection 和 Channel,避免资源泄漏。
消息体必须转换为 byte[] 数组,字符串需指定字符编码(推荐 UTF-8)。
要点总结
- 生产者发送消息需经历:创建连接 -> 创建信道 -> 声明交换机 -> 发送消息。
- basicPublish 是核心发送方法,需指定交换机、路由键、属性和消息体。
- 交换机必须在使用前声明,类型包括 direct、fanout、topic、headers。
- 消息体必须是 byte[] 格式,字符串需通过 getBytes 转换。
- 推荐使用 try-with-resources 自动管理连接资源。
📝 发现内容有误?点击此处直接编辑