MQ 简析

概述

消息队列(Message Queue),是分布式系统中重要的组件,它的特点是异步的,即消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。

其通用的使用场景可以简单地描述为:当不需要立即获得结果,但是并发量又需要进行控制时,就可以使用消息队列。

当前使用较多的消息队列有 RabbitMQ、RocketMQ、ActiveMQ、Kafka 等,而部分数据库如 Redis、Mysql 以及 phxsql 也可实现消息队列的功能。

使用场景

消息队列在实际应用中一般包括如下四个场景:

  1. 应用解耦:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;
  2. 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;
  3. 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;
  4. 消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理;

缺点

  1. 系统可用性降低:系统引入的外部依赖越多,越不稳定,万一 MQ 挂了可能导致整套系统崩溃。

  2. 系统复杂性提高:增加了 MQ,需要考虑更多的情况,比如如何保证消息没有重复消费?如何处理消息丢失的情况?如何保证消息传递的顺序性?

  3. 一致性问题:A 系统处理完了直接返回成功了,MQ 下游的 B 系统消费之后却写入失败了,数据就不一致了。

消息队列的两种模式

消息队列包括两种模式,点对点模式(point to point, queue)和发布/订阅模式(publish/subscribe,topic)。

点对点模式

点对点模式下包括三个角色:

  • 消息队列
  • 发送者 (生产者)
  • 接收者(消费者)

消息发送者生产消息发送到 queue 中,然后消息接收者从 queue 中取出并且消费消息。消息被消费以后,queue 中不再有存储,所以消息接收者不可能消费到已经被消费的消息。

点对点模式特点:

  • 每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中);
  • 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
  • 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;

发布订阅模式

发布/订阅模式下包括三个角色:

  • 主题(Topic)
  • 发布者 (Publisher)
  • 订阅者 (Subscriber)

发布者将消息发送到 Topic, 系统将这些消息传递给多个订阅者。

发布/订阅模式特点:

  • 每个消息可以有多个订阅者;
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  • 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;

常见 MQ 对比

一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃。

RabbitMQ 优势在于低延时,但是吞吐量较低,并且 erlang 语言阻止了大量的工程师去深入研究和掌控他,对公司而言,几乎处于不可控的状态,不过是开源的,有比较稳定的支持,活跃度也高;

RocketMQ 接口简单易用,而且在阿里大规模应用过,有阿里品牌保障,日处理消息上百亿,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,可靠性和可用性都很好,还可以支撑大规模的 topic 数量,支持复杂 MQ 业务场景。

kafka 的特点其实很明显,就是提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准,社区活跃度很高

如何保证 MQ 的高可用

RabbitMQ

RabbitMQ 有 3 种部署模式:

  • 单机模式
  • 普通集群模式
  • 镜像集群模式

单机模式与高可用完全没关系,咱就不说了,直接看看这 2 种集群模式。

普通集群模式

某一个 Queue 是在集群中的某一个 Broker 上,各个 Broker 会同步元数据,但不会同步 Queue 的消息数据。

如果某一个 Broker 故障了,其中的 Queue 便无法使用。如果消息没有配置消息持久化,则消息丢失。

可以看到,这种方式并没有实现高可用,只是扩展性比较好,扩充 Broker 可以容纳更多的 Queue,提高吞吐量。

镜像集群模式

一个 Broker 中 Queue 的元数据和消息数据都会同步到其他 Broker 上,就是做了全量备份,所以称为 “镜像模式”。

实现了高可用,如果一个 Broker 故障了,没关系,可以使用其他 Broker 继续工作,消息数据不会丢失。

可用性上去了,但扩展性没有了。因为如果某个 Queue 负载很重,加机器的话,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展 Queue

一个 Queue 的数据是全量存在 Broker 中的,所以 Queue 的消息容量、消息处理能力,都受限于 Broker。

普通集群模式 没有达到高可用,扩展性较好。

镜像集群模式 实现了高可用,但扩展性差。

Kafka

