flink学习笔记大纲(二)
2024-06-20 18:00:42 # Flink # 学习笔记

flink学习笔记大纲(二)

目录

多流转换

分流

侧输出流

可以使用侧输出流做分流

合流

Union

要求数据类型必须相同

水位线按照最慢的数据时间开始计算

Connect

一国两制,key相同的数据放在一起

CoprocessFunction

Join

基于时间窗口进行合流

  1. 窗口联结

    假设我们有两个流,A流和B流,每个流中包含用户的活动记录。我们希望在一个5秒的时间窗口内联结这两个流,找出同时在两个流中都有活动记录的用户。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, String>> streamA = env.fromElements(
    Tuple2.of("user1", "login"),
    Tuple2.of("user2", "logout")
    );

    DataStream<Tuple2<String, String>> streamB = env.fromElements(
    Tuple2.of("user1", "click"),
    Tuple2.of("user3", "click")
    );

    streamA
    .join(streamB)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .apply((tupleA, tupleB) -> Tuple2.of(tupleA.f0, "activity"))
    .print();

    env.execute("Window Join Example");

    在这个示例中,我们使用一个5秒的滚动时间窗口来联结两个流。如果两个流中同一用户的活动记录出现在同一个5秒的时间窗口内,它们就会被联结。

  2. 间隔联结

    间隔联结允许对两个流进行联结,并在指定的时间间隔内进行匹配。与窗口联结不同的是,间隔联结不使用固定的窗口,而是在指定的时间范围内进行联结。

    假设我们有两个流,A流和B流,每个流中包含传感器的读数。我们希望在A流的事件时间前后2秒内找到B流中的匹配记录。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, Long>> streamA = env.fromElements(
    Tuple2.of("sensor1", 1000L),
    Tuple2.of("sensor2", 2000L)
    ).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, Long>>() {
    @Override
    public long extractAscendingTimestamp(Tuple2<String, Long> element) {
    return element.f1;
    }
    });

    DataStream<Tuple2<String, Long>> streamB = env.fromElements(
    Tuple2.of("sensor1", 1500L),
    Tuple2.of("sensor3", 2500L)
    ).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, Long>>() {
    @Override
    public long extractAscendingTimestamp(Tuple2<String, Long> element) {
    return element.f1;
    }
    });

    streamA
    .keyBy(tuple -> tuple.f0)
    .intervalJoin(streamB.keyBy(tuple -> tuple.f0))
    .between(Time.seconds(-2), Time.seconds(2))
    .process(new ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple2<String, Long>>() {
    @Override
    public void processElement(Tuple2<String, Long> left, Tuple2<String, Long> right, Context ctx, Collector<Tuple2<String, Long>> out) {
    out.collect(Tuple2.of(left.f0, left.f1 + right.f1));
    }
    }).print();

    env.execute("Interval Join Example");


    在这个示例中,我们使用间隔联结操作,指定时间范围为[-2秒, 2秒]。这意味着,如果A流中的传感器读数事件时间与B流中的传感器读数事件时间相差不超过2秒,它们就会被联结。

状态

状态分类

托管状态

自动持久化保存,故障自动恢复,状态自动重组,一般直接使用托管状态

算子状态

当前分区的并行子任务的状态完全独立

按键分区状态(和key相关)

必须先keyby之后再使用,每一个分区可能会有不同的key,只针对当前的key有效,最常用

管理状态

需要自己去管理和维护状态

按键分区状态

底层实现

底层是一个map键值对,直接访问值也就是当前的状态

