Flink+ODPS历史累计计算-流批结合
基本介绍
ODPS:ODPS是阿里云的数据仓库和离线计算的内容,其实可以理解为数据库和离线计算,类似华为云的数据同步数据集成
背景
需求目前是根据用户注册的累计的车辆行驶里程,给用户发优惠卷,需要实时的计算出历史–这时刻的累计的里程数据
实现逻辑
这种我们考虑使用实时+离线的方式,离线计算历史到作日的累计数据,实时计算当日累计数据,然后把两条链路的数据进行汇总
离线计算逻辑
step1:用户历史数据初始化。假设该计算任务发布的时间为20231010,首先要对用户 历史~20231009 期间的历史数据进行汇总,得到一个 历史存量累计数据 history_data;
step2:从20231010起,对用户每日的增量跑步数据进行汇总,得到该日的增量累计数据 day_data;
step3:将每日的增量累计数据day_data 与 历史存量累计数据history_data 进行求和,作为新的历史存量累计数据 history_data(T-1) = day_data(T-1) + history_data(T-2) ;
step4:重复 step2 和step3 ,每日更新历史存量累计数据 history_data 。
历史全量数据只计算一次,以后计算的都是增量数据
实时链路设计
step1:用户新增的跑步记录通过MQ发送给Flink任务;
step2:Flink节点1对数据去重;
step3:Flink节点2对实时汇总统计 当日零点至此刻 用户的跑步累计数据;step4:将计算结果输出给下游
融合
我们得到了用户“历史–昨日”的累计数据和“当日凌晨–此刻”的累计数据,将两者相加就可以实时得到历史–此刻的数据
ODPS(数据分析) 计算出用户 [非当日的历史累计数据],为使用方便,会每天更新全量用户历史累计数据;
使用Flink节点1 实时计算用户当日上传的跑步累计数据;
使用 Flink节点2 实时的将离线数据和实时数据汇总起来;
将汇总结果写入Hbase结果表,同时发送个MQ消息给下游业务方。
遇到的问题
目前有两个问题
上面的lambda方案有个问题,每日凌晨零点过后,实时任务已开始计算新的一天数据,而离线任务计算尚未结束,这时会出现一个离线数据缺失的窗口期。重点分析一下框图中“实时数据+离线数据”的部分:
正常情况
当一个用户在T日实时上传了自己的记录,Flink节点1会计算出其 [当日0点起至此刻] 的累计数据data1,Flink节点2会根据该用户id取hbase维表里查询其 [历史~T-1日] 的累计数据 data2 (hbase表里数据由odps每日更新,即T-1日的存量累计汇总数据),将data1和data2二者汇总,就可得到 用户历史至此时刻的汇总数据;
异常情况
在凌晨(比如说,在00:00~00:30),ODPS正在计算最新分区数据(T-1日的数据)的期间,新的分区还没生成完,或者ODPS计算已经完成,但odps表同步base表同步任务还未完成,此时若发生了查询,会发生什么?
会使用老分区的数据(T-2日的数据,而不是期望的T-1日数据),导致数据不准。
【问题描述】
在凌晨时分,ODPS计算T-1日数据期间,如果发生了对T-1日的数据查询,则无法获取到期望的T-1日数据,会继续使用T-2日的数据
这里“无法获取正确数据”的时间长度 = ODPS计算时间 + ODPS同步数据到Hbase的时间
【原因】
Flink查询维表时 使用维表当前的数据快照,本次查询完成后再发生的维表更新不会对已有查询造成影响。
【举例】
case1(ODPS计算未完成)
27号,Flink任务计算27号当天的用户累计数据,同时查询odps维表的 26号分区 中该用户的历史累计数据,两者相加,得到27号的实时累计结果;
28号凌晨,ODPS正在计算27号分区的数据,任务还未结束,27号分区数据尚不可用;而Flink任务已经开始计算28号当天的用户累计数据,此刻发生了一次维表查询,期望从维表中查到该用户27号统计的历史累计数据,然而由于27号数据未准备好,则维表会返回26号的历史累计数据,这会导致数据计算错误,相当于丢失了该用户27号的数据。
case2(ODPS计算完成,但odps表同步habse表任务未完成)
28号凌晨,ODPS的计算已完成,odps表正在同步数据到hbase表期间,如果Flink发生了查询,期望获取用户27号的最新数据,但由于还没有更新完成,还是会用26号的数据,会造成类似的错误结果。
总结
其实也就是在零点的时候,离线要开始计算前一天的数据量和历史到前一天的数据量,但是这时候实时任务还在跑,离线任务计算的这个空档期,实时任务获取不到计算那天的数据,就会和前一天的数据相加,导致拉下了昨天的数据
优化方案
优化方案一:基于lamba方案改进
第一个时间窗口:数据计算时间窗口
这个时间窗口是ODPS计算的数据时间窗口,现在我们的计算是历史到t-1的离线数据+实时计算的t日的增量数据作为实时总量的数据
现在我们可以改成实时总量=历史~t-2的离线数据+t日和t—1的增量数据计算
举例来说:
27号 :
ODPS凌晨时开始计算 26号的存量数据(此时25号存量数据已经计算完成了)。
Flink1计算26和27号的汇总数据;Flink2使用odps表的25号存量数据与Flink1的26、27号汇总数据,得到27号历史~此刻的汇总数据。
28号 :
ODPS凌晨时开始计算 27号的存量数据(此时26号存量数据经过一天肯定计算完成了)。
Flink1计算27和28号的汇总数据;Flink2使用odps表的26号存量数据与Flink1的26、27号汇总数据,得到27号历史~此刻的汇总数据。
······
经过上述方案,可以解决“ODPS计算时间”造成的时间差问题。
这里有一个小细节需要注意以下
实时计算 [T日 +T-1日] 增量数据,T-1日数据既会参与T-1日的实时计算,也会参与T日的计算,会被用到两次(如上,27号的数据既要参与27日当日的实时计算,也要参与28号的实时计算)。可以写个UDTF将数据内容复制下,一条用于当日 ,一条用于明日
第二个时间窗口:数据同步到hbase表的时间窗口
我们这里可以使用hbase的双分区(双列)
我们要使用odps表的 T-2 分区数据, 在凌晨的时候还要无缝切换到新的T-2日分区,这里就涉及到两个分区数据的切换。那就在 hbase表里模拟出两个时间分区,分别存方odps的最近两个分区数据好了(hbase维表设计使用 “两列” 来模拟两个分区,也可以使用 “一列+两行” 的方案来解决)。
原来hbase表结构
更改后hbase表结构
优化方案一的实施
优化方案二:基于流计算
分析得到的主要的问题就是这两个时间窗口,不要离线链路,完全用实时链路来计算就可以没有这个问题了
step1: 初始化:将用户 [历史 ~ 昨日] 累计数据计算好,放入到hbase结果表;
step2: 从今日零点开始启动Flink任务,新来一条数据,Flink就去hbase结果表里查该用户原有累计数据,相加,实时更新hbase结果表;
该hbase结果表,既作为Flink写的结果表,又作为Flink查的维表;
老用户,历史累计与新来数据数值直接相加即可;
新用户,hbase结果表里没有该用户数据,则将历史累计值设为0,在相加,更新hbase结果表;同时将该新用户uid记录到一张odps增量分区表中(备用)。
注意
如果某些业务的QPS(每秒内查询次数)没有那么高,预估在400之下,建议直接使用flink+clickhouse的实时分析来计算,不用存放到hbase中
step1:将用户历史存量明细数据、实时增量明细数据 实时同步到Clickhouse中;
step2: 基于Clickhouse做实时数据分析,对外提供数据服务,每次查询时在Clickhouse中实时计算历史~今的统计数据。