Kafka学习笔记
2024-07-18 18:16:28 # kafka # 学习笔记

Kafka学习笔记

目录

基础架构

alt text

生产者

首先创建生产者,通过序列化器一般使用自带的序列化器,然后加上分区器,发送到缓存队列里面,这个队列大小是32m,每批次大小是16k。这个缓存队列叫双端队列,(这个队列里面其实还有一个内存池,发送批次数据的时候会创建批次大小并且从内存池中取出内存,后续数据发送到kafka集群时候就把内存再释放到内存池当中。)数据到队列中,Sender线程会拉取数据。拉取数据有两种情况:1. 数据积累到16k会发送数据 2. 数据如果迟迟没有达到16k,但是达到linger时间会发送数据。发送数据的时候会把每一个分区的数据每一个broker节点一个队列往外发送,如果kafka中没有及时应答,最多缓存五个请求。会有一个Selector将输入和输出打通,开始发送数据,集群收到会进行一个副本同步,会进行应答,应答有三种方式,ack=0,1,-1,应答成功会清理对应的请求,清理掉分区的数据,如果失败会重试,重试会一直重试,一直到发送成功为止

alt text

带有回调函数的异步发送

回调函数会返回一些发送的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 2 发送数据
for (int i = 0; i < 500; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {

if (exception == null){
System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
}
}
});

Thread.sleep(2);
}

kafka分区策略

alt text

三种分区方式

  1. 指定了分区就写在这个分区下面
  2. 没有指定分区但是有key,就把这个key的hash值和topic的分区数进行取余得到分区值
  3. 如果既没有分区也没有key使用粘性分区,随机选择一个分区使用,这个分区满了再换下一个分区
  4. 自定义分区
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
        public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

    // 获取数据 atguigu hello
    String msgValues = value.toString();

    int partition;

    if (msgValues.contains("atguigu")){
    partition = 0;
    }else {
    partition = 1;
    }

    return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
    }
分区方式实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 1. 指定分区
ProducerRecord<String, String> recordWithPartition = new ProducerRecord<>(topic, 0, "key1", "value1");
RecordMetadata metadata1 = producer.send(recordWithPartition).get();
System.out.printf("Sent record with partition: (key=%s, value=%s) to partition=%d%n", "key1", "value1", metadata1.partition());

// 2. 基于键的哈希值分区
ProducerRecord<String, String> recordWithKey = new ProducerRecord<>(topic, "key2", "value2");
RecordMetadata metadata2 = producer.send(recordWithKey).get();
System.out.printf("Sent record with key: (key=%s, value=%s) to partition=%d%n", "key2", "value2", metadata2.partition());

// 3. 粘性分区(既没有指定分区,也没有提供键)
ProducerRecord<String, String> recordNoPartitionNoKey = new ProducerRecord<>(topic, "value3");
RecordMetadata metadata3 = producer.send(recordNoPartitionNoKey).get();
System.out.printf("Sent record without key and partition: (value=%s) to partition=%d%n", "value3", metadata3.partition());

生产者如何提高吞吐量

提高吞吐量:

指的是在单位时间内能够发送更多的消息。提高吞吐量的关键在于优化消息的生产和发送流程,以最大化数据的传输效率和系统资源的利用率。

  1. 设置缓冲区大小
  2. 设置批次大小
  3. 减少linger时间
  4. 设置压缩
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    // 0 配置
    Properties properties = new Properties();

    // 连接kafka集群
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

    // 序列化
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

    // 缓冲区大小
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

    // 批次大小
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

    // linger.ms
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

    // 压缩
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");


    // 1 创建生产者
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

ack应答

alt text

alt text

alt text

总结

ack=0一般不用,不等数据落盘容易丢数据

ack=1用于一些普通日志数,leader收到数据后应答,但是有可能leader会挂掉,但是follower还没来得及同步副本,也可能丢数据

ack=-1,leader和isr队列中的所有节点收到数据后应答

leader维护一个isr队列,当follower迟迟不和leader保持通信就会认为是挂了,踢出isr队列,不向这个follower同步数据

数据完全可靠条件:ack=-1+分区副本大于等于2+isr里应答的最小副本数量大于等于2

事务

开启事务必须开启幂等性

幂等性

幂等性就是保证不管发送多少重复数据,broker端都只持久化一条数据,不会重复

alt text

必须自定义一个唯一的transactional.id

