flink学习笔记大纲(一)
2024-06-20 13:41:32 # Flink # 学习笔记

flink学习笔记(一)

目录

分层API

概述

  • SQL
  • TABLE API
  • DataStream/DataSet
    • 转换
    • 链接
    • 聚合
    • 窗口
    • ….

集群角色

客户端Client

客户端获取提交给JobManager

JobManager

管理者进行调度下发任务给TaskManager

TaskManager

干活的

部署模式

会话模式

一般不使用

单作业模式

每提交一个作业就启动一个集群。借助Yarn,K8s框架等启动集群

应用模式

上面的两种模式都是需要客户端再提交给JobManager的,这个方式是直接将应用提交到JobManager来执行,每提交一个应用单独启动一个JobManager,执行完这个JobManager就关闭

运行架构

系统架构

整体构成

主要是JobManager和TaskManager

JobManager

一般只有一个

  • JobMaster:处理单独的任务,一个job对应一个jobMaster,作业提交的时候先接受客户端传来的jar,数据流图和作业图,然后根据这个向ReourceManager申请资源,申请资源后,将作业图转化成执行图,然后提交给TaskManager
  • ResourceManager:一个集群就一个ReourceManager,负责资源分配和调度。管理Solt,Solt就是一组CPU和内容,每一个并行的任务都需要一个Solt执行
  • 分发器:web ui,展示监控
TaskManager

每个TaskManager有很多的Solt,有多少Solt就能并行执行多少任务,启动任务后,TaskManager会向ResourceManager注册自己的Solt

作业提交流程

Yarn会话模式提交

alt text

Yarn单作业模式提交

alt text

重要概念

数据流图

wen ui上看到的source ,transformation,sink连接的图

算子数据传输

一对一

alt text

这种数据直接进入的就是一对一,类似于Spark的窄依赖

重分区

alt text

map之后,keyby会将数据放到不同的分区就是重分区

作业图与执行图

  • 逻辑流图:代码做出的DAG图
  • 作业图:客户端优化代码后交给JobManager的图,将并行度相同的算子合并
  • 执行图:JobManager收到作业图后,将并行度相同的算子合并,然后生成执行图
  • 物理图:JobManager将执行图发送给TaskManager,TaskManager根据这个执行计划生成物理图,明确数据存放位置等

任务槽

  • 概念:有几个任务槽就能开多少并行度
  • 任务槽共享:
    • 负载均衡
    • 当有TaskManager宕机的时候,可以自动将任务迁移到其他TaskManager上

DataStreamAPI

这里对常见的一些算子不做描述,写一些需要注意的点

物理分区算子

随机分区

shuffle一般不使用

轮询分区

rebalance使用负载均衡算法

alt text

重缩放分区

.rescale

rescale和rebalance的区别是,rebalance是上游所有任务和下游所有任务建立通信,这是一个笛卡尔积

rescale是每一个任务和下游对应的任务之间进行通信,节省资源。
建议使用rescale

alt text

广播

广播数据会在每个分区都保存一份,调用DataStream.broadcast()方法,将数据数据发送到下游所有分区上

全局分区

全局分区,调用.global,直接将所有数据放到下游一个并行度中,不使用,资源程序压力太大,等于将并行度设置为1

水位线

时间语义

  • 事件时间:数据产生的时间,flink中建议使用这个时间
  • 处理时间:数据到来的时间,不准确,中间可能有延迟

处理延迟和乱序水位线写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {

private Long delayTime = 5000L; // 延迟时间
private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳

@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
// 每来一条数据就调用一次
maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
}

@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 发射水位线,默认200ms调用一次
output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
}
}

运行机制

  • 事件到达:每当一个新事件到达时,onEvent 方法被调用。此方法更新 maxTs 为目前流中观察到的最大时间戳。

  • 周期性发射水印:Flink 运行时会周期性地调用 onPeriodicEmit 方法(默认每 200 毫秒)。此方法发射一个新的水印,水印时间戳为 maxTs - delayTime - 1L。

  • 水印作用:发射的水印用于指导 Flink 何时可以触发窗口计算或其他基于时间的操作。水印时间戳表示在此时间之前的所有事件都已经到达,可以进行计算。

为什么减去 delayTime 和 1L

  • 减去 delayTime:允许事件在 delayTime 内乱序到达。例如,设定 delayTime 为 5 秒,那么即使事件延迟 5 秒到达,系统仍能正确处理。

  • 减去 1L:确保水印总是略微早于允许的延迟范围,避免因精确等于延迟范围导致的问题。

水位线分类

  • 有序水位线
  • 乱序水位线
  • 乱序+迟到数据
  • 自定义水位线

flink内置水位线

  • 有序内置水位线:WatermarkStrategy.forMonotonousTimestamps()
  • 乱序内置水位线:WatermarkStrategy. forBoundedOutOfOrderness()//需要传入最大乱序程度

水位线的传递

alt text

首先下游会将上游所有的水位线时间都保存起来,然后选择最小的水位线时间作为自己的时钟(木桶原理)

每个分区都有自己的水位线,自己的分区来数据就会更新自己分区的水位线,然后检查所有水位线,所有分区水位线的最小值和当前水位线相比,如果不同,就将最新的最小值水位线广播到下游去,如果没有变化就不广播

窗口

概念

首先理解窗口不是一个框,而是一个桶满足这个桶条件的数据都放在一个桶里面,等到窗口结束时间结束再处理桶里面的数据

