RocketMQ

Rocket Message Queue

Posted by Ekko on August 20, 2020

参考资料 JavaGuide三太子敖丙RocketMQ官网

[TOC]


消息队列扫盲

回顾下消息队列能做什么:异步、解耦、削峰

《消息队列》章有提到异步、解耦、削峰

异步:

首先看下 同步通信: 比如有一个购票系统,需求是用户在购买完之后能接收到购买完成的短信

RocketMQ章节同步通信.png

省略中间的网络通信时间消耗,假如购票系统处理需要 150ms ,短信系统处理需要 200ms ,那么整个处理流程的时间消耗就是 150ms + 200ms = 350ms

用户购票在购票系统的时候其实就已经完成了购买,而现在通过 同步调用 非要让整个请求拉长时间,而短信系统又不是很有必要,它仅仅是一个辅助功能增强用户体验感而已

现在整个调用流程就有点 头重脚轻 的感觉了,购票是一个不太耗时的流程,而现在因为同步调用,非要等待发送短信这个比较耗时的操作才返回结果。那如果再加一个发送邮件呢?

RocketMQ章节同步通信2.png

这样整个系统的调用链又变长了,整个时间就变成了 550ms

为了解决这一问题,在系统中加入消息中间件(消息队列),将上面的模型进行改造后:

RocketMQ章节异步通信.png

这样,在将消息存入消息队列之后就可以直接返回了,所以整个耗时只是 150ms + 10ms = 160ms

用户体验上时间变短了,但是整个系统上的调用链的时间并没有受影响,比如发送短信,依然是 150 + 200 (大致)

解耦:

回到上面的同步调用过程,写个伪代码简单概括一下:

1
2
3
4
5
6
7
8
public void purchaseTicket(Request request){
    // 校验
    validate(request);
    // 购票
    Result result = purchase(request);
    // 发送短信
    sendMessage(result);
}

接下来,系统中又添加了一个发送邮件,这个时候我们又必须去重新修改代码同步调用;如果购买完后又需要加积分,那么还得再次修改代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void purchaseTicket(Request request){
    // 校验
    validate(request);
    // 购票
    Result result = purchase(request);
    // 发送短信
    sendMessage(result);
    // 发送邮件
    sendEmail(result);
    // 添加积分
    addPoint(result);
    // 后面可能还会有其他需求
    ...
    ...
}

如果新需求要求不要再发送邮件,那么这个时候又得改代码

此时我们就用一个消息队列在中间进行解耦。需要注意的是,后面的发送短信、发送邮件、添加积分等一些操作都依赖于上面的 result ,这东西抽象出来就是购票的处理结果 ,比如订单号,用户账号等等,也就是说我们后面的一系列服务都是需要同样的消息来进行处理。既然这样,我们是不是可以通过 “广播消息” 来实现

我上面所讲的“广播”并不是真正的广播,而是接下来的系统作为消费者去 订阅 特定的主题。比如这里的主题就可以叫做 订票 ,我们购买系统作为一个生产者去生产这条消息放入消息队列,然后消费者订阅了这个主题,会从消息队列中拉取消息并消费。就比如我们刚刚画的那张图,会发现在生产者这边只需要关注 生产消息到指定主题中 ,而 消费者只需要关注从指定主题中拉取消息 就行了

Rocket章节之解耦.png

如果没有消息队列,每当一个新的业务接入,我们都要在主系统调用新接口、或者当我们取消某些业务,我们也得在主系统删除某些接口调用。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,接下来收到消息如何处理,是下游的事情,无疑极大地减少了开发和联调的工作量

削峰:

回到一开始我们使用同步调用系统的情况,并且思考一下,如果此时有大量用户请求购票整个系统会变成什么样?

Rocket章节之削峰.png

如果,此时有一万的请求进入购票系统,我们知道运行我们主业务的服务器配置一般会比较好,所以这里我们假设购票系统能承受这一万的用户请求,那么也就意味着我们同时也会出现一万调用发短信服务的请求。而对于短信系统来说并不是我们的主要业务,所以我们配备的硬件资源并不会太高,那么你觉得现在这个短信系统能承受这一万的峰值么,且不说能不能承受,系统会不会 直接崩溃 了?