结构类型

  1. 值状态ValueState

  2. 列表状态ListState

  3. 映射状态MapState

  4. 归约状态ReducingState :

    保存的是归约之后的聚合结果,保存类型值状态

    每次add都是把新数据和之前的数据进行归约后再进行保存,愈合的数据类型必须相同

    1
    2
    3
    4
    5
    6
    7
    8
    myReducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Event>("my-reduce",
    new ReduceFunction<Event>() {
    @Override
    public Event reduce(Event value1, Event value2) throws Exception {
    return new Event(value1.user, value1.url, value2.timestamp);
    }
    }
    , Event.class));
  5. 聚合状态AggregatingState

    也是聚合结果保存,传入一个AggregateFunction,聚合的数据类型可以不同

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    myAggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Event, Long, String>("my-agg",
    new AggregateFunction<Event, Long, String>() {
    @Override
    public Long createAccumulator() {
    return 0L;
    }

    @Override
    public Long add(Event value, Long accmulator) {
    return accmulator + 1;
    }

    @Override
    public String getResult(Long accumulator) {
    return "count: " + accumulator;
    }

    @Override
    public Long merge(Long a, Long b) {
    return a + b;
    }
    }
    , Long.class));

生存时间(TTL)

可以设置多长时间之后,状态就过期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class StateTtlExample {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 示例数据流
DataStream<Tuple2<String, Integer>> stream = env.fromElements(
Tuple2.of("key1", 1),
Tuple2.of("key2", 2),
Tuple2.of("key1", 3)
);

// 将流按键分区
KeyedStream<Tuple2<String, Integer>, String> keyedStream = stream.keyBy(value -> value.f0);

// 配置状态 TTL
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(10)) // 设置状态的 TTL 为 10 秒
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 在创建和写入时更新 TTL
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) // 在未清理的情况下返回过期状态
.build();

keyedStream.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, String>() {
// 定义一个 ValueState,并应用 TTL 配置
private transient ValueState<Integer> state;

@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("state", Integer.class);
descriptor.enableTimeToLive(ttlConfig); // 启用 TTL
state = getRuntimeContext().getState(descriptor);
}

@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
Integer currentState = state.value();
if (currentState == null) {
currentState = 0;
}
currentState += value.f1;
state.update(currentState); // 更新状态
out.collect("Key: " + value.f0 + ", State: " + currentState);
}
}).print();

env.execute("State TTL Example");
}
}

这里的处理的事件只能是处理时间的TTL,而不是事件时间的TTL

状态持久化

检查点

保证数据不丢,故障恢复

写入检查点必须是算子状态保存起才可以

内部方法
  1. snapshotState:将局部变量中的数据写入到检查点中。在这个方法中,你需要将当前的状态保存到检查点存储中,以便在作业失败时可以从检查点恢复。
  2. initializeState:获取算子状态后,判断故障,将检查点中的数据重写添加到局部变量中。在这个方法中,你需要从检查点存储中恢复状态或进行状态的初始化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

public class CheckpointedFunctionExample {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置检查点,设置检查点间隔时间
env.enableCheckpointing(10000);

env.addSource(new SimpleSource()).print();

env.execute("CheckpointedFunction Example");
}

public static class SimpleSource implements SourceFunction<Long>, CheckpointedFunction {

private volatile boolean isRunning = true;
private long count = 0L;

// 用于持久化状态的 ListState
private transient ListState<Long> checkpointedState;

@Override
public void run(SourceContext<Long> ctx) throws Exception {
while (isRunning && count < 1000) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(count);
count++;
}
Thread.sleep(100);
}
}

@Override
public void cancel() {
isRunning = false;
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear(); // 清空现有状态
checkpointedState.add(count); // 保存当前 count 的值
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Long> descriptor =
new ListStateDescriptor<>(
"countState",
TypeInformation.of(new TypeHint<Long>() {})
);

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

// 如果从检查点恢复,则恢复状态
if (context.isRestored()) {
for (Long state : checkpointedState.get()) {
count = state;
}
}
}
}
}

创建检查点:每隔一段时间(如 10 秒),Flink 会调用 snapshotState 方法来保存当前状态。

从检查点恢复:在任务启动或从故障中恢复时,Flink 会调用 initializeState 方法来初始化或恢复状态。

广播状态

广播状态底层是一个 MapState,它存储了 key-value 键值对。

状态后端

检查点机制