Kafka 把 Topic(主题)分为了多个 Partition(分区),Topic 只是逻辑概念,Partition 才是实际的消息存储单元。

一个 Topic 的多个 Partition 分散在多个 Broker 中,每个 Partition 存放 Topic 的一部分数据。

有了 Partition 之后,Topic 就具有了极强的扩展性,可以指定 N 个 Partition。

可以为 Partition 指定多个“副本”,分散在不同的 Broker,从而实现其高可用。

当某个 Broker 故障的时候,其中存放的 Partition 不可用,但没有关系,可以使用其他 Broker 上的副本。

Partition 的多个副本分为两种角色,Leader 和 Follower。

Leader 是由 Kafka 选举出来的,负责处理消息的读写。Leader 收到新消息后,会同步给 Follower。

Follower 的作用是候选人,当 Leader 宕机之后,Kafka 会从 Follower 中选举出新的 Leader。

可以配置消息写入完成的标准:

  • 写入 Leader 既可 – 速度快,但可能会有消息丢失,例如在同步到 Follower 之前 Broker 故障了,则消息丢失。
  • Follower 同步完成之后才算写入成功 – 消息可靠性极高,但影响写入速度。

    RocketMQ

这是 RocketMQ 的官方结构图,左右是 Producer 和 Consumer,中间是 RocketMQ,分为两个部分:

  • NameServer 集群 – 存放元数据
  • Broker 集群 – 存放队列数据

这两部分都需要保证高可用。

NameServer 是独立运行的,保存着集群完整的集群元数据,例如路由信息、Broker 信息、数据信息。

为了保证其高可用,可以运行多个 NameServer,之间完整的同步数据即可。

这样只要有一个 NameServer 是可用的,就不会影响集群的正常工作。

Broker 集群的部署方式可以分为 3 种。

多 master

部署多个 Broker,角色都是 Master,Topic 的数据会分散存储在这些 Broker 中。

单个 Master 故障会导致其中数据无法使用,需要等待修复。类似于 RabbitMQ 的普通集群模式。

如果想保障数据的可靠性,可以使用【RAID10 + 同步刷盘】机制。

多 master 多 slave

为 Master 配置了 Slave,Master 会把数据同步到 Slave。

当 Master 故障之后,可以用 Slave 顶上去,数据和服务都不影响,但会有短暂的停顿,需要修改配置并重启才能完成切换动作。

数据同步的方式分为:

  • 异步 – Master 写入完成即可,异步同步给 Slave。写入速度快,但同步会有延迟,可能会丢数据。
  • 同步 – Master 与 Slave 都写入之后才算成功。不会丢消息,但写入速度降低。

Dledger Group

Dledger 模式要求为 Master 配置 2 个 Slave,3 者组成一个 Dledger Group。

Dledger 也是 Master-Slave 同步的方式,好处在于可以实现自动选举 Master,自动切换。

当 Master 故障的时候,RocketMQ 可以从组内选出一个新的 Master,完成自动切换,这样更进一步提高了集群的可用性。

如何处理消息丢失的问题

一条消息从生产到被消费,将会经历三个阶段:

  • 生产阶段,Producer 新建消息,然后通过网络将消息投递给 MQ Broker
  • 存储阶段,消息将会存储在 Broker 端磁盘中
  • 消费阶段, Consumer 将会从 Broker 拉取消息

以上任一阶段都可能会丢失消息,我们只要找到这三个阶段丢失消息原因,采用合理的办法避免丢失,就可以彻底解决消息丢失的问题。所有的 MQ 处理消息丢失的方法都是类似的。

生产阶段

生产者 (Producer) 调用 send 方法发送消息之后,消息可能因为网络问题并没有发送过去。

所以,我们不能默认在调用 send 方法发送消息之后消息消息发送成功了。为了确定消息是发送成功,我们要判断消息发送的结果。

生产者(Producer) 通过网络发送消息给 Broker,当 Broker 收到之后,返回确认响应信息给 Producer。所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失。如果消息发送失败的话,我们检查失败的原因之后重新发送即可。

