消息队列

Message Queue

Posted by Ekko on August 19, 2020

参考资料 掘金消息队列3y三太子敖丙JavaGuide芋道源码

[TOC]


什么是消息队列

MQ 全称 Message Queue,消息队列( MQ )是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如 远程过程调用 的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求

把消息队列比作是一个存放消息的容器,当需要使用消息的时候可以取出消息供自己使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过 异步 处理提高系统性能和削峰降低系统耦合性。目前使用较多的消息队列有 ActiveMQ、RabbitMQ、Kafka、RocketMQ

;队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。比如生产者发送消息1,2,3…对于消费者就会按照1,2,3…的顺序来消费。但是偶尔也会出现消息被消费的顺序不对的情况,比如某个消息消费失败又或者一个 queue 多个consumer 也会导致消息被消费的顺序不对,我们一定要保证消息被消费的顺序正确

除了上面说的消息消费顺序的问题,使用消息队列,我们还要考虑如何保证消息不被重复消费?如何保证消息的可靠性传输(如何处理消息丢失的问题)?……等等问题。所以说使用消息队列也不是十全十美的,使用它也会让系统可用性降低、复杂度提高,另外需要我们保障一致性等问题

优点: 异步、削峰、解耦 缺点: 重复消费、消息丢失、顺序消费


解耦、异步、削峰

解耦:

传统模式:

消息队列之解耦传统模式.png

缺点: 系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦

中间件模式:

消息队列之解耦中间件模式.png

优点: 将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改

如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些

最常见的事件驱动架构类似生产者消费者模式,在大型网站中通常用利用消息队列实现事件驱动结构。如下图所示

消息队列实现事件驱动.png

消息队列是利用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。 从上图可以看到消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计

消息接受者对消息进行过滤、处理、包装后,构造成一个新的消息类型,将消息继续发送出去,等待其他消息接受者订阅该消息。因此基于事件(消息对象)驱动的业务架构可以是一系列流程

另外为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息

不要认为消息队列只能利用发布-订阅模式工作,只不过在解耦这个特定业务环境下是使用发布-订阅模式的。除了发布-订阅模式,还有点对点订阅模式(一个消息只有一个消费者),我们比较常用的是发布-订阅模式。 另外,这两种消息模型是 JMS (JAVA Message Service,java消息服务)提供的,AMQP 协议(Advanced Message Queuing Protocol 高级消息队列协议)还提供了 5 种消息模型


异步:

传统模式:

消息队列之异步传统模式.png

缺点: 一些非必要的业务逻辑以同步的方式运行,太耗费时间

消息队列之异步中间件模式.png

优点: 将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度

在不使用消息队列服务器的时候,用户的请求数据直接写入数据库,在高并发的情况下数据库压力剧增,使得响应速度变慢。但是在使用消息队列之后,用户的请求数据发送给消息队列之后立即返回,再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列服务器处理速度快于数据库(消息队列也比数据库有更好的伸缩性),因此响应速度得到大幅改善

通过以上分析我们可以得出消息队列具有很好的削峰作用的功能——即通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务

举例: 在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。平时流量很低,但是你要做秒杀活动 00 :00 的时候流量疯狂怼进来,服务器,Redis,MySQL 各自的承受能力都不一样,全部流量照单全收肯定有问题,直接就打挂了 DB

因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票


削峰:

消息队列之削峰传统模式.png

缺点: 并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常

消息队列之削峰中间件模式.png

优点: 系统 A 慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的

事件驱动架构

事件驱动架构(Event Driven Architecture,EDA)一个事件驱动框架(EDA)定义了一个设计和实现一个应用系统的方法学,在这个系统里事件可传输于松散耦合的组件和服务之间

一个事件驱动系统典型地由事件消费者和事件产生者组成。事件消费者向事件管理器订阅事件,事件产生者向事件管理器发布事件。当事件管理器从事件产生者那接收到一个事件时,事件管理把这个事件转送给相应的事件消费者。如果这个事件消费者是不可用的,事件管理者将保留这个事件,一段间隔之后再次转送该事件消费者。这种事件传送方法在基于消息的系统里就是:储存(store)和转送(forward)


使用消息队列带来的一些问题

系统可用性降低: 系统可用性在某种程度上降低,在加入 MQ 之前,不用考虑消息丢失或者说 MQ 挂掉等等的情况,但是,引入 MQ 之后就需要去考虑

系统复杂性提高: 加入 MQ 之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题

数据一致性问题:

上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了

这个其实是分布式服务本身就存在的一个问题,不仅仅是消息队列的问题,但是放在这里说是因为用了消息队列这个问题会暴露得比较严重一点