首先所有的jobManager向taskManager发布命令要做检查点了,所有的taskManager开始将当前正在进行的所有任务都保存,然后序列化进去到文件系统,然后taskManager向jobManager发送一个检查点成功的信息,然后JobManager发出所有检查点保存完成的报告

上面这些流程有一个专职人员做,这个专职人员就是状态后端

状态后端分类

  1. 哈希表状态后端(默认)(HashMapStaeBackend)
    状态放到taskManager的堆内存里面,键值对的存储结构是耗费内存,但是读取快

  2. 内嵌RocksDB状态后端
    嵌入的数据库,会存放到taskManager的本地数据目录,读写慢一点,异步快照,增量保存检查点

  3. 配置状态后端

    1. 首先启动检查点

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

      public class StateBackendExample {
      public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.enableCheckpointing(1000L); // 设置检查点间隔,单位为毫秒

      // 配置状态后端
      // env.setStateBackend(new MemoryStateBackend()); // 使用内存状态后端
      env.setStateBackend(new EmbeddedRocksDBStateBackend()); // 使用嵌入式 RocksDB 状态后端

      // 继续定义数据源和算子...

      env.execute("State Backend Example");
      }
      }
    2. 配置RocksDB状态后端

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

      public class EmbeddedRocksDBStateBackendExample {
      public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.enableCheckpointing(1000L); // 设置检查点间隔,单位为毫秒

      // 设置嵌入式 RocksDB 状态后端
      String rocksDBPath = "file:///tmp/flink/checkpoints";
      env.setStateBackend(new RocksDBStateBackend(rocksDBPath, true)); // true 表示启用增量检查点

      // 继续定义数据源和算子...

      env.execute("Embedded RocksDB State Backend Example");
      }
      }

容错机制

检查点

检查点的保存

每隔一段时间就做一次保存
保存检查点的时候需要当按照数据梳理完的状态进行保存,不需要所有的并行子任务一起都同一时间保存下来
可以先将当前source中的数据进行准备保存,让所有数据都处理完同一条数据的时候再进行一次状态的保存
这里可以理解为构建一个“事务”,当所有并行子任务当前正在处理的数据都处理完再进行保存检查点;如果在保存检查点的时候有的数据是在传输的过程中没过来,那到时候就进行事务的回滚

检查点协调机制:
Flink 使用一个名为 Checkpoint Coordinator 的组件来协调检查点的创建。
当达到检查点的间隔时间时,Checkpoint Coordinator 向所有并行子任务发送一个检查点触发请求。
每个子任务在收到请求后,开始准备保存当前的状态,但不会立即停止数据处理。

保存状态:每个子任务会在处理完当前数据后,保存自己的状态。这意味着每个子任务在保存状态时,都会确保自己当前处理的数据是一致的。
这种机制确保了在保存状态时,不会因为部分数据正在传输而导致不一致性。

事务性检查点:
检查点机制类似于数据库中的事务。当所有并行子任务都保存了自己的状态后,检查点才被认为是成功的。
如果在某个子任务保存状态的过程中出现了错误(例如数据丢失或网络故障),Flink 会回滚到上一个成功的检查点,并重新处理数据。
这种事务性机制确保了数据的一致性和可靠性。

检查点的恢复

  1. 重启作业:Flink 自动重启作业,所有任务的状态清空。
  2. 恢复检查点:JobManager 加载最近的检查点(如检查点保存于 S3)。
  3. 分发状态快照:JobManager 将检查点中的状态快照分发给各个任务。
  4. 恢复状态和偏移量:任务从状态快照中恢复其状态,source 任务根据检查点记录的偏移量重放数据。
  5. 重新处理数据:任务从检查点恢复后,重新处理从检查点到故障发生之间的数据,确保数据处理的连续性和一致性。

