RabbitMQ 原理解析
RabbitMQ 是一种流行的开源消息代理软件,遵循 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)标准。本文将从多个方面详细解析 RabbitMQ,包括其工作原理、组成部分、各种交换机类型、使用场景、性能调优、监控管理和常见问题等。
一、应用场景
RabbitMQ 适用于多种应用场景,包括但不限于:
- 异步处理:将耗时任务放入队列,由消费者异步处理,提高系统响应速度。

- 应用解耦:通过消息队列,使生产者和消费者解耦,提高系统的灵活性和可扩展性。

- 流量削峰:在高并发场景下,通过队列缓冲请求,防止系统过载。

- 日志处理:收集分布式系统的日志,通过消息队列进行集中处理和分析。(这个场景一般用kafka来做)
- 任务分发:将任务分发到多个消费者进行并行处理,提高处理效率。比如经常在分布式数据采集系统中与分布式调度框架结合以实现高并发的数据爬取。
一、 RabbitMQ 组成
RabbitMQ 的核心组成部分包括以下几个:
- Producer(生产者):生成并发送消息的应用程序或服务。
- Exchange(交换机):接收生产者发送的消息,并根据路由规则将消息分发到相应的队列。
- Queue(队列):存储消息的缓冲区,消息会在队列中等待消费者处理。
- Consumer(消费者):接收并处理消息的应用程序或服务。
- Binding(绑定):定义了交换机与队列之间的关系及消息路由规则。
- Broker(代理服务器):实现消息的传递和存储。
- Connection(连接):生产者和消费者通过连接与 RabbitMQ 服务器进行通信。
- Channel(通道):是建立在连接之上的虚拟连接,用于发送和接收消息。

二、RabbitMQ 原理
RabbitMQ 的工作原理基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)。其核心流程如下:
- 消息发布:生产者通过交换机将消息发送到 RabbitMQ 服务器。
- 消息路由:交换机根据定义的路由规则将消息路由到相应的队列。
- 消息存储:队列存储消息,直到消费者处理它们。
- 消息消费:消费者从队列中获取消息并进行处理。

AMQP 是一个应用层协议,为消息代理和客户端之间的消息传递提供标准化的方法。它定义了消息的格式、消息通信的流程和消息代理的行为。以下是对 AMQP 及其在 RabbitMQ 中应用的详细讲解。
1. AMQP 概述
AMQP 是一个开源的标准协议,旨在确保消息在不同系统之间可靠地传递。RabbitMQ 就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议 ) AMQP 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定 。它主要包括以下几个方面:
- 消息模型:定义了消息的发送、接收、存储和路由的方式。
- 消息格式:定义了消息的结构,包括消息头和消息体。
- 协议操作:定义了消息代理和客户端之间的操作,如连接、通道、消息发布和消费等。
2. 核心组件
2.1 连接(Connection)
连接是客户端与 RabbitMQ 服务器之间的 TCP 连接。一个连接可以包含多个通道(Channel)。如下图:

2.2 通道(Channel)
通道是建立在连接之上的虚拟连接。所有的 AMQP 命令都是通过通道发送的。通道的创建和销毁比连接更轻量级,因此在同一个连接上可以创建多个通道以提高并发性。如上图:
2.3 消息(Message)
消息是 AMQP 中传递的基本单元。每条消息由消息属性和消息体组成。消息属性包括路由键、内容类型、优先级等,消息体是实际传递的数据。例如:
{
"header": {
"priority": 5, // 消息优先级,数值越大优先级越高
"deliveryMode": 2, // 消息传递模式,2表示持久化消息
"contentType": "application/json", // 消息内容类型
"contentEncoding": "UTF-8", // 消息内容编码
"expiration": "60000", // 消息过期时间,单位为毫秒
"messageId": "1234567890", // 消息唯一标识
"timestamp": "2023-07-03T12:34:56Z" // 消息发送时间戳
},
"properties": {
"appId": "myApp", // 应用ID
"correlationId": "9876543210", // 关联ID,用于跟踪请求和响应
"replyTo": "amq.direct/replyQueue", // 回复地址,用于指定回复消息的队列
"type": "order", // 消息类型
"userId": "user123" // 用户ID
},
"body": {
"orderId": "ORD12345", // 订单ID
"customerId": "CUST54321", // 客户ID
"items": [
{"productId": "PROD123", "quantity": 2}, // 产品ID和数量
{"productId": "PROD456", "quantity": 1} // 产品ID和数量
],
"totalAmount": 150.75, // 订单总金额
"status": "pending" // 订单状态
}
}
2.4 交换机(Exchange)
交换机是消息路由的核心组件。生产者将消息发送到交换机,交换机根据绑定规则将消息路由到相应的队列。交换机有不同的类型,如 direct、fanout、topic 和 headers。

