Flink+ODPS历史累计计算-流批结合
2024-10-09 09:22:02 # 流批结合

Flink+ODPS历史累计计算-流批结合

基本介绍

ODPS:ODPS是阿里云的数据仓库和离线计算的内容,其实可以理解为数据库和离线计算,类似华为云的数据同步数据集成

背景

需求目前是根据用户注册的累计的车辆行驶里程,给用户发优惠卷,需要实时的计算出历史–这时刻的累计的里程数据

实现逻辑

这种我们考虑使用实时+离线的方式,离线计算历史到作日的累计数据,实时计算当日累计数据,然后把两条链路的数据进行汇总

离线计算逻辑

step1:用户历史数据初始化。假设该计算任务发布的时间为20231010,首先要对用户 历史~20231009 期间的历史数据进行汇总,得到一个 历史存量累计数据 history_data;

step2:从20231010起,对用户每日的增量跑步数据进行汇总,得到该日的增量累计数据 day_data;

step3:将每日的增量累计数据day_data 与 历史存量累计数据history_data 进行求和,作为新的历史存量累计数据 history_data(T-1) = day_data(T-1) + history_data(T-2) ;

step4:重复 step2 和step3 ,每日更新历史存量累计数据 history_data 。

历史全量数据只计算一次,以后计算的都是增量数据

实时链路设计

step1:用户新增的跑步记录通过MQ发送给Flink任务;

step2:Flink节点1对数据去重;

step3:Flink节点2对实时汇总统计 当日零点至此刻 用户的跑步累计数据;step4:将计算结果输出给下游

融合

我们得到了用户“历史–昨日”的累计数据和“当日凌晨–此刻”的累计数据,将两者相加就可以实时得到历史–此刻的数据

ODPS(数据分析) 计算出用户 [非当日的历史累计数据],为使用方便,会每天更新全量用户历史累计数据;

使用Flink节点1 实时计算用户当日上传的跑步累计数据;

使用 Flink节点2 实时的将离线数据和实时数据汇总起来;

将汇总结果写入Hbase结果表,同时发送个MQ消息给下游业务方。

alt text

遇到的问题

目前有两个问题

上面的lambda方案有个问题,每日凌晨零点过后,实时任务已开始计算新的一天数据,而离线任务计算尚未结束,这时会出现一个离线数据缺失的窗口期。重点分析一下框图中“实时数据+离线数据”的部分:
alt text

正常情况
当一个用户在T日实时上传了自己的记录,Flink节点1会计算出其 [当日0点起至此刻] 的累计数据data1,Flink节点2会根据该用户id取hbase维表里查询其 [历史~T-1日] 的累计数据 data2 (hbase表里数据由odps每日更新,即T-1日的存量累计汇总数据),将data1和data2二者汇总,就可得到 用户历史至此时刻的汇总数据;

异常情况
在凌晨(比如说,在00:00~00:30),ODPS正在计算最新分区数据(T-1日的数据)的期间,新的分区还没生成完,或者ODPS计算已经完成,但odps表同步base表同步任务还未完成,此时若发生了查询,会发生什么?

会使用老分区的数据(T-2日的数据,而不是期望的T-1日数据),导致数据不准。

【问题描述】

在凌晨时分,ODPS计算T-1日数据期间,如果发生了对T-1日的数据查询,则无法获取到期望的T-1日数据,会继续使用T-2日的数据

这里“无法获取正确数据”的时间长度 = ODPS计算时间 + ODPS同步数据到Hbase的时间

【原因】

Flink查询维表时 使用维表当前的数据快照,本次查询完成后再发生的维表更新不会对已有查询造成影响。

【举例】

case1(ODPS计算未完成)

27号,Flink任务计算27号当天的用户累计数据,同时查询odps维表的 26号分区 中该用户的历史累计数据,两者相加,得到27号的实时累计结果;