检查点算法

  1. 检查点分界线Barrier

    因为目前是流处理,所以我们要打上一个检查点的标记

  2. 异步分界线快照

    这是Chandy-Lamport算法变体,与水位线类似

    上游任务向多个下游任务发送barrier需要广播出去
    当多个上游任务向同一个下游任务传递barrier,下游任务需要执行“分界线对齐”,等到所有并行分区的barrier到齐再开始状态的保存

  3. 示例

    1. alt text

    2. alt text

      • 首先jobManager发送检查点保存的指令,所有taskManager向所有的source任务发送命令
      • source会直接插入一个分界线,这里我们不要求两个分区的处理偏移量相同
      • source将当前状态和偏移量保存到持久化存储,source后面的内容同步在处理不受影响
    3. alt text

      • 第一个任务的状态快照保存完成会向JobManager确认当前任务的检查点保存完毕,然后下游广播出去传递
      • 这里不能确认检查点真正完成,需要所有任务的检查点都确认完成才行
      • barrier下游广播的时候后面的其他任务正常处理不受影响
    4. alt text

      • 在这里我们看到第二条流中的barrier比第一条流中的barrier先到,第二条流当前的所有正在计算的数据已经处理完了,第一条流中还有的数据没有处理完。这里第二条流的barrier需要等待第一条流的barrier也到来,也就是等待第一条流中当前的数据也处理完才行,也就是分界线对齐
    5. alt text

      • 等到所有流上的barrier都处理完了,就可以将当前状态存储起来了。
      • 分界线意义所在:
      • 我们要判断的保存的时间节点是source1处理完三个数据和source2处理完1个数据的时间节点,只处理完一个word1的话,我们不能确认两个流都处理完数据了
    6. 这里如果分区对应的barrier还没来,那么就正常处理状态改变。假如第二条流的速度很快,如果第二个流处理速度很快的话,第一个分区的barrier已经来了,那么第一个分区就不能再加1了,就需要缓存起来,不能正常处理

(此时的Sum 2收到了来自上游两个Map任务的barrier,说明第一条流第三个数据、第二条流第一个数据都已经处理完,可以进行状态的保存了;而Sum 1只收到了来自Map 2的barrier,所以这时需要等待分界线对齐。在等待的过程中,如果分界线尚未到达的分区任务Map 1又传来了数据(hello, 1),说明这是需要保存到检查点的,Sum任务应该正常继续处理数据,状态更新为3;而如果分界线已经到达的分区任务Map 2又传来数据,这已经是下一个检查点要保存的内容了,就不应立即处理,而是要缓存起来、等到状态保存之后再做处理。)
7. 当各个分区的分界点都对齐后,对当前状态做快照,保存起来,通知JobManager保存完毕,JobManager检查检查点已经缓存成功,接下来所有任务就正常执行

检查点保存

  1. 如果对实时性和容错性要求高就配置这个检查点的时间短一些
  2. 如果对系统性能要求高一点,就需要对这个时间做合理的设置

检查点配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class CheckpointConfigExample {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 配置存储检查点到JobManager堆内存
// env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());

// 配置存储检查点到文件系统 (HDFS)
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));

CheckpointConfig checkpointConfig = env.getCheckpointConfig();

// 设置检查点模式为 "精确一次" (EXACTLY_ONCE)
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 设置前后两个检查点之间的最小间隔时间
checkpointConfig.setMinPauseBetweenCheckpoints(500);

// 设置检查点超时时间
checkpointConfig.setCheckpointTimeout(60000);

// 设置最大的并行检查点数量
checkpointConfig.setMaxConcurrentCheckpoints(1);

// 开启检查点的外部持久化
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 开启非对齐检查点
checkpointConfig.enableUnalignedCheckpoints();

// 添加示例数据源和处理逻辑
env.addSource(new SimpleSource()).print();

env.execute("Checkpoint Config Example");
}

public static class SimpleSource implements SourceFunction<Long>, CheckpointedFunction {

private volatile boolean isRunning = true;
private long count = 0L;

// 用于持久化状态的 ListState
private transient ListState<Long> checkpointedState;

@Override
public void run(SourceContext<Long> ctx) throws Exception {
while (isRunning && count < 1000) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(count);
count++;
}
Thread.sleep(100);
}
}

