[TOC]
Kafka 简介
Kafka 最早由 LinkedIn 开发,用来承载活动流(Activity Stream)和运营数据管道(Pipeline)等场景。随着实时数据处理需求不断增长,它逐渐从一个“高性能消息系统”演化为一套兼顾消息传递、日志存储和流式处理的数据基础设施。
如果把一个网站或业务系统比作一座城市,那么用户点击、订单变更、服务调用、监控指标、应用日志,都会不断产生大量事件。这些事件既要被快速写入,也要被多个下游系统稳定消费,例如:
- 实时计算系统要统计 PV、UV、转化率
- 风控系统要分析交易行为
- 搜索与推荐系统要订阅行为数据
- 运维平台要采集服务日志和监控指标
在这种背景下,Kafka 的价值就体现出来了:它把“海量事件如何可靠写入、如何高效保存、如何被多个系统并行消费”这几个问题统一起来。
可以把 Kafka 先理解为一种分布式、可持久化、基于发布/订阅模型的消息系统。不过它又不只是传统消息队列,因为它还天然具备日志存储和流式处理平台的能力。
Kafka 的核心设计目标大致可以概括为:
- 高吞吐:在普通硬件上也能承受很大的写入和消费压力
- 可扩展:通过增加 Broker 和 Partition 实现水平扩展
- 可持久化:消息默认写入磁盘,而不是只停留在内存中
- 有序性:保证同一个 Partition 内的消息顺序
- 高可用:通过副本机制降低单机故障带来的风险
- 统一数据通道:既适合实时处理,也适合离线处理
基础架构

生产者(producer)和 消费者(consumer)

对于 Kafka 来说,最基础的两类客户端分别是:
- 生产者(Producer)
- 消费者(Consumer)
除此之外,Kafka 还提供了 Kafka Connect、Kafka Streams 等更高层的能力,但它们底层仍然建立在 Producer 和 Consumer 之上。
Producer(生产者) 负责把业务数据写入 Kafka。它可以是下单服务、日志采集程序,也可以是埋点 SDK 或数据同步组件。
Consumer(消费者) 负责从 Kafka 读取消息并处理。和很多传统消息队列不同,Kafka 的消费模型非常强调 Consumer Group(消费者组) 的概念,也就是“多个消费者以组为单位协作消费同一份数据”。后面会专门展开说明。
Broker 和集群(Cluster)

Broker 可以理解为 Kafka 集群中的一台服务节点。通常我们会把一台 Kafka 服务器视为一个 Broker。每个 Broker 都有唯一编号,并负责存储一部分 Topic 数据、处理客户端请求以及参与副本同步。
一个 Kafka 集群由多个 Broker 组成。Producer 把消息写入 Broker,Consumer 再从 Broker 拉取消息。多个 Broker 协同工作,才让 Kafka 同时具备了高吞吐、可扩展和高可用这些能力。
一台 Kafka 服务器可以看作一个 Broker;多个 Broker 共同组成 Kafka 集群。
在早期版本中,Kafka 依赖 ZooKeeper 保存集群元信息并协调 Broker 状态;新版本中则逐步转向 KRaft 模式,不再强依赖 ZooKeeper。对于入门理解来说,可以先把它视为“Kafka 需要一套集群元数据管理机制”。
Kafka 还有一个很重要的特性叫 日志保留(retention)。消息写入后不会因为被消费就立刻删除,而是按照时间或空间策略统一清理。这也是 Kafka 更像“分布式提交日志”而不只是“消息转发器”的关键原因。
主题(Topic)与分区(Partition)