发送失败的话,Producer 需要进行重试, Producer 的 retries(重试次数)可以设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动的话 3 次一下就重试完了。

存储阶段

默认情况下,消息只要到了 Broker 端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后 Broker 定期批量的将一组消息从内存异步刷入磁盘。

这种方式减少 I/O 次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。

若想保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。

若 Broker 未在同步刷盘时间内完成刷盘,返回相应的超时状态给生产者。

集群部署

为了保证可用性,Broker 通常采用一主(master)多从(slave)部署方式。为了保证消息不丢失,消息还需要复制到 slave 节点。

默认方式下,消息写入 master 成功,就可以返回确认响应给生产者,接着消息将会异步复制到 slave 节点。

但是此时若 master 突然宕机且不可恢复,那么还未复制到 slave 的消息将会丢失。

为了进一步提高消息的可靠性,我们可以采用同步的复制方式,master 节点将会同步等待 slave 节点复制完成,才会返回确认响应。

异步复制与同步复制区别如下图:

注: 大家不要被上图误导,broker master 只能配置一种复制方式,上图只为解释同步复制的与异步复制的概念。

如果 slave 节点未在指定时间内同步返回响应,将会返回相应的超时状态。

虽然上述配置提高消息的高可靠性,但是会降低性能,生产实践中需要综合选择。

消费阶段

消费者从 broker 拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回确认相应信息给 Broker。

如果 Broker 未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。

这种方式虽然提高消息可靠性,但是可能导致消息重发和重复消费。所以对于消费客户端,需要注意保证幂等性

如何保证消息消费时的幂等性

什么是消息幂等

任意多次执行所产生的影响均与一次执行的影响相同就可以称为幂等

消息幂等就是当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响

为什么需要保证消息幂等

这个问题其实没法准确回答。回答这个问题的根源得从业务场景上进行分析。比如正常业务情况下,我们是不允许同个订单重复支付,这种业务场景我们就需要确保幂等性。再比如日志记录,这种业务场景,我们可能就不需要做幂等判断。

因此是否要保证幂等性,得基于业务进行考量

消息队列可以保证幂等吗?

rabbitmq、rocketmq、kafka 都有可能会出现消费重复消费的问题,这是正常情况,因为由于网络原因闪断,ACK 返回失败等等故障,确认信息没有传送到消息队列,导致消息队列不知道该消息已经被消费了,再次将该消息分发给其他的消费者。所以这类问题 MQ 本身是无法保证的,而是需要我们写业务的时候自己保证的。

常见的保证幂等的方法

  1. 利用数据库的唯一约束实现幂等
    比如将订单表中的订单编号设置为唯一索引,创建订单时,根据订单编号就可以保证幂等

  2. 去重表
    这个方案本质也是根据数据库的唯一性约束来实现。其实现大体思路是:首先在去重表上建唯一索引,其次操作时把业务表和去重表放在同个本地事务中,如果出现重现重复消费,数据库会抛唯一约束异常,操作就会回滚

  3. 利用 redis 的原子性
    每次操作都直接 set 到 redis 里面,然后将 redis 数据定时同步到数据库中

  4. 多版本(乐观锁)控制
    此方案多用于更新的场景下。其实现的大体思路是:给业务数据增加一个版本号属性,每次更新数据前,比较当前数据的版本号是否和消息中的版本一致,如果不一致则拒绝更新数据,更新数据的同时将版本号+1

  5. token 机制
    生产者发送每条数据的时候,增加一个全局唯一的 id,这个 id 通常是业务的唯一标识,比如订单编号。在消费端消费时,则验证该 id 是否被消费过,如果还没消费过,则进行业务处理。处理结束后,在把该 id 存入 redis,同时设置状态为已消费。如果已经消费过了,则不进行处理。

如何保证消息的顺序性

为什么需要保证消息的顺序性