@Override
public void cancel() {
isRunning = false;
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear(); // 清空现有状态
checkpointedState.add(count); // 保存当前 count 的值
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Long> descriptor =
new ListStateDescriptor<>(
"countState",
TypeInformation.of(new TypeHint<Long>() {})
);

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

// 如果从检查点恢复,则恢复状态
if (context.isRestored()) {
for (Long state : checkpointedState.get()) {
count = state;
}
}
}
}
}

这里的非对齐检查点的作用让数据在反压的情况下,也能快速创建检查点

这里的最大并行检查点数据的作用是提供检查点吞吐量,减少检查点延迟
checkpointConfig.setMaxConcurrentCheckpoints(2);

保存点

作用

  • 有计划的手动备份和恢复,在任何需要的时候创建保存点

  • 做flink版本升级

  • 调整并行度

  • 暂停应用程序

  • 建议在addSource后加上.uid(名称)

状态一致性

事务

原子性(Atomicity):即不可分割性,事务中的操作要么全不做,要么全做

一致性(Consistency):一个事务在执行前后,数据库都必须处于正确的状态,满足完整性约束

隔离性(Isolation):多个事务并发执行时,一个事务的执行不应影响其他事务的执行

持久性(Durability):事务处理完成后,对数据的修改就是永久的,即便系统故障也不会丢失

概念

状态一致性就是计算结果要保证准确
一条数据应该不丢失,也不重复计算
遇到故障后可以恢复状态,结果完全正确

分类

  • 最多一次:不影响数据处理,结果不太准确为代价,什么也不干直接恢复
  • 至少一次:当前数据重复处理,保证数据不丢失,最简单的是发生故障重放数据
  • 精准一次:可以重放数据并且数据只处理一次

端到端的状态一致性

数据源+流处理器+外部存储系统

端到端精准一次

概述

  • flink内部:开启检查点
  • 输入端:flink将当前的kafka偏移量保存为算子状态
  • 输出端:两阶段提交,写入kafka的时候基于事务进行预提交,等到检查点保存完毕,再提交事务正式提交

这里详细对照kafka学习笔记

具体步骤

  1. JobManager通知各个TaskManager启动检查点保存,Source任务会将检查点分界线(barrier)注入数据流。这个barrier可以将数据流中的数据,分为进入当前检查点的集合和进入下一个检查点的集合。
  2. 分界线(barrier)会在算子间传递下去。每个算子收到barrier时,会将当前的状态做个快照,保存到状态后端。
    Source任务将barrier插入数据流后,也会将当前读取数据的偏移量作为状态写入检查点,存入状态后端;然后把barrier向下游传递,自己就可以继续读取数据了。
    接下来barrier传递到了内部的Window算子,它同样会对自己的状态进行快照保存,写入远程的持久化存储。
  3. 分界线(barrier)终于传到了Sink任务,这时Sink任务会开启一个事务。接下来到来的所有数据,Sink任务都会通过这个事务来写入Kafka。这里barrier是检查点的分界线,也是事务的分界线。由于之前的检查点可能尚未完成,因此上一个事务也可能尚未提交;此时barrier的到来开启了新的事务,上一个事务尽管可能没有被提交,但也不再接收新的数据了。
    对于Kafka而言,提交的数据会被标记为“未确认”(uncommitted)。这个过程就是所谓的“预提交”(pre-commit)。
  4. 当所有算子的快照都完成,也就是这次的检查点保存最终完成时,JobManager会向所有任务发确认通知,告诉大家当前检查点已成功保存
    当Sink任务收到确认通知后,就会正式提交之前的事务,把之前“未确认”的数据标为“已确认”,接下来就可以正常消费了。
    在任务运行中的任何阶段失败,都会从上一次的状态恢复,所有没有正式提交的数据也会回滚。这样,Flink和Kafka连接构成的流处理系统,就实现了端到端的exactly-once状态一致性。

需要配置