短信业务又不是我们的主业务,我们能不能 折中处理 呢?如果我们把购买完成的信息发送到消息队列中,而短信系统 尽自己所能地去消息队列中取消息和消费消息 ,即使处理速度慢一点也无所谓,只要我们的系统没有崩溃就行了

消息队列的副作用:

  1. 降低了系统的可用性
  2. 系统的复杂度
  3. 重复消费消息问题
  4. 消息顺序消费问题
  5. 解决分布式事务问题
  6. 解决消息堆积的问题

这些问题后面慢慢讲到


RocketMQ简介

RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点

核心模块

  • rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息
  • rocketmq-client:提供发送、接受消息的客户端 API。
  • rocketmq-namesrv:NameServer,类似于 Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息。
  • rocketmq-common:通用的一些类,方法,数据结构等。
  • rocketmq-remoting:基于 Netty4 的 client/server + fastjson 序列化 + 自定义二进制协议。
  • rocketmq-store:消息、索引存储等。
  • rocketmq-filtersrv:消息过滤器 Server,需要注意的是,要实现这种过滤,需要上传代码到 MQ!(一般而言,我们利用Tag足以满足大部分的过滤需求,如果更灵活更复杂的过滤需求,可以考虑 filtersrv 组件)。
  • rocketmq-tools:命令行工具。

四大核心: NameServer、Broker、Producer以及 Consumer 四部分


队列模型和主题模型

消息队列为什么要叫消息队列?

早期的消息中间件是通过 队列 这一模型来实现的,所以都习惯把消息中间件称为消息队列

但是,如今例如 RocketMQ 、Kafka 这些优秀的消息中间件不仅仅是通过一个 队列 来实现消息存储的

队列模型:

就像我们理解队列一样,消息中间件的队列模型就真的只是一个队列

消息队列之队列模型.png

在一开始提到了一个 “广播” 的概念,也就是说如果此时我们需要将一个消息发送给多个消费者(比如需要将信息发送给短信系统和邮件系统),这个时候单个队列即不能满足需求了

当然也可以让 Producer 生产消息放入多个队列中,然后每个队列去对应每一个消费者。问题是可以解决,创建多个队列并且复制多份消息是会很影响资源和性能的。而且,这样子就会导致生产者需要知道具体消费者个数然后去复制对应数量的消息队列,这就违背我们消息中间件的 解耦 这一原则

主题模型:

主题模型可以解决上面 队列模型 的问题(多个队列,达不到解耦目的)

主题模型 也可以称为 发布订阅模型

在主题模型中,消息的生产者称为 发布者(Publisher) ,消息的消费者称为 订阅者(Subscriber),存放消息的容器称为 主题(Topic)

其中,发布者将消息发送到指定主题中,订阅者需要 提前订阅主题 才能接受特定主题的消息

消息队列之主题模型.png


RocketMQ中的消息模型

RockerMQ 中的消息模型就是按照 主题模型 所实现的

其实对于主题模型的实现来说每个消息中间件的底层设计都是不一样的,就比如:

  • Kafka 中的 分区
  • RocketMQ 中的 队列
  • RabbitMQ 中的 Exchange

可以理解为 主题模型/发布订阅模型 只是一个标准,那些中间件只不过照着这个标准去实现而已

所以,RocketMQ 中的 主题模型 到底是如何实现的呢?根据下面的图,可以尝试着去理解一下

RocketMQ主题模型.png

可以看到在整个图中有 Producer Group 、Topic 、Consumer Group 三个角色

  • Producer Group 生产者组: 代表某一类的生产者,比如我们有多个秒杀系统作为生产者,这多个合在一起就是一个 Producer Group 生产者组,它们一般生产相同的消息
  • Consumer Group 消费者组: 代表某一类的消费者,比如我们有多个短信系统作为消费者,这多个合在一起就是一个 Consumer Group 消费者组,它们一般消费相同的消息
  • Topic 主题: 代表一类消息,比如订单消息,物流消息等等