在生产中经常会有一些类似报表系统这样的系统,需要做 MySQL 的 binlog 同步。比如订单系统要同步订单表的数据到大数据部门的 MySQL 库中用于报表统计分析,通常的做法是基于 Canal 这样的中间件去监听订单数据库的 binlog,然后把这些 binlog 发送到 MQ 中,再由消费者从 MQ 中获取 binlog 落地到大数据部门的 MySQL 中。

在这个过程中,可能会有对某个订单的增删改操作,比如有三条 binlog 执行顺序是增加、修改、删除;如果消费者不按照这个顺序执行结果肯定就错误了,所以针对此类场景需要保证消息的发送和消费都是有序的。

乱序原因和保证顺序性的方法

RabbitMQ

对于 RabbitMQ 来说,导致上面顺序错乱的原因通常是消费者是集群部署,不同的消费者消费到了同一订单的不同的消息,如消费者 A 执行了增加,消费者 B 执行了修改,消费者 C 执行了删除,但是消费者 C 执行比消费者 B 快,消费者 B 又比消费者 A 快,就会导致消费 binlog 执行到数据库的时候顺序错乱,本该顺序是增加、修改、删除,变成了删除、修改、增加。

如下图是 RabbitMQ 可能出现顺序错乱的问题示意图:

RabbitMQ 的问题是由于不同的消息都发送到了同一个 queue 中,多个消费者都消费同一个 queue 的消息。解决这个问题,我们可以给 RabbitMQ 创建多个 queue,每个消费者固定消费一个 queue 的消息,生产者发送消息的时候,同一个订单号的消息发送到同一个 queue 中,由于同一个 queue 的消息是一定会保证有序的,那么同一个订单号的消息就只会被一个消费者顺序消费,从而保证了消息的顺序性。

如下图是 RabbitMQ 保证消息顺序性的方案:

Kafka

对于 Kafka 来说,一个 topic 下同一个 partition 中的消息肯定是有序的,生产者在写的时候可以指定一个 key,通过订单号作为 key,这个 key 对应的消息都会发送到同一个 partition 中,所以消费者消费到的消息也一定是有序的。

那么为什么 Kafka 还会存在消息错乱的问题呢?问题就出在消费者身上。通常消费到同一个 key 的多条消息后,会使用多线程技术去并发处理来提高消息处理速度,否则一条消息的处理需要耗时几十毫秒,一秒只能处理几十条消息,吞吐量太低了。而多线程并发处理的话,binlog 执行到数据库的时候就不一定还是原来的顺序了。

如下图是 Kafka 可能出现乱序现象的示意图:

Kafka 从生产者到消费者消费消息这一整个过程其实都是可以保证有序的,导致最终乱序是由于消费者端需要使用多线程并发处理消息来提高吞吐量,比如消费者消费到了消息以后,开启 32 个线程处理消息,每个线程线程处理消息的快慢是不一致的,所以才会导致最终消息有可能不一致。

所以对于 Kafka 的消息顺序性保证,其实我们只需要保证同一个订单号的消息只被同一个线程处理的就可以了。由此我们可以在线程处理前增加个内存队列,每个线程只负责处理其中一个内存队列的消息,同一个订单号的消息发送到同一个内存队列中即可。

如下图是 Kafka 保证消息顺序性的方案:

RocketMQ

对于 RocketMQ 来说,每个 Topic 可以指定多个 MessageQueue,当我们写入消息的时候,会把消息均匀地分发到不同的 MessageQueue 中,比如同一个订单号的消息,增加 binlog 写入到 MessageQueue1 中,修改 binlog 写入到 MessageQueue2 中,删除 binlog 写入到 MessageQueue3 中。

但是当消费者有多台机器的时候,会组成一个 Consumer Group,Consumer Group 中的每台机器都会负责消费一部分 MessageQueue 的消息,所以可能消费者 A 消费了 MessageQueue1 的消息执行增加操作,消费者 B 消费了 MessageQueue2 的消息执行修改操作,消费者 C 消费了 MessageQueue3 的消息执行删除操作,但是此时消费 binlog 执行到数据库的时候就不一定是消费者 A 先执行了,有可能消费者 C 先执行删除操作,因为几台消费者是并行执行,是不能够保证他们之间的执行顺序的。

