[TOC]
Kafka 简介
Kafka 创建背景
Kafka 是一个消息系统,原本开发自 LinkedIn,用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。现在它已被多家不同类型的公司作为多种类型的数据管道和消息系统使用
活动流数据 是几乎所有站点在对其网站使用情况做报表时都要用到的数据中最常规的部分。活动数据包括页面访问量(Page View)、被查看内容方面的信息以及搜索情况等内容。这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件,然后周期性地对这些文件进行统计分析
运营数据 指的是服务器的性能数据(CPU、IO 使用率、请求时间、服务日志等等数据)。运营数据的统计方法种类繁多
近年来,活动流数据和运营数据处理已经成为了网站软件产品特性中一个至关重要的组成部分,这就需要一套稍微更加复杂的基础设施对其提供支持
Kafka 是由 Apache 软件基金会开发的一个开源流处理平台。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个按照分布式事务日志架构的大规模发布/订阅消息队列,这使它作为企业级基础设施来处理流式数据非常有价值。一般情况下,kafka 会作为多种类型的数据管道和消息系统使用
Kafka 是一种分布式的,基于发布 / 订阅的消息系统
主要设计目标如下:
- 以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输
- 支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 Partition 内的消息顺序传输
- 同时支持离线数据处理和实时数据处理
- Scale out:支持在线水平扩展
- 完全的分布式系统,Broker、Producer、Consumer 都原生自动支持分布式
- 自动实现负载均衡
- 支持 Hadoop 数据并行加载,对于像 Hadoop 的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka 通过 Hadoop 的并行加载机制,同时支持离线数据处理和实时数据处理
基础架构
生产者(producer)和 消费者(consumer)
对于 Kafka 来说客户端有两种基本类型:
- 生产者(Producer)
- 消费者(Consumer)
除此之外,还有用来做数据集成的 Kafka Connect API 和流式处理的 Kafka Streams 等高阶客户端,但这些高阶客户端底层仍然是生产者和消费者API,它们只不过是在上层做了封装
Producer(生产者)顾名思义,生产者就是负责向 Kafka 发送消息的应用程序
Consumer(消费者)与生产者对应的是消费者,应用程序可以通过 Kafka consumer 来订阅 topic 并且从订阅的 topic 中 pull 消息。Kafka consumer 和我们平时所见的一些消息中间件不同的是: Kafka 的消费理念中还有一个叫 Consumer Group (消费组)的概念。每个 Consumer 都会有一个与之对应的 Consumer Group。后续我们再来讨论消费者的设计
Broker 和集群(Cluster)
Broker: Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-1、broker-2等
一台 Kafka 服务器就是一个 Broker。一个集群由多个 Broker 组成。一个 broker 可以容纳多个 topic。Producer 通过 Broker 写入数据到相应的 topic。 Consumer 通过 Broker 从 Kafka topic 拉取数据
一台Kafka服务器叫做Broker,Kafka集群就是多台Kafka服务器
Kafka cluster: 中的 Broker 都是通过 Zookeeper 来管理的。一个集群中可能有多个 Zookeeper,实际上建议是 3 到 5 个,并保持一个满足使用的最小奇数,以节约资源
kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性
Kafka 的一个关键性质是日志保留(retention),我们可以配置主题的消息保留策略,譬如只保留一段时间的日志或者只保留特定大小的日志。当超过这些限制时,老的消息会被删除。我们也可以针对某个主题单独设置消息过期策略,这样对于不同应用可以实现个性化
主题(Topic)与分区(Partition)
Topic: 消息的主题,是一个逻辑的概念,可以理解为消息的分类,每一条发送到 Kafka 的消息都有一个类别,这就是 topic,producer 负责将消息发送到特定的 topic(发送到 Kafka 集群中的每一条消息都必须指定一个 topic),而 consumer 负责订阅 topic 并进行消费
kafka 的数据就保存在 topic。在每个 broker(kafka实例)上都可以创建多个 topic
Partition: Topic 的分区,是物理的概念,一个分区只属于一个主题
那既然用户生产和消费都只需要知道该消息数据所在的 topic,为什么 Kafka 还要引入 Partition 的概念呢?
其实他是对用户透明的,假设所有消息都只能存在一个 Broker 上面,日后这里必将成为性能瓶颈。所以把 topic 的数据分布到整个 Kafka 集群是一个非常合理的设计。Partition 的产生,就是为了为了可以解决这个水平扩展的问题
物理上,每个 partition 对应于一个文件夹。一个 Broker 上可以存放多个 Partition。这样,producer 可以将数据发送给多个 Broker 上的多个 Partition,consumer 也可以并行从多个 broker 上的不同 paritition 上读数据,从而实现了水平扩展
每个 topic 可以有多个分区,分区的作用是做负载,提高 kafka 的吞吐量。同一个 topic 在不同的分区的数据是不重复的,partition 的表现形式就是一个一个的文件夹
在 Kafka 中,消息以主题(Topic) 来分类,每一个主题都对应一个消息队列,这有点儿类似于数据库中的表。但是如果我们把所有同类的消息都塞入到一个“中心”队列中,势必缺少可伸缩性,无论是生产者/消费者数目的增加,还是消息数量的增加,都可能耗尽系统的性能或存储
我们使用一个生活中的例子来说明:现在 A 城市生产的某商品需要运输到 B 城市,走的是公路,那么单通道的高速公路会出现吞吐量不足的问题。所以我们现在引入分区(Partition) 的概念,类似“允许多修几条道”的方式对我们的主题完成了水平扩展
消费者(Consumer)和消费者组(Consumer Group)
Consumer: 消费者,即消息的消费方,是消息的出口。
Consumer Group: 我们可以将多个消费组组成一个消费者组,在 kafka 的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个 topic 的不同分区的数据,这也是为了提高 kafka 的吞吐量
Replication 副本
每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为 Leader。在 kafka 中默认副本的最大数量是 10 个,且副本的数量不能大于 Broker 的数量,follower 和 leader 绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)
数据存在不同的 partition 上,kafka 把这些 partition 做备份。比如,现在有三个 partition,分别存在三台 broker 上。每个 partition 都会备份,这些备份散落在不同的 broker 上
备份分区仅仅用作于备份,不做读写。如果某个Broker挂了,那就会选举出其他 Broker 的 partition 来作为主分区,这就实现了高可用
Kafka 存储在文件系统上
首先应该知道 Kafka 的消息是存在于文件系统之上的。Kafka 高度依赖文件系统来存储和缓存消息,一般的人认为 “磁盘是缓慢的”,所以对这样的设计持有怀疑态度。实际上,磁盘比人们预想的快很多也慢很多,这取决于它们如何被使用;一个好的磁盘结构设计可以使之跟网络速度一样快。
现代的操作系统针对磁盘的读写已经做了一些优化方案来加快磁盘的访问速度。比如,预读会提前将一个比较大的磁盘快读入内存。后写会将很多小的逻辑写操作合并起来组合成一个大的物理写操作。并且,操作系统还会将主内存剩余的所有空闲内存空间都用作磁盘缓存,所有的磁盘读写操作都会经过统一的磁盘缓存(除了直接 I/O 会绕过磁盘缓存)。综合这几点优化特点,如果是针对磁盘的顺序访问,某些情况下它可能比随机的内存访问都要快,甚至可以和网络的速度相差无几
上述的 Topic 其实是逻辑上的概念,面相消费者和生产者,物理上存储的其实是 Partition,每一个 Partition 最终对应一个目录,里面存储所有的消息和索引文件。默认情况下,每一个 Topic 在创建时如果不指定 Partition 数量时只会创建 1 个 Partition。比如,我创建了一个 Topic 名字为 test ,没有指定 Partition 的数量,那么会默认创建一个 test-0 的文件夹,这里的命名规则是:<topic_name>-<partition_id>
任何发布到 Partition 的消息都会被追加到 Partition 数据文件的尾部,这样的顺序写磁盘操作让 Kafka 的效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是 Kafka 高吞吐率的一个很重要的保证)
每一条消息被发送到 Broker 中,会根据 Partition 规则选择被存储到哪一个 Partition。如果 Partition 规则设置的合理,所有消息可以均匀分布到不同的 Partition中
Kafka 中的底层存储设计(Partition 以及如何查找消息)
假设我们现在 Kafka 集群只有一个 Broker,我们创建 2 个 Topic 名称分别为:「topic1」和「topic2」,Partition 数量分别为 1、2,那么我们的根目录下就会创建如下三个文件夹:
1
2
3
| --topic1-0
| --topic2-0
| --topic2-1
在 Kafka 的文件存储中,同一个 Topic 下有多个不同的 Partition ,每个 Partition 都为一个目录,而每一个目录又被平均分配成多个大小相等的 Segment File 中,Segment File 又由 index file 和 data file 组成,他们总是成对出现,后缀 “.index” 和 “.log” 分表表示 Segment 索引文件和数据文件
现在假设我们设置每个 Segment 大小为 500 MB,并启动生产者向 topic1 中写入大量数据,topic1-0 文件夹中就会产生类似如下的一些文件:
1
2
3
4
5
6
7
8
9
10
11
| --topic1-0
| --00000000000000000000.index
| --00000000000000000000.log
| --00000000000000368769.index
| --00000000000000368769.log
| --00000000000000737337.index
| --00000000000000737337.log
| --00000000000001105814.index
| --00000000000001105814.log
| --topic2-0
| --topic2-1
Segment 是 Kafka 文件存储的最小单位 Segment 文件命名规则:Partition 全局的第一个 Segment 从 0 开始,后续每个 Segment 文件名为上一个 Segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用0填充。如 00000000000000368769.index 和 00000000000000368769.log
以上面的一对 Segment File 为例,说明一下索引文件和数据文件对应关系:
其中以索引文件中元数据 <3, 497>
为例,依次在数据文件中表示第 3 个 message(在全局 Partition 表示第 368769 + 3 = 368772 个 message)以及该消息的物理偏移地址为 497
注意该 index 文件并不是从0开始,也不是每次递增1的,这是因为 Kafka 采取稀疏索引存储的方式,每隔一定字节的数据建立一条索引,它减少了索引文件大小,使得能够把 index 映射到内存,降低了查询时的磁盘 IO 开销,同时也并没有给查询带来太多的时间消耗
因为其文件名为上一个 Segment 最后一条消息的 offset ,所以当需要查找一个指定 offset 的 message 时,通过在所有 segment 的文件名中进行二分查找就能找到它归属的 segment ,再在其 index 文件中找到其对应到文件上的物理位置,就能拿出该 message
由于消息在 Partition 的 Segment 数据文件中是顺序读写的,且消息消费后不会删除(删除策略是针对过期的 Segment 文件),这种顺序磁盘 IO 存储设计是 Kafka 高性能很重要的原因
Partition 结构
partition 是比较具体的东西,Partition 在服务器上的表现形式就是一个一个的文件夹,每个 partition 的文件夹下面会有多组 segment 文件,每组 segment 文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息
如上图,这个 partition 有三组 segment 文件,每个 log 文件的大小是一样的,但是存储的 message 数量是不一定相等的(每条的 message 大小不一致)。文件的命名是以该segment 最小 offset 来命名的,如 000.index 存储 offset 为 0~368795 的消息,kafka 就是利用 分段+索引 的方式来解决查找效率的问题
Kafka 是如何准确的知道 message 的偏移的呢?这是因为在 Kafka 定义了标准的数据存储结构,在 Partition 中的每一条 message 都包含了以下三个属性:
Message 结构
- offset: 一个占 8 byte 的有序 id 号,表示 message 在当前 Partition 中的偏移量,是一个逻辑上的值,唯一确定了 Partition 中的一条 message,可以简单的认为是一个 id
- MessageSize: 消息大小占用 4 byte,表示 message 内容 data 的大小
- data: message 的具体内容,消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样
查找消息:
假如现在需要查找一个offset 为 368801 的 message 是什么样的过程呢?我们先看看下面的图:
- 先找到 offset 的 368801 message 所在的 segment 文件(利用二分法查找),这里找到的就是在第二个 segment 文件
- 打开找到的 segment 中的 .index 文件(也就是 368796.index 文件,该文件起始偏移量为 368796+1,我们要查找的 offset 为 368801 的 message 在该 index 内的偏移量为 368796 + 5= 368801,所以这里要查找的相对 offset 为 5)。由于该文件采用的是稀疏索引的方式存储着相对 offset 及对应 message 物理偏移量的关系,所以直接找相对 offset 为5的索引找不到,这里同样利用二分法查找相对 offset 小于或者等于指定的相对 offset 的索引条目中最大的那个相对 offset,所以找到的是相对 offset 为 4 的这个索引
- 根据找到的相对 offset 为 4 的索引确定 message 存储的物理偏移位置为 256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条 Message
这套机制是建立在 offset 为有序的基础上,利用 segment + 有序offset + 稀疏索引 + 二分查找 + 顺序查找 等多种手段来高效的查找数据。至此,消费者就能拿到需要处理的数据进行处理了。那每个消费者又是怎么记录自己消费的位置呢?在早期的版本中,消费者将消费到的 offset 维护 zookeeper 中,consumer 每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的 offset 已经直接维护在 kafka 集群的__consumer_offsets 这个 topic 中
存储策略
无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?
- 基于时间: 默认配置是 168 小时(7天)
- 基于大小: 默认配置是 1073741824
需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能
生产者设计概要
当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量?
举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消息到 Kafka,另一个服务来读取消息并根据规则引擎来检查交易是否通过,将结果通过 Kafka 返回。对于这样的业务,消息既不能丢失也不能重复,由于交易量大因此吞吐量需要尽可能大,延迟可以稍微高一点
再举个例子,假如我们需要收集用户在网页上的点击数据,对于这样的场景,少量消息丢失或者重复是可以容忍的,延迟多大都不重要只要不影响用户体验,吞吐则根据实时用户数来决定
不同的业务需要使用不同的写入方式和配置。具体的方式我们在这里不做讨论,现在先看下生产者写消息的基本流程:
流程如下:
- 首先,我们需要创建一个 ProducerRecord,这个对象需要包含消息的主题(topic)和值(value),可以选择性指定一个键值(key)或者分区(partition)
- 发送消息时,生产者会对键值和值序列化成字节数组,然后发送到分配器(partitioner)
- 如果我们指定了分区,那么分配器返回该分区即可;否则,分配器将会基于键值来选择一个分区并返回
- 选择完分区后,生产者知道了消息所属的主题和分区,它将这条记录添加到相同主题和分区的批量消息中,另一个线程负责发送这些批量消息到对应的 Kafka broker
- 当broker接收到消息后,如果成功写入则返回一个包含消息的主题、分区及位移的 RecordMetadata 对象,否则返回异常
- 生产者接收到结果后,对于异常可能会进行重试
producer 在写入数据的时候永远的找 leader,不会直接将数据写入 follower!那 leader 怎么找呢?写入的流程又是什么样的呢?我们看下图:
发送的流程就在图中已经说明了,需要注意的一点是,消息写入 leader 后,follower 是主动的去 leader 进行同步的!producer 采用 push 模式将数据发布到 broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!写入示意图如下:
上面说到数据会写入到不同的分区,那 kafka 为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:
-
方便扩展。因为一个 topic 可以有多个 partition ,所以我们可以通过扩展机器去轻松的应对日益增长的数据量
-
提高并发。以 partition 为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率
熟悉负载均衡的应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在 kafka 中,如果某个 topic 有多个 partition , producer 又怎么知道该将数据发往哪个 partition 呢? kafka 中有几个原则:
- partition 在写入的时候可以指定需要写入的 partition ,如果有指定,则写入对应的 partition
- 如果没有指定 partition ,但是设置了数据的 key,则会根据 key 的值 hash 出一个 partition
- 如果既没指定 partition,又没有设置 key,则会轮询选出一个 partition
保证消息不丢失是一个消息队列中间件的基本保证,那 producer 在向 kafka 写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过 ACK 应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认 kafka 接收到数据,这个参数可设置的值为 0、1、all
-
0 代表 producer 往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
-
1 代表 producer 往集群发送数据只要 leader 应答就可以发送下一条,只确保leader 发送成功
-
all 代表 producer 往集群发送数据需要所有的 follower 都完成从 leader 的同步才会发送下一条,确保 leader 发送成功和所有的副本都完成备份。安全性最高,但是效率最低
最后要注意的是,如果往不存在的 topic 写数据,能不能写入成功呢? kafka 会自动创建 topic ,分区和副本的数量根据默认配置都是 1
消费者设计概要
消费者与消费组
假设这么个场景:我们从 Kafka 中读取消息,并且进行检查,最后产生结果数据。我们可以创建一个消费者实例去做这件事情,但如果生产者写入消息的速度比消费者读取的速度快怎么办呢?这样随着时间增长,消息堆积越来越严重。对于这种场景,我们需要增加多个消费者来进行水平扩展
Kafka 消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个 T1 主题,该主题有 4 个分区;同时我们有一个消费组 G1,这个消费组只有一个消费者 C1。那么消费者 C1 将会收到这 4 个分区的消息,如下所示:
如果我们增加新的消费者C2到消费组G1,那么每个消费者将会分别收到两个分区的消息,如下所示:
如果增加到4个消费者,那么每个消费者将会分别收到一个分区的消息,如下所示:
但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息:
总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助
Kafka 一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息 换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组 G2,而这个消费组有两个消费者,那么会是这样的:
在这个场景中,消费组 G1 和消费组 G2 都能收到 T1 主题的全量消息,在逻辑意义上来说它们属于不同的应用
最后,总结起来就是:如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者
消费者和消费者组,有这两个重要的特性:
- 如果所有的 consumer 都属于一个 consumer group,那么所有的消息会被均匀地投递给每一个消费者。就是每一个消息都只会被一个消费者处理(有点像点对点的应用,或者说单播放)
- 如果多个 consumer 分别属于多个 consumer group,那么消息会被发送(或者直接说广播)给所有的消费者组,消费者组内只会被一个消费者处理,这样子可以组成一个广播模式的应用,其实就是相当于 pub/sub 模型
消费组与分区重平衡
可以看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)
重平衡是 Kafka 一个很重要的性质,这个性质保证了高可用和水平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。 而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何 安全的进行重平衡 以及如何尽可能避免
消费者通过定期发送心跳(heartbeat)到一个作为组协调者(group coordinator)的 broker 来保持在消费组内存活。这个 broker 不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳
如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期
在 0.10.1 版本,Kafka 对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的 Kafka 支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费
Partition 与消费模型
上面提到,Kafka 中一个 topic 中的消息是被打散分配在多个 Partition(分区) 中存储的, Consumer Group 在消费时需要从不同的 Partition 获取消息,那最终如何重建出 Topic 中消息的顺序呢?
答案是:没有办法。Kafka 只会保证在 Partition 内消息是有序的,而不管全局的情况。
下一个问题是:Partition 中的消息可以被(不同的 Consumer Group)多次消费,那 Partition 中被消费的消息是何时删除的? Partition 又是如何知道一个 Consumer Group 当前消费的位置呢?
无论消息是否被消费,除非消息到期 Partition 从不删除消息。例如设置保留时间为 2 天,则消息发布 2 天内任何 Group 都可以消费,2 天后,消息自动被删除。 Partition 会为每个 Consumer Group 保存一个偏移量,记录 Group 消费到的位置。 如下图:
为什么 Kafka 是 pull 模型
消费者应该向 Broker 要数据(pull)还是 Broker 向消费者推送数据(push)?作为一个消息系统,Kafka 遵循了传统的方式,选择由 Producer 向 broker push 消息并由 Consumer 从 broker pull 消息。一些 logging-centric system,比如 Facebook 的Scribe和 Cloudera 的Flume,采用 push 模式。事实上,push 模式和 pull 模式各有优劣
push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。 push 模式的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息
对于 Kafka 而言,pull 模式更合适。 pull 模式可简化 broker 的设计,Consumer 可自主控制消费消息的速率,同时 Consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义
Kafka 如何保证可靠性
当我们讨论可靠性的时候,我们总会提到 保证 这个词语。可靠性保证是基础,我们基于这些基础之上构建我们的应用。比如关系型数据库的可靠性保证是ACID,也就是原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)
Kafka 中的可靠性保证有如下四点:
- 对于一个分区来说,它的消息是有序的。如果一个生产者向一个分区先写入消息 A,然后写入消息 B,那么消费者会先读取消息 A 再读取消息 B
- 当消息写入所有 in-sync 状态的副本后,消息才会认为已提交(committed)。这里的写入有可能只是写入到文件系统的缓存,不一定刷新到磁盘。生产者可以等待不同时机的确认,比如等待分区主副本写入即返回,后者等待所有 in-sync 状态副本写入才返回
- 一旦消息已提交,那么只要有一个副本存活,数据不会丢失
- 消费者只能读取到已提交的消息
使用这些基础保证,我们构建一个可靠的系统,这时候需要考虑一个问题:究竟我们的应用需要多大程度的可靠性?可靠性不是无偿的,它与系统可用性、吞吐量、延迟和硬件价格息息相关,得此失彼。因此,我们往往需要做权衡,一味的追求可靠性并不实际
Kafka 如何实现简单延时队列
所谓的延时队列,就是指消息被发送出去以后,并不想让消费者立即获取来消费,而是等待到了特定的时间,如果到了 30 分钟后,消费者才能获取这个消息进行消费,有时候也叫做“延迟队列”
延时队列在我们日常业务中很常见:
- 在购物 app 中,用户下单后有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单就会进入异常处理流程。这就有了延迟 30 分钟的需求了
- 用户在收到网购的商品 1 天后,发短信通知用户进行购物体验的评价
- 智能家具场景,在我离开家后一小时,关闭新风机。这时候其实是用户发送了一个指令到延时队列,当指令执行的时间到达以后,在将此指令推送到新风机中,关闭新风机
下面简单说一下通过延时等级划分的方式实现简单的延时队列
实现延时队列有一个及其简单的方案,就是在消费者 pull 到消息数据的时候,查看当前消息是否到达了消费的时间,否则回写该消费到 topic 中,等待着下一次消费 It sounds pretty good。但是这个问题就像为什么 setTimeout 不准确一样。当消息大量堆积的时候,原本消息被拉出来只需要等待 10s 就能消费了,但是重新入队以后,再次出来,可能是 1 个小时之后的事情了。显然这个方案不适宜投产,延迟太高
假设我们现在要实现一个支持 2 小时内的延时队列,精确到秒级别,也就是 7200s。如果按照秒级来区分消费的分类,那我们我们需要 7200 个 topic,topic 背后又有 n 多分区,每个分区还有 n 多副本,每个副本背后还有多个日志段…等等。是一笔巨大的资源浪费,整个集群的吞吐会大幅度降低
此时你就会产品经理商量,是否允许有一定的误差呢?产品经理同意了。ok,这个问题突然就变得简单起来了
我们可以按照消息的预计发送时间,划分出来一些延时等级。例如我们这里是 2h,那么我们可以划分出来,5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1h、2h。延时的消息按照不同的延时等级,被放入不同的 topic 当中,如果是和等级不一致的消息,会被强制转为和等级一致的延时时间,这样延时的误差可以控制在两个延时等级的时间差范围内。例如,现在有个消息要在 19s 之后被消费,按照我们定下来的规则,他会被投递到 30s 的延时等级中,也就是说这个消息将会在 30s 后被消费,比起秒级延时队列,这条消息的误差晚了 11s 被消费。虽然存在着一定的误差,但是误差算是可控,并且这样只需要增加少量的 topic,就能实现延时队列的功能,而且非常简单
如图所示, Producer 写入数据的时候,发送到不同的 deplay*topic
中,而消费者消费的 real_topic*_
中的数据。这里为 Producer 写一个拦截器,根据约定好 header,timestamp,key 之类的信息进行拦截,可以实现 Producer 看着像直接写入 real*topic*_
中,但是内部被拦截了写入到 deplay*topic
中,这样的话,开发者不必关心内部延时队列的实现。例如 Producer 要写入一个 8s 的延时消息到 real_topic_*
给 consumer 消费,那么他写了以后会被拦截,放入 10s 的 deplay_topic 中
Delay Service 简单来说内部实现其实是一个 consumer 进程 + DelayQueue + Producer 进程。消息进入到不同等级的 deplay*topic
后,会被 Delay Service 的 consumer 进程消费背景进入相应的 DelayQueue,时间到了以后就会被 Producer 进程发送到 real_topic_*
被真实的 consumer 消费。Delay Service 应保证和 Broker 一对一部署,以保证服务的可用
这种延时的实现方案会有一定的误差,无法做到秒级的精确延时。但是对于精度要求不高的业务,只要延时等级设计合理,还是有一定的应用价值的
kafka 原理总结
kafka 和传统的 MQ 一样,由 producer,broker 和 consumer 组成
kafka 对外使用 topic 的概念,生产者往 topic 里写消息,消费者从 topic 读消息。为了做到水平扩展,一个 topic 实际是由多个 partition 组成的,遇到瓶颈时,可以通过增加 partition 的数量来进行横向扩容。单个 parition 内是保证消息有序。 每新写一条消息,kafka 就是在对应的文件 append 写,所以性能非常高。
consumerGroup: 每个 Consumer 会组成一个 Group。kafka 中一个 parititon 只能被每个 Group 中的一个 consumer 线程去消费,也就是说,如果一个 partition 被 group1 中的 consumer1 去消费了那么其中的 consumer2 就不能去消费。如果你想多个 consumer 去消费,只能再去开一个 group
通常来讲,每个 Group 之间往往是处理不同的业务逻辑。如果处理相同的业务逻辑,那么可能会导致处理重复的情况,因为每个 consumer 线程默认会从头去消费这个 partition,并且一定会把这个 partition 消费完。这里说一下,为什么一个 Group 内只有一个 Consumer 线程可以去消费。首先,我们要明白一点,每个 Group 处理的是一个业务逻辑,那么它下面的 consumer 也是一样的。如果,我们现在让两个 consumer 线程去同时消费一个 partiiton,那么就会出现冲突问题,如果 consumer1,消费到了 offset 为 5 的地方,那么 consumer2 要去消费这个 partition 就要从 5 的地方开始。这里明显就会有竞争的。因为 partition 的消费是串行的,所以高效。(这里的高效意思是按照 offset 顺序去消费,所以时间复杂度是 O(1))那么如果要保证这一点,就需要给它加锁。这样就会导致 consume 的性能下降,吞吐量不够。为了保证吞吐量,kafka 只允许同一个 consumer group 下的一个 consumer 线程去访问一个 partition。如果这时候还觉得效率不高的时候,可以加 partition 的数量来横向扩展,同时扩展 consumer 线程的数量
一个 group 下的 consumer 的数量往往是取决于这个 group 订阅的 topic 下的 partition 数量。上面也说了,一个 partition 只能被一个 group 里的一个 consumer 去消费。这里还有一个很重要的地方,一个 group 一旦订阅了一个 topic,就会消费完所有的消息。这里就会三种情况,consumer 数量与 partition 数量刚刚好,那么每个 consumer 线程就会去消费一个 partition 刚刚好。如果 consumer 数量小于 partition,那么,其中某个/某些 consumer 就会去消费多个 partition。对,一个 consumer 可以消费多个 partition,但是一个 partition 只能被一个 consumer 消费。也就是说,第三种情况,consumer 数量多于 partition 数量的时候,会有 consumer 空闲出来,浪费资源。也就是说,通常来讲,consumer 数量最好与 group 订阅的 topic 下的 partition 数量相同
这里可能会有两个问题,一是如果 consumer 数量少于 partition,那么 partition 是怎么均衡的分配给 consumer 的?第二个问题,上面我也说了,感觉效率不高的话,可以横向扩展,那么新扩展的 partition 要怎么分配。这里其实用到的是一个 ConsumerRebalance 的方法(这个方法放后面说明),ConsumerRebalance 发生在两种情况下,一是 consumer 的增减,二是 partition 的增减
consumer: group 说的差不多了,来说说里面的 consumer。之前也说过了,consumer 处理 message 是顺序读的,所以才高效。那么这个 offset(每次读到的位置)其实并不是由 consumer 去管理的,以前是存在 zookeeper 中的,由 zk 统一管理(包括 producer 也是由 zk 管理的)。由于 zk 的写性能不好,以前的解决方法都是 consumer 每隔一分钟上报一次。这里 zk 的性能严重影响了消费的速度,而且很容易出现重复消费。在 0.10 版本后(好像是这个版本),kafka 把这个 offset 的保存,从 zk 中剥离出来,保存在一个名叫consumeroffsets topic 的 topic 中。写进消息的 key 由 groupid、topic、partition 组成,value 是偏移量 offset(这里注意一点,这个消息是往consumeroffsets topic 中写的消息,并非是 producer 生产的消息,不要误解)。topic 配置的清理策略是 compact。总是保留最新的 key,其余删掉
Kafka 提供了两套 consumer api,有 high level api,替我们把很多事情都干了,offset,路由啥都替我们干了,用以来很简单。还有 simple api,offset 啥的都是要我们自己记录。所以,事实上,consumer 是可以任意访问 offset 对应的消息的,因为这些数据是不会消失的,kafka 中的 message 并不是消费掉就没了,而是会保存一个你设置的失效时间。一般情况下,每个 key 的 offset 都是缓存在内存中,查询的时候不用遍历 partition,如果没有缓存,第一次就会遍历 partition 建立缓存,然后查询返回
consumer 有个 enable.auto.commit 的属性,是读自动提交,默认 true。还有 auto.commit.interval.ms,默认是 5s。因为就是说每 5 秒,使用者将向 kafka 提交其偏移量,或者每次从指定主题中提取数据时,都将提交最新的偏移量。这里有个问题就是拿到了这个 offset 的数据,还没来得及消费,然后这个 consumer 挂了,但是 offset 已经提交上去了。这时,这个 message 基本就已经废了,所以如果想避免这种情况,可以直接把 enable.auto.commit 设置为 false,那么就是消费完这个 message 才会去 commit offset。但是,这样的话 consumer 响应就会比较慢,因为需要有个消费过程
Topic 和 Partition: topic 相当于传统意义上的 MQ,producer 端发送的 message 必须指定是发送到哪个 topic,但是不需要指定 topic 下的哪个 partition,因为 kafka 会把收到的 message 进行 load balance,均匀的分布在这个 topic 下的不同的 partition 上( hash(message) % [partition数量]
)。每个 Topic 是有多个 Partition 组成的每个 Partiton 里的位置就是一个 Message 每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部,每条消息在文件中的位置称为 offset(偏移量)。消费者消费的时候根据 offset 去消费,同时消息被消费并不会消失,需要保留一段时间后(根据 broker 中的配置)才会被删除。设计 partition 的原因应该是 kafka 是基于文件存储的,通过分区,可以将日志内容分散到多个 broker 上,避免文件尺寸达到磁盘上限
每个 partition 都会有 partition replica,也就是 partition 备份,kafka 可以配置 partition 需要的 replica 个数。当存在多副本的情况下,会尽量把多个副本,分配到不同的 broker 上。kafka 会为 partition 选出一个 leader,leader 会去处理所有对 partition 的请求,不论读写,同时也只有 leader 会去处理,然后再同步到其他的 follower。当 broker 挂了后,所有 leader 在该 broker 上的 partition 都会重新选举,选出一个 leader。(这里不像分布式文件存储系统那样会自动进行复制保持副本数)
这里就衍生出来两个问题
- leader 怎么选?
- partition 怎么分配。leader 怎么选我们后面说(因为现在说了,等会说容灾就没话了)
partition 分配方式,主要有三步:
- 将所有的 Broker 和待分配的 Partition 排序。
- 将 i 个 Partition 分配到第(I mod n)个 broker 上。
- 将第 i 个 Partition 的第 j 个 Replica 分配到((i+j) mod n)个 Broker 上
Producer: Producer 创建一条记录,记录中一个要指定对应的 topic 和 value,key 和 partition 可选。先序列化,然后按照 topic 和 partition,放进对应的发送队列中。同时它有三种发送的方式,同步,异步和批量
同步发送: 等接收方响应后才会去发送下一个
异步发送: 则是不等对方,直接发送
批量发送: 其实是在异步发送模式下,将消息缓存到内存里,然后一次性发送出去。为了避免引起误会,我再次说明一下,批量发送是在异步发生模式下,它其实是异步模式的一种,有两个可选的配置参数。batch.size 通过这个参数来设置批量提交的数据大小,默认是 16k,当积压的消息达到这个值的时候就会统一发送(发往同一分区的消息) 。linger.ms 这个设置是为发送设置一定是延迟来收集更多的消息,默认大小是 0 ms(就是有消息就立即发送)。当这两个参数同时设置的时候,只要两个条件中满足一个就会发送
这其中有个非常重要的参数“acks”,这个参数决定了 producer 要求 leader partition 收到确认的副本个数,如果 acks 设置数量为 0,表示 producer 不会等待 broker 的响应,所以,producer 无法知道消息是否发送成功,这样有可能会导致数据丢失,但同时,acks 值为 0 会得到最大的系统吞吐量。若 acks 设置为 1,表示 producer 会在 leader partition 收到消息时得到 broker 的一个确认,这样会有更好的可靠性,因为客户端会等待直到 broker 确认收到消息。若设置为 all(以前的版本是设置为-1,后来是 all),producer 会在所有备份的 partition 收到消息时得到 broker 的确认,这个设置可以得到最高的可靠性保证
kafka 容灾/一致性
Kafka 容灾主要依赖于 Replication 机制,Replication 可以在 server 的配置文件中去配置。通常来讲,broker 的个数是至少大于 Replication 的。其实在一个集群中,broker 的个数一般是和最大的 Replication 个数齐平的。
理论上来讲,follower 上的日志和 leader 上的基本一致。再具体说明之前,需要先说 kafka 中的一个东西,ISR,它是一个“in sync”的 node list(in-sync replica 已同步的副本)。存在于 ISR 里的 follower 是会变化的,如果一个 follower 宕机,或者落后 leader 太多,leader 将把它从 ISR 中踢出。当它活过来/再次跟上时会再拉进来。“落后太多”是根据配置文件中的设定( 之后好像这个已经被删除了,因为这个值很难确定,而且高峰期会出现进进出出的情况)。Kafka 的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。对于 ISR 中的同步副本它通常要满足几个条件:
- leader 副本是同步副本。(关于副本的内容之后会再说,首先知道有这么个要求)
- 与 zookeeper 之间有一个活跃的会话,也即在过去 6S(可配置)内向 zookeeper 发送过心跳。
- 在过去的 10S 内(可配置)从 leader 副本那里获得过信息。
- 在过去 10S 内从 leader 副本那里获取过最新的信息。(光从 leader 那里获取信息是不够的,还必须是几乎零延迟的)
如果 leader 挂了,需要在 followers 中选择一个新的 leader。但是 followers 本身是可能有问题的,follower 可能“落后”或者 follower 也挂了,所以必须选择高质量的 follower 作为 leader。必须保证,一旦一个消息被提交了,但是 leader 挂掉了,新选出的 leader 必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的 leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为 leader。Kafka 并不是使用这种方法。前面也说了,kafka 维护这一个 ISR 队列,在这个队列中,follower 是和 leader 保持高度一致的,强同步情况下,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。这里就是由 request.required.acks 来决定的(这个 acks 和 producer 中的 acks 是同一个)。因此这个集合中的节点是最优先考虑容灾的。想要一个数据都不丢失,acks 必须设置为 all。ISR 中有 n+1 个节点,就可以允许在 n 个节点挂掉的情况下不会丢失消息并正常提供服务。
这里就有一个问题,如果所有的节点都挂了怎么办?主要策略有两种:
- 等待 ISR 中的任何一个节点恢复并担任 leader。
- 选择所有节点中(不只是 ISR)第一个恢复的节点作为 leader。
但是,对于 1 而言,ISR 中的节点如果长时间不恢复,那么 kafka 就会长时间不能使用。对于 2 而言,第一个醒的节点如果不是 ISR 中的,那么它的数据可能丢失的很多。总之这是个权衡问题,基于具体的项目具体对待
副本的复制方式
- Follower 根据自身拥有多少个需要同步的 topic Partition 来创建相对应的 partitionFetchState,记录了从 leader 的哪个 offset 开始获取数据
- follower 会根据 leader 的 brokeId 和 topicPartition 经过 hash 计算的 partitionId(这里其实就是前面怎么把 partiton 去分到 broker 上)来创建复制线程 ReplicationFetchThread
- ReplicaFetchThread 会根据 partitionFetchState 提供的信息不停地从 leader 获取数据,每次成功复制后,都会更新 partitionFetchState 的 fetchOffset
- 如果 fetchOffset 越界,则会对 followerPartitionLog 进行 truncate,然后冲去拉取 offset
- 每次 log 的增加,都可能会触发一个新的 logSegement 的产生,原因可以是 log index or time index 容量满了,或者上次 append log 太久,超过了阈值
分区副本与 leader 不同步的情况
- 慢副本:在一定周期内 follower 不能追赶上 leader。最常见的原因之一是 I/O 瓶颈导致 follower 复制速度慢于 leader 拉取速度
- 卡主副本:在一定周期时间内 follower 停止从 leader 拉取请求/可能由于 GC 暂停或 follower 失效死亡
- 新启动副本:当用户给主题增加副本因子时,新的 follower 不在同步副本列表中,直到他们完全赶上了 leader 日志
数据的一致性:在 kafka 中,数据一致性的含义是若某条消息对 Consumer 可见,那么即使 Leader 宕机了,在新 Leader 上数据依然可以被读到。HighWaterMark 简称 HW: Partition 的高水位,取一个 partition 对应的 ISR 中最小的 LEO(日志末端位移(log end offset),它指向的是下一条消息的位移)作为 HW,消费者最多只能消费到 HW 所在的位置,另外每个 replica 都有 highWatermark,leader 和 follower 各自负责更新自己的 highWatermark 状态,highWatermark <= leader. LogEndOffset。对于 Leader 新写入的 message,Consumer 不能立刻消费,Leader 会等待该消息被所有 ISR 中的 replica 同步后,更新 HW,此时该消息才能被 Consumer 消费,即 Consumer 最多只能消费到 HW 位置。这样就保证了如果 Leader Broker 失效,该消息仍然可以从新选举的 Leader 中获取。对于来自内部 Broker 的读取请求,没有 HW 的限制。同时,Follower 也会维护一份自己的 HW,Folloer.HW = min(Leader.HW,Follower.LEO)
高水位截止:(补充一下高水位截止的含义,因为 leader 是取最小的 HW,那么就可能存在 follower 比 leader 高的情况,这时 follower 需要把多出来的部分 truncate,称为高水位截止)之所以把这里再次拿出来说的原因是,我在看到这个地方的时候就产生一种疑问:高水位截止会不会导致数据丢失?这里我们就需要明白几点,leader 和 follower 的 LEO 和 HW 是什么时候更新的,在此之前,我们需要重新来理解一下 kafka 的 HW 和 LEO 机制(上述的数据一致性部分,请参考起来看,因为 kafka 的一致性确实依赖于 HW 和 LEO 机制)
LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果 LEO=10,那么表示该副本保存了 10 条消息,位移值范围是[0, 9]
。另外,leader LEO 和 follower LEO 的更新是有区别的。
HW:即上面提到的水位值。对于同一个副本对象而言,其 HW 值不会大于 LEO 值。小于等于 HW 值的所有消息都被认为是“已备份”的(replicated)
如图:HW 的位置是 7,那么 0-7 是都是已经备份好的,LEO 在 15,那么 8-14 都是还没开始备份的,15 则是下一条消息的位置
Kafka 分区下会有很多个副本用于实现冗余来进一步实现高可用。副本根据角色的不同可大致分为 3 类:
- leader 副本:响应 clients 端读写请求的副本
- follower 副本:被动地备份 leader 副本中的数据,不能响应 clients 端读写请求。
- appendISR 副本:包含了 leader 副本和所有与 leader 副本保持同步的 follower 副本(同步副本的判断之前已经提过了)
这里需要大家明白一个很重要的点:Kafka 有两套 follower 副本 LEO:1. 一套 LEO 保存在 follower 副本所在 broker 的副本管理机中;2. 另一套 LEO 保存在 leader 副本所在 broker 的副本管理机中——换句话说,leader 副本机器上保存了所有的 follower 副本的 LEO(其实这里也不难理解,follower 肯定自己有一套 LEO,leader 需要知道每个 follower 的 LEO 来帮助自己去判断是否可以更新 HW,所以 leader 也会存一套)。
看到这里,我们就需要知道,leader 副本的 LEO 和 HW,follower 副本的 LEO 和 HW 以及 leader 上存的 follower 的 LEO 什么时候更新,只有清楚了这些更新时间才能去判断是否会产生数据丢失或者数据不一致性问题。(为了避免误会,说明一下,下面的顺序并没有任何意义,并非每个更新的顺序,请一切参考内容)
follower 副本更新 LEO:写入时就会更新,follower 发送 FETCH 请求后,leader 将数据返回给 follower,此时 follower 开始向底层 log 写数据,从而自动地更新 LEO 值。
leader 上的 follower 副本更新 LEO:leader 接收到 follower 发送的 FETCH 请求,它首先会从自己的 log 中读取相应的数据,但是在给 follower 返回数据之前它先去更新 follower 的 LEO。
leader 的 LEO 更新:和 follower 一样,写入时就会更新。
follower 更新 HW:在更新 LEO 之后,follower 向 log 写完数据时会尝试更新它自己的 HW 值。具体做法就是比较当前 LEO 值与 FETCH 响应中 leader 的 HW 值,取两者的小者作为新的 HW 值。这里就会明白一点,follower HW 永远不可能超过 leader,因为 leader 的 HW 是整个分区的 HW,所以 leader 的 HW 一定是最高的。
leader 更新 HW:这个地方是最重要的,上面也说了,leader 的 HW 是整个分区的 HW,那么就意味着,这个 HW 直接决定了 consumer 可不可见。所以它的更新就显得很关键了。大概有 4 种情况下 leader 会尝试去更新分区 HW——切记是尝试,有可能因为不满足条件而不做任何更新:
- 副本成为 leader 副本时:当某个副本成为了分区的 leader 副本,Kafka 会尝试去更新分区 HW。这是显而易见的道理,毕竟分区 leader 发生了变更,这个副本的状态是一定要检查的。
- broker 出现崩溃导致副本被踢出 ISR 时:若有 broker 崩溃则必须查看下是否会波及此分区,因此检查下分区 HW 值是否需要更新是有必要的。
- producer 向 leader 副本写入消息时:因为写入消息会更新 leader 的 LEO,故有必要再查看下 HW 值是否也需要修改
- leader 处理 follower FETCH 请求时:当 leader 处理 follower 的 FETCH 请求时首先会从底层的 log 读取数据,之后会尝试更新分区 HW 值
当 kafka 正常工作的时候,leader 尝试更新 HW 是有 producer 请求和 follower 的 FETCH 请求时。这时 leader 会考虑所有符合条件的副本 LEO,当然也包括 leader 自身的 LEO。两个条件分别是:
- 处于 ISR 中
- 副本 LEO 落后于 leader LEO 的时长不大于 replica.lag.time.max.ms 参数值(默认是 10s)
细心的同学可能会发现,第二个条件是满足同步副本的条件之一,而 ISR 中的就是同步副本。只满足第二个条件肯定是不行的,因为如果只满足第二个条件,可能会产生某个 follower 已经“追上”来了,但不在 ISR 中,这个肯定是不能参考的。换句话说,HW 取的是 ISR 中 LEO 的最小值。那么只有条件一可以不?在 ISR 中会不会产生时长大于 replica.lag.time.max.ms 的 follower?因为官方给出的 ISR 会自动踢出不满足同步副本的 follower。所以我个人认为只有条件一是可以的(官方给出的两个条件必须同时满足,一切还是以官方为主)。
现在我们来讨论一下,是否有可能丢数据或数据不一致的问题
看了上述解释,我想大家对流程应该理的差不多了。那么,我们就能得到一个事实:kafka 使用 HW 值来决定副本备份的进度,而 follower 的 HW 值的更新通常需要额外一轮 FETCH 请求才能完成(参考 followerHW 更新时间),那么会不会产生数据丢失或者不一致,答案是会的
数据丢失
假设我们现在只有一主一从两副本。一个 leader 一个 follower,某个时刻时,leader 收到了 follower 的 FETCH 请求,这时 leader 去更新自己的 HW,这个时候 follower 重启了,重启后需要重新去拉 HW。再去发一个 FETCH 请求,这个时候恰好 leader 挂了。那么就要重新选主,follower 就会成为新主,这个时候 follower 里的 HW 并没有更新,但是对于 consumer 而言,这个 HW 已经更新过了,也就是说,这个数据就不存在了。
数据不一致
我们还是只有一主一从两副本。一个 leader 一个 follower。假设此时 leader 收到了一条消息 m2,情况同上时,也就是 leader 刚更新完 HW,follower 还没来得及更新的那一刻,leader 和 follower 全挂了。这个时候要等最先复活的,之前的 follower 先复活了,并且这个时候成为了新的 leader,它的接收了新消息 m3,这个时候新 leader 的 HW 会+1。注意这个时候新 leader 的 HW 和原来 follower 的 HW 位置是一样的。然后旧的 leader 活了,成为了 follower,此时,两者的 HW 相同,但是这个位置的数据却不一致。
综上其实说明一个问题,kafka 使用 HW 值来决定副本备份的进度,而 follower 的 HW 值的更新通常需要额外一轮 FETCH 请求才能完成(参考 followerHW 更新时间),而这种设计是有问题的。
事实上用高水位截止来标识备份进度只用于 kafka0.11 之前的版本,在 0.11 之后是采用 leader epoch 来标识备份进度。
现在我们来说,leader epoch 是怎么工作的,先了解几个基础概念:
Leader Epoch: 32 位,单调递增的数字。代表单个分区所处的 leader 时代。每发生一次 leader 转换,就+1。例如 leader epoch =2,说明处于第二 leader 时代。
Leader Epoch Start Offset:该 epoch 版本的 leader 写入第一条消息的位移。
Leader Epoch Sequence File: 存储 Leader Epoch 和 Leader Epoch Start Offset 对(epoch,offset),类似如下形式:
(0,100) , (1,200) , (3,500)
然后具体分析之前,我们先得知道 leader epoch 的具体流程,我把官方给出的流程列到这。(略)
通俗来讲,就是 Leader 端多开辟一段内存区域专门保存 leader 的 epoch 信息。leader broker 中会保存这样的一个缓存,并定期地写入到一个 Leader Epoch Sequence File 文件中。当 leader 写底层 log 时它会尝试更新整个缓存——如果这个 leader 首次写消息,则会在缓存中增加一个条目;否则就不做更新。而每次副本重新成为 leader 时会查询这部分缓存,获取出对应 leader 版本的位移,这就不会发生数据不一致和丢失的情况
更通俗来说就是每个副本都引入了新的状态来保存自己当 leader 时开始写入的第一条消息的 offset 以及 leader 版本。这样在恢复的时候完全使用这些信息而非水位来判断是否需要截断日志
现在我们去尝试着解决上面那两个问题
解决数据丢失:A 重启后,向 B 发送 LeaderEpochRequest 请求,此时由于 Leader Epoch 相等,所以 B 返回给 A 的是 B 的 LEO。此时 B 挂掉,A 接收到 LEO=2,与自己相同,所以不会截断,A 成为 leader,在文件中添加新的 Leader epoch 和开始偏移即可
这是官方给出的解释,我在这里有个问题,如果 A 刚重启完,还没来得及发 LeaderEpochRequest,B 就挂了,这个时候怎么办?因为这里不是按 HW 进行截止的,而是主要依赖于 LEO。如果此时 B 直接挂了,A 就会被选为 leader,那么就要依赖于 A 的 LEO,这个时候 A 的 LEO 是处于 2 的位置的,HW 是否会直接变成 2 而不进行截止。(此处是我的推论,未经证实,只是我看到的时候产生了疑问)
解决数据不一致:A、B 宕机后,B 先重启成为 leader,此时 Leader Epoch 由 0 变为 1,并且 LE 的开始偏移为 1。此时 A 重启,向 B 发送自己宕机时所处的 Leader Epoch,也就是 0。此时 B 返回 LE1 的开始偏移 1。A 发现自己的数据大于此值,便会截断自己的日志。从而保证数据的一致性
这个比较好理解,就不做过多的解释了,其实我觉得这个(epoch,sffset)主要是用来解决数据不一致问题的。。。
补充一下,这里其实还有问题,在极端情况下,也就是这样的情况下,你会发现消息 m2 没了。因为其实新复活的 B 并不能提供给 Am2,但这种就是 ISR 全挂了的情况,也算极端情况的一种
kafka 的存储
Kafka 中的 message 是以 topic 为基本单位进行组织的,不同的 topic 之间是相互独立的。每个 topic 又可以分为不同的 partition,每个 partition 存储一部分的 message 信息。这些基础的结构希望大家牢记
那么,我们需要分析些什么?由外往里走:
- partition 是怎么在 Topic 中存储分布的
- partiton 中文件存储方式
- partiton 中 segment 文件存储结构
- 在 partition 中通过 offset 查找 message
partition 在 Topic 中存储分布:
在 Kafka 文件存储中,同一个 topic 下有多个不同 partition,每个 partition 为一个目录,partiton 命名规则为 topic 名称+有序序号,第一个 partiton 序号从 0 开始,序号最大值为 partitions 数量减 1。消息发送时都被发送到一个 topic,其本质就是一个目录。Partition 是一个 Queue 的结构,每个 Partition 中的消息都是有序的,生产的消息被不断追加到 Partition 上,其中的每一个消息都被赋予了一个唯一的 offset 值。Kafka 只维护在 Partition 中的 offset 值,因为这个 offset 标识着这个 partition 的 message 消费到哪条了。Consumer 每消费一个消息,offset 就会加 1。其实消息的状态完全是由 Consumer 控制的,Consumer 可以跟踪和重设这个 offset 值,这样的话 Consumer 就可以读取任意位置的消息。
把消息日志以 Partition 的形式存放有多重考虑,第一,方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic 又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;第二就是可以提高并发,因为可以以 Partition 为单位读写了。
现在我们可以知道,kafka 中的数据是持久化的并且能够容错的。Kafka 允许用户为每个 topic 设置副本数量,副本数量决定了有几个 broker 来存放写入的数据。一般这个数量是取决与你对数据持久化的要求。Kafka 中的 topic 是以 partition 的形式存放的,每一个 topic 都可以设置它的 partition 数量,Partition 的数量决定了组成 topic 的 message 的数量。Producer 在生产数据时,会按照一定规则(这个规则是可以自定义的)把消息发布到 topic 的各个 partition 中,俗称分区,不指定“key”(这里涉及代码部分,我就告诉大家有这两个可以调的参数,暂时在这里不要去深究这个 key 是什么,它是在创建 ProducerRecord 时用的,姑且先知道有可以调的部分就行了)就是轮询,如果“key”不为空,且“使用默认分区器”,那就会使用 kafka 自带的散列算法。上面说的副本都是以 partition 为单位的,不过只有一个 partition 的副本会被选举成 leader 作为读写用。
关于如何设置 partition 的数量需要考虑的因素。一个 partition 只能被一个消费者消费(一个消费者可以同时消费多个 partition),因此,如果设置的 partition 的数量小于 consumer 的数量,就会有消费者消费不到数据。所以,推荐 partition 的数量一定要大于同时运行的 consumer 的数量。另外一方面,建议 partition 的数量大于集群 broker 的数量,这样 leader partition 就可以均匀的分布在各个 broker 中,最终使得集群负载均衡。在 Cloudera,每个 topic 都有上百个 partition。需要注意的是,kafka 需要为每个 partition 分配一些内存来缓存消息数据,如果 partition 数量越大,就要为 kafka 分配更大的 heap space。
partiton 中文件存储方式:借图说话,如下
每个 partion(目录)相当于一个巨型文件被平均分配到多个大小相等 segment(段)数据文件中。但每个段 segment file 消息数量不一定相等,这种特性方便 old segment file 快速被删除。每个 partiton 只需要支持顺序读写就行了,segment 文件生命周期由服务端配置参数决定。这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。
partiton 中 segment 文件存储结构:
producer 发 message 到某个 topic,message 会被均匀的分布到多个 partition 上(随机或根据用户指定的回调函数进行分布),kafka broker 收到 message 往对应 partition 的最后一个 segment 上添加该消息,说白了就是尾添加,当某个 segment 上的消息条数达到配置值或时间超过阈值时,segment 上的消息会被 flush 到磁盘,segment 达到一定的大小后将不会再往该 segment 写数据,broker 会创建新的 segment。(刷新部分参考 零拷贝)
每个 part 在内存中对应一个 index,记录每个 segment 中的第一条消息偏移。
segment file 组成:由 2 大部分组成,分别为 index file 和 data file,此 2 个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为 segment 索引文件、数据文件.
segment 文件命名规则:partion 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个全局 partion 的最大 offset(偏移 message 数)。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。
每个 segment 中存储很多条消息,消息 id 由其逻辑位置决定,即从消息 id 可直接定位到消息的存储位置,避免 id 到位置的额外映射
在 partition 中通过 offset 查找 message:
根据 offset 的值,查找 segment 段中的 index 索引文件。由于索引文件命名是以上一个文件的最后一个 offset 进行命名的,所以,使用二分查找算法能够根据 offset 快速定位到指定的索引文件
找到索引文件后,根据 offset 进行定位,找到索引文件中的匹配范围的偏移量 position。(kafka 采用稀疏索引的方式来提高查找性能)得到 position 以后,再到对应的 log 文件中,从 position 处开始查找 offset 对应的消息,将每条消息的 offset 与目标 offset 进行比较,直到找到消息
kafka 的存储是基于磁盘而不是内存的。这里可能就会有疑问?kafka 不是高性能吗,用磁盘会比内存快?用磁盘确实快,但这依赖于 kafka 自身的一些机制(前面有提到)