可以看到图中生产者组中的生产者会向主题发送消息,而 主题中存在多个队列 ,生产者每次生产消息之后是指定主题中的某个队列发送消息的

每个主题中都有多个队列(这里还不涉及到 Broker),集群消费模式下,一个消费者集群多台机器共同消费一个 topic 的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。就像上图中 Consumer1 和 Consumer2 分别对应着两个队列,而 Consuer3 是没有队列对应的,所以一般来讲要控制 消费者组中的消费者个数和主题中队列个数相同

当然也可以消费者个数小于队列个数,只不过不太建议。如下图:

rocket消费组个数小于队列个数.png

每个消费组在每个队列上维护一个消费位置 ,为什么呢?

因为刚刚画的仅仅是一个消费者组,在发布订阅模式中一般会涉及到多个消费者组,而每个消费者组在每个队列中的消费位置都是不同的。如果此时有多个消费者组,那么消息被一个消费者组消费完之后是不会删除的(因为其它消费者组也需要呀),它仅仅是为每个消费者组维护一个 消费位移(offset) ,每次消费者组消费完会返回一个成功的响应,然后队列再把维护的消费位移加一,这样就不会出现刚刚消费过的消息再一次被消费了

RocketMQ消费位移.png

为什么一个主题中需要维护多个队列 ?

答案是 提高并发能力 (类似 kafka 的 topic 下的分区)。的确,每个主题中只存在一个队列也是可行的。如果每个主题中只存在一个队列,这个队列中也维护着每个消费者组的消费位置,这样也可以做到 发布订阅模式 。如下图:

RocketMQ主题单一队列情况.png

但是,这样生产者是不是只能向一个队列发送消息?又因为需要维护消费位置所以一个队列只能对应一个消费者组中的消费者,这样是不是其他的 Consumer 就没有用武之地了?从这两个角度来讲,并发度一下子就小了很多

总结来说,RocketMQ 通过使用在一个 Topic 中配置多个队列并且每个队列维护每个消费者组的消费位置实现了主题模式/发布订阅模式


RocketMQ的架构图(NameServer\Broker\Producer\Consumer)

RocketMQ 技术架构中有四大角色 NameServer 、Broker 、Producer 、Consumer

Broker:

主要负责消息的存储、投递和查询以及服务高可用保证。说白了就是消息队列服务器,生产者生产消息到 Broker ,消费者从 Broker 拉取消息并消费

Broker是具体提供业务的服务器,单个 Broker 节点与所有的 NameServer 节点保持长连接及心跳,并会定时将 Topic 信息注册到 NameServer,顺带一提底层的通信和连接都是基于 Netty 实现的

普及一下关于 Broker 、Topic 和 队列的关系。上面讲解了 Topic 和队列的关系,一个 Topic 中存在多个队列,那么这个 Topic 和队列存放在哪呢?

一个 Topic 分布在多个 Broker 上,一个 Broker 可以配置多个 Topic ,它们是多对多的关系

如果某个 Topic 消息量很大,应该给它多配置几个队列(上文中提到了提高并发能力),并且 尽量多分布在不同 Broker 上,以减轻某个 Broker 的压力

Topic 消息量都比较均匀的情况下,如果某个 broker 上的队列越多,则该 broker 压力越大

RocketMQ中的Topic和Broker.png

NameServer:

类似 ZooKeeper 和 Spring Cloud 中的 Eureka ,它其实也是一个 注册中心 ,主要提供两个功能:

  • Broker管理
  • 路由信息管理

说白了就是 Broker 会将自己的信息注册到 NameServer 中,此时 NameServer 就存放了很多 Broker 的信息(Broker 的路由表),消费者和生产者就从 NameServer 中获取路由表然后照着路由表的信息和对应的 Broker 进行通信(生产者和消费者定期会向 NameServer 去查询相关的 Broker 的信息)

