flink学习笔记大纲(二)
目录
多流转换
分流
侧输出流
可以使用侧输出流做分流
合流
Union
要求数据类型必须相同
水位线按照最慢的数据时间开始计算
Connect
一国两制,key相同的数据放在一起
CoprocessFunction
Join
基于时间窗口进行合流
窗口联结
假设我们有两个流,A流和B流,每个流中包含用户的活动记录。我们希望在一个5秒的时间窗口内联结这两个流,找出同时在两个流中都有活动记录的用户。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, String>> streamA = env.fromElements(
Tuple2.of("user1", "login"),
Tuple2.of("user2", "logout")
);
DataStream<Tuple2<String, String>> streamB = env.fromElements(
Tuple2.of("user1", "click"),
Tuple2.of("user3", "click")
);
streamA
.join(streamB)
.where(tuple -> tuple.f0)
.equalTo(tuple -> tuple.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.apply((tupleA, tupleB) -> Tuple2.of(tupleA.f0, "activity"))
.print();
env.execute("Window Join Example");在这个示例中,我们使用一个5秒的滚动时间窗口来联结两个流。如果两个流中同一用户的活动记录出现在同一个5秒的时间窗口内,它们就会被联结。
间隔联结
间隔联结允许对两个流进行联结,并在指定的时间间隔内进行匹配。与窗口联结不同的是,间隔联结不使用固定的窗口,而是在指定的时间范围内进行联结。
假设我们有两个流,A流和B流,每个流中包含传感器的读数。我们希望在A流的事件时间前后2秒内找到B流中的匹配记录。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Long>> streamA = env.fromElements(
Tuple2.of("sensor1", 1000L),
Tuple2.of("sensor2", 2000L)
).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, Long>>() {
public long extractAscendingTimestamp(Tuple2<String, Long> element) {
return element.f1;
}
});
DataStream<Tuple2<String, Long>> streamB = env.fromElements(
Tuple2.of("sensor1", 1500L),
Tuple2.of("sensor3", 2500L)
).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, Long>>() {
public long extractAscendingTimestamp(Tuple2<String, Long> element) {
return element.f1;
}
});
streamA
.keyBy(tuple -> tuple.f0)
.intervalJoin(streamB.keyBy(tuple -> tuple.f0))
.between(Time.seconds(-2), Time.seconds(2))
.process(new ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple2<String, Long>>() {
public void processElement(Tuple2<String, Long> left, Tuple2<String, Long> right, Context ctx, Collector<Tuple2<String, Long>> out) {
out.collect(Tuple2.of(left.f0, left.f1 + right.f1));
}
}).print();
env.execute("Interval Join Example");在这个示例中,我们使用间隔联结操作,指定时间范围为[-2秒, 2秒]。这意味着,如果A流中的传感器读数事件时间与B流中的传感器读数事件时间相差不超过2秒,它们就会被联结。
状态
状态分类
托管状态
自动持久化保存,故障自动恢复,状态自动重组,一般直接使用托管状态
算子状态
当前分区的并行子任务的状态完全独立
按键分区状态(和key相关)
必须先keyby之后再使用,每一个分区可能会有不同的key,只针对当前的key有效,最常用
管理状态
需要自己去管理和维护状态
按键分区状态
底层实现
底层是一个map键值对,直接访问值也就是当前的状态
结构类型
值状态ValueState
列表状态ListState
映射状态MapState
归约状态ReducingState :
保存的是归约之后的聚合结果,保存类型值状态
每次add都是把新数据和之前的数据进行归约后再进行保存,愈合的数据类型必须相同
1
2
3
4
5
6
7
8myReducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Event>("my-reduce",
new ReduceFunction<Event>() {
public Event reduce(Event value1, Event value2) throws Exception {
return new Event(value1.user, value1.url, value2.timestamp);
}
}
, Event.class));聚合状态AggregatingState
也是聚合结果保存,传入一个AggregateFunction,聚合的数据类型可以不同
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23myAggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Event, Long, String>("my-agg",
new AggregateFunction<Event, Long, String>() {
public Long createAccumulator() {
return 0L;
}
public Long add(Event value, Long accmulator) {
return accmulator + 1;
}
public String getResult(Long accumulator) {
return "count: " + accumulator;
}
public Long merge(Long a, Long b) {
return a + b;
}
}
, Long.class));
生存时间(TTL)
可以设置多长时间之后,状态就过期
1 | public class StateTtlExample { |
这里的处理的事件只能是处理时间的TTL,而不是事件时间的TTL
状态持久化
检查点
保证数据不丢,故障恢复
写入检查点必须是算子状态保存起才可以
内部方法
- snapshotState:将局部变量中的数据写入到检查点中。在这个方法中,你需要将当前的状态保存到检查点存储中,以便在作业失败时可以从检查点恢复。
- initializeState:获取算子状态后,判断故障,将检查点中的数据重写添加到局部变量中。在这个方法中,你需要从检查点存储中恢复状态或进行状态的初始化。
1 |
|
创建检查点:每隔一段时间(如 10 秒),Flink 会调用 snapshotState 方法来保存当前状态。
从检查点恢复:在任务启动或从故障中恢复时,Flink 会调用 initializeState 方法来初始化或恢复状态。
广播状态
广播状态底层是一个 MapState,它存储了 key-value 键值对。
状态后端
检查点机制
首先所有的jobManager向taskManager发布命令要做检查点了,所有的taskManager开始将当前正在进行的所有任务都保存,然后序列化进去到文件系统,然后taskManager向jobManager发送一个检查点成功的信息,然后JobManager发出所有检查点保存完成的报告
上面这些流程有一个专职人员做,这个专职人员就是状态后端
状态后端分类
哈希表状态后端(默认)(HashMapStaeBackend)
状态放到taskManager的堆内存里面,键值对的存储结构是耗费内存,但是读取快内嵌RocksDB状态后端
嵌入的数据库,会存放到taskManager的本地数据目录,读写慢一点,异步快照,增量保存检查点配置状态后端
首先启动检查点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class StateBackendExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000L); // 设置检查点间隔,单位为毫秒
// 配置状态后端
// env.setStateBackend(new MemoryStateBackend()); // 使用内存状态后端
env.setStateBackend(new EmbeddedRocksDBStateBackend()); // 使用嵌入式 RocksDB 状态后端
// 继续定义数据源和算子...
env.execute("State Backend Example");
}
}配置RocksDB状态后端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class EmbeddedRocksDBStateBackendExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000L); // 设置检查点间隔,单位为毫秒
// 设置嵌入式 RocksDB 状态后端
String rocksDBPath = "file:///tmp/flink/checkpoints";
env.setStateBackend(new RocksDBStateBackend(rocksDBPath, true)); // true 表示启用增量检查点
// 继续定义数据源和算子...
env.execute("Embedded RocksDB State Backend Example");
}
}
容错机制
检查点
检查点的保存
每隔一段时间就做一次保存
保存检查点的时候需要当按照数据梳理完的状态进行保存,不需要所有的并行子任务一起都同一时间保存下来
可以先将当前source中的数据进行准备保存,让所有数据都处理完同一条数据的时候再进行一次状态的保存
这里可以理解为构建一个“事务”,当所有并行子任务当前正在处理的数据都处理完再进行保存检查点;如果在保存检查点的时候有的数据是在传输的过程中没过来,那到时候就进行事务的回滚
检查点协调机制:
Flink 使用一个名为 Checkpoint Coordinator 的组件来协调检查点的创建。
当达到检查点的间隔时间时,Checkpoint Coordinator 向所有并行子任务发送一个检查点触发请求。
每个子任务在收到请求后,开始准备保存当前的状态,但不会立即停止数据处理。
保存状态:每个子任务会在处理完当前数据后,保存自己的状态。这意味着每个子任务在保存状态时,都会确保自己当前处理的数据是一致的。
这种机制确保了在保存状态时,不会因为部分数据正在传输而导致不一致性。
事务性检查点:
检查点机制类似于数据库中的事务。当所有并行子任务都保存了自己的状态后,检查点才被认为是成功的。
如果在某个子任务保存状态的过程中出现了错误(例如数据丢失或网络故障),Flink 会回滚到上一个成功的检查点,并重新处理数据。
这种事务性机制确保了数据的一致性和可靠性。
检查点的恢复
- 重启作业:Flink 自动重启作业,所有任务的状态清空。
- 恢复检查点:JobManager 加载最近的检查点(如检查点保存于 S3)。
- 分发状态快照:JobManager 将检查点中的状态快照分发给各个任务。
- 恢复状态和偏移量:任务从状态快照中恢复其状态,source 任务根据检查点记录的偏移量重放数据。
- 重新处理数据:任务从检查点恢复后,重新处理从检查点到故障发生之间的数据,确保数据处理的连续性和一致性。
检查点算法
检查点分界线Barrier
因为目前是流处理,所以我们要打上一个检查点的标记
异步分界线快照
这是Chandy-Lamport算法变体,与水位线类似
上游任务向多个下游任务发送barrier需要广播出去
当多个上游任务向同一个下游任务传递barrier,下游任务需要执行“分界线对齐”,等到所有并行分区的barrier到齐再开始状态的保存示例
- 首先jobManager发送检查点保存的指令,所有taskManager向所有的source任务发送命令
- source会直接插入一个分界线,这里我们不要求两个分区的处理偏移量相同
- source将当前状态和偏移量保存到持久化存储,source后面的内容同步在处理不受影响
- 第一个任务的状态快照保存完成会向JobManager确认当前任务的检查点保存完毕,然后下游广播出去传递
- 这里不能确认检查点真正完成,需要所有任务的检查点都确认完成才行
- barrier下游广播的时候后面的其他任务正常处理不受影响
- 在这里我们看到第二条流中的barrier比第一条流中的barrier先到,第二条流当前的所有正在计算的数据已经处理完了,第一条流中还有的数据没有处理完。这里第二条流的barrier需要等待第一条流的barrier也到来,也就是等待第一条流中当前的数据也处理完才行,也就是分界线对齐
- 等到所有流上的barrier都处理完了,就可以将当前状态存储起来了。
- 分界线意义所在:
- 我们要判断的保存的时间节点是source1处理完三个数据和source2处理完1个数据的时间节点,只处理完一个word1的话,我们不能确认两个流都处理完数据了
这里如果分区对应的barrier还没来,那么就正常处理状态改变。假如第二条流的速度很快,如果第二个流处理速度很快的话,第一个分区的barrier已经来了,那么第一个分区就不能再加1了,就需要缓存起来,不能正常处理
(此时的Sum 2收到了来自上游两个Map任务的barrier,说明第一条流第三个数据、第二条流第一个数据都已经处理完,可以进行状态的保存了;而Sum 1只收到了来自Map 2的barrier,所以这时需要等待分界线对齐。在等待的过程中,如果分界线尚未到达的分区任务Map 1又传来了数据(hello, 1),说明这是需要保存到检查点的,Sum任务应该正常继续处理数据,状态更新为3;而如果分界线已经到达的分区任务Map 2又传来数据,这已经是下一个检查点要保存的内容了,就不应立即处理,而是要缓存起来、等到状态保存之后再做处理。)
7. 当各个分区的分界点都对齐后,对当前状态做快照,保存起来,通知JobManager保存完毕,JobManager检查检查点已经缓存成功,接下来所有任务就正常执行
检查点保存
- 如果对实时性和容错性要求高就配置这个检查点的时间短一些
- 如果对系统性能要求高一点,就需要对这个时间做合理的设置
检查点配置
1 | public class CheckpointConfigExample { |
这里的非对齐检查点的作用让数据在反压的情况下,也能快速创建检查点
这里的最大并行检查点数据的作用是提供检查点吞吐量,减少检查点延迟checkpointConfig.setMaxConcurrentCheckpoints(2);
保存点
作用
有计划的手动备份和恢复,在任何需要的时候创建保存点
做flink版本升级
调整并行度
暂停应用程序
建议在addSource后加上.uid(名称)
状态一致性
事务
原子性(Atomicity):即不可分割性,事务中的操作要么全不做,要么全做
一致性(Consistency):一个事务在执行前后,数据库都必须处于正确的状态,满足完整性约束
隔离性(Isolation):多个事务并发执行时,一个事务的执行不应影响其他事务的执行
持久性(Durability):事务处理完成后,对数据的修改就是永久的,即便系统故障也不会丢失
概念
状态一致性就是计算结果要保证准确
一条数据应该不丢失,也不重复计算
遇到故障后可以恢复状态,结果完全正确
分类
- 最多一次:不影响数据处理,结果不太准确为代价,什么也不干直接恢复
- 至少一次:当前数据重复处理,保证数据不丢失,最简单的是发生故障重放数据
- 精准一次:可以重放数据并且数据只处理一次
端到端的状态一致性
数据源+流处理器+外部存储系统
端到端精准一次
概述
- flink内部:开启检查点
- 输入端:flink将当前的kafka偏移量保存为算子状态
- 输出端:两阶段提交,写入kafka的时候基于事务进行预提交,等到检查点保存完毕,再提交事务正式提交
这里详细对照kafka学习笔记
具体步骤
- JobManager通知各个TaskManager启动检查点保存,Source任务会将检查点分界线(barrier)注入数据流。这个barrier可以将数据流中的数据,分为进入当前检查点的集合和进入下一个检查点的集合。
- 分界线(barrier)会在算子间传递下去。每个算子收到barrier时,会将当前的状态做个快照,保存到状态后端。
Source任务将barrier插入数据流后,也会将当前读取数据的偏移量作为状态写入检查点,存入状态后端;然后把barrier向下游传递,自己就可以继续读取数据了。
接下来barrier传递到了内部的Window算子,它同样会对自己的状态进行快照保存,写入远程的持久化存储。 - 分界线(barrier)终于传到了Sink任务,这时Sink任务会开启一个事务。接下来到来的所有数据,Sink任务都会通过这个事务来写入Kafka。这里barrier是检查点的分界线,也是事务的分界线。由于之前的检查点可能尚未完成,因此上一个事务也可能尚未提交;此时barrier的到来开启了新的事务,上一个事务尽管可能没有被提交,但也不再接收新的数据了。
对于Kafka而言,提交的数据会被标记为“未确认”(uncommitted)。这个过程就是所谓的“预提交”(pre-commit)。 - 当所有算子的快照都完成,也就是这次的检查点保存最终完成时,JobManager会向所有任务发确认通知,告诉大家当前检查点已成功保存
当Sink任务收到确认通知后,就会正式提交之前的事务,把之前“未确认”的数据标为“已确认”,接下来就可以正常消费了。
在任务运行中的任何阶段失败,都会从上一次的状态恢复,所有没有正式提交的数据也会回滚。这样,Flink和Kafka连接构成的流处理系统,就实现了端到端的exactly-once状态一致性。
需要配置
(1)必须启用检查点
(2)在FlinkKafkaProducer的构造函数中传入参数Semantic.EXACTLY_ONCE
(3)配置Kafka读取数据的消费者的隔离级别
这里所说的Kafka,是写入的外部系统。预提交阶段数据已经写入,只是被标记为“未提交”(uncommitted),而Kafka中默认的隔离级别isolation.level是read_uncommitted,也就是可以读取未提交的数据。这样一来,外部应用就可以直接消费未提交的数据,对于事务性的保证就失效了。所以应该将隔离级别配置
为read_committed,表示消费者遇到未提交的消息时,会停止从分区中消费数据,直到消息被标记为已提交才会再次恢复消费。当然,这样做的话,外部应用消费数据就会有显著的延迟。
(4)事务超时配置
Flink的Kafka连接器中配置的事务超时时间transaction.timeout.ms默认是1小时,而Kafka集群配置的事务最大超时时间transaction.max.timeout.ms默认是15分钟。所以在检查点保存时间很长时,有可能出现Kafka已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。所以这两个超时时间,前者应该小于等于后者。
示例
1 | public class ExactlyOnceExample { |