Topic(主题) 是逻辑概念,可以理解为消息的分类或频道。生产者把消息写入某个 Topic,消费者按 Topic 订阅消息。
但 Kafka 真正做水平扩展,不是靠 Topic,而是靠 Partition(分区)。
Partition 是 Topic 在物理层面的拆分单元。 一个 Topic 可以包含多个 Partition,而每个 Partition 都只属于一个 Topic。这样做的直接好处有两个:
- 可以把同一个 Topic 的数据分散到多个 Broker 上,突破单机瓶颈
- 可以让多个消费者并行消费不同 Partition,提升整体吞吐
如果把 Topic 想成一条高速公路,那么 Partition 就像把单车道扩成多车道。数据不再只能走一条路,而是可以分散到多条“并行通道”中处理。
从存储层面看,一个 Partition 通常会对应日志目录下的一个子目录。一个 Broker 上可以承载很多个 Partition,因此 Producer 可以把消息写入不同 Broker 上的不同 Partition,Consumer 也可以从这些 Partition 并行拉取数据。
这里顺带记住一个非常重要的结论:Kafka 只保证同一个 Partition 内的消息顺序,不保证多个 Partition 之间的全局顺序。
消费者(Consumer)和消费者组(Consumer Group)
Consumer 是消息的消费方,负责从 Kafka 读取数据并执行业务处理。
Consumer Group 则是 Kafka 消费模型里的核心概念。多个 Consumer 组成一个组后,会协作消费同一个 Topic 的多个 Partition。对同一个消费者组来说,一个 Partition 在同一时刻只能分配给组内一个 Consumer,这既保证了分区内顺序,也让消费能力可以按组水平扩展。
用订单场景把整条链路串起来
前面这些概念放在一起时,很多人都会产生几个非常典型的疑问:
- Producer 把消息发到 Topic 后,究竟是谁决定它进入哪个 Partition?
- 如果一个 Topic 有多个 Partition,消费者拉取时是不是还要“合并”这些消息?
- 业务如果要求顺序消费,Kafka 到底保证的是哪一层顺序?
下面我们用一个很具体的订单场景,把这条链路完整串起来。
假设现在有一个订单事件 Topic,名字叫做 order-events,它有 3 个 Partition:
order-events-0order-events-1order-events-2
订单系统会持续往这个 Topic 写入事件,例如:
- 订单创建:
OrderCreated - 订单支付:
OrderPaid - 订单发货:
OrderShipped
为了保证同一个订单的事件顺序,我们约定所有消息都使用 orderId 作为 key。例如:
1
2
3
topic = order-events
key = orderId
value = OrderCreated / OrderPaid / OrderShipped
这时消息流转链路可以简化理解为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Producer(订单服务)
|
| send(topic=order-events, key=orderId, value=订单事件)
v
Partitioner(分区器)
|
| 根据 key = orderId 计算分区
v
Topic: order-events
|---- Partition-0 -> Broker-A
|---- Partition-1 -> Broker-B
|---- Partition-2 -> Broker-C
|
v
Consumer Group: order-fulfillment-group
|---- Consumer-1 负责 Partition-0
|---- Consumer-2 负责 Partition-1
|---- Consumer-3 负责 Partition-2
这里最关键的一点是:不是 Topic 在收到消息后再“把消息分发到不同 Partition”,而是 Producer 在发送阶段就已经通过分区器决定了目标 Partition。也就是说,Producer 发给 Kafka 的其实不是一个抽象的 Topic,而是“某个 Topic 下的某个具体 Partition”。
假设有下面这几条消息:
1
2
3
4
5
1. key=order_1001, value=OrderCreated
2. key=order_1001, value=OrderPaid
3. key=order_1002, value=OrderCreated
4. key=order_1001, value=OrderShipped
5. key=order_1002, value=OrderPaid
如果分区器计算后得到:
order_1001 -> Partition-1order_1002 -> Partition-0
那么最终效果大致会是这样:
1
2
3
4
5
6
7
8
9
10
11
Partition-0:
order_1002 -> OrderCreated
order_1002 -> OrderPaid
Partition-1:
order_1001 -> OrderCreated
order_1001 -> OrderPaid
order_1001 -> OrderShipped
Partition-2:
暂无相关消息
从这里你就能看出 Kafka 的顺序保证边界了。
第一,同一个 orderId 的消息只要始终落在同一个 Partition,就能保持顺序。例如 order_1001 的创建、支付、发货都进入 Partition-1,那么消费者读取到的顺序就是:
1
OrderCreated -> OrderPaid -> OrderShipped
第二,不同订单之间并没有全局顺序。例如 order_1001 在 Partition-1,order_1002 在 Partition-0,那么 Kafka 不会去保证“两个订单谁先被消费完”。它只保证每个 Partition 内部有序,不保证多个 Partition 之间的全局有序。
第三,Consumer Group 不负责做全局顺序合并。消费者组做的事情是“分工消费”:
Consumer-1拉取自己负责的Partition-0Consumer-2拉取自己负责的Partition-1Consumer-3拉取自己负责的Partition-2
如果某个 Consumer 同时负责多个 Partition,它也只是分别从多个 Partition 拉取消息,而不是把这些 Partition 自动合并成一条全局有序流。因此,业务层如果想得到“全局唯一顺序”,不能指望 Kafka 替你完成这件事。
所以,当业务说“我要顺序消费”时,通常需要先问清楚,究竟要的是哪一种顺序:
- 同一业务主键有序:例如同一个
orderId、userId、accountId的事件顺序。这种需求最适合 Kafka,做法是用业务主键作为key,把同一主键的消息稳定路由到同一个 Partition。 - 整个 Topic 全局有序:例如所有订单事件严格按产生顺序串行处理。这种需求 Kafka 原生并不擅长,最直接的做法通常是只使用 1 个 Partition,但代价是吞吐和扩展性都会明显下降。
最后还要补一句:即使消息已经进入同一个 Partition,业务代码也不一定天然有序。如果消费端把同一个 Partition 的消息拿到后又交给多个线程并发处理,或者在消息尚未真正处理完成前就提前提交了 offset,那么业务层面仍然可能出现“看起来像乱序”的结果。
因此,Kafka 对顺序消费更准确的承诺应该表述为:
- Kafka 只保证单个 Partition 内的消息顺序
- 业务想保证同一对象有序,就要让同一对象的消息进入同一个 Partition
- 业务想保证真正的处理顺序,还要避免消费端自己把顺序打乱
Replication 副本
为了保证高可用,Kafka 会为 Partition 配置多个副本。一个 Partition 的多个副本会分布在不同 Broker 上,其中负责对外提供读写服务的叫 Leader,其余副本叫 Follower。
Producer 和 Consumer 默认都只与 Leader 打交道,Follower 的主要职责是从 Leader 同步数据。当 Leader 所在 Broker 故障时,Kafka 会从可用副本中选出新的 Leader,继续对外提供服务。
可以把副本机制理解成“同一份分区日志在多台机器上的冗余拷贝”。这样即使单台 Broker 出问题,集群仍然有机会继续工作,而不是直接丢失整个分区的数据。
Kafka 存储在文件系统上
很多人第一次接触 Kafka 时,都会有一个疑问:既然它主打高吞吐,为什么核心数据却落在文件系统上,而不是只放在内存里?
答案是:磁盘本身并不天然慢,真正慢的是低效的随机访问。 Kafka 的设计恰恰是尽量把写入和读取都组织成顺序 I/O,从而吃满操作系统和磁盘的吞吐能力。
现代操作系统已经为文件读写做了很多优化,例如:
- 预读(read-ahead):提前把后续可能访问的数据读入内存
- 延迟写/合并写:把多个小写操作合并成更大的顺序写
- 页缓存(Page Cache):把空闲内存尽量用于缓存热点文件页
这也是为什么 Kafka 虽然把消息写到磁盘,却依然能维持很高吞吐的重要原因之一。
上面提到的 Topic 更偏向逻辑概念,面向的是生产者和消费者;真正落到磁盘上的存储单位其实是 Partition。通常情况下,一个 Partition 会对应日志目录下的一个子目录,目录中保存该分区的消息数据以及索引文件。
这里还要再往前走一步理解:目录里通常不只有一个数据文件,而是由多个 Segment 组成。 Kafka 不会把一个 Partition 永远写进同一个无限增长的大文件里,而是把它切分成多个日志段。这样做的好处是:
- 新消息只需要持续追加到当前活跃 Segment 末尾
- 老 Segment 可以按时间或大小批量删除,便于日志保留管理
- 查找消息时可以先定位 Segment,再在 Segment 内查索引
- 单个文件不会无限膨胀,管理和恢复成本更可控
这里有一个容易被忽略的细节:创建 Topic 时如果没有显式指定 Partition 数量,Kafka 会使用 Broker 端 num.partitions 的配置值。Kafka 的默认配置通常是 1,但线上集群往往会按业务吞吐量提前修改这个值。因此,如果我创建一个名为 test 的 Topic,且最终分配到 1 个 Partition,那么磁盘上通常会看到一个名为 test-0 的目录,其命名规则为:<topic_name>-<partition_id>