2.5 队列(Queue)
队列是存储消息的缓冲区。消息在队列中等待消费者消费。队列可以绑定到一个或多个交换机,并且可以设置不同的属性,如持久化、自动删除等。

2.6 绑定(Binding)
绑定定义了交换机和队列之间的关系。它指定了消息如何从交换机路由到队列。绑定可以包含路由键,用于精确匹配或模式匹配。
3. 工作流程

3.1 连接建立
客户端与 RabbitMQ 服务器之间首先建立 TCP 连接。然后,在连接上创建通道。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
3.2 声明交换机和队列
在通道上声明交换机和队列。如果交换机或队列已经存在,声明操作是幂等的,不会重复创建。
channel.exchangeDeclare("myExchange", "direct");
channel.queueDeclare("myQueue", true, false, false, null);
3.3 绑定交换机和队列
通过绑定将交换机和队列关联起来,定义消息的路由规则。
channel.queueBind("myQueue", "myExchange", "myRoutingKey");
3.4 消息发布
生产者将消息发送到交换机,交换机根据绑定规则将消息路由到相应的队列。
String message = "Hello, RabbitMQ!";
channel.basicPublish("myExchange", "myRoutingKey", null, message.getBytes());
3.5 消息消费
消费者从队列中获取消息并进行处理。消费者可以设置自动确认或手动确认消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received: " + message);
// 手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume("myQueue", false, deliverCallback, consumerTag -> {});
3.6 消息确认
消息确认机制确保消息可靠传递。生产者可以启用发布确认,确保消息成功发送到交换机。消费者可以启用消息确认,确保消息成功处理。
- 生产者确认(Publisher Confirms):生产者发送消息后,RabbitMQ 会确认消息已被正确接收。
- 消费者确认(Consumer Acknowledgements):消费者处理完消息后,向 RabbitMQ 发送确认消息,RabbitMQ 会将该消息从队列中删除。
// 发布确认
channel.confirmSelect();
channel.basicPublish("myExchange", "myRoutingKey", null, message.getBytes());
channel.waitForConfirmsOrDie();
// 消费确认
channel.basicAck(deliveryTag, false);
3.7 消息持久化
为了防止消息丢失,RabbitMQ 支持消息持久化。将消息标记为持久化后,即使 在 RabbitMQ 服务器重启或出现故障时消息也不会丢失。通过将队列和消息标记为持久化,可以实现这一目标。
1. 持久化队列
持久化队列是指在队列声明时将其标记为持久化,这样即使 RabbitMQ 服务器重启,队列依然存在。要声明持久化队列,只需将 durable 参数设置为 true。
channel.queueDeclare("durableQueue", true, false, false, null);
2. 持久化消息
持久化消息是指在消息发布时将其标记为持久化,这样即使 RabbitMQ 服务器重启,消息也不会丢失。要声明持久化消息,需要在消息属性中将 deliveryMode 设置为 2。
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 2 表示持久化
.build();
channel.basicPublish("exchangeName", "routingKey", props, message.getBytes());
3. 完整示例
以下是一个完整的示例,展示如何声明持久化队列和持久化消息:
import com.rabbitmq.client.*;
public class PersistentExample {
private final static String QUEUE_NAME = "durableQueue";
private final static String EXCHANGE_NAME = "durableExchange";
public static void main(String[] argv) throws Exception {
// 创建连接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
// 声明持久化队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 声明持久化交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
// 将队列绑定到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingKey");
// 创建持久化消息
String message = "Persistent Hello World!";
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 2 表示持久化
.build();
// 发布消息
channel.basicPublish(EXCHANGE_NAME, "routingKey", props, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
4. 其他注意事项
- 性能影响:启用消息持久化会降低消息吞吐量,因为每条消息都需要写入磁盘。为了优化性能,RabbitMQ 采用了批量写入和延迟写入等技术,但仍需要权衡性能和可靠性。
- 持久化策略:并非所有消息都需要持久化。根据业务需求,可以选择性地对重要消息进行持久化,而对临时性或不重要的消息则不持久化。
- 持久化与高可用性:持久化保证了单节点故障时消息不丢失,但不能完全解决多节点集群中的高可用性问题。为了实现高可用性,可以结合 RabbitMQ 的镜像队列(Mirrored Queues)功能,将消息复制到集群中的多个节点。
5. 持久化和镜像队列
RabbitMQ 的镜像队列允许将队列及其消息复制到集群中的多个节点,进一步提高消息的可用性和可靠性。结合消息持久化和镜像队列,可以实现更高的容灾能力。
Map<String, Object> args = new HashMap<>();
args.put("x-ha-policy", "all"); // 将队列镜像到所有节点
channel.queueDeclare("mirroredQueue", true, false, false, args);
通过以上方式,我们可以确保消息在 RabbitMQ 服务器重启或节点故障时不会丢失,从而提高系统的可靠性和容灾能力。
4. 消息模型
4.1 消息属性
消息属性是对消息的附加信息,包括:
- 内容类型(Content-Type):消息体的 MIME 类型。
- 消息ID(Message ID):消息的唯一标识符。
- 时间戳(Timestamp):消息的生成时间。
- 优先级(Priority):消息的优先级。
- 持久性(Delivery Mode):消息是否持久化。
4.2 路由键(Routing Key)
路由键是消息在交换机和队列之间路由的关键。不同类型的交换机对路由键的处理方式不同:
- Direct Exchange:完全匹配路由键。
- Topic Exchange:模式匹配路由键,支持通配符。
- Fanout Exchange:忽略路由键,广播消息。
- Headers Exchange:根据消息头属性匹配。
5. RabbitMQ 的 AMQP 扩展
RabbitMQ 在 AMQP 标准之上做了一些扩展,以增强其功能和灵活性。例如:
- 插件系统:RabbitMQ 支持丰富的插件系统,可以扩展其功能,如延迟消息、Shovel 插件等。
- 管理接口:RabbitMQ 提供了管理插件,可以通过 HTTP API 和 Web UI 管理和监控 RabbitMQ。
- 多协议支持:除了 AMQP,RabbitMQ 还支持 STOMP、MQTT 等协议,适应不同的应用场景。
6. AMQP 版本
AMQP 有多个版本,RabbitMQ 主要支持的是 AMQP 0-9-1 版本。这个版本广泛应用于消息队列系统,并且有着成熟的生态系统和文档支持。
7. 总结
AMQP 是 RabbitMQ 的核心协议,定义了消息的传输、路由和处理方式。通过 AMQP,RabbitMQ 能够实现可靠的消息传递和灵活的消息路由,适用于各种分布式系统和应用场景。掌握 AMQP 的工作原理和使用方法,可以帮助你更好地理解和使用 RabbitMQ,提升系统的可靠性和性能。
三、RabbitMQ 工作方式

1. 交换机类型
RabbitMQ 支持多种交换机类型,主要包括:
- Direct Exchange:根据消息的路由键将消息发送到绑定键完全匹配的队列。
- Fanout Exchange:将消息广播到所有绑定到该交换机的队列,不考虑路由键。
- Topic Exchange:根据消息的路由键和绑定键的模式匹配将消息发送到队列,支持模糊匹配。
- Headers Exchange:根据消息头属性进行匹配,而不是路由键。
1.1 Direct Exchange
Direct Exchange 根据路由键精确匹配队列绑定键。适用于精确路由的场景。

1.2 Fanout Exchange
Fanout Exchange 将消息广播到所有绑定的队列,不考虑路由键。适用于广播消息的场景。
1.3 Topic Exchange
Topic Exchange 支持基于模式匹配的路由。绑定键可以使用通配符(如 * 和 #)进行匹配。适用于需要按主题进行消息路由的场景。

1.4 Headers Exchange(不推荐)
Headers Exchange 根据消息头属性(键值对)进行匹配,而不是路由键。适用于需要基于消息头属性进行路由的场景。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
确实,解释 RabbitMQ 的时候,重要的一环是它的工作模式。这些模式决定了消息如何在交换机、队列、生产者和消费者之间流转。下面是对 RabbitMQ 的主要工作模式的详细介绍。
2. 主要的工作模式
RabbitMQ 支持多种消息传递模式,主要包括以下几种:
- 简单模式(Simple Queue)
- 工作队列模式(Work Queues)
- 发布/订阅模式(Publish/Subscribe)
- 路由模式(Routing)
- 主题模式(Topics)
- RPC 模式(Remote Procedure Call)
1. 简单模式(Simple Queue)
简单模式是最基本的消息传递模式,包含一个生产者、一个队列和一个消费者。
适用场景:适用于简单的任务分发,例如日志处理、邮件发送等,其中每个任务只需要一个消费者处理。
- 生产者:发送消息到队列。
- 队列:存储消息。
- 消费者:从队列中接收消息并进行处理。
// ---------------------生产者代码 start-------------------
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接的主机地址,这里设置为本地主机
factory.setHost("localhost");
try (
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel()
) {
// 声明一个队列,队列名为 "simpleQueue",不持久化,不独占,不自动删除,无额外参数
channel.queueDeclare("simpleQueue", false, false, false, null);
// 要发送的消息内容
String message = "Hello World!";
// 发布消息到默认交换机(空字符串表示默认交换机),路由键为 "simpleQueue"
channel.basicPublish("", "simpleQueue", null, message.getBytes());
// 打印发送的消息
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
// 处理异常
e.printStackTrace();
}
// ---------------------生产者代码 end-------------------
// ---------------------消费者代码 start-------------------
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接的主机地址,这里设置为本地主机
factory.setHost("localhost");
try {
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 声明一个队列,队列名为 "simpleQueue"
channel.queueDeclare("simpleQueue", false, false, false, null);
// 定义消息传递回调接口
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 将消息体转换为字符串
String message = new String(delivery.getBody(), "UTF-8");
// 打印接收到的消息
System.out.println(" [x] Received '" + message + "'");
};
// 消费消息,队列名为 "simpleQueue",自动确认,使用定义的回调接口,无取消回调
channel.basicConsume("simpleQueue", true, deliverCallback, consumerTag -> {});
} catch (Exception e) {
// 处理异常
e.printStackTrace();
}
// ---------------------消费者代码 end--------------------
2. 工作队列模式(Work Queues)
工作队列模式用于将耗时任务分发给多个消费者,均衡负载。
适用场景:适用于需要并行处理的任务,例如图像处理、文件转换等,可以通过多个消费者来提高处理速度。
- 生产者:发送多个任务消息到队列。
- 队列:存储任务消息。
- 多个消费者:从队列中接收任务消息并处理。
// ----------------------生产者代码 start---------------------
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接的主机地址,这里设置为本地主机
factory.setHost("localhost");
try (
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel()
) {
// 声明一个队列,队列名为 "workQueue",持久化,不独占,不自动删除,无额外参数
channel.queueDeclare("workQueue", true, false, false, null);
// 要发送的消息内容
String message = "Task message";
// 发布消息到默认交换机(空字符串表示默认交换机),路由键为 "workQueue",消息持久化
channel.basicPublish("", "workQueue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
// 打印发送的消息
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
// 处理异常
e.printStackTrace();
}
// ----------------------生产者代码 end---------------------
// ----------------------消费者代码 start---------------------
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接的主机地址,这里设置为本地主机
factory.setHost("localhost");
try {
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 声明一个队列,队列名为 "workQueue",持久化,不独占,不自动删除,无额外参数
channel.queueDeclare("workQueue", true, false, false, null);
// 设置每次只接收一条消息,实现公平分发
channel.basicQos(1);
// 定义消息传递回调接口
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 将消息体转换为字符串
String message = new String(delivery.getBody(), "UTF-8");
// 打印接收到的消息
System.out.println(" [x] Received '" + message + "'");
try {
// 处理消息
doWork(message);
} finally {
// 打印处理完成的消息
System.out.println(" [x] Done");
// 手动确认消息处理完成
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 消费消息,队列名为 "workQueue",不自动确认,使用定义的回调接口,无取消回调
channel.basicConsume("workQueue", false, deliverCallback, consumerTag -> {});
} catch (Exception e) {
// 处理异常
e.printStackTrace();
}
// ----------------------消费者代码 end---------------------
3. 发布/订阅模式(Publish/Subscribe)
发布/订阅模式通过 fanout 交换机将消息广播到所有绑定的队列,实现一条消息被多个消费者接收。
适用场景:适用于需要广播消息的场景,例如新闻推送、实时更新等,多个消费者可以订阅感兴趣的消息。
- 生产者:发送消息到
fanout交换机。 - 交换机:将消息广播到所有绑定的队列。
- 多个队列和消费者:每个队列都有一个消费者接收消息。
// ----------------------生产者代码 start---------------------
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接的主机地址,这里设置为本地主机
factory.setHost("localhost");
try (
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel()
) {
// 声明一个交换机,交换机名为 "logs",类型为 "fanout"
channel.exchangeDeclare("logs", "fanout");
// 要发送的消息内容
String message = "log message";
// 发布消息到交换机 "logs",路由键为空
channel.basicPublish("logs", "", null, message.getBytes());
// 打印发送的消息
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
// 处理异常
e.printStackTrace();
}
// ----------------------生产者代码 end---------------------
// ----------------------消费者代码 start---------------------
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接的主机地址,这里设置为本地主机
factory.setHost("localhost");
try {
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 声明一个交换机,交换机名为 "logs",类型为 "fanout"
channel.exchangeDeclare("logs", "fanout");
// 声明一个临时队列,获取队列名称
String queueName = channel.queueDeclare().getQueue();
// 将队列绑定到交换机 "logs",路由键为空
channel.queueBind(queueName, "logs", "");
// 定义消息传递回调接口
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 将消息体转换为字符串
String message = new String(delivery.getBody(), "UTF-8");
// 打印接收到的消息
System.out.println(" [x] Received '" + message + "'");
};
// 消费消息,队列名为临时队列名称,自动确认,使用定义的回调接口,无取消回调
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
} catch (Exception e) {
// 处理异常
e.printStackTrace();
}
// ----------------------消费者代码 end---------------------
4. 路由模式(Routing)
路由模式通过 direct 交换机根据路由键将消息发送到指定的队列,实现有选择地接收消息。
适用场景:适用于需要根据特定条件过滤消息的场景,例如日志处理,可以根据日志级别(如错误、警告、信息)将日志发送到不同的队列。
- 生产者:发送带有路由键的消息到
direct交换机。 - 交换机:根据路由键将消息发送到匹配的队列。
- 队列和消费者:接收特定路由键的消息。
// ----------------------生产者代码 start---------------------
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接的主机地址,这里设置为本地主机
factory.setHost("localhost");
try (
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel()
) {
// 声明一个交换机,交换机名为 "direct_logs",类型为 "direct"
channel.exchangeDeclare("direct_logs", "direct");
// 定义消息的严重性(路由键)
String severity = "error";
// 要发送的消息内容
String message = "Error message";
// 发布消息到交换机 "direct_logs",路由键为 "error"
channel.basicPublish("direct_logs", severity, null, message.getBytes());
// 打印发送的消息
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
// 处理异常
e.printStackTrace();
}
// ----------------------生产者代码 end---------------------
// ----------------------消费者代码 start---------------------
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接的主机地址,这里设置为本地主机
factory.setHost("localhost");
try {
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 声明一个交换机,交换机名为 "direct_logs",类型为 "direct"
channel.exchangeDeclare("direct_logs", "direct");
// 声明一个临时队列,获取队列名称
String queueName = channel.queueDeclare().getQueue();
// 将队列绑定到交换机 "direct_logs",路由键为 "error"
channel.queueBind(queueName, "direct_logs", "error");
// 定义消息传递回调接口
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 将消息体转换为字符串
String message = new String(delivery.getBody(), "UTF-8");
// 打印接收到的消息
System.out.println(" [x] Received '" + message + "'");
};
// 消费消息,队列名为临时队列名称,自动确认,使用定义的回调接口,无取消回调
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
} catch (Exception e) {
// 处理异常
e.printStackTrace();
}
// ----------------------消费者代码 end---------------------
5. 主题模式(Topics)
主题模式通过 topic 交换机根据通配符路由键将消息发送到匹配的队列,实现复杂的路由规则。
适用场景:适用于需要更灵活的消息过滤的场景,例如股票市场数据,可以根据股票代码或市场类型将数据发送到不同的队列。
- 生产者:发送带有通配符路由键的消息到
topic交换机。 - 交换机:根据通配符路由键将消息发送到匹配的队列。
- 队列和消费者:接收匹配通配符路由键的消息。
// ----------------------生产者代码 start---------------------
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接的主机地址,这里设置为本地主机
factory.setHost("localhost");
try (
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel()
) {
// 声明一个交换机,交换机名为 "topic_logs",类型为 "topic"
channel.exchangeDeclare("topic_logs", "topic");
// 定义消息的路由键
String routingKey = "kern.critical";
// 要发送的消息内容
String message = "Critical kernel error";
// 发布消息到交换机 "topic_logs",路由键为 "kern.critical"
channel.basicPublish("topic_logs", routingKey, null, message.getBytes());
// 打印发送的消息
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
// 处理异常
e.printStackTrace();
}
// ----------------------生产者代码 end---------------------
// ----------------------消费者代码 start---------------------
// 创建连接工厂
factory = new ConnectionFactory();
// 设置连接的主机地址,这里设置为本地主机
factory.setHost("localhost");
try {
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 声明一个交换机,交换机名为 "topic_logs",类型为 "topic"
channel.exchangeDeclare("topic_logs", "topic");
// 声明一个临时队列,获取队列名称
String queueName = channel.queueDeclare().getQueue();
// 将队列绑定到交换机 "topic_logs",路由键为 "kern.*"
channel.queueBind(queueName, "topic_logs", "kern.*");
// 定义消息传递回调接口
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 将消息体转换为字符串
String message = new String(delivery.getBody(), "UTF-8");
// 打印接收到的消息
System.out.println(" [x] Received '" + message + "'");
};
// 消费消息,队列名为临时队列名称,自动确认,使用定义的回调接口,无取消回调
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
} catch (Exception e) {
// 处理异常
e.printStackTrace();
}
// ----------------------消费者代码 end---------------------
6. RPC 模式(Remote Procedure Call)
RPC 模式允许客户端通过 RabbitMQ 调用远程服务,并等待结果返回。
适用场景:适用于需要远程调用的场景,例如微服务架构中的服务间调用,客户端可以调用远程服务并等待响应。
- 客户端:发送 RPC 请求消息到队列,并等待响应。
- 服务端:从队列中接收请求消息,处理后发送响应消息。
// ----------------------RPC 客户端代码 start---------------------
public class RPCClient implements AutoCloseable {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue"; // 请求队列名称
private String replyQueueName; // 回复队列名称
private QueueingConsumer consumer; // 消费者
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // 设置连接的主机地址
connection = factory.newConnection(); // 创建一个新的连接
channel = connection.createChannel(); // 创建一个通道
replyQueueName = channel.queueDeclare().getQueue(); // 声明一个临时队列,获取队列名称
consumer = new QueueingConsumer(channel); // 创建一个消费者
channel.basicConsume(replyQueueName, true, consumer); // 消费消息,队列名为临时队列名称,自动确认
}
public String call(String message) throws Exception {
String corrId = UUID.randomUUID().toString(); // 生成一个唯一的correlation ID
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(corrId) // 设置correlation ID
.replyTo(replyQueueName) // 设置回复队列名称
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes()); // 发布消息到请求队列
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // 获取下一条消息
if (delivery.getProperties().getCorrelationId().equals(corrId)) { // 检查correlation ID是否匹配
return new String(delivery.getBody(), "UTF-8"); // 返回消息内容
}
}
}
@Override
public void close() throws Exception {
connection.close(); // 关闭连接
}
public static void main(String[] argv) throws Exception {
try (RPCClient rpcClient = new RPCClient()) { // 创建并自动关闭RPCClient实例
String response = rpcClient.call("10"); // 调用远程方法,传递参数"10"
System.out.println(" [.] Got '" + response + "'"); // 打印响应
}
}
}
// ----------------------RPC 客户端代码 end---------------------
// ----------------------RPC 服务端代码 start---------------------
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue"; // RPC队列名称
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2); // 计算斐波那契数列
}
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // 设置连接的主机地址
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); // 声明RPC队列
channel.basicQos(1); // 设置QoS,每次只处理一条消息
System.out.println(" [x] Awaiting RPC requests"); // 打印等待RPC请求的消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(delivery.getProperties().getCorrelationId()) // 设置correlation ID
.build();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8"); // 获取消息内容
int n = Integer.parseInt(message); // 将消息内容转换为整数
System.out.println(" [.] fib(" + message + ")"); // 打印正在处理的斐波那契数列请求
response += fib(n); // 计算斐波那契数列并设置响应
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString()); // 打印异常信息
} finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); // 发布响应消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 确认消息处理完成
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, consumerTag -> {}); // 消费消息,队列名为RPC队列名称,手动确认
}
}
}
// ----------------------RPC 服务端代码 end---------------------
通过以上示例代码,我们可以看到 RabbitMQ 不同工作模式的实现方式。选择合适的模式能够提高系统的效率和健壮性。
四、基本使用
要在 Java 项目中使用 RabbitMQ,通常需要以下步骤:
1. 添加依赖
在 Maven 或 Gradle 项目中添加 RabbitMQ 客户端库依赖。
<!-- Maven -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
2. 配置连接
创建连接工厂并配置连接参数。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
3. 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
4. 声明交换机和队列
channel.exchangeDeclare("myExchange", "direct");
channel.queueDeclare("myQueue", false, false, false, null);
channel.queueBind("myQueue", "myExchange", "myRoutingKey");
5. 发送消息
String message = "Hello, RabbitMQ!";
channel.basicPublish("myExchange", "myRoutingKey", null, message.getBytes());
6. 接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received: " + message);
};
channel.basicConsume("myQueue", true, deliverCallback, consumerTag -> {});
7. 关闭连接
channel.close();
connection.close();