下单的服务自己保证自己的逻辑成功处理了,成功发了消息,但是优惠券系统,积分系统等等这么多系统,他们成功还是失败不能不考虑到

所有的服务都成功才能算这一次下单是成功的,那怎么才能保证数据一致性呢?分布式事务: 把下单,优惠券,积分。。。都放在一个事务里面一样,要成功一起成功,要失败一起失败


JMS 和 AMQP

JMS简介:

JMS(JAVA Message Service,java 消息服务)是 java 的消息服务,JMS 的客户端之间可以通过 JMS 服务进行异步的消息传输。JMS API是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性

ActiveMQ 就是基于 JMS 规范实现的

JMS两种消息模型:

  1. 点到点(P2P)模型

JMS点到点模型.png

使用队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用, 未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送 100 条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费)

  1. 发布/订阅(Pub/Sub)模型

JMS发布订阅模型.png

发布订阅模型(Pub/Sub) 使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息 广播之后 才订阅的用户是收不到该条消息的

JMS 五种不同的消息正文格式:

JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性

  • StreamMessage – Java原始值的数据流
  • MapMessage–一套名称-值对
  • TextMessage–一个字符串对象
  • ObjectMessage–一个序列化的 Java对象
  • BytesMessage–一个字节的数据流

AMQP简介

Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制

RabbitMQ 就是基于 AMQP 协议实现的


JMS vs AMQP

对比方向 JMS AMQP
定义 Java API 协议
跨语言
跨平台
支持消息类型 提供两种消息模式 1、peer-2-peer 2、pub/sub 提供了五种消息模型1、direct exchange;2、fanout exchange;3、topic change;4、header exchange;5、system exchange。本之来讲,后四种和 JMS 的pub/sub模型没有太大区别,仅是在路由机制上做了更详细的划分;
支持消息类型 支持多种消息类型1、StreamMessage – Java原始值的数据流2、MapMessage–一套名称-值对3、TextMessage–一个字符串对象4、ObjectMessage–一个序列化的 Java对象5、BytesMessage–一个字节的数据流 byte[] 二进制

总结

  • AMQP 为消息定义了线路层(wire-level protocol)的协议,而 JMS 所定义的是 API 规范。在 Java 体系中,多个 client 均可以通过 JMS 进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而 AMQP 天然具有跨平台、跨语言特性
  • JMS 支持 TextMessage、MapMessage 等复杂的消息类型;而 AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送)
  • 由于 Exchange 提供的路由算法,AMQP 可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅 方式两种

消息队列比较

消息队列比较.png


消息重复消费

消息重复消费是使用消息队列之后,必须考虑的一个问题,也是比较严重和常见的问题

就比如有这样的一个场景,用户下单成功后需要去一个活动页面给他加 GMV(销售总额),最后根据他的 GMV 去给他发奖励,这是电商活动很常见的玩法

下单了马上去看一些活动页面,有时候马上就有了,有时候缺延迟有很久,这个速度取决于消息队列的消费速度,消费慢堵塞了就迟点看到

下个单支付成功就发个消息出去,上面那个活动的开发人员就监听你的支付成功消息,监听到这个订单成功支付的消息,那就去活动 GMV 表里加上去

普通下单图解.png

但是一般消息队列的使用,都是有重试机制的,就是说下游的业务发生异常了,会抛出异常并且要求重新发一次消息

问题在于,监听消息的服务不止一个,其他服务也会有失败要求重发,但是加钱的服务是成功的,重发后,价钱的操作重复

下单异常重发.png

积分系统处理失败了,这个系统肯定要求重新发送一次这个消息,积分的系统重新接收并且处理成功了,但是别人的活动,优惠券等等服务也监听了这个消息,就可能出现活动系统给他 GMV 加两次,优惠券扣两次这种情况

真实的情况其实重试是很正常的,服务的网络抖动,开发人员代码 Bug,还有数据问题等都可能处理失败要求重发的


重复消费解决方法(接口幂等 ——— 强校验、弱校验)

幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中。在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同

幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。例如,“setTrue()”函数就是一个幂等函数,无论多次执行,其结果都是一样的.更复杂的操作幂等保证是利用唯一交易号(流水号)实现

通俗了讲就是同样的参数调用这个接口,调用多少次结果都是一个,你加 GMV 同一个订单号加一次是多少钱,加 N 次都还是多少钱。但是如果不做幂等,一个订单调用多次钱就会出现加多次,同理退款调用多次钱也就减多次

幂等流程.png

一般幂等,会分场景考虑,是强校验还是弱校验,比如跟金钱相关的场景就很关键,就做强校验,不是很重要的场景做弱校验