任何发布到 Partition 的消息,最终都会被追加到当前活跃 Segment 的尾部。这个“顺序追加写”的特性,是 Kafka 高吞吐的重要基础,但并不是全部原因。
Kafka 的高吞吐通常来自多个机制的叠加:
- 顺序写磁盘:避免随机写带来的磁头寻址和频繁定位开销
- 页缓存:大量读写先命中操作系统缓存,减少真实磁盘访问
- 批量发送:Producer 会尽量把同一分区的多条消息聚合后再发送
- 批量拉取:Consumer 可以按批次拉取消息,减少网络往返
- 零拷贝:Broker 在网络发送阶段可借助
sendfile等机制减少用户态和内核态之间的数据拷贝 - 分区并行:不同 Partition 可以分散到不同 Broker 和线程上并行处理
也正因为如此,把 Kafka 的性能只归因于“顺序写磁盘”是不完整的。更准确地说,Kafka 是把顺序 I/O、操作系统缓存、批处理、零拷贝和分区并行组合在了一起。
每一条消息被发送到 Broker 后,会根据分区规则选择落入哪个 Partition。如果分区策略设计得合理,数据就能比较均衡地分布到不同 Partition 中,从而把整个集群的吞吐能力真正利用起来。
Kafka 中的底层存储设计(Partition 以及如何查找消息)
为了把上面的“目录、Segment、索引”串起来,我们来看一个更具体的例子。
假设现在 Kafka 集群里只有一个 Broker,我们创建两个 Topic,名称分别为 topic1 和 topic2,它们的 Partition 数量分别是 1 和 2,那么日志根目录下通常会出现如下三个目录:
1
2
3
| --topic1-0
| --topic2-0
| --topic2-1
在 Kafka 的文件存储模型中,同一个 Topic 可以拆成多个 Partition,而每个 Partition 又会继续拆成多个 Segment。可以把它理解成这样一层层关系:
Topic -> Partition -> Segment -> 索引文件 / 数据文件
其中:
Topic是逻辑分类Partition是物理分区目录Segment是 Partition 内进一步切分出的日志段- 每个 Segment 至少会有一组与之配套的
.log和.index文件,很多版本中还会看到.timeindex文件
这样设计的关键在于:Kafka 只需要不断往“当前活跃 Segment”尾部追加消息;当 Segment 达到阈值后,就滚动出新的 Segment,继续顺序写入。
假设我们把每个 Segment 的大小设置为 500 MB,并持续向 topic1 写入大量数据,那么 topic1-0 目录中就会逐渐出现类似下面的一组文件:
1
2
3
4
5
6
7
8
9
10
11
| --topic1-0
| --00000000000000000000.index
| --00000000000000000000.log
| --00000000000000368769.index
| --00000000000000368769.log
| --00000000000000737337.index
| --00000000000000737337.log
| --00000000000001105814.index
| --00000000000001105814.log
| --topic2-0
| --topic2-1
Segment 是 Kafka 在日志管理层面的基本单位。 一个 Segment 的文件名,通常取该日志段起始消息的 offset,并统一补齐为固定长度的数字字符串。例如:
00000000000000000000.log00000000000000368769.log
这意味着:当某个 Segment 的起始 offset 是 368769 时,与它配套的索引文件和数据文件通常会以这个偏移量命名。通过这种方式,Kafka 可以先根据 offset 在多个 Segment 之间做快速定位。
以上面的一组 Segment 为例,我们再看索引文件和数据文件之间的对应关系:

假设索引文件里有一条元数据 <3, 497>,它表示的是:在当前 Segment 内,相对 offset 为 3 的消息,位于 .log 数据文件的物理偏移 497 处。由于这个 Segment 的起始 offset 是 368769,所以这条消息对应的全局 offset 就是 368769 + 3 = 368772。
需要注意的是,.index 文件并不会为每一条消息都建立一条完整索引。Kafka 使用的是稀疏索引:每隔一定字节或一定间隔记录一条索引项。这样做虽然让索引不是“命中即得”,却显著减小了索引文件体积,使得索引更容易被加载到内存中,从而降低查询时的磁盘 I/O 开销。
因此,当我们要查找某个指定 offset 的消息时,整体过程通常是:
- 先在多个 Segment 文件名之间做二分查找,定位消息属于哪个 Segment
- 再在该 Segment 的
.index文件中查找不大于目标 offset 的最近一条索引项 - 根据这条索引项给出的物理位置,回到
.log文件中顺序扫描,直到找到目标消息
这个过程的关键点在于:Kafka 把“全局查找”拆成了“先找 Segment,再查稀疏索引,最后做小范围顺序扫描”。因此它既没有为每条消息维护昂贵的全量索引,又能在海量日志中保持较高的查询效率。
另外,Kafka 的消息消费后不会立刻删除,删除通常是以 Segment 为单位、按保留策略统一进行。这也是 Segment 设计特别重要的原因之一:批量删除旧日志段,远比在一个超大文件中逐条删除消息高效得多。
Partition 结构
partition 是比较具体的东西,Partition 在服务器上的表现形式就是一个一个的文件夹,每个 partition 的文件夹下面会有多组 segment 文件,每组 segment 文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息

如上图,这个 Partition 有三组 Segment 文件。每个 .log 文件的大小通常一致,但由于单条消息大小不同,每个 Segment 中保存的消息条数不一定相同。文件命名通常以该 Segment 的最小 offset 为基准,例如 000.index 存储的是 offset 为 0~368795 的消息。Kafka 正是利用 分段 + 索引 的方式来解决查找效率问题。
Kafka 是如何准确的知道 message 的偏移的呢?这是因为在 Kafka 定义了标准的数据存储结构,在 Partition 中的每一条 message 都包含了以下三个属性:
Message 结构
- offset: 一个占 8 byte 的有序 id 号,表示 message 在当前 Partition 中的偏移量,是一个逻辑上的值,唯一确定了 Partition 中的一条 message,可以简单的认为是一个 id
- MessageSize: 消息大小占用 4 byte,表示 message 内容 data 的大小
- data: message 的具体内容,消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样
查找消息:
假如现在需要查找一个offset 为 368801 的 message 是什么样的过程呢?我们先看看下面的图:

- 先找到 offset 的 368801 message 所在的 segment 文件(利用二分法查找),这里找到的就是在第二个 segment 文件
- 打开找到的 segment 中的 .index 文件(也就是 368796.index 文件,该文件起始偏移量为 368796+1,我们要查找的 offset 为 368801 的 message 在该 index 内的偏移量为 368796 + 5= 368801,所以这里要查找的相对 offset 为 5)。由于该文件采用的是稀疏索引的方式存储着相对 offset 及对应 message 物理偏移量的关系,所以直接找相对 offset 为5的索引找不到,这里同样利用二分法查找相对 offset 小于或者等于指定的相对 offset 的索引条目中最大的那个相对 offset,所以找到的是相对 offset 为 4 的这个索引
- 根据找到的相对 offset 为 4 的索引确定 message 存储的物理偏移位置为 256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条 Message
这套机制建立在 offset 有序的基础上,结合了 Segment + 有序 offset + 稀疏索引 + 二分查找 + 顺序扫描 等多种手段,因此能够高效定位消息。至此,消费者就能拿到需要处理的数据了。那消费者又是如何记录自己消费位置的呢?在早期版本中,消费位点会维护在 ZooKeeper 中,消费者定期上报,既容易产生重复消费,也会带来性能压力;在后续版本中,这些 offset 被直接维护在 Kafka 集群内部的 __consumer_offsets Topic 中。
存储策略
无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?
- 基于时间: 默认配置是 168 小时(7天)
- 基于大小: 默认配置是 1073741824
需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能
生产者设计概要
当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量?
举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消息到 Kafka,另一个服务来读取消息并根据规则引擎来检查交易是否通过,将结果通过 Kafka 返回。对于这样的业务,消息既不能丢失也不能重复,由于交易量大因此吞吐量需要尽可能大,延迟可以稍微高一点
再举个例子,假如我们需要收集用户在网页上的点击数据,对于这样的场景,少量消息丢失或者重复是可以容忍的,延迟多大都不重要只要不影响用户体验,吞吐则根据实时用户数来决定
不同的业务需要使用不同的写入方式和配置。具体的方式我们在这里不做讨论,现在先看下生产者写消息的基本流程:

流程如下:
- 首先,我们需要创建一个 ProducerRecord,这个对象需要包含消息的主题(topic)和值(value),可以选择性指定一个键值(key)或者分区(partition)
- 发送消息时,生产者会对键值和值序列化成字节数组,然后发送到分配器(partitioner)
- 如果我们指定了分区,那么分配器返回该分区即可;否则,分配器将会基于键值来选择一个分区并返回
- 选择完分区后,生产者知道了消息所属的主题和分区,它将这条记录添加到相同主题和分区的批量消息中,另一个线程负责发送这些批量消息到对应的 Kafka broker
- 当broker接收到消息后,如果成功写入则返回一个包含消息的主题、分区及位移的 RecordMetadata 对象,否则返回异常
- 生产者接收到结果后,对于异常可能会进行重试
producer 在写入数据的时候永远的找 leader,不会直接将数据写入 follower!那 leader 怎么找呢?写入的流程又是什么样的呢?我们看下图:

发送的流程就在图中已经说明了,需要注意的一点是,消息写入 leader 后,follower 是主动的去 leader 进行同步的!producer 采用 push 模式将数据发布到 broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!写入示意图如下:

上面说到数据会写入到不同的分区,那 kafka 为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:
-
方便扩展。因为一个 topic 可以有多个 partition ,所以我们可以通过扩展机器去轻松的应对日益增长的数据量
-
提高并发。以 partition 为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率
熟悉负载均衡的应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在 kafka 中,如果某个 topic 有多个 partition , producer 又怎么知道该将数据发往哪个 partition 呢? kafka 中有几个原则:
- partition 在写入的时候可以指定需要写入的 partition ,如果有指定,则写入对应的 partition
- 如果没有指定 partition ,但是设置了数据的 key,则会根据 key 的值 hash 出一个 partition
- 如果既没指定 partition,又没有设置 key,则会轮询选出一个 partition
保证消息不丢失是一个消息队列中间件的基本保证,那 producer 在向 kafka 写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过 ACK 应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认 kafka 接收到数据,这个参数可设置的值为 0、1、all
-
0 代表 producer 往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
-
1 代表 producer 往集群发送数据只要 leader 应答就可以发送下一条,只确保leader 发送成功
-
all 代表 producer 往集群发送数据需要所有的 follower 都完成从 leader 的同步才会发送下一条,确保 leader 发送成功和所有的副本都完成备份。安全性最高,但是效率最低
最后补充一点:如果往一个不存在的 Topic 写数据,是否能够写入成功,取决于 Broker 是否开启了 auto.create.topics.enable。如果允许自动创建,那么新 Topic 的 Partition 数量通常取自 num.partitions,副本因子通常取自 default.replication.factor;它们不一定都是 1,而是由集群配置决定。
消费者设计概要
消费者与消费组
假设这么个场景:我们从 Kafka 中读取消息,并且进行检查,最后产生结果数据。我们可以创建一个消费者实例去做这件事情,但如果生产者写入消息的速度比消费者读取的速度快怎么办呢?这样随着时间增长,消息堆积越来越严重。对于这种场景,我们需要增加多个消费者来进行水平扩展
Kafka 消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个 T1 主题,该主题有 4 个分区;同时我们有一个消费组 G1,这个消费组只有一个消费者 C1。那么消费者 C1 将会收到这 4 个分区的消息,如下所示:

如果我们增加新的消费者C2到消费组G1,那么每个消费者将会分别收到两个分区的消息,如下所示:

如果增加到4个消费者,那么每个消费者将会分别收到一个分区的消息,如下所示:

但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息:

总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助
Kafka 一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息 换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组 G2,而这个消费组有两个消费者,那么会是这样的:

在这个场景中,消费组 G1 和消费组 G2 都能收到 T1 主题的全量消息,在逻辑意义上来说它们属于不同的应用
最后,总结起来就是:如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者
消费者和消费者组,有这两个重要的特性:
- 如果所有的 consumer 都属于一个 consumer group,那么所有的消息会被均匀地投递给每一个消费者。就是每一个消息都只会被一个消费者处理(有点像点对点的应用,或者说单播放)
- 如果多个 consumer 分别属于多个 consumer group,那么消息会被发送(或者直接说广播)给所有的消费者组,消费者组内只会被一个消费者处理,这样子可以组成一个广播模式的应用,其实就是相当于 pub/sub 模型
消费组与分区重平衡
可以看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)
重平衡是 Kafka 一个很重要的性质,这个性质保证了高可用和水平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。 而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何 安全的进行重平衡 以及如何尽可能避免
消费者通过定期发送心跳(heartbeat)到一个作为组协调者(group coordinator)的 broker 来保持在消费组内存活。这个 broker 不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳
如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期
在 0.10.1 版本,Kafka 对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的 Kafka 支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费
Partition 与消费模型
上面提到,Kafka 中一个 topic 中的消息是被打散分配在多个 Partition(分区) 中存储的, Consumer Group 在消费时需要从不同的 Partition 获取消息,那最终如何重建出 Topic 中消息的顺序呢?
答案是:没有办法。Kafka 只会保证在 Partition 内消息是有序的,而不管全局的情况。
下一个问题是:Partition 中的消息可以被(不同的 Consumer Group)多次消费,那 Partition 中被消费的消息是何时删除的? Partition 又是如何知道一个 Consumer Group 当前消费的位置呢?
无论消息是否被消费,除非消息到期 Partition 从不删除消息。例如设置保留时间为 2 天,则消息发布 2 天内任何 Group 都可以消费,2 天后,消息自动被删除。 Partition 会为每个 Consumer Group 保存一个偏移量,记录 Group 消费到的位置。 如下图:

