RabbitMQ核心概念与3.13新特性解析
很多刚接触消息队列的同学,一上来就懵:交换机、队列、绑定、路由键都是啥?换个角度看,你可以把RabbitMQ想象成一个快递站:生产者就是寄快递的人,交换机是快递分拣中心,队列是快递暂存货架,消费者就是取快递的收件人。消息从生产者发出来,先到交换机,交换机根据规则把消息扔到对应的队列里,消费者再从队列里拿消息处理。
咱们先唠唠核心组件。交换机(Exchange) 是消息路由的入口,它本身不存消息,只负责把收到的消息按照规则转发到队列。RabbitMQ原生支持四种交换机类型,这个面试也常考:
- Direct Exchange:直连交换机,消息带的路由键(Routing Key)和绑定队列时的路由键完全匹配,才会把消息发过去。比如路由键是
order.create,只有绑定了order.create的队列能收到。
- Topic Exchange:主题交换机,支持通配符匹配。
* 匹配一个单词,# 匹配零个或多个单词。比如路由键order.*能匹配order.create、order.pay,order.#能匹配order.create.pay。
- Fanout Exchange:扇形交换机,广播模式,不管路由键是啥,只要绑定到这个交换机的队列,全都能收到消息,适合群发通知场景。
- Headers Exchange:头交换机,不靠路由键,靠消息的Headers属性匹配,用的比较少,除非你有自定义头信息的特殊路由需求。
队列(Queue) 就是存消息的地方,消息在这儿等着被消费者拿走。队列可以设置持久化,这样RabbitMQ重启了,队列和里面的消息也不会丢。绑定(Binding) 就是交换机和队列的关联关系,相当于告诉交换机:你收到的消息,符合啥规则就往这个队列里扔。
核心要点,现在最新稳定版是RabbitMQ 3.13.2,2024年5月刚更新的,3.13.0是3月26日发布的。新版本里几个点特别值得关注:一是仲裁队列的性能又优化了,写入和消费吞吐量比之前高了不少,大规模集群下资源开销也降了;二是可观测性这块原生集成了OpenTelemetry,以后做分布式链路追踪不用自己瞎折腾了,指标导出直接支持;三是MQTT 5.0的支持更强了,做物联网项目的同学有福,海量设备消息通信更稳。还有云原生这块,Kubernetes Operator的部署能力也优化了,容器化弹性伸缩效率更高,现在不少公司都往K8s上迁RabbitMQ,这个特性很实用。
核心概念代码示例(Java声明组件)
下面是用RabbitMQ Java客户端声明交换机、队列、绑定的基础代码,直接能跑:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class RabbitMQCoreDemo {
private static final String DIRECT_EXCHANGE = "direct.order.exchange";
private static final String TOPIC_EXCHANGE = "topic.user.exchange";
private static final String FANOUT_EXCHANGE = "fanout.notify.exchange";
private static final String DIRECT_QUEUE = "order.create.queue";
private static final String TOPIC_QUEUE = "user.log.queue";
private static final String FANOUT_QUEUE = "notify.email.queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂,配置RabbitMQ地址
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/"); // 虚拟主机,默认是/
// 2. 建立连接
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 3. 声明Direct交换机,durable=true表示持久化
channel.exchangeDeclare(DIRECT_EXCHANGE, "direct", true, false, null);
// 4. 声明Topic交换机
channel.exchangeDeclare(TOPIC_EXCHANGE, "topic", true, false, null);
// 5. 声明Fanout交换机
channel.exchangeDeclare(FANOUT_EXCHANGE, "fanout", true, false, null);
// 6. 声明队列,durable=true持久化,exclusive=false不排他,autoDelete=false不自动删除
channel.queueDeclare(DIRECT_QUEUE, true, false, false, null);
channel.queueDeclare(TOPIC_QUEUE, true, false, false, null);
channel.queueDeclare(FANOUT_QUEUE, true, false, false, null);
// 7. 绑定队列到交换机:Direct交换机绑定路由键order.create
channel.queueBind(DIRECT_QUEUE, DIRECT_EXCHANGE, "order.create");
// Topic交换机绑定路由键user.*,匹配user开头的单个单词路由键
channel.queueBind(TOPIC_QUEUE, TOPIC_EXCHANGE, "user.*");
// Fanout交换机绑定,不需要路由键
channel.queueBind(FANOUT_QUEUE, FANOUT_EXCHANGE, "");
System.out.println("核心组件声明完成");
}
}
}
⚡ 效率提示
新手刚开始学的时候,别一上来就搞复杂的路由规则,先从Direct交换机练手,把消息收发流程跑通。还有,测试的时候尽量用单独的VHost,别用默认的/,避免和生产环境的队列、交换机冲突,权限也更好控制。3.13版本之后建议优先用仲裁队列替代老的镜像队列,高可用和性能都更靠谱,后面进阶章节会详细讲。
---
Docker快速部署与Management插件监控实战
很多同学本地搭RabbitMQ最头疼的就是环境配置,装Erlang、配环境变量,一不小心就报错。简单来说,现在都用Docker部署,一条命令就搞定,省得踩环境坑。而且RabbitMQ自带的Management插件是真的好用,可视化看队列、交换机、消息数量,不用敲一堆命令行,新手友好度拉满。
咱们先搞Docker部署,现在最新稳定版是3.13.2,所以拉镜像的时候直接指定版本,别用latest,避免以后版本升级出问题。命令很简单:
docker run -d --name rabbitmq3.13 \
-p 5672:5672 # 客户端通信端口,Java、Spring Boot连接用这个
-p 15672:15672 # Management插件访问端口,浏览器访问用这个
-p 25672:25672 # 集群通信端口,后面搭集群会用到
-p 61613:61613 # STOMP协议端口,需要的话可以开
-p 1883:1883 # MQTT协议端口,做物联网的话开这个
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=Admin123 \
rabbitmq:3.13.2-management
注意,这里用的镜像是rabbitmq:3.13.2-management,自带Management插件,不用自己再安装。如果是普通rabbitmq:3.13.2镜像,还得进容器执行rabbitmq-plugins enable rabbitmq_management装插件,麻烦。
容器启动之后,等个10秒左右,浏览器访问http://localhost:15672,输入刚才设的用户名admin、密码Admin123,就能进管理界面了。左边的菜单栏啥都有:
- Overview:总览,能看到节点状态、端口、集群信息、消息速率这些核心指标
- Connections:当前所有客户端连接,能看到哪个IP连的、连了多久、通道数
- Channels:通道,一个连接可以有多个通道,消息收发都是走通道的
- Exchanges:所有交换机,能看到类型、是否持久化、绑定关系
- Queues and Streams:所有队列,能看到队列长度、消息总数、消费者数量,还能手动发消息、删队列
- Admin:用户、VHost、权限管理,能新建用户、给VHost分配权限
比如你想看某个队列的消息积压情况,点进Queues菜单,找到对应的队列,点名字进去,往下拉就能看到Total messages:Ready是等待消费的消息,Unacked是已经发给消费者但还没确认的消息,这俩加起来就是积压的消息数。要是Ready数一直涨,说明消费者消费速度跟不上,要么加消费者实例,要么优化消费逻辑。
管理界面操作代码示例(REST API)
除了页面操作,Management插件还提供了REST API,比如你要在代码里获取所有队列的信息,可以这样写:
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import java.io.IOException;
public class RabbitMQMgmtApiDemo {
private static final String MGMT_URL = "http://localhost:15672/api/queues";
private static final String USERNAME = "admin";
private static final String PASSWORD = "Admin123";
public static void main(String[] args) throws IOException {
OkHttpClient client = new OkHttpClient();
// 构造请求,Basic Auth认证
Request request = new Request.Builder()
.url(MGMT_URL)
.header("Authorization", okhttp3.Credentials.basic(USERNAME, PASSWORD))
.get()
.build();
try (Response response = client.newCall(request).execute()) {
if (response.isSuccessful() && response.body() != null) {
String responseBody = response.body().string();
ObjectMapper mapper = new ObjectMapper();
JsonNode queuesNode = mapper.readTree(responseBody);
System.out.println("当前所有队列信息:");
for (JsonNode queueNode : queuesNode) {
String queueName = queueNode.get("name").asText();
int messages = queueNode.get("messages").asInt();
int consumers = queueNode.get("consumers").asInt();
System.out.printf("队列名:%s,总消息数:%d,消费者数:%d%n", queueName, messages, consumers);
}
} else {
System.out.println("获取队列信息失败,状态码:" + response.code());
}
}
}
}
这个代码需要引入OkHttp和Jackson依赖,Maven坐标:
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.12.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.16.1</version>
</dependency>
💡 经验总结
用Docker部署的时候,记得把数据目录挂载到宿主机,不然容器删了,队列和消息全没了。加个-v /data/rabbitmq:/var/lib/rabbitmq参数就行,把RabbitMQ的数据目录映射到宿主机的/data/rabbitmq。还有,默认用户guest只能本地访问,所以刚才我们新建了admin用户,生产环境一定要改默认密码,别用弱密码,之前有同学用默认密码被挖矿的盯上,血泪教训。另外3.13版本之后Management插件的指标导出支持OpenTelemetry,要是你用了Prometheus+Grafana做监控,可以直接配,不用自己写采集脚本。
---
Spring Boot 3集成RabbitMQ:四种交换机路由实战
现在Java后端基本都用Spring Boot,Spring Boot 3.x集成RabbitMQ那是相当丝滑,不用写一堆原生客户端的模板代码,靠RabbitTemplate发消息,@RabbitListener注解就能消费消息。咱们今天就把四种交换机都跑一遍,代码都是能直接复制到项目里用的。
首先得加依赖,Spring Boot 3.x对应的RabbitMQ starter是spring-boot-starter-amqp,Maven依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然后配application.yml,注意Spring Boot 3.x用的是spring.rabbitmq前缀,地址、端口、用户密码填你自己的:
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: Admin123
virtual-host: /
# 开启生产者确认,后面可靠性投递会用到
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
值得留意的是,,publisher-confirm-type: correlated是开启生产者确认回调,mandatory: true是消息路由不到队列的时候把消息返回给生产者,这两个配置后面保证消息不丢会用到。
接下来咱们分别实现四种交换机的路由场景,先定义常量,避免硬编码:
public class RabbitMQConstants {
// Direct交换机相关
public static final String DIRECT_EXCHANGE = "boot.direct.exchange";
public static final String DIRECT_QUEUE = "boot.direct.queue";
public static final String DIRECT_ROUTING_KEY = "order.pay";
// Topic交换机相关
public static final String TOPIC_EXCHANGE = "boot.topic.exchange";
public static final String TOPIC_QUEUE1 = "boot.topic.queue1";
public static final String TOPIC_QUEUE2 = "boot.topic.queue2";
public static final String TOPIC_ROUTING_KEY1 = "user.login";
public static final String TOPIC_ROUTING_KEY2 = "user.login.success";
// Fanout交换机相关
public static final String FANOUT_EXCHANGE = "boot.fanout.exchange";
public static final String FANOUT_QUEUE1 = "boot.fanout.queue1";
public static final String FANOUT_QUEUE2 = "boot.fanout.queue2";
// Headers交换机相关
public static final String HEADERS_EXCHANGE = "boot.headers.exchange";
public static final String HEADERS_QUEUE = "boot.headers.queue";
}
1. Direct交换机实战
Direct交换机是精确匹配路由键,咱们发送order.pay路由键的消息,只有绑定了这个路由键的队列能收到。
先写配置类声明组件:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectExchangeConfig {
// 声明Direct交换机
@Bean
public DirectExchange directExchange() {
// durable(true)持久化,autoDelete(false)不自动删除
return ExchangeBuilder.directExchange(RabbitMQConstants.DIRECT_EXCHANGE).durable(true).build();
}
// 声明队列
@Bean
public Queue directQueue() {
return QueueBuilder.durable(RabbitMQConstants.DIRECT_QUEUE).build();
}
// 绑定队列到交换机,指定路由键
@Bean
public Binding directBinding() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with(RabbitMQConstants.DIRECT_ROUTING_KEY);
}
}
然后写生产者,用RabbitTemplate发消息:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DirectProducerController {
private final RabbitTemplate rabbitTemplate;
public DirectProducerController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@GetMapping("/sendDirect")
public String sendDirect() {
String message = "订单支付成功,订单号:20240501001";
// 参数:交换机、路由键、消息内容
rabbitTemplate.convertAndSend(RabbitMQConstants.DIRECT_EXCHANGE, RabbitMQConstants.DIRECT_ROUTING_KEY, message);
return "Direct消息发送成功";
}
}
消费者用@RabbitListener注解监听队列:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectConsumer {
@RabbitListener(queues = RabbitMQConstants.DIRECT_QUEUE)
public void consume(String message) {
System.out.println("Direct消费者收到消息:" + message);
}
}
2. Topic交换机实战
Topic交换机支持通配符,*匹配一个单词,#匹配多个单词。咱们声明两个队列,队列1绑定user.*,队列2绑定user.#,发不同路由键的消息看效果。
配置类:
@Configuration
public class TopicExchangeConfig {
@Bean
public TopicExchange topicExchange() {
return ExchangeBuilder.topicExchange(RabbitMQConstants.TOPIC_EXCHANGE).durable(true).build();
}
@Bean
public Queue topicQueue1() {
return QueueBuilder.durable(RabbitMQConstants.TOPIC_QUEUE1).build();
}
@Bean
public Queue topicQueue2() {
return QueueBuilder.durable(RabbitMQConstants.TOPIC_QUEUE2).build();
}
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.*");
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#");
}
}
生产者发两个消息,一个路由键user.login,一个user.login.success:
@RestController
public class TopicProducerController {
private final RabbitTemplate rabbitTemplate;
public TopicProducerController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@GetMapping("/sendTopic")
public String sendTopic() {
// 路由键user.login,匹配user.* 和 user.#
rabbitTemplate.convertAndSend(RabbitMQConstants.TOPIC_EXCHANGE, "user.login", "用户登录");
// 路由键user.login.success,只匹配user.#
rabbitTemplate.convertAndSend(RabbitMQConstants.TOPIC_EXCHANGE, "user.login.success", "用户登录成功");
return "Topic消息发送成功";
}
}
消费者:
@Component
public class TopicConsumer {
@RabbitListener(queues = RabbitMQConstants.TOPIC_QUEUE1)
public void consume1(String message) {
System.out.println("Topic队列1(user.*)收到消息:" + message);
}
@RabbitListener(queues = RabbitMQConstants.TOPIC_QUEUE2)
public void consume2(String message) {
System.out.println("Topic队列2(user.#)收到消息:" + message);
}
}
运行之后你会发现,user.login的消息两个队列都能收到,user.login.success只有队列2能收到,这就是通配符的作用。
3. Fanout交换机实战
Fanout是广播,不管路由键,绑定到交换机的队列全都能收到。配置类:
@Configuration
public class FanoutExchangeConfig {
@Bean
public FanoutExchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange(RabbitMQConstants.FANOUT_EXCHANGE).durable(true).build();
}
@Bean
public Queue fanoutQueue1() {
return QueueBuilder.durable(RabbitMQConstants.FANOUT_QUEUE1).build();
}
@Bean
public Queue fanoutQueue2() {
return QueueBuilder.durable(RabbitMQConstants.FANOUT_QUEUE2).build();
}
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
}
生产者发消息,路由键随便写都没用:
@RestController
public class FanoutProducerController {
private final RabbitTemplate rabbitTemplate;
public FanoutProducerController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@GetMapping("/sendFanout")
public String sendFanout() {
String message = "系统更新通知,2024年5月1日停机维护";
// Fanout交换机路由键没用,随便传
rabbitTemplate.convertAndSend(RabbitMQConstants.FANOUT_EXCHANGE, "", message);
return "Fanout消息发送成功";
}
}
两个队列都能收到消息,适合发通知、更新缓存这种场景。
4. Headers交换机实战
Headers交换机靠消息的Headers属性匹配,不是路由键。比如我们发消息的时候加个Headers的type=email,只有绑定了type=email的队列能收到。
配置类:
@Configuration
public class HeadersExchangeConfig {
@Bean
public HeadersExchange headersExchange() {
return ExchangeBuilder.headersExchange(RabbitMQConstants.HEADERS_EXCHANGE).durable(true).build();
}
@Bean
public Queue headersQueue() {
return QueueBuilder.durable(RabbitMQConstants.HEADERS_QUEUE).build();
}
@Bean
public Binding headersBinding() {
// 匹配Headers里type=email的消息
return BindingBuilder.bind(headersQueue()).to(headersExchange()).where("type").matches("email");
}
}
生产者发消息的时候设置Headers:
@RestController
public class HeadersProducerController {
private final RabbitTemplate rabbitTemplate;
public HeadersProducerController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@GetMapping("/sendHeaders")
public String sendHeaders() {
String message = "您有新的邮件通知";
// 设置消息属性,加Headers
org.springframework.amqp.core.MessageProperties properties = new org.springframework.amqp.core.MessageProperties();
properties.setHeader("type", "email");
org.springframework.amqp.core.Message msg = new org.springframework.amqp.core.Message(message.getBytes(), properties);
rabbitTemplate.send(RabbitMQConstants.HEADERS_EXCHANGE, "", msg);
return "Headers消息发送成功";
}
}
消费者就能收到这个消息,这种场景用的少,除非你有自定义头信息的特殊路由需求。
🔧 实战技巧
Spring Boot 3.x集成的时候,别忘了在启动类上加@EnableRabbit注解,开启RabbitMQ的监听功能,不然@RabbitListener不生效。还有,生产环境建议给队列加个前缀,比如prod.order.queue,避免不同环境的队列冲突。另外3.13版本的RabbitMQ和Spring Boot 3.x兼容性很好,不用额外搞版本适配,放心用。
---
进阶:仲裁队列、死信队列与延迟消息插件应用
前面讲的都是基础用法,实际项目里肯定会遇到这些问题:怎么保证队列高可用?消息消费失败了怎么办?怎么实现延迟30分钟关单?这些就得用到仲裁队列、死信队列和延迟消息插件了,这些都是面试核心,也是生产环境必用的特性。
先说仲裁队列(Quorum Queue),这个是RabbitMQ 3.8之后推出来的,用来替代老的镜像队列。老的镜像队列有脑裂问题,而且性能一般,仲裁队列基于Raft协议实现,一致性更强,自动选举主节点,挂了自动切换,吞吐量也比镜像队列高。现在3.13版本又优化了仲裁队列的写入和消费性能,大规模集群下资源开销更低,官方都建议优先用仲裁队列。
怎么声明仲裁队列?很简单,用QueueBuilder的quorum()方法就行,Spring Boot里这么写:
@Bean
public Queue quorumQueue() {
// 声明仲裁队列,x-queue-type=quorum
return QueueBuilder.durable("order.quorum.queue")
.withArgument("x-queue-type", "quorum")
.build();
}
核心要点,仲裁队列不支持消息TTL和最大长度限制,要是你需要这些特性,得用普通队列,或者结合死信队列实现。
然后是死信队列(DLQ),“死掉的消息”去的队列。消息变成死信有三种情况:
- 消息被消费者拒绝(basic.reject/ basic.nack)并且不重新入队(requeue=false)
- 消息过期(TTL到期)
- 队列达到最大长度,最早的消息会被挤出去变成死信
死信队列一般用来处理失败的消息,比如订单关单之后库存扣减失败,就把消息扔到死信队列,后面人工处理或者重试。配置死信队列需要先声明一个普通队列作为死信队列,然后声明业务队列的时候指定死信交换机和死信路由键:
@Configuration
public class DlqConfig {
public static final String BUSINESS_QUEUE = "order.business.queue";
public static final String DLQ_EXCHANGE = "order.dlq.exchange";
public static final String DLQ_QUEUE = "order.dlq.queue";
public static final String DLQ_ROUTING_KEY = "order.dlq";
// 声明死信交换机(Direct类型)
@Bean
public DirectExchange dlqExchange() {
return ExchangeBuilder.directExchange(DLQ_EXCHANGE).durable(true).build();
}
// 声明死信队列
@Bean
public Queue dlqQueue() {
return QueueBuilder.durable(DLQ_QUEUE).build();
}
// 绑定死信队列到死信交换机
@Bean
public Binding dlqBinding() {
return BindingBuilder.bind(dlqQueue()).to(dlqExchange()).with(DLQ_ROUTING_KEY);
}
// 声明业务队列,指定死信交换机和路由键
@Bean
public Queue businessQueue() {
return QueueBuilder.durable(BUSINESS_QUEUE)
.withArgument("x-dead-letter-exchange", DLQ_EXCHANGE) // 死信交换机
.withArgument("x-dead-letter-routing-key", DLQ_ROUTING_KEY) // 死信路由键
.withArgument("x-message-ttl", 30000) // 消息30秒过期,变成死信
.build();
}
}
生产者发消息到业务队列,30秒没被消费就会变成死信,进入死信队列,消费者监听死信队列就能处理这些失败消息:
@Component
public class DlqConsumer {
// 监听死信队列
@RabbitListener(queues = DlqConfig.DLQ_QUEUE)
public void consumeDlq(String message) {
System.out.println("收到死信消息,需要处理:" + message);
// 这里可以写重试逻辑或者告警通知
}
}
接下来是延迟消息,RabbitMQ本身不支持延迟消息,官方出了个延迟消息插件rabbitmq-delayed-message-exchange,3.13版本也兼容这个插件。怎么用?首先得装插件,Docker部署的话直接进容器执行:
# 进RabbitMQ容器
docker exec -it rabbitmq3.13 bash
# 下载延迟插件,对应3.13版本
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
# 把插件放到plugins目录
mv rabbitmq_delayed_message_exchange-3.13.0.ez /opt/rabbitmq/plugins/
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 退出容器重启RabbitMQ
rabbitmqctl stop_app && rabbitmqctl start_app
装完之后,声明延迟交换机的时候类型是x-delayed-message,还要指定原来的交换机类型,比如topic:
@Configuration
public class DelayConfig {
public static final String DELAY_EXCHANGE = "delay.exchange";
public static final String DELAY_QUEUE = "delay.queue";
public static final String DELAY_ROUTING_KEY = "order.close";
// 声明延迟交换机,类型是x-delayed-message,参数x-delayed-type指定原来的交换机类型
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "topic");
return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
}
@Bean
public Queue delayQueue() {
return QueueBuilder.durable(DELAY_QUEUE).build();
}
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY).noargs();
}
}
发送延迟消息的时候,要设置x-delay头,单位是毫秒,比如延迟30分钟关单,就是30*60*1000=1800000毫秒:
@RestController
public class DelayProducerController {
private final RabbitTemplate rabbitTemplate;
public DelayProducerController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@GetMapping("/sendDelay")
public String sendDelay() {
String message = "订单号:20240501001,30分钟后关单";
// 设置延迟时间,单位毫秒
org.springframework.amqp.core.MessageProperties properties = new org.springframework.amqp.core.MessageProperties();
properties.setHeader("x-delay", 1800000); // 30分钟
org.springframework.amqp.core.Message msg = new org.springframework.amqp.core.Message(message.getBytes(), properties);
rabbitTemplate.send(DELAY_EXCHANGE, DELAY_ROUTING_KEY, msg);
return "延迟消息发送成功,30分钟后消费";
}
}
消费者监听延迟队列,30分钟后就能收到消息,实现关单逻辑。
📌 要点提醒
现在社区里很多人纠结用延迟插件还是死信队列+TTL实现延迟,打个比方,延迟插件的精度更高,支持动态延迟时间,但是需要装插件;死信+TTL不用装插件,但是延迟时间固定,而且如果队列里前面的消息TTL长,后面的消息TTL短,后面的消息会等前面的消息过期才能被消费,有延迟偏差。生产环境如果延迟时间不固定,优先用延迟插件,3.13版本的延迟插件稳定性已经很好了,放心用。还有,仲裁队列现在已经是主流,新项目别再用镜像队列了,老的镜像队列可以慢慢迁移到仲裁队列,避免脑裂问题。消息消费的时候一定要做幂等性设计,比如用订单号做唯一键,避免重复消费的问题,这个面试也常考。
5. 高可用架构:集群搭建、脑裂问题与性能调优
搞分布式系统,最怕的就是单点故障。你辛辛苦苦搭了个MQ,结果服务器一挂,整个业务链路都断了,这锅你背不动。所以,生产环境必须上集群。换个角度看,就是多搞几台机器,数据互备,一台挂了另一台顶上。
现在的RabbitMQ(咱们以最新的 3.13.2 稳定版为例)在高可用这块已经玩得很溜了。以前大家用镜像队列(Mirrored Queue),虽然能同步数据,但性能开销大,还容易脑裂。现在官方主推的是仲裁队列(Quorum Queue)。这玩意儿基于Raft协议,一致性更强,数据可靠性更高,虽然吞吐量比不上普通队列,但对于核心业务数据,绝对是首选。
集群搭建实战
搭建集群其实没那么玄乎。假设你有两台机器,node1 和 node2。
- 保证Cookie一致:Erlang节点间通信靠一个
.erlang.cookie文件,必须保证两台机器的内容一模一样。
- 修改hosts:确保两台机器能通过域名ping通。
- 加入集群:在
node2 上执行命令,让它加入 node1。
这里给个Docker Compose的示例,方便你在本地快速起一个集群环境测试,这比手动装环境快多了:
version: '3.8'
services:
rabbitmq1:
image: rabbitmq:3.13.2-management
hostname: rabbitmq1
ports:
- "15672:15672"
- "5672:5672"
environment:
RABBITMQ_ERLANG_COOKIE: "SECRET_CLUSTER_COOKIE"
RABBITMQ_NODENAME: "rabbit@rabbitmq1"
volumes:
- ./data1:/var/lib/rabbitmq
rabbitmq2:
image: rabbitmq:3.13.2-management
hostname: rabbitmq2
ports:
- "15673:15672"
- "5673:5672"
environment:
RABBITMQ_ERLANG_COOKIE: "SECRET_CLUSTER_COOKIE"
RABBITMQ_NODENAME: "rabbit@rabbitmq2"
volumes:
- ./data2:/var/lib/rabbitmq
depends_on:
- rabbitmq1
启动后,进入 rabbitmq2 的容器内部,执行以下命令加入集群:
# 停止RabbitMQ应用
rabbitmqctl stop_app
# 重置节点(如果是新节点可以不执行,但加了保险)
rabbitmqctl reset
# 加入 rabbitmq1 集群
rabbitmqctl join_cluster rabbit@rabbitmq1
# 启动应用
rabbitmqctl start_app
搞定后,访问 http://localhost:15672,如果能在Nodes里看到两个节点,恭喜你,集群搭好了。
脑裂问题与仲裁队列
以前用镜像队列最头疼的就是脑裂(Split-Brain)。比如网络抖动,两个节点觉得对方挂了,都把自己当老大,结果数据冲突,恢复后还得手动合并,贼麻烦。
现在的仲裁队列就是为了解决这个问题的。它要求大多数节点(Quorum)存活才能工作。比如3个节点的集群,挂了1个还能玩,挂了2个就只读不写。这虽然牺牲了点可用性,但保证了数据一致性。
在代码里声明仲裁队列很简单,用Spring Boot的话,加个参数就行:
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class QuorumQueueConfig {
// 关键点:x-queue-type 设置为 quorum
@Bean
public Queue orderQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
// 设置死信交换机,增强健壮性
args.put("x-dead-letter-exchange", "dlx.exchange");
return new Queue("order.pay.queue", true, false, false, args);
}
}
性能调优建议
- 别把集群当存储:RabbitMQ毕竟是队列,消息进进出出,别积压太多。如果消息堆积百万级,性能会直线下降。
- 调整帧大小(Frame Size):如果是大消息传输,可以适当调大
amqp-frame-max,减少网络交互次数。
- 使用合适的交换机:如果是广播,用Fanout,别用Topic,因为Topic要匹配路由键,有性能损耗。
⚡ 效率提示:生产环境建议至少部署3个节点组成仲裁队列集群。别搞那种“伪集群”,比如把三个实例都部署在同一台物理机上,那机器一挂全完蛋,高可用就成笑话了。
---
6. :消息丢失、重复消费与积压解决方案
做消息队列,最经典的三个问题就是:消息丢了怎么办?消息重复消费怎么办?消息堆满了怎么办?这章咱们就聊聊这些“坑”,以及怎么填。
消息丢失:三重防护
消息丢了,那是生产事故。要保证不丢,得从三个维度入手:生产者、队列、消费者。
- 生产者确认(Publisher Confirm):别发了就不管。开启Confirm机制,MQ收到消息后会回调告诉你“收到啦”。
- 队列持久化(Durability):队列和消息都要设置持久化。不然MQ重启,内存里的数据全没了。
- 消费者手动ACK:千万别用自动ACK。自动ACK意味着消息一发出去就认为成功了,要是你业务还没处理完消费者就挂了,消息就丢了。得等业务处理完,手动发ACK。
看个Spring Boot里保证可靠投递的代码示例,这里用了RabbitTemplate的ConfirmCallback:
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import jakarta.annotation.PostConstruct;
@Service
public class ReliableMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
// 设置生产者确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
// 其实,这里就是消息没到MQ,得记录日志或者重发
System.err.println("消息发送失败,ID: " + correlationData.getId() + ", 原因: " + cause);
// 这里可以写重发逻辑,或者存入数据库定时扫描重发
} else {
System.out.println("消息发送成功,ID: " + correlationData.getId());
}
}
});
}
public void sendImportantMessage(String msg) {
// 构造CorrelationData,用于追踪消息
CorrelationData correlationData = new CorrelationData("msg_" + System.currentTimeMillis());
// 发送消息,这里指定了持久化模式(MessageProperties.PERSISTENT_TEXT_PLAIN)
rabbitTemplate.convertAndSend("order.exchange", "order.key", msg, m -> {
// 设置消息持久化
m.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return m;
}, correlationData);
}
}
重复消费:幂等性是王道
网络抖动或者消费者重启,都可能导致消息重复投递。解决重复消费,核心就是幂等性设计。可以这么理解,就是无论你同一个请求执行多少次,结果都一样。
比如你处理支付回调,可以根据bizId(业务唯一ID)去数据库查,如果已经处理过了,直接返回成功,别再扣一次钱。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Component
public class OrderConsumer {
@Autowired
private OrderService orderService;
@RabbitListener(queues = "order.pay.queue")
@Transactional
public void processOrder(String orderId) {
// 1. 先查数据库,看这个订单是不是已经处理过了
Order order = orderService.getById(orderId);
if (order != null && "PAID".equals(order.getStatus())) {
// 已经是支付状态了,说明之前处理过,直接ACK掉,别再重复处理了
System.out.println("订单已处理,跳过重复消息: " + orderId);
return;
}
// 2. 如果没处理,执行核心业务逻辑
boolean success = orderService.handlePayment(orderId);
if (success) {
System.out.println("支付处理成功: " + orderId);
} else {
// 失败可以抛异常,让MQ重试,或者进入死信队列
throw new RuntimeException("处理失败,等待重试");
}
}
}
消息积压:削峰填谷
秒杀场景下,瞬间涌入几万请求,消费者处理不过来,消息就积压了。这时候怎么办?
- 增加消费者:这是最直接的,多开几个服务实例,水平扩容。
- 限流(Prefetch):在消费者端设置
prefetch=1,意思是MQ一次只给我发一条,我处理完再要下一条。别一下子把几万条消息都塞给一个消费者,直接把内存撑爆。
- 优化消费逻辑:看看消费逻辑里有没有慢SQL或者外部接口调用,优化代码。
📌 要点提醒:一定要监控队列长度。RabbitMQ 3.13.x 版本的管理界面里可以很直观地看到队列积压情况。如果积压严重,且消费者已经扩容到极限了,那就得考虑是不是业务逻辑太重,能不能把非核心逻辑拆出去,或者是不是该优化数据库索引了。千万别等到MQ磁盘写满,那时候整个服务就不可用了。
---
7. 总结:云原生趋势下的RabbitMQ最佳实践
技术一直在迭代,RabbitMQ也在不断进化。咱们不能只盯着那几个基础功能用,得看看现在的趋势。根据2024年的技术风向,RabbitMQ正在往云原生和可观测性方向大步迈进。
拥抱云原生:Kubernetes Operator
以前部署MQ还得自己写一堆YAML,现在有了 RabbitMQ Kubernetes Operator。这玩意儿简直是运维福音,它能帮你自动管理集群的生命周期。你想扩容?改个配置文件里的 replicas 数字就行,Operator自动帮你搞定。
现在的版本对容器化支持非常友好,3.13.2 版本在资源占用和启动速度上都有优化。如果你在跑K8s,强烈建议用Operator部署,别再手动搞StatefulSet了,太容易出岔子。
可观测性:拥抱OpenTelemetry
以前排查问题,只能去Management界面看队列数,或者去翻日志,效率低得要命。现在的趋势是集成 OpenTelemetry。RabbitMQ正在增强这方面的能力,以后你可以直接看到消息从生产者发出,经过MQ,再到消费者的完整链路追踪。
这意味着什么?意味着哪个环节慢了,哪个消息丢了,在APM(比如Jaeger、SkyWalking)里一目了然。这对于分布式系统排错简直是降维打击。
物联网(IoT)与MQTT 5.0
如果你在做物联网项目,RabbitMQ的 MQTT 支持绝对是个大杀器。它现在不仅支持MQTT 3.1.1,还在强化 MQTT 5.0 的支持。这意味着你可以用它来对接海量的边缘设备,统一做消息接入。不用再为了IoT单独搞一套EMQX,除非你的并发量真的到了百万级。
最佳实践清单
结合我这5年的实际案例经验,给你总结几条在云原生时代使用RabbitMQ的最佳实践:
- 首选仲裁队列(Quorum Queue):除非你对性能要求极高且能容忍数据丢失,否则核心业务一律用仲裁队列。镜像队列已经是过去式了,官方也不推荐新项目用。
- 善用延迟插件:虽然可以用死信+TTL实现延迟,但那个不直观且TTL管理麻烦。直接用官方的
rabbitmq-delayed-message-exchange 插件,省心。
- VHost隔离:别把所有业务都扔到一个
/ 根目录下。用VHost做逻辑隔离,比如 order-vhost、user-vhost,权限也好控制。
- 连接池管理:在代码里,别每次发消息都建个连接。用连接池(比如Spring Boot默认管理的就是单例的
CachingConnectionFactory),减少TCP握手开销。
最后,给个简单的Spring Boot配置示例,展示一下如何配置连接池和监听容器,这是生产环境的标准姿势:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /order-vhost
# 开启生产者确认
publisher-confirm-type: CORRELATED
publisher-returns: true
template:
mandatory: true
listener:
simple:
# 手动ACK,防止消息丢失
acknowledge-mode: manual
# 预取数量,根据消费能力调整,别太大
prefetch: 10
# 并发消费者数量,动态调整
concurrency: 5
max-concurrency: 20
可以这么理解,RabbitMQ虽然老了点,但依然很能打。只要用对了姿势,它依然是你架构里最稳健的那块基石。别盲目追新,也别固步自封,根据业务场景选最合适的特性,这才是老司机的做法。