Kafka学习笔记
目录
基础架构
生产者
首先创建生产者,通过序列化器一般使用自带的序列化器,然后加上分区器,发送到缓存队列里面,这个队列大小是32m,每批次大小是16k。这个缓存队列叫双端队列,(这个队列里面其实还有一个内存池,发送批次数据的时候会创建批次大小并且从内存池中取出内存,后续数据发送到kafka集群时候就把内存再释放到内存池当中。)数据到队列中,Sender线程会拉取数据。拉取数据有两种情况:1. 数据积累到16k会发送数据 2. 数据如果迟迟没有达到16k,但是达到linger时间会发送数据。发送数据的时候会把每一个分区的数据每一个broker节点一个队列往外发送,如果kafka中没有及时应答,最多缓存五个请求。会有一个Selector将输入和输出打通,开始发送数据,集群收到会进行一个副本同步,会进行应答,应答有三种方式,ack=0,1,-1,应答成功会清理对应的请求,清理掉分区的数据,如果失败会重试,重试会一直重试,一直到发送成功为止
带有回调函数的异步发送
回调函数会返回一些发送的信息
1 | // 2 发送数据 |
kafka分区策略
三种分区方式
- 指定了分区就写在这个分区下面
- 没有指定分区但是有key,就把这个key的hash值和topic的分区数进行取余得到分区值
- 如果既没有分区也没有key使用粘性分区,随机选择一个分区使用,这个分区满了再换下一个分区
- 自定义分区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public class MyPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取数据 atguigu hello
String msgValues = value.toString();
int partition;
if (msgValues.contains("atguigu")){
partition = 0;
}else {
partition = 1;
}
return partition;
}
public void close() {
}
public void configure(Map<String, ?> configs) {
}
}
分区方式实例
1 | // 1. 指定分区 |
生产者如何提高吞吐量
提高吞吐量:
指的是在单位时间内能够发送更多的消息。提高吞吐量的关键在于优化消息的生产和发送流程,以最大化数据的传输效率和系统资源的利用率。
- 设置缓冲区大小
- 设置批次大小
- 减少linger时间
- 设置压缩
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26// 0 配置
Properties properties = new Properties();
// 连接kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// 批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
// 1 创建生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
ack应答
总结
ack=0一般不用,不等数据落盘容易丢数据
ack=1用于一些普通日志数,leader收到数据后应答,但是有可能leader会挂掉,但是follower还没来得及同步副本,也可能丢数据
ack=-1,leader和isr队列中的所有节点收到数据后应答
leader维护一个isr队列,当follower迟迟不和leader保持通信就会认为是挂了,踢出isr队列,不向这个follower同步数据
数据完全可靠条件:ack=-1+分区副本大于等于2+isr里应答的最小副本数量大于等于2
事务
开启事务必须开启幂等性
幂等性
幂等性就是保证不管发送多少重复数据,broker端都只持久化一条数据,不会重复
必须自定义一个唯一的transactional.id
事务使用
1 | public class IdempotentProducerExample { |
数据乱序处理
如果未启动幂等性:max.in.fight.requests.per.connection设置为1
如果启动幂等性:max.in.fight.requests.per.connection需要小于等于5
Broker
zk中的kafka信息
Broker的工作流程
- 首先启动的时候会将broker的启动信息会在zk里面注册。
- 然后抢占信息,谁抢的快谁就是监控的Controller。
- 监控的Controller会健康brokers的节点变化,然后选举Leader,Leader的选举规则是根据isr里面存活的节点,然后按照ar的顺序选举。这里的isr就是能连通的所有节点,ar就是kafka里面的所有副本
- leader负责读写操作,follower负责数据的同步
- 选举完,Controller会将选举信息上传到zk中
- 其他的Controller会从zk中同步选举的信息
- 如果有一个节点挂了,监听的Controller会监听到变化然后去获取isr,重新选择leader,然后更新isr和leader,再重复前面的内容
Leader和Follower的故障处理
follower故障处理细节
当这个follower出现故障后会被踢出isr
在这期间leader和follower会继续接受数据
等这个follower恢复的时候会读取当前磁盘中的hw,将自己高于这个hw的数据都删除掉,因为这些数据都是没有验证的数据,然后去追赶这个hw,当follower追赶上当前的hw,就可以加入isr了
leo是每个副本的最后一个offset也就是每个副本最新的数据
hw是所有副本的最小的数据是全局的
leader故障处理细节
- leader发生故障后会选出一个新的leader
- 然后为了保证数据的一致性,其余的follower会将自身文件高于hw的部分截掉,然后从这个新的leader同步数据
这样只能保证副本的一致性,无法保证数据不丢失或者不重复
生产经验
手动调整分区副本,创建副本存储计划
手动调整分区副本存储的步骤如下:
(1)创建一个新的topic,名称为three。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 4 --replication-factor 2 --topic three
(2)查看分区副本存储情况。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
(3)创建副本存储计划(所有副本都指定存储在broker0、broker1中)。
vim increase-replication-factor.json
输入如下内容:
1 | { |
(4)执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
(5)验证副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
(6)查看分区副本存储情况。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
leader Partition自动平衡
auto.leader.rebalance.enable
建议设置false
增加副本因子
由于某个主题很重要,所以我们要对这个重要的主题增加副本
1)创建topic
bin/kafka-topics.sh –bootstrap-server hadoop102:9092 –create –partitions 3 –replication-factor 1 –topic four
2)手动增加副本存储
- 创建副本存储计划(所有副本都指定存储在broker0、broker1、broker2中)。
vim increase-replication-factor.json
输入如下内容:
1 | {"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]} |
(2)执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
文件存储机制
概念
首先kafka中topic是逻辑上的概念,partition是物理上的概念,每个partition对应一个log文件,数据是不断追加到log文件的末尾的。为了防止文件过大定位太慢,kafka使用分片和稀疏索引机制,每个log分为多个segment,segment中包括index文件,log文件和timeindex文件,文件命名规则是:topic名称+分区序号
log文件和index文件详解
如何在log文件中找到offset为600的数据
- 首先根据segment文件的名字找打segment文件。这个文件里面存储的是相对offset,通过相对offset+segment文件名获取绝对offset
- 找到小于等于目标offset的最大的offset对应的索引
- 然后定位到log文件
- 向下遍历找到想要的record
注意:
kafka的index是稀疏索引,每向log文件中写入4kb数据,就会向index文件写入一条索引。
index文件中保存的offset是相对offset
文件清理策略
日志保存时间默认为7天,一旦超过了设置的时间有两种清理策略
- delete:直接将过期数据删除
- compact:将相同key的数据的不同value值保留最后一个版本
kafka如何高效读写数据
- kafka本身就是分布式的集群,分区技术,并行度高4
- 读数据使用稀疏索引,方便定位
- 顺序写入磁盘,kafka是追加写,不是更新
- 零拷贝和页缓存技术
零拷贝
在Kafka中,零拷贝指的是数据在传输过程中,避免了不必要的内存复制操作,从而提高了数据传输的效率和性能。具体来说:
生产者写入数据:
当Kafka生产者产生消息时,数据被写入到内核的页缓存中,而不是首先拷贝到用户空间再写入。
数据被直接写入到Linux系统的页缓存中,这个过程中不会涉及到从用户空间到内核空间的复制操作,称为零拷贝写入。
消费者读取数据:
Kafka消费者从页缓存中读取数据,同样避免了将数据从内核复制到用户空间的额外开销。
消费者能够直接访问页缓存中的数据,提高了数据读取的效率。
页缓存
Linux操作系统使用页缓存(Page Cache)来缓存文件系统中的数据,这些数据可以是文件的内容,包括Kafka日志文件中的消息数据。页缓存技术的关键点包括:
数据落盘策略:
当数据被写入到页缓存后,并不会立即写入磁盘,而是由Linux系统根据一定的策略(如写时复制、延迟写入等)来决定何时将数据同步到文件系统的文件(如Kafka的日志段文件)中。
这种延迟写入的机制能够减少频繁的磁盘访问,提高了整体的写入性能。
零拷贝和页缓存技术举例说明
假设有一个名为 my_topic 的Kafka主题,它有一个分区(Partition) my_topic-0,并且分区中包含多个日志段文件(Segment Files)用于存储消息数据。
生产者写入数据:
Kafka生产者生成一条新消息,并将其发送到主题 my_topic 的分区 my_topic-0。
生产者将消息数据直接写入到Linux系统的页缓存中,避免了额外的数据复制操作。
数据落盘:
Linux系统根据自身的策略,例如延迟写入策略,决定何时将页缓存中的数据(包括Kafka生产者写入的消息)同步到文件系统的日志段文件中。
这意味着数据首先存储在内存中,减少了频繁的磁盘访问,从而提高了写入性能和系统的响应速度。
消费者读取数据:
Kafka消费者从页缓存中直接读取数据,而不需要将数据再次复制到用户空间。
消费者能够有效地访问到页缓存中的最新数据,减少了读取操作的延迟。
网络传输:
由于Kafka消费者直接从页缓存读取数据,并且数据已经在内核中进行了预处理和缓存,因此发送到消费者的数据可以直接通过网卡进行传输,而不需要额外的应用层处理或复制操作。
这进一步降低了数据传输的延迟,提高了整体系统的效率和性能。
通过这种方式,Kafka利用了操作系统的页缓存和零拷贝技术,实现了高效的数据写入和读取,适用于大规模数据处理和实时消息传输的场景。
kafka消费者
总体工作流程
- 生产者向每一个分区的leader发送数据,每一个分区的follower拉取自己分区的leader的数据作为副本
- 消费者消费数据,每个分区的数据只能由一个消费者组中的一个消费者消费
- 每个消费者的offset存放在kafka的_consumer_offsets主题下
消费者工作原理
消费者组初始化流程
- 首先选择出coordinator节点,这个节点是用消费者组的groupid%50来计算出哪个分区是这个消费者组的老大(leader消费者)
- coordinator会讲话要消费的topic情况发送给上面选出的leader消费者
- leader消费者会制定消费方案
- leader会将制定好的方案发送给coordinator,然后coordinator会下发给各个消费者
- 注意注意注意这里的每个消费者都需要和coordinator保持心跳,如果超过45s没有心跳,这个消费者就会被移除,就会触发再平衡。
这个再平衡非常的影响性能,最好不要触发再平衡
消费者组的详细消费流程
这里的图片是从右往左看的
首先消费者发送消费请求给NetWork客户端,发送请求导对应分区,请求返回一个回调函数OnSuccess,放到一个completeFetches队列中,队列中会有很多返回的数据,然后消费者向这个队列中拉取数据,默认500条拉取一次。
这里注意:每批次抓取的大小默认1字节,也就是当分区内有一个字节的数据就直接拉取,数据最小值超时时间默认是500ms,也就是500ms没有拉取导小于设置的抓取大小的数据就直接都拉取到对列。每批次拉取的最大大小是50m
分区的分配和再平衡
有四种分区,Range,RoundRobin,Sticky,CooperativeSticky
kafka默认的是Range+CooperativeStricky
Range
分区按照序号排序,分区数/消费者数获取每个消费者可以消费多少分区然后分配
再分配:
(1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者:消费到3、4号分区数据。
2号消费者:消费到5、6号分区数据。
0号消费者的任务会整体被分配到1号消费者或者2号消费者。
说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
(2)再次重新发送消息观看结果(45s以后)。
1号消费者:消费到0、1、2、3号分区数据。
2号消费者:消费到4、5、6号分区数据。
说明:消费者0已经被踢出消费者组,所以重新按照range方式分配。
RoundRobin
轮询分配
再分配:
(1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者:消费到2、5号分区数据
2号消费者:消费到4、1号分区数据
0号消费者的任务会按照RoundRobin的方式,把数据轮询分成0 、6和3号分区数据,分别由1号消费者或者2号消费者消费。
说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
(2)再次重新发送消息观看结果(45s以后)。
1号消费者:消费到0、2、4、6号分区数据
2号消费者:消费到1、3、5号分区数据
说明:消费者0已经被踢出消费者组,所以重新按照RoundRobin方式分配。
Sticky
粘性分区
尽量防止分区
再分配:
(1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者:消费到2、5、3号分区数据。
2号消费者:消费到4、6号分区数据。
0号消费者的任务会按照粘性规则,尽可能均衡的随机分成0和1号分区数据,分别由1号消费者或者2号消费者消费。
说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
(2)再次重新发送消息观看结果(45s以后)。
1号消费者:消费到2、3、5号分区数据。
2号消费者:消费到0、1、4、6号分区数据。
说明:消费者0已经被踢出消费者组,所以重新按照粘性方式分配。
offset
默认维护位置
维护在_consumer_offsets主题中,kv的方式存储,k是groupid+topic+分区号,v就是当前的offset的值,每隔一段时间就会对这个数进行compact,保存最新的数据
自动提交offset
enable.auto.commit 默认值为true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。
手动提交offset
同步提交offset
consumer.commitSync();
异步提交offset
consumer.commitAsync()
相同与不同
同步提交offset和异步提交offset的相同点是都会选择一批数据中最高的偏移量提交,不同是同步提交会阻塞线程,一直到提交成功,自动失败重试。异步提交没有失败重试可能会提交失败
同步提交必须等待offset提交完再去消费下一批数据
异步提交是发送完提交offset请求就去消费下一批数据
同步提交:由于等待提交结果,可以确保offset已经成功提交到Kafka。这对于关键任务或需要严格一致性的应用场景尤为重要。
异步提交:提交结果在后台处理,即使提交失败,消费者线程也不会直接受到影响。但是,需要额外的机制处理可能的提交失败。
指定offset
1 | public static void main(String[] args) { |
指定时间消费
1 | public static void main(String[] args) { |
注意:这个指定时间消费是重点,可以用作flink补数
漏消费和重复消费
重复消费
自动提交offset引起
已经消费数据了但是offset没有提交
漏消费
手动提交offset了,数据没有落盘,消费者挂了
先提交offset后消费数据
解决方案
开启消费者事务
可以开启自动提交offset,然后中间用redis进行数据去重