为什么 Kafka 是 pull 模型
消费者应该向 Broker 要数据(pull)还是 Broker 向消费者推送数据(push)?作为一个消息系统,Kafka 遵循了传统的方式,选择由 Producer 向 broker push 消息并由 Consumer 从 broker pull 消息。一些 logging-centric system,比如 Facebook 的Scribe和 Cloudera 的Flume,采用 push 模式。事实上,push 模式和 pull 模式各有优劣
push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。 push 模式的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息
对于 Kafka 而言,pull 模式更合适。 pull 模式可简化 broker 的设计,Consumer 可自主控制消费消息的速率,同时 Consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义
Kafka 如何保证可靠性
当我们讨论可靠性的时候,我们总会提到 保证 这个词语。可靠性保证是基础,我们基于这些基础之上构建我们的应用。比如关系型数据库的可靠性保证是ACID,也就是原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)
Kafka 中的可靠性保证有如下四点:
- 对于一个分区来说,它的消息是有序的。如果一个生产者向一个分区先写入消息 A,然后写入消息 B,那么消费者会先读取消息 A 再读取消息 B
- 当消息写入所有 in-sync 状态的副本后,消息才会认为已提交(committed)。这里的写入有可能只是写入到文件系统的缓存,不一定刷新到磁盘。生产者可以等待不同时机的确认,比如等待分区主副本写入即返回,后者等待所有 in-sync 状态副本写入才返回
- 一旦消息已提交,那么只要有一个副本存活,数据不会丢失
- 消费者只能读取到已提交的消息
使用这些基础保证,我们构建一个可靠的系统,这时候需要考虑一个问题:究竟我们的应用需要多大程度的可靠性?可靠性不是无偿的,它与系统可用性、吞吐量、延迟和硬件价格息息相关,得此失彼。因此,我们往往需要做权衡,一味的追求可靠性并不实际
Kafka 如何实现简单延时队列
所谓的延时队列,就是指消息被发送出去以后,并不想让消费者立即获取来消费,而是等待到了特定的时间,如果到了 30 分钟后,消费者才能获取这个消息进行消费,有时候也叫做“延迟队列”
延时队列在我们日常业务中很常见:
- 在购物 app 中,用户下单后有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单就会进入异常处理流程。这就有了延迟 30 分钟的需求了
- 用户在收到网购的商品 1 天后,发短信通知用户进行购物体验的评价
- 智能家具场景,在我离开家后一小时,关闭新风机。这时候其实是用户发送了一个指令到延时队列,当指令执行的时间到达以后,在将此指令推送到新风机中,关闭新风机
下面简单说一下通过延时等级划分的方式实现简单的延时队列
实现延时队列有一个极其简单的方案:消费者在 pull 到消息后,先判断当前消息是否已经到了可消费时间;如果没有,就把这条消息重新写回 Topic,等待下一次消费。 It sounds pretty good。但是这个问题就像为什么 setTimeout 不准确一样。当消息大量堆积的时候,原本消息被拉出来只需要等待 10s 就能消费了,但是重新入队以后,再次出来,可能是 1 个小时之后的事情了。显然这个方案不适宜投产,延迟太高
假设我们现在要实现一个支持 2 小时内延时的队列,并且精确到秒级,也就是 7200s。如果按照秒级粒度去划分消费类别,那我们就需要 7200 个 Topic;而 Topic 背后又对应着很多分区、很多副本、很多日志段,这会造成巨大的资源浪费,整个集群的吞吐也会明显下降。
此时你就会产品经理商量,是否允许有一定的误差呢?产品经理同意了。ok,这个问题突然就变得简单起来了

我们可以按照消息的预计发送时间,划分出来一些延时等级。例如我们这里是 2h,那么我们可以划分出来,5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1h、2h。延时的消息按照不同的延时等级,被放入不同的 topic 当中,如果是和等级不一致的消息,会被强制转为和等级一致的延时时间,这样延时的误差可以控制在两个延时等级的时间差范围内。例如,现在有个消息要在 19s 之后被消费,按照我们定下来的规则,他会被投递到 30s 的延时等级中,也就是说这个消息将会在 30s 后被消费,比起秒级延时队列,这条消息的误差晚了 11s 被消费。虽然存在着一定的误差,但是误差算是可控,并且这样只需要增加少量的 topic,就能实现延时队列的功能,而且非常简单
如图所示,Producer 写入数据时,实际上会先被发送到不同的 delay_* Topic 中,而消费者真正消费的是 real_topic_* 中的数据。这里可以为 Producer 增加一个拦截器,根据约定好的 header、timestamp、key 等信息进行改写,让开发者看起来像是直接把消息写进了 real_topic_*,但内部其实被转发到了对应的 delay_* Topic。举个例子:如果 Producer 要写入一条 8s 后消费的延时消息,那么它最终会被拦截并放入 10s 这一档的 delay_* Topic 中。
Delay Service 的内部实现可以简单理解为:Consumer 进程 + DelayQueue + Producer 进程。消息进入不同等级的 delay_* Topic 后,会先被 Delay Service 的 Consumer 拉取,再放入对应的 DelayQueue;等到时间到达后,再由 Producer 转发到 real_topic_*,供真实业务消费者处理。为了提升可用性,Delay Service 一般需要配合部署策略进行高可用设计。
这种延时的实现方案会有一定的误差,无法做到秒级的精确延时。但是对于精度要求不高的业务,只要延时等级设计合理,还是有一定的应用价值的
Kafka 原理总结
如果把前面的内容浓缩成一张心智图,那么 Kafka 的工作方式大致可以概括为:Producer 把消息写入 Topic,Topic 再被拆分为多个 Partition 分布到不同 Broker 上;Consumer 以消费者组的形式并行消费这些 Partition;副本机制负责在节点故障时维持可用性。
Kafka 和传统 MQ 的相似之处在于,它同样承担着“解耦生产者与消费者”的职责;不同之处在于,它把消息保存为一份可持久化、可回放的分布式日志。也正因为如此,Kafka 不只是“把消息送出去”,而是还要解决“如何高效存、如何高效读、如何可靠恢复”这些问题。
可以把 Kafka 的核心理解为下面几条:
- Topic 是逻辑概念,Partition 是真正承载数据的物理单元
- 单个 Partition 内消息有序,但多个 Partition 之间不保证全局顺序
- Consumer Group 负责把多个消费者组织起来,协作消费同一份 Topic 数据
- 副本机制负责在 Leader 故障时完成切换,提升可用性
- 消息写入本质上是追加到 Partition 日志末尾,再配合 Segment 和索引组织存储


关于消费模型,可以再抓住两个最重要的事实:
- 对同一个 Consumer Group 来说,一个 Partition 同时只能被组内一个 Consumer 消费
- 如果消费者数量多于 Partition 数量,多出来的消费者会空闲下来
这也是为什么分区数往往会影响消费并行度。通常情况下,想提升某个消费组的处理能力,要么优化单个消费者逻辑,要么增加 Partition 和消费者数量,并接受由此带来的重平衡成本。
至于消费进度,Kafka 通过 offset 来记录每个消费者组读到哪里。消息被消费后并不会立刻删除,它们会继续按保留策略留在日志中,因此 Kafka 天然支持“消息回放”这件事。