NameServer 与 Zookeeper 相比更轻量。主要是因为每个NameServer 节点互相之间是独立的,没有任何信息交互

NameServer压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据

但有一点需要注意,Broker 向 NameServer 发心跳时, 会带上当前自己所负责的所有 Topic 信息,如果 Topic 个数太多(万级别),会导致一次心跳中,就 Topic 的数据就几十 M,网络情况差的话,网络传输失败,心跳失败,导致 NameServer 误认为 Broker 心跳失败

NameServer 被设计成几乎无状态的,可以横向扩展,节点之间相互之间无通信,通过部署多台机器来标记自己是一个伪集群。

每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。

所以从功能上看 NameServer 应该是和 ZooKeeper 差不多,据说 RocketMQ 的早期版本确实是使用的 ZooKeeper ,后来改为了自己实现的 NameServer

Producer:

消息发布的角色,支持分布式集群方式部署。说白了就是生产者

Producer 由用户进行分布式部署,消息由 Producer 通过多种负载均衡模式发送到 Broker 集群,发送低延时,支持快速失败

RocketMQ 提供了三种方式发送消息:同步、异步和单向

  • 同步发送: 同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信
  • 异步发送: 异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务
  • 单向发送: 单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集

Consumer:

消息消费的角色,支持分布式集群方式部署。支持以 push 推,pull 拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制。说白了就是消费者

Pull: 拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型

Push: 推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息

RocketMQ的架构图.png

为什么需要 **NameServer ,为什么不直接 Producer、Consumer 和 Broker 直接进行生产消息、消费消息?**

上文提到过 Broker 是需要保证高可用的,如果整个系统仅仅靠着一个 Broker 来维持的话,那么这个 Broker 的压力会很大,所以需要使用多个 Broker 来保证 负载均衡

如果说,消费者和生产者直接和多个 Broker 相连,那么当 Broker 修改的时候必定会牵连着每个生产者和消费者,这样就会产生耦合问题,而 NameServer 注册中心就是用来解决这个问题的

和 SpringCloud 中的 Eureka 注册中心类似

当然,RocketMQ 中的技术架构肯定不止前面那么简单,因为上面图中的四个角色都是需要做集群的。官网的架构图:

RocketMQ的架构图2.png

和上面架构图的大致没有区别,不过是做了集群而已,另外还有一些细节上的差别:

  1. Broker 做了集群并且还进行了主从部署 ,由于消息分布在各个 Broker 上,一旦某个 Broker 宕机,则该 Broker 上的消息读写都会受到影响。所以 Rocketmq 提供了 master/slave 的结构, salve 定时从 master 同步数据(同步刷盘或者异步刷盘),如果 master 宕机,则 slave 提供消费服务,但是不能写入消息 (后面还会提到)

  2. 为了保证 HA(高可用),NameServer 也做了集群部署,但是请注意它是 去中心化 的。也就意味着它没有主节点,你可以很明显地看出 NameServer 的所有节点是没有进行 Info Replicate 的,在 RocketMQ 中是通过 单个 Broker 和所有 NameServer 保持长连接 ,并且在每隔 30 秒 Broker 会向所有 Nameserver 发送心跳,心跳包含了自身的 Topic 配置信息,这个步骤就对应这上面的 Routing Info

  3. 在生产者需要向 Broker 发送消息的时候,需要先从 NameServer 获取关于 Broker 的路由信息,然后通过 轮询 的方法去向每个队列中生产数据以达到 负载均衡 的效果

  4. 消费者通过 NameServer 获取所有 Broker 的路由信息后,向 Broker 发送 Pull 请求来获取消息数据。Consumer 可以以两种模式启动—— 广播(Broadcast)和集群(Cluster)。广播模式下,一条消息会发送给 同一个消费组中的所有消费者 ,集群模式下消息只会发送给一个消费者


消息领域模型

RocketMQ消息领域模型.png

Message: Message(消息)就是要传输的信息。一条消息必须有一个主题(Topic),主题可以看做是你的信件要邮寄的地址