(1)必须启用检查点
(2)在FlinkKafkaProducer的构造函数中传入参数Semantic.EXACTLY_ONCE
(3)配置Kafka读取数据的消费者的隔离级别
这里所说的Kafka,是写入的外部系统。预提交阶段数据已经写入,只是被标记为“未提交”(uncommitted),而Kafka中默认的隔离级别isolation.level是read_uncommitted,也就是可以读取未提交的数据。这样一来,外部应用就可以直接消费未提交的数据,对于事务性的保证就失效了。所以应该将隔离级别配置
为read_committed,表示消费者遇到未提交的消息时,会停止从分区中消费数据,直到消息被标记为已提交才会再次恢复消费。当然,这样做的话,外部应用消费数据就会有显著的延迟。
(4)事务超时配置
Flink的Kafka连接器中配置的事务超时时间transaction.timeout.ms默认是1小时,而Kafka集群配置的事务最大超时时间transaction.max.timeout.ms默认是15分钟。所以在检查点保存时间很长时,有可能出现Kafka已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。所以这两个超时时间,前者应该小于等于后者。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class ExactlyOnceExample {

public static void main(String[] args) throws Exception {
// 创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 启用检查点,每 10 秒创建一次检查点
env.enableCheckpointing(10000L); // 设置检查点间隔为 10000 毫秒(10 秒)

// 设置检查点模式为 "精确一次" (exactly-once)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 设置检查点模式为精确一次

// 设置前后两个检查点之间的最小间隔时间为 500 毫秒
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 设置检查点最小间隔时间,防止检查点处理时间过长

// 设置检查点超时时间为 60000 毫秒(60 秒)
env.getCheckpointConfig().setCheckpointTimeout(60000); // 设置检查点超时时间,超时则放弃检查点以节省性能

// 设置最大并行检查点数量为 1确保在同一时间只能有一个检查点进行,从而保证系统的一致性。
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 设置最大并行检查点数量,确保单一时间只有一个检查点

// 启用外部持久化检查点,当作业取消时保留检查点
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 配置检查点外部持久化,取消作业时保留检查点

// 启用非对齐检查点
env.getCheckpointConfig().enableUnalignedCheckpoints(); // 启用非对齐检查点,减少反压时间

// 配置重启策略,设置固定延迟重启,最多重试 3 次,每次延迟 10 秒
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000)); // 设置重启策略,3 次重试,每次延迟 10 秒

// 配置 Kafka 消费者属性
Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 设置 Kafka 服务器地址
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group"); // 设置消费者组 ID
consumerProperties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 设置隔离级别为读取已提交

// 创建 Kafka 消费者,订阅 "input-topic" 主题
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"input-topic",
new SimpleStringSchema(),
consumerProperties
);

// 配置 Kafka 生产者属性
Properties producerProperties = new Properties();
producerProperties.setProperty("bootstrap.servers", "localhost:9092"); // 设置 Kafka 服务器地址
producerProperties.setProperty("transaction.timeout.ms", "900000"); // 设置事务超时时间为 15 分钟(900000 毫秒)

// 创建 Kafka 生产者,写入 "output-topic" 主题
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
"output-topic",
new SimpleStringSchema(),
producerProperties,
Semantic.EXACTLY_ONCE // 设置语义为 "精确一次"
);

// 添加 Kafka 数据源
env.addSource(kafkaConsumer)
.name("Kafka Source") // 设置数据源名称
.uid("kafka-source") // 设置数据源 UID
.map(value -> "Processed: " + value) // 处理数据,在消息前添加 "Processed: "
.name("Map Function") // 设置 Map 函数名称
.uid("map-function") // 设置 Map 函数 UID
.addSink(kafkaProducer) // 添加 Kafka Sink
.name("Kafka Sink") // 设置 Sink 名称
.uid("kafka-sink"); // 设置 Sink UID

// 执行作业
env.execute("Exactly Once Example"); // 执行 Flink 作业,作业名称为 "Exactly Once Example"
}
}