flink问题总结(持续更新)
2024-09-26 19:52:02 # flink # 问题总结

flink问题总结(持续更新)

flink反压如何排查

flink的反压首先我们需要检查一下taskManager和JobManager的日志

然后看一下web ui界面上的反压程度有多少,这里需要查看BackPressured是百分之多少。

然后在web ui上看一下处理的数据是否倾斜

使用火焰图进行分析,有没有大平顶的情况,有大平顶就是这个函数有问题

如果是source或者sink的第三方组件就对第三方组件进行处理。一般的处理就是攒批再读写

算子链断联有哪些方式

  1. 使用.disableChaining(); // 禁用算子链
  2. 使用startNewChain()是强制开启一个新算子链
  3. 设置forceNonParallel()强制不要开启并行,断开算子链
  4. 或者重分区就可以断开算子链了

flink的metrics介绍

一般监控反压的时候会使用到,一般Metrics和Buffer使用有关。

Buffer就是flink中存储数据的临时存储单元,数据会先写入到buffer,再从buffer传输

Metrics常见的有:

  • outpoolUsage:发送端的buffer使用率
  • inpoolUsage:接收端的buffer使用率
  • floatingBuffersUsage:接收端Floating Buffer使用率
  • exclusiveBuffersUsage:接收端Exclusive Buffer使用率

inpoolUsage=floatingBufferUsage+exclusiveBuffersUsage

如果发送端Buffer使用率低,但是接收端的高说明,如果上游的发送端是高的说明就是这里反压

如果发送端buffer使用率高,但是接收端低,有可能就是一条输入多条输出,这里就是反压

继续分析:

1)floatingBuffersUsage 为高,则表明反压正在传导至上游

2)同时exclusiveBuffersUsage为低,则表明可能有倾斜

checkpoint流程

详情查看:
http://www.luckiness.cc/2024/06/20/flink%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%E5%A4%A7%E7%BA%B2-%E4%BA%8C/#%E6%A3%80%E6%9F%A5%E7%82%B9%E7%9A%84%E4%BF%9D%E5%AD%98

端到端的一致性如果挂掉了怎么办

  1. 启动检查点
  2. 配置状态后端
  3. flink设置重启
  4. 保存点手动触发
  5. 需要使用监控,监控flink作业当前状态

flink写入hive能否端到端一致性,怎么保证幂等写入

  1. 启动检查点
  2. 事务写入:可以启动hive的事务表,指定事务id
  3. 分区和覆盖写入:每次写入的时候覆盖分区数据
  4. 创建唯一键约束:使用Merge语句保持检查并更新已经存在的记录

端到端的一致性,只需要保持两端的数据相同即可

数据倾斜场景

数据倾斜分三种情况:

  1. key之后的聚合出现数据倾斜

    需要先在本地进行一次聚合,减少发送到下游的数据量,然后在key前面加一个随机的前缀,把相同键的数据分散到不同子任务,然后再去除前缀全局聚合

  2. keyby聚合之前出现数据倾斜

    直接rescale

  3. keyby之后的窗口出现数据倾斜

    两阶段提交:

    1. 第一阶段聚合:
      1. 在每个key前面加一个随机前缀打散
      2. 按照随机前缀进行窗口聚合,使用十秒滚动窗口
      3. 输出结果包括键,聚合,和窗口结束时间
    2. 第二阶段聚合:
      1. 去掉随机前缀,然后第二阶段聚合
      2. 按照原来的键和窗口结束时间分组
      3. 对第一次聚合结果做最终聚合

缩减状态的方式有哪些

  1. 设置TTL状态过期时间
  2. 压缩状态里的数据
  3. 选择合适的数据结构存状态,比如map,不使用多个ValueState

旁路缓存能不能保证数据端到端的一致性

不能。旁路缓存是绕过缓存直接写入下游,一般是临时数据或者大量数据时候使用

端到端一致性是保证数据源到数据处理到数据存储的数据正确和一致

这两个不是一个概念

flink的一致性

端到端的精准一次

  1. flink内部:开启检查点
  2. 输入端,将kafka的偏移量保存为算子状态
  3. 输出端kafka两阶段提交,基于事务预提交,检查点保存完再提交事务正式提交

具体步骤:

  1. JobManager会通知TaskManager要开启检查点了,Source会把barrier分界线注入到数据流
  2. 算子收到barrier分界线会对当前的状态做快照,然后保存到状态后端。barrier会把offset也作为状态写入到检查点,存入状态后端
  3. 分界线传到sink任务,sink会开启一个事务,所有到的数据会通过事务写入到kafka,对于kafka提交的数据会标记未确认和已确认的两种状态
  4. 所有算子的快照都完成,JobManager会发送确认通知,检查点保存成功,sink接收到就会提交事务,把事务从未确认改成已确认
  5. 如果任务失败了,就会状态回滚,未提交的数据也会回滚