如下图是 RocketMQ 可能出现乱序现象的示意图:

RocketMQ 的消息乱序是由于同一个订单号的 binlog 进入了不同的 MessageQueue,进而导致一个订单的 binlog 被不同机器上的 Consumer 处理。

要解决 RocketMQ 的乱序问题,我们只需要想办法让同一个订单的 binlog 进入到同一个 MessageQueue 中就可以了。因为同一个 MessageQueue 内的消息是一定有序的,一个 MessageQueue 中的消息只能交给一个 Consumer 来进行处理,所以 Consumer 消费的时候就一定会是有序的。

如下图是 RocketMQ 保证消息顺序性的方案:

如何解决消息队列的大量 lag

消息堆积是消息中间件的一大特色,消息中间件的流量削峰、冗余存储等功能正是得益于消息中间件的消息堆积能力。然而消息堆积其实是一把亦正亦邪的双刃剑,如果应用场合不恰当反而会对上下游的业务造成不必要的麻烦,比如消息堆积势必会影响上下游整个调用链的时效性,影响上下游的业务,堆积过多有可能会造成磁盘爆满,或者触发日志清除策略而造成消息丢失的情况。

lag 一直堆积,消费速度跟不上生产速度

首先,我们需要有一个比较及时的监控,在 lag 达到一个风险数量的时候把情况暴露出来,lag 堆积的可能性有以下几种:

  1. consumer 出 bug 导致消费速度降低
    这种情况一般在报警足够及时的情况下,可以在 lag 达到风险量级之前修复好 bug 就可以解决了,否则就会成为情况 4
  2. consumer quota 不够用导致性能下降
    这种情况可能是由于当前的消费方式使 CPU 处理速度已经达到极限,无法更快速的处理消息了,此时可以对消费者现有逻辑进行梳理,看是否有代码优化的空间,比如批量消费,跳过非重要信息,减少处理流程等方式。如果已经无法对消费者进行优化了,可以采用扩容的形式增加消费者的处理能力。
  3. producer 由于业务发展提升了消息产生速度过快
    这种情况是好现象,说明业务在向前发展,如果消费端已经优化到极致了,并且需要考虑扩容成本的情况下,可以考虑对消息进行前期聚合之后再发送,或者跳过非重要信息的发送等
  4. 报警不及时导致堆积了大量消息
  • 如果不是时效性要求很高的业务,修复 bug 之后,等待一段时间是可以处理完堆积的消息的
  • 如果业务需要快速消费完这些消息,可以有两种做法。
    • 丢弃所有堆积的消息,通过查表将缺失的信息重新导入 mq 或直接写入消费方的数据库
    • 紧急扩容,不过这种方式成本比较高
      • 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 暂时停掉。
      • 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
      • 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
      • 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
      • 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。

mq 磁盘写满了

消息队列的磁盘写满之后,再进入的消息会被直接丢弃,这种情况下是无论如何也没办法通过 mq 找回了,只能丢弃所有堆积的消息,通过查表将缺失的信息重新导入 mq 了

如何设计一个消息队列的架构

其实在这篇文章上面讨论的内容就是设计一个消息队列架构需要考虑的点了,这里再总结一下。

  1. 可扩展性
    即可以在需要的时候快速扩容,参照 kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个 broker,只存一部分数据。如果现在资源不够了,只需给 topic 增加 partition,然后做数据迁移,增加机器,就可以存放更多数据,提供更高的吞吐量。
  2. 数据落盘
    mq 接收到的消息需要落盘,这样才能保证宕机时数据不会丢失,可以参考 kafka 采用磁盘顺序读写的方式,提升性能
  3. 可用性
    可以参考 kafka 的高可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。
  4. 消息和数据不丢失
    参考之前提到的生产阶段,存储阶段,消费阶段三个阶段的保证方式

附录

Kafka 如何保证高吞吐