从写入路径看,Producer 会先把消息序列化,再按分区策略放入对应分区的批次缓冲区中,然后由后台线程统一发送给目标 Broker。常见的发送方式可以理解为:
- 同步发送:等待结果返回,再继续后续流程
- 异步发送:先发出去,由回调处理结果
- 批量发送:把同一分区的多条消息聚合后再统一发送
其中批量发送对吞吐提升非常明显。像 batch.size、linger.ms 这类参数,本质上就是在“延迟一点点”和“换取更高吞吐”之间做权衡。
这其中有个非常重要的参数“acks”,这个参数决定了 producer 要求 leader partition 收到确认的副本个数,如果 acks 设置数量为 0,表示 producer 不会等待 broker 的响应,所以,producer 无法知道消息是否发送成功,这样有可能会导致数据丢失,但同时,acks 值为 0 会得到最大的系统吞吐量。若 acks 设置为 1,表示 producer 会在 leader partition 收到消息时得到 broker 的一个确认,这样会有更好的可靠性,因为客户端会等待直到 broker 确认收到消息。若设置为 all(以前的版本是设置为-1,后来是 all),producer 会在所有备份的 partition 收到消息时得到 broker 的确认,这个设置可以得到最高的可靠性保证
Kafka 容灾/一致性
Kafka 容灾主要依赖于 Replication 机制,Replication 可以在 server 的配置文件中去配置。通常来讲,broker 的个数是至少大于 Replication 的。其实在一个集群中,broker 的个数一般是和最大的 Replication 个数齐平的。
理论上来讲,follower 上的日志和 leader 上的基本一致。再具体说明之前,需要先说 kafka 中的一个东西,ISR,它是一个“in sync”的 node list(in-sync replica 已同步的副本)。存在于 ISR 里的 follower 是会变化的,如果一个 follower 宕机,或者落后 leader 太多,leader 将把它从 ISR 中踢出。当它活过来/再次跟上时会再拉进来。“落后太多”是根据配置文件中的设定( 之后好像这个已经被删除了,因为这个值很难确定,而且高峰期会出现进进出出的情况)。Kafka 的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。对于 ISR 中的同步副本它通常要满足几个条件:
- leader 副本是同步副本。(关于副本的内容之后会再说,首先知道有这么个要求)
- 与 zookeeper 之间有一个活跃的会话,也即在过去 6S(可配置)内向 zookeeper 发送过心跳。
- 在过去的 10S 内(可配置)从 leader 副本那里获得过信息。
- 在过去 10S 内从 leader 副本那里获取过最新的信息。(光从 leader 那里获取信息是不够的,还必须是几乎零延迟的)
如果 leader 挂了,需要在 followers 中选择一个新的 leader。但是 followers 本身是可能有问题的,follower 可能“落后”或者 follower 也挂了,所以必须选择高质量的 follower 作为 leader。必须保证,一旦一个消息被提交了,但是 leader 挂掉了,新选出的 leader 必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的 leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为 leader。Kafka 并不是使用这种方法。前面也说了,kafka 维护这一个 ISR 队列,在这个队列中,follower 是和 leader 保持高度一致的,强同步情况下,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。这里就是由 request.required.acks 来决定的(这个 acks 和 producer 中的 acks 是同一个)。因此这个集合中的节点是最优先考虑容灾的。想要一个数据都不丢失,acks 必须设置为 all。ISR 中有 n+1 个节点,就可以允许在 n 个节点挂掉的情况下不会丢失消息并正常提供服务。
这里就有一个问题,如果所有的节点都挂了怎么办?主要策略有两种:
- 等待 ISR 中的任何一个节点恢复并担任 leader。
- 选择所有节点中(不只是 ISR)第一个恢复的节点作为 leader。
但是,对于 1 而言,ISR 中的节点如果长时间不恢复,那么 kafka 就会长时间不能使用。对于 2 而言,第一个醒的节点如果不是 ISR 中的,那么它的数据可能丢失的很多。总之这是个权衡问题,基于具体的项目具体对待
副本的复制方式
- Follower 根据自身拥有多少个需要同步的 topic Partition 来创建相对应的 partitionFetchState,记录了从 leader 的哪个 offset 开始获取数据
- follower 会根据 leader 的 brokeId 和 topicPartition 经过 hash 计算的 partitionId(这里其实就是前面怎么把 partiton 去分到 broker 上)来创建复制线程 ReplicationFetchThread
- ReplicaFetchThread 会根据 partitionFetchState 提供的信息不停地从 leader 获取数据,每次成功复制后,都会更新 partitionFetchState 的 fetchOffset
- 如果 fetchOffset 越界,则会对 followerPartitionLog 进行 truncate,然后冲去拉取 offset
- 每次 log 的增加,都可能会触发一个新的 logSegement 的产生,原因可以是 log index or time index 容量满了,或者上次 append log 太久,超过了阈值
分区副本与 leader 不同步的情况
- 慢副本:在一定周期内 follower 不能追赶上 leader。最常见的原因之一是 I/O 瓶颈导致 follower 复制速度慢于 leader 拉取速度
- 卡主副本:在一定周期时间内 follower 停止从 leader 拉取请求/可能由于 GC 暂停或 follower 失效死亡
- 新启动副本:当用户给主题增加副本因子时,新的 follower 不在同步副本列表中,直到他们完全赶上了 leader 日志
数据的一致性:在 kafka 中,数据一致性的含义是若某条消息对 Consumer 可见,那么即使 Leader 宕机了,在新 Leader 上数据依然可以被读到。HighWaterMark 简称 HW: Partition 的高水位,取一个 partition 对应的 ISR 中最小的 LEO(日志末端位移(log end offset),它指向的是下一条消息的位移)作为 HW,消费者最多只能消费到 HW 所在的位置,另外每个 replica 都有 highWatermark,leader 和 follower 各自负责更新自己的 highWatermark 状态,highWatermark <= leader. LogEndOffset。对于 Leader 新写入的 message,Consumer 不能立刻消费,Leader 会等待该消息被所有 ISR 中的 replica 同步后,更新 HW,此时该消息才能被 Consumer 消费,即 Consumer 最多只能消费到 HW 位置。这样就保证了如果 Leader Broker 失效,该消息仍然可以从新选举的 Leader 中获取。对于来自内部 Broker 的读取请求,没有 HW 的限制。同时,Follower 也会维护一份自己的 HW,Folloer.HW = min(Leader.HW,Follower.LEO)
高水位截止:(补充一下高水位截止的含义,因为 leader 是取最小的 HW,那么就可能存在 follower 比 leader 高的情况,这时 follower 需要把多出来的部分 truncate,称为高水位截止)之所以把这里再次拿出来说的原因是,我在看到这个地方的时候就产生一种疑问:高水位截止会不会导致数据丢失?这里我们就需要明白几点,leader 和 follower 的 LEO 和 HW 是什么时候更新的,在此之前,我们需要重新来理解一下 kafka 的 HW 和 LEO 机制(上述的数据一致性部分,请参考起来看,因为 kafka 的一致性确实依赖于 HW 和 LEO 机制)

LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果 LEO=10,那么表示该副本保存了 10 条消息,位移值范围是[0, 9]。另外,leader LEO 和 follower LEO 的更新是有区别的。
HW:即上面提到的水位值。对于同一个副本对象而言,其 HW 值不会大于 LEO 值。小于等于 HW 值的所有消息都被认为是“已备份”的(replicated)
如图:HW 的位置是 7,那么 0-7 是都是已经备份好的,LEO 在 15,那么 8-14 都是还没开始备份的,15 则是下一条消息的位置
Kafka 分区下会有很多个副本用于实现冗余来进一步实现高可用。副本根据角色的不同可大致分为 3 类:
- leader 副本:响应 clients 端读写请求的副本
- follower 副本:被动地备份 leader 副本中的数据,不能响应 clients 端读写请求。
- appendISR 副本:包含了 leader 副本和所有与 leader 副本保持同步的 follower 副本(同步副本的判断之前已经提过了)
这里需要大家明白一个很重要的点:Kafka 有两套 follower 副本 LEO:1. 一套 LEO 保存在 follower 副本所在 broker 的副本管理机中;2. 另一套 LEO 保存在 leader 副本所在 broker 的副本管理机中——换句话说,leader 副本机器上保存了所有的 follower 副本的 LEO(其实这里也不难理解,follower 肯定自己有一套 LEO,leader 需要知道每个 follower 的 LEO 来帮助自己去判断是否可以更新 HW,所以 leader 也会存一套)。
看到这里,我们就需要知道,leader 副本的 LEO 和 HW,follower 副本的 LEO 和 HW 以及 leader 上存的 follower 的 LEO 什么时候更新,只有清楚了这些更新时间才能去判断是否会产生数据丢失或者数据不一致性问题。(为了避免误会,说明一下,下面的顺序并没有任何意义,并非每个更新的顺序,请一切参考内容)
follower 副本更新 LEO:写入时就会更新,follower 发送 FETCH 请求后,leader 将数据返回给 follower,此时 follower 开始向底层 log 写数据,从而自动地更新 LEO 值。
leader 上的 follower 副本更新 LEO:leader 接收到 follower 发送的 FETCH 请求,它首先会从自己的 log 中读取相应的数据,但是在给 follower 返回数据之前它先去更新 follower 的 LEO。
leader 的 LEO 更新:和 follower 一样,写入时就会更新。
follower 更新 HW:在更新 LEO 之后,follower 向 log 写完数据时会尝试更新它自己的 HW 值。具体做法就是比较当前 LEO 值与 FETCH 响应中 leader 的 HW 值,取两者的小者作为新的 HW 值。这里就会明白一点,follower HW 永远不可能超过 leader,因为 leader 的 HW 是整个分区的 HW,所以 leader 的 HW 一定是最高的。
leader 更新 HW:这个地方是最重要的,上面也说了,leader 的 HW 是整个分区的 HW,那么就意味着,这个 HW 直接决定了 consumer 可不可见。所以它的更新就显得很关键了。大概有 4 种情况下 leader 会尝试去更新分区 HW——切记是尝试,有可能因为不满足条件而不做任何更新:
- 副本成为 leader 副本时:当某个副本成为了分区的 leader 副本,Kafka 会尝试去更新分区 HW。这是显而易见的道理,毕竟分区 leader 发生了变更,这个副本的状态是一定要检查的。
- broker 出现崩溃导致副本被踢出 ISR 时:若有 broker 崩溃则必须查看下是否会波及此分区,因此检查下分区 HW 值是否需要更新是有必要的。
- producer 向 leader 副本写入消息时:因为写入消息会更新 leader 的 LEO,故有必要再查看下 HW 值是否也需要修改
- leader 处理 follower FETCH 请求时:当 leader 处理 follower 的 FETCH 请求时首先会从底层的 log 读取数据,之后会尝试更新分区 HW 值
当 kafka 正常工作的时候,leader 尝试更新 HW 是有 producer 请求和 follower 的 FETCH 请求时。这时 leader 会考虑所有符合条件的副本 LEO,当然也包括 leader 自身的 LEO。两个条件分别是:
- 处于 ISR 中
- 副本 LEO 落后于 leader LEO 的时长不大于 replica.lag.time.max.ms 参数值(默认是 10s)
细心的同学可能会发现,第二个条件是满足同步副本的条件之一,而 ISR 中的就是同步副本。只满足第二个条件肯定是不行的,因为如果只满足第二个条件,可能会产生某个 follower 已经“追上”来了,但不在 ISR 中,这个肯定是不能参考的。换句话说,HW 取的是 ISR 中 LEO 的最小值。那么只有条件一可以不?在 ISR 中会不会产生时长大于 replica.lag.time.max.ms 的 follower?因为官方给出的 ISR 会自动踢出不满足同步副本的 follower。所以我个人认为只有条件一是可以的(官方给出的两个条件必须同时满足,一切还是以官方为主)。
现在我们来讨论一下,是否有可能丢数据或数据不一致的问题
看了上述解释,我想大家对流程应该理的差不多了。那么,我们就能得到一个事实:kafka 使用 HW 值来决定副本备份的进度,而 follower 的 HW 值的更新通常需要额外一轮 FETCH 请求才能完成(参考 followerHW 更新时间),那么会不会产生数据丢失或者不一致,答案是会的
数据丢失
假设我们现在只有一主一从两副本。一个 leader 一个 follower,某个时刻时,leader 收到了 follower 的 FETCH 请求,这时 leader 去更新自己的 HW,这个时候 follower 重启了,重启后需要重新去拉 HW。再去发一个 FETCH 请求,这个时候恰好 leader 挂了。那么就要重新选主,follower 就会成为新主,这个时候 follower 里的 HW 并没有更新,但是对于 consumer 而言,这个 HW 已经更新过了,也就是说,这个数据就不存在了。

数据不一致
我们还是只有一主一从两副本。一个 leader 一个 follower。假设此时 leader 收到了一条消息 m2,情况同上时,也就是 leader 刚更新完 HW,follower 还没来得及更新的那一刻,leader 和 follower 全挂了。这个时候要等最先复活的,之前的 follower 先复活了,并且这个时候成为了新的 leader,它的接收了新消息 m3,这个时候新 leader 的 HW 会+1。注意这个时候新 leader 的 HW 和原来 follower 的 HW 位置是一样的。然后旧的 leader 活了,成为了 follower,此时,两者的 HW 相同,但是这个位置的数据却不一致。