flink如何保证数据不丢失

  1. 状态管理
  2. checkpoint
  3. exactly once语义:两阶段提交:数据写入到外部系统之前会先将数据和状态保存到检查点,检查点保存成功后,再提交到外部系统
  4. 端到端一致性
  5. 支持多种状态后端
  6. 可以配置重启策略

说一下flink的状态后端

默认存在堆内存里面

可以配置rocks DB中和FS中也就是HDFS或者S3中

检查点的状态存储在状态后端,检查点的内容详细查看检查点

flink状态后端怎么做去重

  1. 使用ValueState实现简单去重

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
       import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;

    public class UniqueFilter {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.fromElements("a", "b", "a", "c", "b", "d")
    .keyBy(value -> value)
    .process(new DeduplicateFunction())
    .print();

    env.execute("Deduplication Example");
    }

    public static class DeduplicateFunction extends KeyedProcessFunction<String, String, String> {
    private transient ValueState<Boolean> seenState;

    @Override
    public void open(Configuration parameters) throws Exception {
    ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>(
    "seen", // 状态的名字
    Types.BOOLEAN // 状态存储的数据类型
    );
    seenState = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
    if (seenState.value() == null) {
    // 如果这是第一次看到这个值,输出它并更新状态
    out.collect(value);
    seenState.update(true);
    }
    }
    }
    }
  2. 使用MapState实现带有时间窗口的去重

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    import org.apache.flink.api.common.state.MapState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;

    public class WindowedUniqueFilter {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.fromElements("a", "b", "a", "c", "b", "d")
    .keyBy(value -> value)
    .process(new WindowedDeduplicateFunction(10000L)) // 10秒窗口
    .print();

    env.execute("Windowed Deduplication Example");
    }

    public static class WindowedDeduplicateFunction extends KeyedProcessFunction<String, String, String> {
    private final long windowSize;
    private transient MapState<String, Long> seenState;

    public WindowedDeduplicateFunction(long windowSize) {
    this.windowSize = windowSize;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
    MapStateDescriptor<String, Long> descriptor = new MapStateDescriptor<>(
    "seen",
    Types.STRING,
    Types.LONG
    );
    seenState = getRuntimeContext().getMapState(descriptor);
    }

    @Override
    public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
    long currentTime = ctx.timerService().currentProcessingTime();
    if (!seenState.contains(value) || currentTime - seenState.get(value) > windowSize) {
    // 如果这是第一次看到这个值,或者超过了时间窗口,输出它并更新状态
    out.collect(value);
    seenState.put(value, currentTime);
    }
    }
    }
    }

    每当处理一个新元素时,我们检查当前时间与该元素上次出现时间的差异。如果差异大于窗口大小(例如10秒),我们输出该元素并更新其状态。否则,我们忽略该元素。

flink的优势和其他框架的区别

架构

spark streaming的架构主要是基于spark的微批处理,每个batch依赖driver

flink也是经典的主从架构,flink启动后会先根据用户代码处理成streamgraph,然后优化为job graph,jobManager会根据jobgraph生成执行流图,这个执行流图才是真正flink执行的数据结构,多个执行流图分布在集群中生成一个拓扑结构

反压

spark streaming为了实现反压,根据PID算法,通过任务结束时间,处理时长,处理数据条数等计算了一个速率,控制速率来实现反压

flink的反压很简单,就是在数据传输中,使用分布式的阻塞队列,队列满了的话发送者会被天然堵塞住

flink中的connect和join有什么区别

  1. 数据流性质:

    connect:将两个数据流合并为一个流,但它们的数据类型可以不同。通常用于需要处理多种类型事件的场景,合并后可以使用 CoMap 或 CoFlatMap 进行处理。
    join:在两个数据流之间进行关联,通常需要相同的键或条件。它创建一个新流,包含满足条件的组合数据,常用于关联相关数据(例如,将用户信息与用户行为进行关联)。

  2. 输出类型:

connect:输出的类型由两个流的数据类型决定,可以使用不同的处理逻辑来处理每个流的数据。
join:输出的数据类型通常是结合两个流的数据,形成新的结果类型,通常是一个复合对象或元组。
3. 时间窗口:

connect:通常不涉及时间窗口的概念,更多的是实时处理。
join:可以通过时间窗口进行连接,例如事件时间或处理时间窗口,适合需要时间上下文的场景。
4. 应用场景:

connect:适合处理不同类型的数据流,需要分别处理的场景。
join:适合需要将相关数据结合在一起的场景,如将用户数据与交易数据关联。