什么是消息队列
用一个最简单的生活类比:去餐厅吃饭。
- 没有 MQ
(同步通信):你(客户端)点完菜,必须站在厨房门口盯着厨师(服务端)把菜做好,端走后才能去干别的事。如果厨师动作慢,你就被“卡”住了。
- 有 MQ
(异步通信):你把点菜单交给服务员(MQ)。服务员把单子贴在后厨的墙上(队列)。你就可以回座位玩手机了。厨师做完一道菜,就从墙上撕下一个单子继续做。
技术上的定义:
消息队列是一个存放消息的容器。
- 生产者
(Producer):发送消息的程序(比如:点餐系统)。
- 消费者
(Consumer):从队列中读取并处理消息的程序(比如:后厨系统)。
- Broker:消息队列的服务端本身,负责接收、存储和转发消息。
消息队列(Message Queue, MQ)
是一种进程间通信(IPC)*或*服务间通信的中间件机制。它通过提供异步通信协议,允许发送者(Producer)和接收者(Consumer)在不同的时间、不同的进程甚至不同的网络环境下进行数据交换。
为什么要用 MQ?
MQ 主要是为了解决三个问题:
1. 解耦 (Decoupling)
- 场景:系统 A 下单后,需要通知系统 B(库存)、系统
C(积分)、系统 D(短信)。
- 问题:如果不用 MQ,A 必须调用 B、C、D 的接口。如果
D 挂了,A 也会报错;如果后面加个系统 E,A 又要改代码。
- MQ 方案:A 下单后,往 MQ
扔一条消息“有人下单了”,然后就不管了。B、C、D 自己去 MQ
里监听这条消息。哪怕 D 挂了,A 也不受影响。
image-20251217163458817
2. 异步 (Asynchronous)
- 场景:用户注册,需要写数据库(50ms) + 发邮件(50ms) +
发短信(50ms)。总共耗时 150ms。
- MQ 方案:写完数据库(50ms)后,往 MQ
发个消息(5ms)就直接告诉用户“注册成功”。邮件和短信服务自己在后台慢慢消费消息去发送。响应时间从
150ms 降到了 55ms。
image-20251217163433856
3. 削峰 (Peak Shaving / Load
Leveling)
- 场景:秒杀活动,平时每秒 10 个请求,秒杀时每秒 5000
个请求。数据库只能抗 2000 个,直接崩了。
- MQ 方案:把 5000 个请求全部打入 MQ(MQ
的写入性能通常极高)。后台系统按照自己的能力(比如每秒处理 2000
个)慢慢从 MQ 里拉取处理。就像水库蓄水一样,保护下游系统不被冲垮。
image-20251217163517525
市面上主流的 MQ 选型
| 特性 |
RabbitMQ |
RocketMQ |
Kafka |
| 主要特点 |
稳定、功能全 |
金融级可靠、高吞吐 |
极高吞吐、大数据 |
| 开发语言 |
Erlang |
Java |
Scala/Java |
| 单机吞吐量 |
万级 |
十万级 |
百万级 |
| 消息延迟 |
微秒级 (极快) |
毫秒级 |
毫秒级 |
| 适用场景 |
中小型公司,对实时性要求高,数据量没那么大。 |
阿里出品,适合复杂的业务系统(如电商交易),高可靠。 |
日志收集、大数据实时计算、用户行为追踪。 |
| 缺点 |
Erlang 语言难维护,吞吐量相对低。 |
社区主要在国内。 |
某些配置下可能丢数据,不适合极其严苛的金融交易。 |
Exchange(交换器)
在消息队列(特别是基于 AMQP 协议 的实现,如
RabbitMQ)中,交换器 (Exchange)
是核心组件之一。
如果说 Queue(队列)是存储消息的“仓库”,那么
Exchange(交换器)就是负责分拣和投递的“路由器”。
在专业的 AMQP 架构中,生产者 (Producer)
绝不会直接把消息发送到队列中,而是发送给交换器。交换器根据既定的路由规则
(Routing Key),将消息分发到一个或多个队列中。
核心机制:Binding 与 Routing
Key
理解交换器,必须先理解两个概念:
- Binding (绑定):这是连接 Exchange 和 Queue
的纽带。它告诉交换器:“如果你收到了消息,请把它按这条路径转给这个队列。”
- Routing Key
(路由键):生产者发送消息时带的一个“标签”。交换器会拿着这个标签,去和
Binding 规则做匹配。
数据流向:
Producer –> Message + RoutingKey –>
Exchange –> (匹配逻辑) –> Queue
–> Consumer
RabbitMQ 的工作模式
第一类:基础队列模式 (点对点)
这两种模式主要利用队列“存储转发”的特性,通常不需要显式配置复杂的
Exchange(交换机)。
1. 简单模式 (Simple / Hello
World)
- 架构:
P (生产者) ->
Queue (队列) -> C (消费者)
- 机制:最原始的模式。一个生产者对应一个消费者。
- 场景:简单的“短信发送”任务。程序 A 产生内容,程序 B
发送,两者不需要同时在线。
2. 工作队列模式 (Work Queues)
- 架构:
P -> Queue ->
C1, C2…
- 机制:竞争消费。一个队列对应多个消费者,但一条消息只能被一个消费者抢到。
- 核心逻辑:
- 轮询 (Round-robin):默认情况下,RabbitMQ
会依次把消息分给每个消费者(你一条,我一条)。
- 公平分发 (Fair Dispatch):通过设置
prefetch=1,让“忙碌”的消费者不接新单,把消息给“空闲”的消费者(即:谁处理得快谁多干活)。
- 场景:集群削峰。比如大促期间的订单处理,启动
100 个订单处理服务(Worker)去消费同一个订单队列,加快处理速度。
第二类:高级发布订阅模式
(Publish/Subscribe)
这类模式引入了 Exchange (交换机)
的概念,实现了“一次发送,多处接收”。区别在于路由规则的不同。
3. 发布/订阅模式
(Publish/Subscribe - Fanout)
- 架构:
P ->
Exchange (Fanout) -> Queue A,
Queue B -> C1, C2
- 机制:广播。生产者把消息发给交换机,交换机把它复制给所有绑定到它身上的队列。
- 特点:速度最快,因为它完全忽略 Routing
Key,闭着眼转发。
- 场景:数据同步或日志广播。比如“修改密码”事件,既要发给“短信队列”通知用户,又要发给“审计队列”记录日志。
4. 路由模式 (Routing - Direct)
- 架构:
P ->
Exchange (Direct) -> Queue A (error),
Queue B (info)
- 机制:精准匹配。发送消息时携带
Routing Key(比如 “error”),交换机只把消息投递给绑定了
“error” Key 的队列。
- 场景:日志分级存储。
- 消费者 A 只想接收
error 级别的日志写磁盘(绑定
key=“error”)。
- 消费者 B 想接收所有级别的日志打印控制台(绑定 key=“info”, “warning”,
“error”)
image-20251217182529481
5. 主题模式 (Topics - Topic)
- 架构:
P ->
Exchange (Topic) -> Queue
- 机制:通配符匹配。这是最灵活的模式。
#:匹配 0 个或多个单词。
*:匹配 1 个单词。
- 例子:
- 发送 Key:
usa.news
- 队列 A 绑定:
usa.# (接收美国的所有消息)
- 队列 B 绑定:
#.news (接收全世界的新闻)
- 场景:复杂业务路由。比如外卖系统,按区域(北京.海淀)、按品类(食品.奶茶)进行多维度的消息分发。
image-20251217182552430
Quorum 队列
1. 为什么要发明 Quorum
队列?(历史背景)
在 Quorum 队列出现之前,RabbitMQ
想要实现“一台机器挂了数据不丢”,用的是 镜像队列 (Mirrored
Queues)。
老镜像队列的致命痛点:
- 同步风暴:当一个新节点加入集群时,它需要从老节点把所有数据复制过来。这个过程会导致整个集群卡顿(Stop-the-world),甚至导致集群崩溃。
- 效率低下:它采用的是“链式复制”或者简单的广播,一条消息要在所有节点间转圈圈,性能随着节点数增加而剧烈下降。
- 即将被废弃:RabbitMQ
官方已经宣布,在未来的版本(4.0)中将彻底删除镜像队列。
所以,Quorum 队列就是为了“接班”而来的。
2. Quorum 队列的核心原理:Raft
算法
“Quorum”这个词的本意是“法定人数”(也就是多数派)。
它的核心逻辑不再是“所有人都必须收到消息”,而是“只要大多数人收到消息,这事儿就成了”。它基于著名的分布式一致性算法
Raft。
工作机制图解:
假设你的集群有 3 个节点(Node A, Node B, Node C)。
- Leader 选举:三个节点通过投票,选出 Node A 作为
Leader,B 和 C 是 Follower。
- 写消息:
- 生产者把消息发给 Leader (A)。
- A 把消息写入自己的日志,并同时发给 B 和 C。
- 关键点:只要 B 或者 C 其中有一个
回复“我收到了”(加上 A 自己,就是 2 票,满足 3 票中的多数派),A
就认为这条消息写入成功。
- A 返回 ACK 给生产者。
- 故障切换:
- 如果 Leader (A) 挂了。
- B 和 C 发现老大不在了,迅速发起新一轮投票。
- 因为 B 和 C 都是活着的(2 > 3/2),它们能立刻选出新的
Leader,继续工作。
image-20251217190353627
常见问题
如果重启rabbitmq,出现消息丢失问题如何解决
核心原因是默认情况下 RabbitMQ
是将数据存储在内存中的。一旦进程关闭或服务器重启,内存数据就会被清空。
要解决这个问题,必须配置 “持久化”
(Persistence)。
但这不仅仅是改一个配置那么简单。要保证消息绝对不丢,你需要同时满足
三个层面的持久化(缺一不可):
1. 交换器的持久化 (Exchange
Durability)
如果你只持久化了队列和消息,但交换器没持久化。重启后,交换器没了,生产者发消息时找不到交换器,消息就会直接报错或丢弃。
2. 队列的持久化 (Queue
Durability)
如果队列不持久化,重启后队列元数据会消失,依附于该队列的消息(无论消息本身是否持久化)都会一起消失。
3. 消息的持久化 (Message
Persistence)
这是最容易被遗忘的一步。即便队列还在,如果消息本身是“瞬态”的,重启后队列是空的。
进阶:这样就 100% 安全了吗?
不是的。
即便你做到了以上三点,依然存在两个极端情况会导致丢失:
- 漏洞 1:消息刚到内存,还没来得及刷盘 RabbitMQ
为了性能,不会每收到一条消息就立马写硬盘(fsync),而是先存缓存区。如果这时候断电了,缓存区里的几条消息就丢了。
- 解决方案:发布确认机制 (Publisher
Confirms)。 生产者开启 Confirm 模式。只有当 RabbitMQ
明确告诉你“我已经把这条消息存入硬盘了”(Handle
Ack),你才算发送成功。如果超时未收到 Ack,生产者需要重发。
- 漏洞 2:磁盘坏了 / 物理机报废
如果单台机器硬盘物理损坏,持久化也没用。
- 解决方案:镜像队列 (Mirrored
Queues) 或 仲裁队列 (Quorum Queues)。
这是集群层面的高可用。将消息复制到 3
台不同的机器上。挂掉一台,另外两台还有数据。
如何解决同一个消息被消费多次的问题
这是一个非常经典且必须解决的分布式系统问题。在专业术语中,解决这个问题的方法叫做实现接口的“幂等性”
(Idempotency)。
简单来说,幂等性意味着:无论我对同一个消息处理多少次,最终的结果都和处理一次是一样的。
在 RabbitMQ(以及大多数 MQ)的设计中,为了保证消息不丢,默认采用的是
“至少投递一次” (At-Least-Once) 策略。
- 场景还原:消费者把钱扣了,正准备告诉 MQ
“我办完了(ACK)”,结果网线断了或进程崩了。
- 后果:MQ 没收到
ACK,以为你没办完,于是把消息重新发给另一个消费者。结果:扣了两次钱。
要解决这个问题,不能依赖
MQ,必须由消费者(Consumer)在业务逻辑层面来保证。以下是三种最主流的工程实现方案:
方案一:利用数据库的唯一约束
(最强硬方案)
这是最简单、最可靠的方法,适用于新增数据(Insert)的场景。
- 原理:利用数据库(MySQL/Oracle)的主键(Primary
Key)或唯一索引(Unique Key)约束。
- 做法:
- 每条消息必须携带一个全局唯一的 ID(比如
message_id 或者业务上的 order_id)。
- 消费者尝试向数据库插入数据。
- 如果插入成功 -> 处理结束,发送 ACK。
- 如果插入失败(报
DuplicateKeyException) ->
说明已经处理过了,直接忽略,发送 ACK。
方案二:利用 SQL
的条件更新 (状态机方案)
适用于更新数据(Update)的场景,比如更新订单状态。
方案三:Redis 去重表
(最高性能方案)
如果你的业务不涉及数据库,或者并发量极高,可以用 Redis
做“去重记录表”。
如何处理消息乱序的问题
在 RabbitMQ
中,单个队列由单个消费者消费时,是严格保证先进先出(FIFO)的。
但是,为了提升性能,我们通常会开启多个消费者(Competing
Consumers
Pattern)同时消费同一个队列,或者发生消息重试(Nack/Requeue)。这时候,顺序就乱了。
场景举例: 生产者依次发了三条关于“订单
A”的消息:
INSERT (创建订单)
UPDATE (支付订单)
DELETE (删除订单)
如果有两个消费者 C1 和 C2。 C1 拿到了 INSERT,C2 拿到了
UPDATE。 C2 的网速很快,先处理完
UPDATE。结果数据库报错“找不到订单”,操作失败。然后 C1 才把
INSERT 做完。
结果:数据不一致,业务崩盘。
解决这个问题的核心思路是:我们不需要“全局有序”,只需要“局部有序”(即:保证同一个
ID 的消息是有序的即可,不同 ID 之间的顺序无所谓)。
对于 Python
开发者以及大多数分布式系统来说,解决消息乱序最稳健、最通用的方案就是:拆分
Queue + 一致性 Hash (Queue Sharding)。
核心方案:拆分 Queue + 一致性
Hash
这个方案的核心逻辑是:我们不需要“全局有序”,只需要保证“同一业务
ID 的消息有序”。
只要保证同一个订单(例如
Order_1001)的所有操作(下单、支付、发货)都严格进入同一个队列,并且被同一个消费者处理,那么顺序就绝对不会乱。
1. 架构设计图解
我们要把原来的“一个大队列”拆分成 N 个“小队列”。
- 原来的模型(会乱序):
Producer ->
Queue -> Consumer A,
Consumer B (并发抢单,顺序错乱)
- 现在的模型(保证有序):
Producer
-> Exchange -> Queue_1 ->
Consumer A (只负责 Queue_1) Producer ->
Exchange -> Queue_2 ->
Consumer B (只负责 Queue_2) Producer ->
Exchange -> Queue_3 ->
Consumer C (只负责 Queue_3)
2. 具体实现步骤
这个方案分为三个关键环节:
第一步:生产者负责“路由分发”
在发送消息时,生产者必须根据业务 ID(如
order_id)决定这条消息发往哪个队列。通常使用 Hash
取模 算法。
- 逻辑:
index = hash(order_id) % N (N
是队列的总数量)。
- 例子:假设有 3 个队列。
Order_1001 的 Hash 模 3 结果是 0 -> 发往
Queue_0
Order_1002 的 Hash 模 3 结果是 1 -> 发往
Queue_1
Order_1001 的后续状态(如支付)Hash
结果肯定还是 0 -> 依然发往 Queue_0
第二步:RabbitMQ 队列配置 你需要创建 N 个队列(如
order_sub_queue_0, order_sub_queue_1…)。
- 进阶技巧:RabbitMQ 有一个官方插件叫
rabbitmq_consistent_hash_exchange。你只需要把消息发给这个交换机,带上
routing_key(设为 order_id),交换机会自动帮你根据 Hash
值均匀分发到绑定的队列中,连生产者的代码都不用改太复杂。
第三步:消费者“独占”队列 (关键)
这是最重要的一点:每个小队列,同一时刻只能有一个消费者在监听。
- Consumer A 专门监听
Queue_0。
- Consumer B 专门监听
Queue_1。
因为 RabbitMQ 的单个队列是先进先出 (FIFO) 的,而 Consumer A
是单线程顺序处理 Queue_0 的,所以 Order_1001
的“下单”一定比“支付”先被处理。
如何处理消息处理失败的情况
在分布式系统中,消息处理失败是常态(比如数据库挂了、网络抖动、代码
bug)。
如果处理失败,绝不能简单地忽略,否则会导致数据丢失;也不能死板地无限重试,否则会死循环拖垮系统。
处理失败通常有三道防线,层层递进:
第一步:判断异常类型(是“病”还是“命”?)
当 try...except
捕获到异常时,不能盲目重试,先看是什么错:
- 致命错误(Fatal Error):
- 例如:
JsonDecodeError(格式不对)、KeyError(缺字段)、NullPointerException(空指针)。
- 决策:这种错误重试一万次也没用。跳过重试,直接进死信队列。
- 临时错误(Transient Error):
- 例如:
Timeout(连接超时)、Deadlock(数据库死锁)、503 Service Unavailable。
- 决策:这种病能治。进入重试流程。
第二步:带策略的重试(Retry)——
关键缓冲
既然决定要救,也不能瞎救(比如立即原地无限重试,那是“毒药”)。我们需要“有节制、有延迟”的重试。
- 检查重试次数: 从消息 Header 中读取
retry_count。
- 逻辑:
- 如果 count < 3(假设最大重试3次):
count + 1。
- 等待一会儿(Backoff):不要立即重试,而是把消息发到一个
“延迟队列”(或者用代码
sleep 一会儿,但
Python 中不建议阻塞主线程,推荐用延迟插件
rabbitmq_delayed_message_exchange)。
- 重新发布这条消息(Publish)。
- 对当前失败的这条消息进行
ACK(因为它已经生成了新的替身去排队了)。
- 如果 count >= 3:
第三步:死信队列(DLQ)——
最终兜底
这是最后一道防线。当重试次数耗尽,或者遇到致命错误时,才轮到它出场。
- 操作:调用
basic_nack(delivery_tag, requeue=False)。
- 结果:
- RabbitMQ 会根据配置,自动把这条消息“踢”到死信交换机。
- 死信交换机把它路由到 死信队列。
- 后续:
- 开发/运维人员配置报警脚本,监听死信队列。
- 一旦有消息进来,发钉钉/邮件报警。
- 人工排查原因(比如发现是数据库挂了),修复后,手动把死信队列里的消息取出来再发回业务队列(或者写脚本批量重发)。
Kafka
消息队列Kafka是什么?架构是怎么样的?5分钟快速入门_哔哩哔哩_bilibili
1. 核心思维转变:从“队列”到“日志”
这是理解 Kafka 最重要的一步。
- RabbitMQ
(队列模型):就像“收件箱”。你把信拿出来,信就没了(Delete)。它的目标是让消息越快被处理完越好,堆积消息是异常状态。
- Kafka
(日志模型):就像“船长的航海日志”。
- 消息是追加写入 (Append-only) 的。
- 消费者读消息,不会删除消息,只是在自己的笔记本上记一下:“我读到了第
100 行”。
- 这意味着:消息可以被多个不同的消费者重复读取,甚至可以“倒带”回去重读历史数据。
2. 为什么 Kafka
快得离谱?(架构设计)
Kafka 单机可以轻松抗住 每秒几十万甚至上百万
的写入,它是怎么做到的?
A. 顺序写磁盘 (Sequential
Write)
RabbitMQ 尽量用内存,而 Kafka 直接写磁盘。
你可能会问:“写磁盘不是慢吗?”
随机写确实慢,但顺序写极快。Kafka
强制所有数据只能追加到文件末尾。在现代操作系统中,顺序写磁盘的速度(600MB/s+)甚至可以超过随机写内存的速度。
B. 零拷贝 (Zero-Copy)
还记得你之前感兴趣的底层原理吗?Kafka 是利用 OS sendfile
系统调用的教科书级案例。
- 传统方式:磁盘 -> 内核 Buffer -> 用户态
Buffer (Application) -> 内核 Socket Buffer -> 网卡。
- Kafka 方式:磁盘 -> 内核 Buffer ->
直接传给网卡。
- 数据完全不经过应用程序(Kafka JVM),CPU 也就不用瞎忙活。
C. 分区 (Partitioning) ——
扩展性的核心
Kafka 将一个 Topic (主题) 拆分成了多个
Partition (分区)。
- 每个 Partition 是一个独立的物理日志文件。
- 不同的 Partition 可以分布在不同的服务器上。
- 结果:并发读写能力随着机器数量线性扩展。
3. Kafka 的核心组件
1. Broker
Kafka 的服务器节点。
2. Topic & Partition
- Topic 是逻辑分类(比如
logs)。
- Partition 是物理存储。Topic A 可以分为 Partition 0, 1, 2。
- 注意:Kafka 只保证 Partition
内部的消息有序,不保证整个 Topic 全局有序。
3. Producer (生产者)
生产者决定把消息发给哪个 Partition(通常轮询或 Hash)。
4. Consumer Group
(消费者组) —— Kafka 的神来之笔
这是 Kafka 区别于 RabbitMQ 的最大特色。
- 机制:一个 Topic 可以被多个 Group 消费。
- 组内 (Queue 模式):同一个 Group
里的消费者,互相竞争。Partition 0 给消费者 A,Partition 1 给消费者
B。一个 Partition
只能被组内的一个消费者消费(防止乱序)。
- 组间 (Pub/Sub 模式):Group A 消费了一遍数据,Group
B 可以再消费一遍同样的数据,互不干扰。
5. Offset (偏移量)
消费者读到哪了?RabbitMQ 是 Server 记,Kafka 是
消费者自己记(或者提交给 Kafka 的内部 Topic
__consumer_offsets)。
- 你可以随时修改 Offset,让消费者从昨天的数据开始重新跑一遍(用于修复
Bug 后重算数据)。
image-20251217193104605
RocketMQ
消息队列RocketMQ是什么?和Kafka有什么区别?架构是怎么样的?7分钟快速入门_哔哩哔哩_bilibili
参考资料
什么是消息队列?不就是排个队么?_哔哩哔哩_bilibili
RabbitMQ是什么?架构是怎么样的?_哔哩哔哩_bilibili