事务使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class IdempotentProducerExample {

public static void main(String[] args) throws ExecutionException, InterruptedException {
String topic = "my-topic";

// Kafka生产者配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 启用幂等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

// 设置事务ID
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-unique-transactional-id");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 初始化事务
producer.initTransactions();

try {
// 开始事务
producer.beginTransaction();

for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, Integer.toString(i), "message-" + i);
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Sent record(key=%s value=%s) meta(partition=%d, offset=%d)\n",
record.key(), record.value(), metadata.partition(), metadata.offset());
}

// 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 事务回滚
producer.abortTransaction();
e.printStackTrace();
} finally {
producer.close();
}
}
}

数据乱序处理

如果未启动幂等性:max.in.fight.requests.per.connection设置为1
如果启动幂等性:max.in.fight.requests.per.connection需要小于等于5

Broker

zk中的kafka信息

alt text

Broker的工作流程

  1. 首先启动的时候会将broker的启动信息会在zk里面注册。
  2. 然后抢占信息,谁抢的快谁就是监控的Controller。
  3. 监控的Controller会健康brokers的节点变化,然后选举Leader,Leader的选举规则是根据isr里面存活的节点,然后按照ar的顺序选举。这里的isr就是能连通的所有节点,ar就是kafka里面的所有副本
  4. leader负责读写操作,follower负责数据的同步
  5. 选举完,Controller会将选举信息上传到zk中
  6. 其他的Controller会从zk中同步选举的信息
  7. 如果有一个节点挂了,监听的Controller会监听到变化然后去获取isr,重新选择leader,然后更新isr和leader,再重复前面的内容

alt text

Leader和Follower的故障处理

follower故障处理细节

alt text

  1. 当这个follower出现故障后会被踢出isr

  2. 在这期间leader和follower会继续接受数据

  3. 等这个follower恢复的时候会读取当前磁盘中的hw,将自己高于这个hw的数据都删除掉,因为这些数据都是没有验证的数据,然后去追赶这个hw,当follower追赶上当前的hw,就可以加入isr了

  4. leo是每个副本的最后一个offset也就是每个副本最新的数据

    hw是所有副本的最小的数据是全局的

leader故障处理细节

alt text

  1. leader发生故障后会选出一个新的leader
  2. 然后为了保证数据的一致性,其余的follower会将自身文件高于hw的部分截掉,然后从这个新的leader同步数据

这样只能保证副本的一致性,无法保证数据不丢失或者不重复

生产经验

手动调整分区副本,创建副本存储计划

手动调整分区副本存储的步骤如下:

(1)创建一个新的topic,名称为three。

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 4 --replication-factor 2 --topic three

(2)查看分区副本存储情况。

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three

(3)创建副本存储计划(所有副本都指定存储在broker0、broker1中)。

vim increase-replication-factor.json

输入如下内容:

1
2
3
4
5
6
7
{
"version":1,
"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}]
}

(4)执行副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

(5)验证副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify

(6)查看分区副本存储情况。

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three

leader Partition自动平衡

auto.leader.rebalance.enable 建议设置false

增加副本因子

由于某个主题很重要,所以我们要对这个重要的主题增加副本

1)创建topic

bin/kafka-topics.sh –bootstrap-server hadoop102:9092 –create –partitions 3 –replication-factor 1 –topic four

2)手动增加副本存储

  • 创建副本存储计划(所有副本都指定存储在broker0、broker1、broker2中)。

vim increase-replication-factor.json

输入如下内容:

1
{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}

(2)执行副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

文件存储机制

概念

首先kafka中topic是逻辑上的概念,partition是物理上的概念,每个partition对应一个log文件,数据是不断追加到log文件的末尾的。为了防止文件过大定位太慢,kafka使用分片和稀疏索引机制,每个log分为多个segment,segment中包括index文件,log文件和timeindex文件,文件命名规则是:topic名称+分区序号

log文件和index文件详解

如何在log文件中找到offset为600的数据

  1. 首先根据segment文件的名字找打segment文件。这个文件里面存储的是相对offset,通过相对offset+segment文件名获取绝对offset
  2. 找到小于等于目标offset的最大的offset对应的索引
  3. 然后定位到log文件
  4. 向下遍历找到想要的record

alt text

注意

  1. kafka的index是稀疏索引,每向log文件中写入4kb数据,就会向index文件写入一条索引。

  2. index文件中保存的offset是相对offset

文件清理策略

日志保存时间默认为7天,一旦超过了设置的时间有两种清理策略

  1. delete:直接将过期数据删除
  2. compact:将相同key的数据的不同value值保留最后一个版本