综上其实说明一个问题,kafka 使用 HW 值来决定副本备份的进度,而 follower 的 HW 值的更新通常需要额外一轮 FETCH 请求才能完成(参考 followerHW 更新时间),而这种设计是有问题的。
事实上,使用高水位截止来标识备份进度主要存在于 Kafka 0.11 之前的版本;在 0.11 之后,Kafka 开始使用 leader epoch 来标识备份进度。
现在我们来说,leader epoch 是怎么工作的,先了解几个基础概念:
Leader Epoch: 32 位,单调递增的数字。代表单个分区所处的 leader 时代。每发生一次 leader 转换,就+1。例如 leader epoch =2,说明处于第二 leader 时代。
Leader Epoch Start Offset:该 epoch 版本的 leader 写入第一条消息的位移。
Leader Epoch Sequence File: 存储 Leader Epoch 和 Leader Epoch Start Offset 对(epoch,offset),类似如下形式:
(0,100) , (1,200) , (3,500)
然后具体分析之前,我们先得知道 leader epoch 的具体流程,我把官方给出的流程列到这。(略)
通俗来讲,就是 Leader 端多开辟一段内存区域专门保存 leader 的 epoch 信息。leader broker 中会保存这样的一个缓存,并定期地写入到一个 Leader Epoch Sequence File 文件中。当 leader 写底层 log 时它会尝试更新整个缓存——如果这个 leader 首次写消息,则会在缓存中增加一个条目;否则就不做更新。而每次副本重新成为 leader 时会查询这部分缓存,获取出对应 leader 版本的位移,这就不会发生数据不一致和丢失的情况
更通俗来说就是每个副本都引入了新的状态来保存自己当 leader 时开始写入的第一条消息的 offset 以及 leader 版本。这样在恢复的时候完全使用这些信息而非水位来判断是否需要截断日志
现在我们去尝试着解决上面那两个问题
解决数据丢失:A 重启后,向 B 发送 LeaderEpochRequest 请求,此时由于 Leader Epoch 相等,所以 B 返回给 A 的是 B 的 LEO。此时 B 挂掉,A 接收到 LEO=2,与自己相同,所以不会截断,A 成为 leader,在文件中添加新的 Leader epoch 和开始偏移即可

这是官方给出的解释,我在这里有个问题,如果 A 刚重启完,还没来得及发 LeaderEpochRequest,B 就挂了,这个时候怎么办?因为这里不是按 HW 进行截止的,而是主要依赖于 LEO。如果此时 B 直接挂了,A 就会被选为 leader,那么就要依赖于 A 的 LEO,这个时候 A 的 LEO 是处于 2 的位置的,HW 是否会直接变成 2 而不进行截止。(此处是我的推论,未经证实,只是我看到的时候产生了疑问)
解决数据不一致:A、B 宕机后,B 先重启成为 leader,此时 Leader Epoch 由 0 变为 1,并且 LE 的开始偏移为 1。此时 A 重启,向 B 发送自己宕机时所处的 Leader Epoch,也就是 0。此时 B 返回 LE1 的开始偏移 1。A 发现自己的数据大于此值,便会截断自己的日志。从而保证数据的一致性
这个比较好理解,就不做过多的解释了,其实我觉得这个(epoch,sffset)主要是用来解决数据不一致问题的。。。
补充一下,这里其实还有问题,在极端情况下,也就是这样的情况下,你会发现消息 m2 没了。因为其实新复活的 B 并不能提供给 Am2,但这种就是 ISR 全挂了的情况,也算极端情况的一种
Kafka 的存储

前面已经多次提到 Kafka 的存储模型,这里再从“由外到内”的角度把它整理一遍:
- Topic 如何拆分为多个 Partition
- Partition 在磁盘上如何组织成目录
- 目录内部为什么会继续拆成多个 Segment
- Kafka 如何根据 offset 快速定位消息
先看最外层。Kafka 中的消息以 Topic 为逻辑分类单位,不同 Topic 彼此独立;每个 Topic 又可以拆成多个 Partition,每个 Partition 只保存该 Topic 的一部分消息。这样做既是为了水平扩展,也是为了并行读写。
从磁盘视角看,Partition 才是真正落地的存储单元。它通常对应日志目录下的一个子目录,命名规则一般是:
<topic_name>-<partition_id>
例如 orders-0、orders-1、orders-2。消息一旦进入某个 Partition,就会在这个 Partition 内获得单调递增的 offset。这个 offset 不是“全局唯一消息 ID”,而是“该消息在当前 Partition 内的位置编号”。
为什么 Kafka 要把 Topic 继续拆成多个 Partition?主要是因为:
- 可以把一个 Topic 的数据分散到多个 Broker 上,提升整体容量
- 可以让不同 Consumer 并行处理不同 Partition,提高消费能力
- 可以在保持分区内有序的前提下,兼顾吞吐和扩展性
关于 Partition 数量的设置,也没有绝对公式,但通常会综合考虑以下因素:
- 预期吞吐量是否需要并行扩展
- 消费者组的并发消费能力有多大
- Broker 数量是否足以均衡承载这些 Partition
- 分区越多,元数据、文件句柄和内存开销也会随之增加
再往内一层看,Partition 目录本身也不会只包含一个无限增大的日志文件。
Partition 的文件存储方式如下图所示:

一个 Partition 目录通常会被拆分成多个 Segment。可以把它理解为:一个大的分区日志,被切成很多按顺序排列的日志段。这样做有几个明显好处:
- 写入时始终只追加到当前活跃 Segment,逻辑简单
- 清理过期数据时,可以直接按 Segment 删除,而不是逐条删除消息
- 恢复和管理时不必面对一个无限膨胀的大文件
- 查找消息时可以先锁定 Segment,再进入局部查找
Producer 把消息发送到某个 Partition 后,Broker 会把消息追加到该 Partition 当前活跃 Segment 的末尾。当 Segment 达到大小阈值或满足滚动条件时,Broker 会关闭当前 Segment,并创建新的 Segment 继续写入。
一个典型的 Segment 往往会包含这些文件:
.log:真正存储消息内容的数据文件.index:按 offset 建立的稀疏索引文件.timeindex:按时间维度建立的辅助索引文件
Segment 的命名通常使用该日志段的起始 offset,并补齐为固定长度数字。这样 Kafka 在面对海量日志时,就能先借助文件名在多个 Segment 之间快速定位范围。
最后再看最关键的“按 offset 查消息”过程。整体上可以理解为三步:
- 根据目标 offset,在多个 Segment 之间做二分查找
- 进入目标 Segment 后,利用
.index稀疏索引找到最近的物理位置 - 回到
.log文件,从该位置开始顺序扫描,直到命中目标消息
这就是 Kafka 典型的查询思路:分段、索引、顺序扫描。它不靠昂贵的全量索引,而是通过“先缩小范围,再局部扫描”的方式,在存储成本和查询效率之间取得平衡。
最后再回到最初那个问题:为什么 Kafka 基于磁盘却还能这么快?答案并不是“磁盘比内存快”,而是 Kafka 通过顺序写、页缓存、批量处理、零拷贝和分区并行等机制,把磁盘、网络和操作系统的能力尽可能压榨出来了。这也是它能长期承担海量日志与事件流核心链路的根本原因。