一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务 Key 并在 Broker 上查找此消息以便在开发期间查找问题

Topic: Topic(主题)可以看做消息的分类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic

Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息

一个 Topic 也可以被 0个、1个、多个消费者订阅

Tag: Tag(标签)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag

标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助

Group: 分组,一个组可以订阅多个Topic

分为 ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务可以作为 Group,同一个 Group 一般来说发送和消费的消息都是一样的

Queue: 在 Kafka 中叫 Partition,每个 Queue 内部是有序的,在 RocketMQ 中分为读和写两种队列,一般来说读写队列数量一致,如果不一致就会出现很多问题

Message Queue: Message Queue(消息队列),主题被划分为一个或多个子主题,即消息队列

一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下的所有队列将消息发出去

消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化,具有了水平扩展能力

Offset:

在RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用 Offset 来访问,Offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限

也可以认为 Message Queue 是一个长度无限的数组,Offset 就是下标

消息消费模式:

消息消费模式有两种:Clustering(集群消费)和Broadcasting(广播消费)

默认情况下就是集群消费,该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费

而广播消费消息会发给消费者组中的每一个消费者进行消费

Message Order:

Message Order(消息顺序)有两种:Orderly(顺序消费)和Concurrently(并行消费)

顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,所以如果正在处理全局顺序是强制性的场景,需要确保使用的主题只有一个消息队列

并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制


如何解决 顺序消费、重复消费

这些问题不仅仅是 RocketMQ ,而是应该每个消息中间件都需要去解决的

其实 Kafka 的架构基本和 RocketMQ 类似,只是它注册中心使用了 Zookeeper 、它的 分区 就相当于 RocketMQ 中的 队列 。还有一些小细节不同会在后面提到

顺序消费:

RocketMQ 在 主题上是无序的、它只有在 队列层面才是保证有序

这又扯到两个概念:普通顺序 和 严格顺序

普通顺序: 消费者通过 同一个消费队列收到的消息是有顺序的 ,不同消息队列收到的消息则可能是无顺序的。普通顺序消息在 Broker 重启情况下不会保证消息顺序性 (短暂时间)

严格顺序: 消费者 收到的所有消息均是有顺序的。严格顺序消息 即使在异常情况下也会保证消息的顺序性

严格顺序看起来虽好,实现它可会付出巨大的代价。如果你使用严格顺序模式,Broker 集群中只要有一台机器不可用,则整个集群都不可用。现在主要场景也就在 binlog 同步

一般而言,我们的 MQ 都是能容忍短暂的乱序,所以推荐使用普通顺序模式

那么,我们现在使用了 普通顺序模式 ,我们从上面学习知道了在 Producer 生产消息的时候会进行轮询(取决你的负载均衡策略)来向同一主题的不同消息队列发送消息。那么如果此时我有几个消息分别是同一个订单的创建、支付、发货,在轮询的策略下这 三个消息会被发送到不同队列 ,因为在不同的队列此时就无法使用 RocketMQ 带来的队列有序特性来保证消息有序性了

Rocket普通消费顺序不同队列情况.png

其实很简单,我们需要处理的仅仅是将同一语义下的消息放入同一个队列(比如这里是同一个订单),那我们就可以使用 Hash取模法 来保证同一个订单在同一个队列中就行了

情况和 kafka 的分区类似,解决顺序消费的问题,不过只能保证生产者放入队列的顺序,客户端的消费顺序需要其他手段来保证


重复消费:

重复消费的问题无非就是 幂等, 在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。比如说,这个时候我们有一个订单的处理积分的系统,每当来一个消息的时候它就负责为创建这个订单的用户的积分加上相应的数值。可是有一次,消息队列发送给订单系统 FrancisQ 的订单信息,其要求是给 FrancisQ 的积分加上 500。但是积分系统在收到 FrancisQ 的订单信息处理完成之后返回给消息队列处理成功的信息的时候出现了网络波动(当然还有很多种情况,比如 Broker 意外重启等等),这条回应没有发送成功

