kafka问题总结(持续更新)
2024-10-28 18:00:18 # kafka # 常见问题

kafka问题总结

kafka乱序问题

kafka只能保证分区内有序,所以要不就直接设置成一个分区

或者可以开启幂等性

acks

  • acks=0 不管应答,咔咔就是写
  • acks=1 只要leader写入成功就算进行应答,不管follower是否同步成功
  • ack=-1 leader写入成功,并且follower同步成功才算应答

kafka为什么是有序的

  1. 分区顺序写入
  2. 分区内的唯一offset
  3. 按照key的分区键
  4. 消费者按照偏移量顺序读取
  5. 副本机制。isr机制

kafka创建大量topic会有什么影响

  1. 每个topic的每个分区都会占用一定的内存资源来存储元数据信息,broker需要维护更多的元数据
  2. kafka会定期向broker请求元数据,大量topic会导致元数据请求和响应的大小增大
  3. 增加元数据同步的开销
  4. zookeeper存储元数据,大量topic会增加zookeeper压力
  5. 运维困难

为什么要使用消息队列

其实消息队列的核心就是3个:解耦,异步,消峰

使用消息队列可以产生数据就放到消息队列里面,哪个系统哪个程序需要就直接去里面消费,哪个程序不需要消费了就取消消费就好会很方便,这就是解耦

假如我们有一个需求是接收到了一个请求就要在写库,需要在A,B,C,D四个系统都写库,那么时间就是A+B+C+D四个系统的写库时间,如果我们使用了消息队列,我们就可以异步执行了,速度会很快,这就是异步

消峰不做解释

缺点就是有很多的数据不一致等等问题,不过这些问题可以解决掉

kafka的高可用

kafka的leader,follower机制和isr机制和ack机制

如果出现kafka数据积压问题怎么办

假设我们消费程序出现问题,数据积压了好长时间,现在修复问题后重新恢复消费速度,也需要等待好几个小时消费完挤压数据,那么一般如何处理这种积压数据呢

我们需要进行紧急扩容,首先需要修复consumer问题,确保消费速度,将现在的所有consumer都停掉

然后新创建一个topic,并且把这个新创建的topic的partition设置为原来的10倍,假设原来是10个分区,现在就创建100个分区

写一个简单的消费程序,从原来积压的topic拉区数据,然后将数据使用轮询的方式均匀的分发到新创建的这100个分区上,从而避免数据处理的耗时

在临时建立的topic上新部署10倍的消费者,这些消费者分布在100个queue上,各自并发消费各自的数据,相当于消费速度提高了10倍,积压数据很快被消费掉

消费完所有的积压数据后,销毁临时的topic和consumer,恢复之前的架构,让最初的consumer回到原来的queue上继续消费数据

示例

下面我们来举个例子

有一个 Kafka topic,名为 original_topic,用于接收生产环境的数据。由于消费者处理逻辑复杂或网络问题,导致消费速度变慢,结果这个 topic 上积压了 1,000 万条数据,系统中有 10 个 partition。假设每个 consumer 每秒能处理 1000 条数据,恢复正常后,使用 3 个 consumer 每秒能处理 3000 条。因此,如果直接消费积压的数据,大概需要 1 小时 才能清完。

为了更快清理积压数据,我们决定通过临时扩容处理:

  1. 创建新topic并且不做处理将原始topic的数据拉取到新topic中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
     import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import java.util.Random;
    public class DispatcherConsumer {
    private static final String ORIGINAL_TOPIC = "original_topic";
    private static final String TEMP_TOPIC = "temp_topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    public static void main(String[] args) {
    Properties consumerProps = new Properties();
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "dispatcher_group");
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    Properties producerProps = new Properties();
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

    consumer.subscribe(Collections.singletonList(ORIGINAL_TOPIC));
    Random random = new Random();

    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    records.forEach(record -> {
    // 轮询分配到 TEMP_TOPIC 的随机分区
    int partition = random.nextInt(100); // 假设 TEMP_TOPIC 有 100 个 partition
    ProducerRecord<String, String> producerRecord =
    new ProducerRecord<>(TEMP_TOPIC, partition, record.key(), record.value());
    producer.send(producerRecord);
    });
    }
    }
    }
  2. 从这个新的topc消费数据并进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class TempConsumer {
private static final String TEMP_TOPIC = "temp_topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";

public static void main(String[] args) {
// args[0] 是分区号
int partitionNumber = Integer.parseInt(args[0]);

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "temp_consumer_group_" + partitionNumber);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

TopicPartition partition = new TopicPartition(TEMP_TOPIC, partitionNumber);
consumer.assign(Collections.singletonList(partition));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
// 处理消息
System.out.println("Consumed from partition " + partitionNumber + ": " + record.value());
});
}
}
}

这里我们是用命令行的方式来指定分区号的因为这样在处理大量积压数据的时候:

  • 支持并行消费特定分区:一个消费者只消费一个分区就可以避免当出现问题的时候kafka启动自带的rebalance机制了,如果哪个分区消费出现问题直接重启一下这个分区的消费者就好了
  • 如果不指定分区号的话,kafka会自动分配,根据分区和consumer的数量进行分配,这样无法灵活扩展或者减少consumer数量
  • 如果某个 consumer 出现故障,只需要重新启动该分区对应的 TempConsumer 实例。这样不会影响到其他分区的消费流程
  • 确保每条消息只被一个 consumer 实例处理,从而避免数据不一致和消息丢失的问题。
  • 更加适用于临时扩容的情况
  1. 在清理完积压数据后,我们停止 DispatcherConsumerTempConsumer,恢复原始的 original_topic consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class OriginalConsumer {
private static final String ORIGINAL_TOPIC = "original_topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";

public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "original_consumer_group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(ORIGINAL_TOPIC));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
// 原始消息处理逻辑
System.out.println("Original consumer processed: " + record.value());
});
}
}
}
  1. 总结启动步骤
  • 执行步骤
  • 启动 DispatcherConsumer 以从 original_topic 消费数据并分发到 temp_topic。
  • 部署多个 TempConsumer 实例,每个实例指定一个不同的 partition。
  • 等积压清理完毕后,关闭 DispatcherConsumer 和所有 TempConsumer 实例。
  • 启动 OriginalConsumer,恢复原始架构。
  1. 问题:如何知道积压已经清理完毕了呢

    我们可以在分发数据的 DispatcherConsumer 程序中加入逻辑,用于定期检查每个 partition 的末尾偏移量(end offset),并将其与当前消费的位置(current offset)进行比较。当前偏移量等于末尾偏移量时,说明已经消费完毕

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class BacklogMonitor {
private static final String ORIGINAL_TOPIC = "original_topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";

public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "monitor_group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 获取每个 partition 的末尾偏移量
for (int partition = 0; partition < 10; partition++) { // 假设 original_topic 有 10 个 partition
TopicPartition topicPartition = new TopicPartition(ORIGINAL_TOPIC, partition);
consumer.assign(Collections.singletonList(topicPartition));

// 获取当前消费位置和末尾位置
long currentOffset = consumer.position(topicPartition);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(Collections.singletonList(topicPartition));
long endOffset = endOffsets.get(topicPartition);

// 检查是否消费完毕
if (currentOffset < endOffset) {
System.out.println("Partition " + partition + " still has backlog.");
} else {
System.out.println("Partition " + partition + " backlog cleared.");
}
}
}
}