强校验:

比如监听到用户支付成功的消息,监听到了去加 GMV 需要调用加钱的接口,那加钱接口下面再调用一个加流水的接口,两个放在一个事务,成功一起成功,失败一起失败

每次消息过来都要拿着 订单号 + 业务场景这样的唯一标识 (比是天猫双十一活动)去流水表查,看看有没有这条流水,有就直接 return 不要走下面的流程了,没有就执行后面的逻辑

之所以用流水表,是因为涉及到金钱这样的活动,有什么问题后面也可以去流水表对账,还有就是帮助开发人员定位问题。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 强校验幂等伪代码演示大致逻辑
*/
public void process(String orderId){
    try{
        // 查询订单是否存在这个活动加GMV的流水
        Object gmvFlow = getFlowByOrderId("addGmv" + orderId);
        if (Objects.isNull(gmvFlow)) {
            // 不存在流水,做加GMV和加流水操作
            addGmvAndFlow(orderId);
        } else {
            // 存在说明处理过,直接返回
            return ;
        }
    } catch (Exception e) {
        // 发送异常 触发消息队列框架重试机制
    }
}

弱校验:

一些不重要的场景,比如给谁发短信,就把这个 id + 场景唯一标识 作为 Redis 的 key,放到缓存里面失效时间看你场景,一定时间内的这个消息就去 Redis 判断

用 KV 就算消息丢了可能这样的场景也没关系


消息队列的顺序消费

一般是同个业务场景下不同几个操作的消息同时过去,本身顺序是对的,但是发出去的时候同时发出去了,消费的时候却乱掉了,这样就会造成乱序问题

电商活动也是有这样的例子,数据量大的时候数据同步压力很大,有时候数据量大的表需要同步几个亿的数据。(并不是主从同步,主从延迟大会有问题,可能是从数据库或者主数据库同步到备库)

这种情况怼到队列里面去,然后慢慢消费,但问题是,在数据库同时对一个 Id 的数据进行了增、改、删三个操作,但是你消息发过去消费的时候变成了改,删、增,这样数据就不对,两者的结果完全不一样


顺序消费 —— RocketMQ解决案例

生产者消费者一般需要保证顺序消息的话,可能就是一个业务场景下的,比如订单的创建、支付、发货、收货

而这些东西都是同一个订单号。一个 topic 下有多个队列,为了保证发送有序,RocketMQ 提供了 MessageQueueSelector 队列选择机制,该机制有三种实现

MessageQueueSelector实现.png

可使用 Hash 取模法,让同一个订单发送到同一个队列中,再使用同步发送,只有同个订单的创建消息发送成功,再发送支付消息。这样就保证了发送有序

RocketMQ 的 topic 内的队列机制,可以保证存储满足 FIFO(First Input First Output 简单说就是指先进先出),剩下的只需要消费者顺序消费即可

RocketMQ仅保证顺序发送,顺序消费由消费者业务保证

同一批需要做到顺序消费的根据 hash 会投递到同一个 queue,同一个 queue 肯定会投递到同一个消费实例,同一个消费实例肯定是顺序拉取并顺序提交线程池的,只要保证消费端顺序消费即可

但是消费者是多线程的,消息是有序取出,但是不能保证有序处理。如果是使用 MessageListenerOrderly 则自带有序处理,如果是使用 MessageListenerConcurrently ,则需要把线程池改为单线程模式


消息队列的分布式事务

下单流程可能涉及到 10 多个环节,下单付钱都成功,但是优惠券扣减失败了,积分新增失败了,需要用分布式事务解决该问题

分布式事务大概分为:

  • 2pc(两段式提交)
  • 3pc(三段式提交)
  • TCC(Try、Confirm、Cancel)
  • 最大努力通知
  • XA
  • 本地消息表(ebay研发出的)
  • 半消息/最终一致性(RocketMQ)

2pc(两段式):

事务两段式提交.png

2pc(两段式提交)可以说是分布式事务的最开始的样子,就是通过消息中间件协调多个系统,在两个系统操作事务的时候都锁定资源但是不提交事务,等两者都准备好了,告诉消息中间件,然后再分别提交事务

如果 A 系统事务提交成功了,但是 B 系统在提交的时候网络波动或者各种原因提交失败了,其实还是会失败的

最终一致性:

事务最终一致性.png

能保证:

  • 业务主动方本地事务提交失败,业务被动方不会收到消息的投递
  • 只要业务主动方本地事务执行成功,那么消息服务一定会投递消息给下游的业务被动方,并最终保证业务被动方一定能成功消费该消息(消费成功或失败,即最终一定会有一个最终态)