那么,消息队列没收到积分系统的回应会不会尝试重发这个消息?问题就来了,我再发这个消息,万一它又给 FrancisQ 的账户加上 500 积分怎么办呢?

所以我们需要给我们的消费者实现幂等,也就是对同一个消息的处理结果,执行多少次都不变

那么如何给业务实现幂等呢?这个还是需要结合具体的业务的。你可以使用 写入 Redis 来保证,因为 Redis 的 key 和 value 就是天然支持幂等的。当然还有使用 数据库插入法 ,基于数据库的唯一键来保证重复数据不会被插入多条

不过最主要的还是需要 根据特定场景使用特定的解决方案 ,你要知道你的消息消费是否是完全不可重复消费还是可以忍受重复消费的,然后再选择强校验和弱校验的方式。毕竟在 CS 领域还是很少有技术银弹的说法

而在整个互联网领域,幂等不仅仅适用于消息队列的重复消费问题,这些实现幂等的方法,也同样适用于,在其他场景中来解决重复请求或者重复调用的问题 。比如将 HTTP 服务设计成幂等的,解决前端或者 APP 重复提交表单数据的问题 ,也可以将一个微服务设计成幂等的,解决 RPC 框架自动重试导致的重复调用问题


分布式事务

如何解释分布式事务呢?在同一个系统中可以轻松地实现事务,但是在分布式架构中,有很多服务是部署在不同系统之间的,而不同服务之间又需要进行调用。比如此时我下订单然后增加积分,如果保证不了分布式事务的话,就会出现 A 系统下了订单,但是 B 系统增加积分失败或者 A 系统没有下订单,B 系统却增加了积分。前者对用户不友好,后者对运营商不利,这是我们都不愿意见到的

如今比较常见的分布式事务实现有 2PC、TCC 和事务消息( half 半消息机制)。每一种实现都有其特定的使用场景,但是也有各自的问题,都不是完美的解决方案

在 RocketMQ 中使用的是 事务消息加上事务反查机制 来解决分布式事务问题的

RocketMQ事务反查机制.png

在第一步发送的 half 消息 ,它的意思是 在事务提交之前,对于消费者来说,这个消息是不可见的

那么,如何做到写入消息但是对用户不可见呢?RocketMQ 事务消息的做法是:如果消息是 half 消息,将备份原消息的主题与消息消费队列,然后 改变主题 为 RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费 half类型 的消息,然后 RocketMQ 会开启一个定时任务,从 Topic 为RMQ_SYS_TRANS_HALF_TOPIC 中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息

可以试想一下,如果没有从第5步开始的 事务反查机制 ,如果出现网路波动第4步没有发送成功,这样就会产生 MQ 不知道是不是需要给消费者消费的问题,他就像一个无头苍蝇一样。在 RocketMQ 中就是使用的上述的事务反查来解决的,而在 Kafka 中通常是直接抛出一个异常让用户来自行解决

还需要注意的是,在 MQ Server 指向系统 B 的操作已经和系统 A 不相关了,也就是说在消息队列中的分布式事务是——本地事务和存储消息到消息队列才是同一个事务。这样也就产生了事务的最终一致性,因为整个过程是异步的,每个系统只要保证它自己那一部分的事务就行了

第二个版本解释:

  1. A服务先发送个Half Message给Brock端,消息中携带 B服务 即将要+100元的信息。

  2. 当A服务知道Half Message发送成功后,那么开始第3步执行本地事务。

  3. 执行本地事务(会有三种情况1、执行成功。2、执行失败。3、网络等原因导致没有响应)

  4. 如果本地事务成功,那么Product像Brock服务器发送Commit,这样B服务就可以消费该message。

  5. 如果本地事务失败,那么Product像Brock服务器发送Rollback,那么就会直接删除上面这条半消息。

  6. 如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口,来进行事务的回查


消息堆积问题

在上面我们提到了消息队列一个很重要的功能——削峰 。那么如果这个峰值太大了导致消息堆积在队列中怎么办呢?