kafka如何高效读写数据

  1. kafka本身就是分布式的集群,分区技术,并行度高4
  2. 读数据使用稀疏索引,方便定位
  3. 顺序写入磁盘,kafka是追加写,不是更新
  4. 零拷贝和页缓存技术

零拷贝

在Kafka中,零拷贝指的是数据在传输过程中,避免了不必要的内存复制操作,从而提高了数据传输的效率和性能。具体来说:

生产者写入数据:

当Kafka生产者产生消息时,数据被写入到内核的页缓存中,而不是首先拷贝到用户空间再写入。
数据被直接写入到Linux系统的页缓存中,这个过程中不会涉及到从用户空间到内核空间的复制操作,称为零拷贝写入。
消费者读取数据:

Kafka消费者从页缓存中读取数据,同样避免了将数据从内核复制到用户空间的额外开销。
消费者能够直接访问页缓存中的数据,提高了数据读取的效率。

页缓存

Linux操作系统使用页缓存(Page Cache)来缓存文件系统中的数据,这些数据可以是文件的内容,包括Kafka日志文件中的消息数据。页缓存技术的关键点包括:

数据落盘策略:
当数据被写入到页缓存后,并不会立即写入磁盘,而是由Linux系统根据一定的策略(如写时复制、延迟写入等)来决定何时将数据同步到文件系统的文件(如Kafka的日志段文件)中。
这种延迟写入的机制能够减少频繁的磁盘访问,提高了整体的写入性能。

零拷贝和页缓存技术举例说明

假设有一个名为 my_topic 的Kafka主题,它有一个分区(Partition) my_topic-0,并且分区中包含多个日志段文件(Segment Files)用于存储消息数据。

生产者写入数据:

Kafka生产者生成一条新消息,并将其发送到主题 my_topic 的分区 my_topic-0。
生产者将消息数据直接写入到Linux系统的页缓存中,避免了额外的数据复制操作。
数据落盘:

Linux系统根据自身的策略,例如延迟写入策略,决定何时将页缓存中的数据(包括Kafka生产者写入的消息)同步到文件系统的日志段文件中。
这意味着数据首先存储在内存中,减少了频繁的磁盘访问,从而提高了写入性能和系统的响应速度。
消费者读取数据:

Kafka消费者从页缓存中直接读取数据,而不需要将数据再次复制到用户空间。
消费者能够有效地访问到页缓存中的最新数据,减少了读取操作的延迟。
网络传输:

由于Kafka消费者直接从页缓存读取数据,并且数据已经在内核中进行了预处理和缓存,因此发送到消费者的数据可以直接通过网卡进行传输,而不需要额外的应用层处理或复制操作。
这进一步降低了数据传输的延迟,提高了整体系统的效率和性能。
通过这种方式,Kafka利用了操作系统的页缓存和零拷贝技术,实现了高效的数据写入和读取,适用于大规模数据处理和实时消息传输的场景。

kafka消费者

总体工作流程

  1. 生产者向每一个分区的leader发送数据,每一个分区的follower拉取自己分区的leader的数据作为副本
  2. 消费者消费数据,每个分区的数据只能由一个消费者组中的一个消费者消费
  3. 每个消费者的offset存放在kafka的_consumer_offsets主题下

消费者工作原理

alt text

消费者组初始化流程

  1. 首先选择出coordinator节点,这个节点是用消费者组的groupid%50来计算出哪个分区是这个消费者组的老大(leader消费者)
  2. coordinator会讲话要消费的topic情况发送给上面选出的leader消费者
  3. leader消费者会制定消费方案
  4. leader会将制定好的方案发送给coordinator,然后coordinator会下发给各个消费者
  5. 注意注意注意这里的每个消费者都需要和coordinator保持心跳,如果超过45s没有心跳,这个消费者就会被移除,就会触发再平衡。

这个再平衡非常的影响性能,最好不要触发再平衡

alt text

消费者组的详细消费流程

alt text

这里的图片是从右往左看的

首先消费者发送消费请求给NetWork客户端,发送请求导对应分区,请求返回一个回调函数OnSuccess,放到一个completeFetches队列中,队列中会有很多返回的数据,然后消费者向这个队列中拉取数据,默认500条拉取一次。

这里注意:每批次抓取的大小默认1字节,也就是当分区内有一个字节的数据就直接拉取,数据最小值超时时间默认是500ms,也就是500ms没有拉取导小于设置的抓取大小的数据就直接都拉取到对列。每批次拉取的最大大小是50m

分区的分配和再平衡