分类

驱动类型分类

时间窗户

到点就处理数据

计数窗口

数据个数到了就开始处理数据

  1. 滑动计数窗口
    1
    2
    stream.keyby(....)
    .countWindow(10,3)
  2. 滚动计数窗口
    1
    2
    stream.keyBy(...)
    .countWindow(10)

分配数据规则分类

滚动窗口
1
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregrate(....)
滑动窗口
1
stream.keyBy(...).window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(3))).aggregrate(....)

会话窗口

设置超时时间,窗口长度和结束时间都不固定

1
stream.keyBy(...).window(SessionWindows.withGap(Time.seconds(5))).aggregrate(....)

窗口api

  • 按键分区:keyby后使用window
  • 非按键分区:keyby之前使用window,不推荐使用,相当于并行度设置了1

调用窗口api

1
2
3
stream.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(<window function>)

窗口函数

增量聚合函数

  • 归约函数(ReduceFunction)
  • 聚合函数(AggregateFunction)推荐使用

增量聚合函数就是每来一条数据就会进行聚合,AggregateFunction需要维护一个累加器,来一条数据就在累加器里面聚合,窗口结束后再输出

全窗口函数

  • 处理窗口函数(ProcessWindowFunction)

全窗口函数是等到窗口要输出结果的时候再取出数据进行计算

可以获取上下文信息,获取窗口的开始时间和结束时间

触发器

Trigger

控制窗口什么时候触发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public static class MyTrigger extends Trigger<Event, TimeWindow> {
@Override
public TriggerResult onElement(Event event, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(
new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN)
);
if (isFirstEvent.value() == null) {
for (long i = timeWindow.getStart(); i < timeWindow.getEnd(); i = i + 1000L) {
triggerContext.registerEventTimeTimer(i);
}
isFirstEvent.update(true);
}
return TriggerResult.CONTINUE;
}

/**
* 当注册事件时间定时器触发时候调用
* @param l
* @param timeWindow
* @param triggerContext
* @return
* @throws Exception
*/
@Override
public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.FIRE;
}

/**
* 当处理时间定时器触发时调用(本例中没有注册处理时间定时器)
* @param l
* @param timeWindow
* @param triggerContext
* @return
* @throws Exception
*/
@Override
public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;
}

/**
* 当窗口关闭时调用,清除与该窗口相关的状态。
* 获取并清除 isFirstEvent 状态。
* @param timeWindow
* @param triggerContext
* @throws Exception
*/
@Override
public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(
new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN)
);
isFirstEvent.clear();
}
}

移除器

Evictor:定义移除某些数据

允许延迟

AllowedLateness:定义窗口允许延迟多少时间,解决迟到数据的问题

1
2
3
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(1))

这里可以把迟到的数据放到侧输出流中

三种保障

这里是重点,是解决乱序数据+迟到数据的重点,进行三种保障,保障数据有序

三重保障

    1. 设置watermark延迟
    1. 允许窗口迟到数据,设置等待时间
    1. 将最后迟到的数据放在侧输出流中
      最大程度保证数据不丢失的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class ProcessLateDataExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// 读取socket文本流
SingleOutputStreamOperator<Event> stream =
env.socketTextStream("192.168.1.100", 7777)
.map(new MapFunction<String, Event>() {
@Override
public Event map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));
}
})
// 方式一:设置watermark延迟时间,2秒钟
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));

// 定义侧输出流标签
OutputTag<Event> outputTag = new OutputTag<Event>("late"){};

SingleOutputStreamOperator<UrlViewCount> result = stream.keyBy(data -> data.url)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 方式二:允许窗口处理迟到数据,设置1分钟的等待时间,这里的延迟的1分钟也是水位线触发,而不是真实的时间,而是事件时间。来到窗口整个事件的1分钟后的数据,窗口会直接全部关闭
.allowedLateness(Time.minutes(1))
// 方式三:将最后的迟到数据输出到侧输出流
.sideOutputLateData(outputTag)
//TODO 这里可以将之前的数据写入到一个表中,然后当迟到数据来的时候根据事件再更新这个表中的对应时间的count+1
.aggregate(new UrlViewCountAgg(), new UrlViewCountResult());

result.print("result");
result.getSideOutput(outputTag).print("late");

// 为方便观察,可以将原始数据也输出
stream.print("input");

env.execute();
}

public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}

@Override
public Long add(Event value, Long accumulator) {
return accumulator + 1;
}

@Override
public Long getResult(Long accumulator) {
return accumulator;
}

@Override
public Long merge(Long a, Long b) {
return null;
}
}

public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {

@Override
public void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
// 结合窗口信息,包装输出内容
Long start = context.window().getStart();
Long end = context.window().getEnd();
out.collect(new UrlViewCount(url, elements.iterator().next(), start, end));
}
}
}

处理函数

processFunction

ProcessElement()

onTimer()

keyedProcessFunction

先keyed再处理,包含定时器Timer

processWindowFunction

全窗口处理函数

elements.spliterator().getExactSizeIfKnown();获取窗口内有多少元素

processAllWindowFunction

AllWindowedStream调用.process()

处理整个窗口的数据

CoprocessFunction

合并两条流进行处理

ProcessJoinFunction

join两条流后进行处理

BroadcastProcessFunction

广播流处理

KeyedBroadcastProcessFunction

分区广播流处理