其实这个问题可以将它广义化,因为产生消息堆积的根源其实就只有两个——生产者生产太快或者消费者消费太慢

我们可以从多个角度去思考解决这个问题,当流量到峰值的时候是因为生产者生产太快,我们可以使用一些 限流降级 的方法,当然你也可以 增加多个消费者实例去水平扩展增加消费能力 来匹配生产的激增。如果消费者消费过慢的话,我们可以先检查是否是消费者出现了大量的消费错误 ,或者打印一下日志查看是否是哪一个线程卡死,出现了锁资源不释放等等的问题

当然,最快速解决消息堆积问题的方法还是增加消费者实例,不过同时你还需要增加每个主题的队列数量

别忘了在 RocketMQ 中,一个队列只会被一个消费者消费,如果你仅仅是增加消费者实例就会出现最原始的架构图的那种情况(消费者空闲)

RocketMQ消费者空闲.png


回溯消费

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,在 RocketMQ 中, Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒


RocketMQ 的刷盘机制

在 Topic 中的 队列是以什么样的形式存在的?

队列中的消息又是如何进行存储持久化的呢?

上文中提到的 同步刷盘 和 异步刷盘 又是什么呢?

同步刷盘和异步刷盘:

RocketMQ同步刷盘和异步刷盘.png

如上图所示,在同步刷盘中需要等待一个刷盘成功的 ACK ,同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是 性能上会有较大影响 ,一般地适用于金融等特定业务场景

而异步刷盘往往是开启一个线程去异步地执行刷盘操作。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了 MQ 的性能和吞吐量,一般适用于如发验证码等对于消息保证要求不太高的业务场景

一般地,异步刷盘只有在 Broker 意外宕机的时候会丢失部分数据,可以设置 Broker 的参数 FlushDiskType 来调整你的刷盘策略(ASYNC_FLUSH 或者 SYNC_FLUSH)

Broker 在消息的存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中

刷盘的最终实现都是使用 NIO 中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘,如果是 同步刷盘 的话,在 Broker 把消息写到 CommitLog 映射区后,就会等待写入完成

异步而言 只是唤醒对应的线程,不保证执行的时机,流程如图所示

RocketMQ异步刷盘.png


存储机制

队列是以什么样的形式存在的?队列中的消息又是如何进行存储持久化的呢?

这里涉及到了 RocketMQ 是如何设计它的存储结构。首先介绍 RocketMQ 消息存储架构中的三大角色 —— CommitLog 、ConsumeQueue 和 IndexFile

  • CommitLog: 消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件
  • ConsumeQueue: 消息消费队列,引入的目的主要是提高消息消费的性能(我们再前面也讲了),由于RocketMQ 是基于主题 Topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件中根据 Topic 检索消息是非常低效的。Consumer 即可根据 ConsumeQueue 来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset ,消息大小 size 和消息 Tag 的 HashCode 值。consumequeue 文件可以看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue 文件采取定长设计,每一个条目共20个字节,分别为8字节的 commitlog 物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue文件大小约5.72M
  • IndexFile: IndexFile(索引文件)提供了一种可以通过 key 或时间区间来查询消息的方法。这里只做科普不做详细介绍

整个消息存储的结构,最主要的就是 CommitLoq 和 ConsumeQueue 。而 ConsumeQueue 可以大概理解为 Topic 中的队列

RocketMQ之commitLog.png

RocketMQ 采用的是 混合型的存储结构 ,即为 Broker 单个实例下所有的队列共用一个日志数据文件来存储消息。有意思的是在同样高并发的 Kafka 中会为每个 Topic 分配一个存储文件。这就有点类似于我们有一大堆书需要装上书架,RockeMQ 是不分书的种类直接成批的塞上去的,而 Kafka 是将书本放入指定的分类区域的

而 RocketMQ 为什么要这么做呢?原因是 提高数据的写入效率 ,不分 Topic 意味着我们有更大的几率获取 成批 的消息进行数据写入,但也会带来一个麻烦就是读取消息的时候需要遍历整个大文件,这是非常耗时的