有四种分区,Range,RoundRobin,Sticky,CooperativeSticky

kafka默认的是Range+CooperativeStricky

Range

分区按照序号排序,分区数/消费者数获取每个消费者可以消费多少分区然后分配
alt text

再分配:

(1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者:消费到3、4号分区数据。
2号消费者:消费到5、6号分区数据。
0号消费者的任务会整体被分配到1号消费者或者2号消费者。
说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
(2)再次重新发送消息观看结果(45s以后)。
1号消费者:消费到0、1、2、3号分区数据。
2号消费者:消费到4、5、6号分区数据。
说明:消费者0已经被踢出消费者组,所以重新按照range方式分配。

RoundRobin

轮询分配
alt text

再分配:

(1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者:消费到2、5号分区数据
2号消费者:消费到4、1号分区数据
0号消费者的任务会按照RoundRobin的方式,把数据轮询分成0 、6和3号分区数据,分别由1号消费者或者2号消费者消费。
说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
(2)再次重新发送消息观看结果(45s以后)。
1号消费者:消费到0、2、4、6号分区数据
2号消费者:消费到1、3、5号分区数据
说明:消费者0已经被踢出消费者组,所以重新按照RoundRobin方式分配。

Sticky

粘性分区

尽量防止分区

再分配:

(1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者:消费到2、5、3号分区数据。
2号消费者:消费到4、6号分区数据。
0号消费者的任务会按照粘性规则,尽可能均衡的随机分成0和1号分区数据,分别由1号消费者或者2号消费者消费。
说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
(2)再次重新发送消息观看结果(45s以后)。
1号消费者:消费到2、3、5号分区数据。
2号消费者:消费到0、1、4、6号分区数据。
说明:消费者0已经被踢出消费者组,所以重新按照粘性方式分配。

offset

默认维护位置

维护在_consumer_offsets主题中,kv的方式存储,k是groupid+topic+分区号,v就是当前的offset的值,每隔一段时间就会对这个数进行compact,保存最新的数据

自动提交offset

enable.auto.commit 默认值为true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。

手动提交offset

同步提交offset

consumer.commitSync();

异步提交offset

consumer.commitAsync()

相同与不同

同步提交offset和异步提交offset的相同点是都会选择一批数据中最高的偏移量提交,不同是同步提交会阻塞线程,一直到提交成功,自动失败重试。异步提交没有失败重试可能会提交失败

同步提交必须等待offset提交完再去消费下一批数据
异步提交是发送完提交offset请求就去消费下一批数据

同步提交:由于等待提交结果,可以确保offset已经成功提交到Kafka。这对于关键任务或需要严格一致性的应用场景尤为重要。

异步提交:提交结果在后台处理,即使提交失败,消费者线程也不会直接受到影响。但是,需要额外的机制处理可能的提交失败。

指定offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public static void main(String[] args) {

// 0 配置信息
Properties properties = new Properties();

// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// 组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");

// 1 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

// 2 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);

// 指定位置进行消费
Set<TopicPartition> assignment = kafkaConsumer.assignment();

// 保证分区分配方案已经制定完毕
//获取消费者的分区分配信息,并确保消费者已经分配到了分区。消费者需要先执行一次poll操作来触发分区分配。
while (assignment.size() == 0){//如果大小等于0,就说明没有获取任何的分区分配方案
kafkaConsumer.poll(Duration.ofSeconds(1));

assignment = kafkaConsumer.assignment();
}

// 指定消费的offset
for (TopicPartition topicPartition : assignment) {
kafkaConsumer.seek(topicPartition,600);
}

// 3 消费数据
while (true){

ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {

System.out.println(consumerRecord);
}
}
}

指定时间消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public static void main(String[] args) {

// 0 配置信息
Properties properties = new Properties();

// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");

// key value反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");

// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);

Set<TopicPartition> assignment = new HashSet<>();

while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}

HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();

// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : assignment) {
timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
// 获取从1天前开始消费的每个分区的offset
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);

// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);

// 根据时间指定开始消费的位置
if (offsetAndTimestamp != null){
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
}
}

// 3 消费该主题数据
while (true) {

ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}

注意:这个指定时间消费是重点,可以用作flink补数

漏消费和重复消费

重复消费

自动提交offset引起

已经消费数据了但是offset没有提交

漏消费

手动提交offset了,数据没有落盘,消费者挂了

先提交offset后消费数据

解决方案

  1. 开启消费者事务

  2. 可以开启自动提交offset,然后中间用redis进行数据去重