28号凌晨,ODPS正在计算27号分区的数据,任务还未结束,27号分区数据尚不可用;而Flink任务已经开始计算28号当天的用户累计数据,此刻发生了一次维表查询,期望从维表中查到该用户27号统计的历史累计数据,然而由于27号数据未准备好,则维表会返回26号的历史累计数据,这会导致数据计算错误,相当于丢失了该用户27号的数据。

case2(ODPS计算完成,但odps表同步habse表任务未完成)

28号凌晨,ODPS的计算已完成,odps表正在同步数据到hbase表期间,如果Flink发生了查询,期望获取用户27号的最新数据,但由于还没有更新完成,还是会用26号的数据,会造成类似的错误结果。

总结

其实也就是在零点的时候,离线要开始计算前一天的数据量和历史到前一天的数据量,但是这时候实时任务还在跑,离线任务计算的这个空档期,实时任务获取不到计算那天的数据,就会和前一天的数据相加,导致拉下了昨天的数据

优化方案

优化方案一:基于lamba方案改进

第一个时间窗口:数据计算时间窗口

这个时间窗口是ODPS计算的数据时间窗口,现在我们的计算是历史到t-1的离线数据+实时计算的t日的增量数据作为实时总量的数据

现在我们可以改成实时总量=历史~t-2的离线数据+t日和t—1的增量数据计算

举例来说:

27号 :

ODPS凌晨时开始计算 26号的存量数据(此时25号存量数据已经计算完成了)。

Flink1计算26和27号的汇总数据;Flink2使用odps表的25号存量数据与Flink1的26、27号汇总数据,得到27号历史~此刻的汇总数据。

28号 :

ODPS凌晨时开始计算 27号的存量数据(此时26号存量数据经过一天肯定计算完成了)。

Flink1计算27和28号的汇总数据;Flink2使用odps表的26号存量数据与Flink1的26、27号汇总数据,得到27号历史~此刻的汇总数据。

······

经过上述方案,可以解决“ODPS计算时间”造成的时间差问题。

这里有一个小细节需要注意以下
实时计算 [T日 +T-1日] 增量数据,T-1日数据既会参与T-1日的实时计算,也会参与T日的计算,会被用到两次(如上,27号的数据既要参与27日当日的实时计算,也要参与28号的实时计算)。可以写个UDTF将数据内容复制下,一条用于当日 ,一条用于明日

第二个时间窗口:数据同步到hbase表的时间窗口

我们这里可以使用hbase的双分区(双列)

我们要使用odps表的 T-2 分区数据, 在凌晨的时候还要无缝切换到新的T-2日分区,这里就涉及到两个分区数据的切换。那就在 hbase表里模拟出两个时间分区,分别存方odps的最近两个分区数据好了(hbase维表设计使用 “两列” 来模拟两个分区,也可以使用 “一列+两行” 的方案来解决)。

原来hbase表结构
alt text

更改后hbase表结构
alt text

优化方案一的实施

alt text

优化方案二:基于流计算

分析得到的主要的问题就是这两个时间窗口,不要离线链路,完全用实时链路来计算就可以没有这个问题了

step1: 初始化:将用户 [历史 ~ 昨日] 累计数据计算好,放入到hbase结果表;

step2: 从今日零点开始启动Flink任务,新来一条数据,Flink就去hbase结果表里查该用户原有累计数据,相加,实时更新hbase结果表;

该hbase结果表,既作为Flink写的结果表,又作为Flink查的维表;

老用户,历史累计与新来数据数值直接相加即可;

新用户,hbase结果表里没有该用户数据,则将历史累计值设为0,在相加,更新hbase结果表;同时将该新用户uid记录到一张odps增量分区表中(备用)。

alt text

注意

如果某些业务的QPS(每秒内查询次数)没有那么高,预估在400之下,建议直接使用flink+clickhouse的实时分析来计算,不用存放到hbase中

step1:将用户历史存量明细数据、实时增量明细数据 实时同步到Clickhouse中;

step2: 基于Clickhouse做实时数据分析,对外提供数据服务,每次查询时在Clickhouse中实时计算历史~今的统计数据。