所以,在 RocketMQ 中又使用了 ConsumeQueue 作为每个队列的索引文件来 提升读取消息的效率。我们可以直接根据队列的消息序号,计算出索引的全局位置(索引序号*索引固定⻓度20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息

RockeMQ存储架构.png

首先,在最上面的那一块就是我刚刚讲的你现在可以直接 把 ConsumerQueue 理解为 Queue。

在图中最左边说明了 红色方块代表被写入的消息虚线方块代表等待被写入的。左边的生产者发送消息会指定 Topic 、QueueId 和具体消息内容,而在 Broker 中管你是哪门子消息,他直接 全部顺序存储到了 CommitLog 。而根据生产者指定的 Topic 和 QueueId 将这条消息本身在 CommitLog 的偏移(offset),消息本身大小,和 tag 的 hash 值存入对应的 ConsumeQueue 索引文件中。而在每个队列中都保存了 ConsumeOffset 即每个消费者组的消费位置,而消费者拉取消息进行消费的时候只需要根据 ConsumeOffset 获取下一个未被消费的消息就行了

CommitLog 文件要设计成固定大小的长度呢?内存映射机制


同步复制和异步复制

上面的同步刷盘和异步刷盘是在单个结点层面的,而同步复制和异步复制主要是指的 Borker 主从模式下,主节点返回消息给客户端的时候是否需要同步从节点

  • 同步复制: 也叫 “同步双写”,也就是说,只有消息同步双写到主从结点上时才返回写入成功
  • 异步复制: 消息写入主节点之后就直接返回写入成功

然而,很多事情是没有完美的方案的,就比如我们进行消息写入的节点越多就更能保证消息的可靠性,但是随之的性能也会下降,所以需要程序员根据特定业务场景去选择适应的主从复制方案

那么,异步复制会不会也像异步刷盘那样影响消息的可靠性呢?

答案是不会的,因为两者就是不同的概念,对于消息可靠性是通过不同的刷盘策略保证的,而像异步同步复制策略仅仅是影响到了 可用性 。为什么呢?其主要原因是 RocketMQ 是不支持自动主从切换的,当主节点挂掉之后,生产者就不能再给这个主节点生产消息了

比如这个时候采用异步复制的方式,在主节点还未发送完需要同步的消息的时候主节点挂掉了,这个时候从节点就少了一部分消息。但是此时生产者无法再给主节点生产消息了,消费者可以自动切换到从节点进行消费(仅仅是消费),所以在主节点挂掉的时间只会产生主从结点短暂的消息不一致的情况,降低了可用性,而当主节点重启之后,从节点那部分未来得及复制的消息还会继续复制

在单主从架构中,如果一个主节点挂掉了,那么也就意味着整个系统不能再生产了。那么这个可用性的问题能否解决呢?一个主从不行那就多个主从的呗,别忘了在我们最初的架构图中,每个 Topic 是分布在不同 Broker 中的

但是这种复制方式同样也会带来一个问题,那就是无法保证 严格顺序 。在上文中我们提到了如何保证的消息顺序性是通过将一个语义的消息发送在同一个队列中,使用 Topic 下的队列来保证顺序性的。如果此时我们主节点A负责的是订单A的一系列语义消息,然后它挂了,这样其他节点是无法代替主节点A的,如果我们任意节点都可以存入任何消息,那就没有顺序性可言了

而在 RocketMQ 中采用了 Dledger 解决这个问题。他要求在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客⼾端返回写⼊成功,并且它是⽀持通过选举来动态切换主节点的。这里我就不展开说明了,读者可以自己去了解

也不是说 Dledger 是个完美的方案,至少在 Dledger 选举过程中是无法提供服务的,而且他必须要使用三个节点或以上,如果多数节点同时挂掉他也是无法保证可用性的,而且要求消息复制板书以上节点的效率和直接异步复制还是有一定的差距的


定时消息

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。

RocketMQ 支持定时消息,但是不支持任意时间精度,支持特定的 level,例如定时 5s,10s,1m 等