Paimon学习笔记(二)
目录
流式查询
默认情况下,Streaming read 在第一次启动时会生成表上的最新快照,并继续读取最新的更改。
需要先
1 | SET 'execution.checkpointing.interval'='30s'; |
也可以从最新读取,设置扫描模式
1 | SELECT * FROM ws_t /*+ OPTIONS('scan.mode' = 'latest') */ |
时间旅行
如果只想处理今天以及以后的数据,就可以使用分区过滤器实现
1 | SELECT * FROM test_p WHERE dt > '2023-07-01' |
如果不是分区表,或者无法按分区筛选,可以使用时间旅行的流读取。
从指定快照id读取变更数据
1 | SELECT * FROM ws_t /*+ OPTIONS('scan.snapshot-id' = '1') */; |
从指定的时间戳开始读取
1 | SELECT * FROM ws_t /*+ OPTIONS('scan.timestamp-millis' = '1688369660841') */; |
批量的时间旅行读取的是快照id为1的当时的全量数据,流旅行是有点像增量查询,是从指定的这个快照id读取后面的所有的变更信息
以谁为起点就是过去的都不算了,以当前为开始,之前的都不算了
第一次启动时读取指定快照数据,并继续读取变化
如果想要的假如是快照id3的当时的快照的全量数据+后面的增量数据,需要指定扫描模式,这种方式应该是在业务上更多一点
Consumer Id
这个功能是类似于flink的savePoint,类似于kafka中的消费id
读取paimon表的时候,指定一个consumer-id开始流式查询,当停掉这个查询后,插入一条数据,再指定这个consumer-id进行流式查询,就会查询到这个最新的插入的数据,但是之前的数据内容不会查询到。如果去掉这个consumner-id的话就是从最早的数据开始读的
查询优化
前缀索引
1 | CREATE TABLE orders ( |
这个建表语句使用
1 | SELECT * FROM orders WHERE catalog_id=1025; |
这个三种查询方式都会加速,因为有前缀索引,按照分区的顺序进行查询会很好的加速查询
1 | SELECT * FROM orders WHERE order_id=29495; |
这两种方式就不能很好的加速查询了
系统表
快照表 Snapshots Table
模式表
表结构信息
选项表
审计日志
维表Join
这个很有用
可以用于补充流数据的维度字段
假如我们有一个流,流里面只有一个sku_id,我们需要去拿着这个id去外部系统查询一下这个id对应的数据,这个就是lookup join。假如我们的外部系统是paimon,paimon是支持被lookup join的,直接flink sql去这里直接找就可以。假如说使用了hudi,那么hudi不能直接去找,需要资源多,去连接再去查询。而且多了一个hudi,运维成本增加,而且容易出问题。
根据customer_id补充维度数据
1 | USE CATALOG fs_catalog; |
1 | SELECT o.order_id, o.total, c.country, c.zip |
这里是flink sql的写法,这里的FOR SYSTEM_TIME AS OF o.proc_time AS c
是写的是指定事实表的处理时间字段
CDC集成
Paimon可以实现多表同步或者整库同步
目前只支持mysql和kafka
目前只支持Canal CDC的方式
Canal CDC是阿里的一种CDC的使用方式,专注于Mysql,获取mysql增量数据放到kafka中,这里获取的格式内容如下,包括:
1 | { |
字段 | 含义 | 示例 |
---|---|---|
data | 包含具体的记录信息,表示数据库表中变化后的数据 | [{"id": "1", "spu_name": "小米12sultra", "description": "小米12", "category3_id": "61", "tm_id": "1", "create_time": "2021-12-14 00:00:00", "operate_time": null}] |
database | 变化发生的数据库名称 | gmall |
es | 事件时间戳,表示事件发生的时间(毫秒) | 1689151700000 |
id | 事件的唯一标识 | 9 |
isDdl | 指示是否为DDL(数据定义语言)操作,false 表示这是一个DML(数据操作语言)操作 |
false |
mysqlType | 每个字段在MySQL中的数据类型 | {"id": "bigint", "spu_name": "varchar(200)", "description": "varchar(1000)", "category3_id": "bigint", "tm_id": "bigint", "create_time": "datetime", "operate_time": "datetime"} |
old | 包含变化前的数据,仅在更新操作中存在 | [{"description": "小米10"}] |
pkNames | 表示表的主键字段 | ["id"] |
sql | SQL语句(在这个示例中为空) | "" |
sqlType | 每个字段在JDBC中的数据类型 | {"id": -5, "spu_name": 12, "description": 12, "category3_id": -5, "tm_id": -5, "create_time": 93, "operate_time": 93} |
table | 变化发生的表名 | spu_info |
ts | 事件的时间戳(毫秒),表示事件记录的时间 | 1689151700998 |
type | 操作类型,表示这是一次更新操作 | UPDATE |
MysqlCDC集成
同步表
1 | <FLINK_HOME>/bin/flink run \ |
配置 | 描述 |
---|---|
–warehouse | Paimon仓库路径。 |
–database | Paimon Catalog中的数据库名称。 |
–table | Paimon 表名称。 |
–partition-keys | Paimon 表的分区键。如果有多个分区键,请用逗号连接,例如“dt,hh,mm”。 |
–primary-keys | Paimon 表的主键。如果有多个主键,请用逗号连接,例如“buyer_id,seller_id”。 |
–computed-column | 计算列的定义。参数字段来自 MySQL 表字段名称。 |
–mysql-conf | Flink CDC MySQL 源表的配置。每个配置都应以“key=value”的格式指定。主机名、用户名、密码、数据库名和表名是必需配置,其他是可选配置。 |
–catalog-conf | Paimon Catalog的配置。每个配置都应以“key=value”的格式指定。 |
–table-conf | Paimon 表sink的配置。每个配置都应以“key=value”的格式指定。 |
实操
如果指定的 Paimon 表不存在,此操作将自动创建该表。其schema将从所有指定的 MySQL 表派生。如果 Paimon 表已存在,则其schema将与所有指定 MySQL 表的schema进行比较。
- mysql一张表同步到paimon的一张表
1 | bin/flink run \ |
mysql多张表同步到paimon的一张表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
mysql-sync-table \
--warehouse hdfs://hadoop102:8020/paimon/hive \
--database test \
--table order_cdc \
--primary-keys id \
--mysql-conf hostname=hadoop102 \
--mysql-conf username=root \
--mysql-conf password=000000 \
--mysql-conf database-name=gmall \
--mysql-conf table-name='order_.*' \
--catalog-conf metastore=hive \
--catalog-conf uri=thrift://hadoop102:9083 \
--table-conf bucket=4 \
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4这里只支持模式匹配的写法,不支持直接写两个表名,将两个表的数据直接同步到一张表,这种方式是不支持的
同步库
1 | <FLINK_HOME>/bin/flink run \ |
配置 | 描述 |
---|---|
–warehouse | Paimon仓库路径。 |
–database | Paimon Catalog中的数据库名称。 |
–ignore-incompatible | 默认为 false,在这种情况下,如果 Paimon 中存在 MySQL 表名,并且它们的 schema 不兼容,则会抛出异常。您可以显式将其指定为 true 以忽略不兼容的表和异常。 |
–table-prefix | 所有需要同步的Paimon表的前缀。例如,如果您希望所有同步表都以“ods_”作为前缀,则可以指定“–table-prefix ods_”。 |
–table-suffix | 所有需要同步的Paimon表的后缀。用法与“–table-prefix”相同。 |
–including-tables | 用于指定要同步哪些源表。您必须使用“ |
–excluding-tables | 用于指定哪些源表不同步。用法与“–include-tables”相同。如果同时指定了“–excluding-tables”,则“–excluding-tables”的优先级高于“–including-tables”。 |
–mysql-conf | Flink CDC MySQL源表的配置。每个配置都应以“key=value”的格式指定。主机名、用户名、密码、数据库名和表名是必需配置,其他是可选配置。 |
–catalog-conf | Paimon Catalog的配置。每个配置都应以“key=value”的格式指定。 |
–table-conf | Paimon 表sink的配置。每个配置都应以“key=value”的格式指定。 |
只有具有主键的表才会被同步
对于每个需要同步的MySQL表,如果对应的Paimon表不存在,该操作会自动创建该表。其schema将从所有指定的 MySQL 表派生。如果 Paimon 表已存在,则其schema将与所有指定 MySQL 表的schema进行比较
实例
同步库下面的表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
mysql-sync-database \
--warehouse hdfs://hadoop102:8020/paimon/hive \
--database test \
--table-prefix "ods_" \
--table-suffix "_cdc" \
--mysql-conf hostname=hadoop102 \
--mysql-conf username=root \
--mysql-conf password=000000 \
--mysql-conf database-name=gmall \
--catalog-conf metastore=hive \
--catalog-conf uri=thrift://hadoop102:9083 \
--table-conf bucket=4 \
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4 \
--including-tables 'user_info|order_info|activity_rule'动态加表
这个适用于这种情景,有时候我们正在同步任务,但是这个时候发现我们落下了一张表同步,我们需要再加一张表,但是这几张表正在同步中,同步完还需要很久,停掉还需要删除,删除也需要很久,这时候我们使用这个方法
假设我们现在提交的同步作业是下面这样:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
mysql-sync-database \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=source_db \
--catalog-conf metastore=hive \
--catalog-conf uri=thrift://hive-metastore:9083 \
--table-conf bucket=4 \
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4 \
--including-tables 'product|user|address'我们希望也同步一张历史数据表,就可以从作业的先前的快照中恢复,并且重用作业的现有的状态,恢复的作业将首先对新添加的表进行快,自动读取变更日志
1
2
3
4
5
6
7
8
9
10
11
12
13
14<FLINK_HOME>/bin/flink run \
--fromSavepoint savepointPath \
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
mysql-sync-database \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=source_db \
--catalog-conf metastore=hive \
--catalog-conf uri=thrift://hive-metastore:9083 \
--table-conf bucket=4 \
--including-tables 'product|user|address|order|custom'这里的
--fromSavepoint savepointPath
和including-tables 'product|user|address|order|custom
需要更改一下就可以了
KafkaCDC集成
这里的kafka中的数据是我们直接从Cancle中获取到的mysql的增量数据然后传到到kafka中,我们再使用paimon这个cdc的功能从kafka中获取到cancle的增量数据,然后按照每条数据上的database,table等信息,将对应的信息写入到对应的paimon表中
同步表
将kafka的一个主题中的一张或者多张表同步到一张Paimon表中
1 | <FLINK_HOME>/bin/flink run \ |
配置 | 描述 |
---|---|
–warehouse | Paimon仓库路径。 |
–database | Paimon Catalog中的数据库名称。 |
–table | Paimon 表名称。 |
–partition-keys | Paimon 表的分区键。如果有多个分区键,请用逗号连接,例如“dt,hh,mm”。 |
–primary-keys | Paimon 表的主键。如果有多个主键,请用逗号连接,例如“buyer_id,seller_id”。 |
–computed-column | 计算列的定义。参数字段来自 Kafka 主题的表字段名称。 |
–kafka-conf | Flink Kafka 源的配置。每个配置都应以“key=value”的格式指定。properties.bootstrap.servers 、topic 、properties.group.id 和 value.format 是必需配置,其他配置是可选的。 |
–catalog-conf | Paimon Catalog的配置。每个配置都应以“key=value”的格式指定。 |
–table-conf | Paimon 表sink的配置。每个配置都应以“key=value”的格式指定。 |
实操
- 准备好Canal-json格式的数据放到kafka的topic中
kafka-console-producer.sh --broker-list hadoop102:9092 --topic paimon_canal
插入数据如下:
1
2
3
4
5{"data":[{"id":"6","login_name":"t7dk2h","nick_name":"冰冰11","passwd":null,"name":"淳于冰","phone_num":"13178654378","email":"t7dk2h@263.net","head_img":null,"user_level":"1","birthday":"1997-12-08","gender":null,"create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689150607000,"id":1,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"冰冰"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151566836,"type":"UPDATE"}
{"data":[{"id":"7","login_name":"vihcj30p1","nick_name":"豪心22","passwd":null,"name":"魏豪心","phone_num":"13956932645","email":"vihcj30p1@live.com","head_img":null,"user_level":"1","birthday":"1991-06-07","gender":"M","create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151623000,"id":2,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"豪心"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151623139,"type":"UPDATE"}
{"data":[{"id":"8","login_name":"02r2ahx","nick_name":"卿卿33","passwd":null,"name":"穆卿","phone_num":"13412413361","email":"02r2ahx@sohu.com","head_img":null,"user_level":"1","birthday":"2001-07-08","gender":"F","create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151626000,"id":3,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"卿卿"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151626863,"type":"UPDATE"}
{"data":[{"id":"9","login_name":"mjhrxnu","nick_name":"武新44","passwd":null,"name":"罗武新","phone_num":"13617856358","email":"mjhrxnu@yahoo.com","head_img":null,"user_level":"1","birthday":"2001-08-08","gender":null,"create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151630000,"id":4,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"武新"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151630781,"type":"UPDATE"}
{"data":[{"id":"10","login_name":"kwua2155","nick_name":"纨纨55","passwd":null,"name":"姜纨","phone_num":"13742843828","email":"kwua2155@163.net","head_img":null,"user_level":"3","birthday":"1997-11-08","gender":"F","create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151633000,"id":5,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"纨纨"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151633697,"type":"UPDATE"}从一个kafka主题同步到paimon表上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
kafka-sync-table \
--warehouse hdfs://hadoop102:8020/paimon/hive \
--database test \
--table kafka_user_info_cdc \
--primary-keys id \
--kafka-conf properties.bootstrap.servers=hadoop102:9092 \
--kafka-conf topic=paimon_canal \
--kafka-conf properties.group.id=atguigu \
--kafka-conf scan.startup.mode=earliest-offset \
--kafka-conf value.format=canal-json \
--catalog-conf metastore=hive \
--catalog-conf uri=thrift://hadoop102:9083 \
--table-conf bucket=4 \
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4
同步数据库
将多个主题或者一个主题同步到一个paimon的库中
一个主题中因为是cancle-json格式,所以可能会有多个表,多个主题中也可以有多个表
只有具有主键的表才会被同步
1 | <FLINK_HOME>/bin/flink run \ |
配置 | 描述 |
---|---|
–warehouse | The path to Paimon warehouse. 通往派蒙仓库的道路。 |
–database | Paimon 目录中的数据库名称。 |
–schema-init-max-read | 如果您的表全部来自某个Topic,您可以设置该参数来初始化需要同步的表数量。默认值为 1000。 |
–ignore-incompatible | 默认为 false,在这种情况下,如果 Paimon 中存在 MySQL 表名,并且它们的 schema 不兼容,则会抛出异常。您可以显式将其指定为 true 以忽略不兼容的表和异常。 |
–table-prefix | 所有需要同步的Paimon表的前缀。例如,如果您希望所有同步表都以“ods_”作为前缀,则可以指定“–table-prefix ods_”。 |
–table-suffix | 所有需要同步的Paimon表的后缀。用法与“–table-prefix”相同。 |
–including-tables | 用于指定要同步哪些源表。您必须使用“ |
–excluding-tables | 用于指定哪些源表不同步。用法与“–include-tables”相同。如果同时指定了“–excluding-tables”,则“–excluding-tables”的优先级高于“–including-tables”。 |
–kafka-conf | Flink Kafka 源的配置。每个配置都应以“key=value”的格式指定。properties.bootstrap.servers 、topic 、properties.group.id 和 value.format 是必需配置,其他配置是可选的。有关完整配置列表,请参阅其文档。 |
–catalog-conf | Paimon 目录的配置。每个配置都应以“key=value”的格式指定。请参阅此处以获取目录配置的完整列表。 |
–table-conf | Paimon 餐桌水槽的配置。每个配置都应以“key=value”的格式指定。请参阅此处了解表配置的完整列表。 |
实例
kafka-console-producer.sh --broker-list hadoop102:9092 --topic paimon_canal_2
插入数据如下(注意不要有空行):
1
2
3
4
5
6
7
8
9{"data":[{"id":"6","login_name":"t7dk2h","nick_name":"冰冰11","passwd":null,"name":"淳于冰","phone_num":"13178654378","email":"t7dk2h@263.net","head_img":null,"user_level":"1","birthday":"1997-12-08","gender":null,"create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689150607000,"id":1,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"冰冰"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151566836,"type":"UPDATE"}
{"data":[{"id":"7","login_name":"vihcj30p1","nick_name":"豪心22","passwd":null,"name":"魏豪心","phone_num":"13956932645","email":"vihcj30p1@live.com","head_img":null,"user_level":"1","birthday":"1991-06-07","gender":"M","create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151623000,"id":2,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"豪心"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151623139,"type":"UPDATE"}
{"data":[{"id":"8","login_name":"02r2ahx","nick_name":"卿卿33","passwd":null,"name":"穆卿","phone_num":"13412413361","email":"02r2ahx@sohu.com","head_img":null,"user_level":"1","birthday":"2001-07-08","gender":"F","create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151626000,"id":3,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"卿卿"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151626863,"type":"UPDATE"}
{"data":[{"id":"9","login_name":"mjhrxnu","nick_name":"武新44","passwd":null,"name":"罗武新","phone_num":"13617856358","email":"mjhrxnu@yahoo.com","head_img":null,"user_level":"1","birthday":"2001-08-08","gender":null,"create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151630000,"id":4,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"武新"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151630781,"type":"UPDATE"}
{"data":[{"id":"10","login_name":"kwua2155","nick_name":"纨纨55","passwd":null,"name":"姜纨","phone_num":"13742843828","email":"kwua2155@163.net","head_img":null,"user_level":"3","birthday":"1997-11-08","gender":"F","create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151633000,"id":5,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"纨纨"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151633697,"type":"UPDATE"}
{"data":[{"id":"12","spu_name":"华为智慧屏 4K全面屏智能电视机1","description":"华为智慧屏 4K全面屏智能电视机","category3_id":"86","tm_id":"3","create_time":"2021-12-14 00:00:00","operate_time":null}],"database":"gmall","es":1689151648000,"id":6,"isDdl":false,"mysqlType":{"id":"bigint","spu_name":"varchar(200)","description":"varchar(1000)","category3_id":"bigint","tm_id":"bigint","create_time":"datetime","operate_time":"datetime"},"old":[{"spu_name":"华为智慧屏 4K全面屏智能电视机"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"spu_name":12,"description":12,"category3_id":-5,"tm_id":-5,"create_time":93,"operate_time":93},"table":"spu_info","ts":1689151648872,"type":"UPDATE"}
{"data":[{"id":"3","spu_name":"Apple iPhone 13","description":"Apple iPhone 13","category3_id":"61","tm_id":"2","create_time":"2021-12-14 00:00:00","operate_time":null}],"database":"gmall","es":1689151661000,"id":7,"isDdl":false,"mysqlType":{"id":"bigint","spu_name":"varchar(200)","description":"varchar(1000)","category3_id":"bigint","tm_id":"bigint","create_time":"datetime","operate_time":"datetime"},"old":[{"spu_name":"Apple iPhone 12","description":"Apple iPhone 12"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"spu_name":12,"description":12,"category3_id":-5,"tm_id":-5,"create_time":93,"operate_time":93},"table":"spu_info","ts":1689151661828,"type":"UPDATE"}
{"data":[{"id":"4","spu_name":"HUAWEI P50","description":"HUAWEI P50","category3_id":"61","tm_id":"3","create_time":"2021-12-14 00:00:00","operate_time":null}],"database":"gmall","es":1689151669000,"id":8,"isDdl":false,"mysqlType":{"id":"bigint","spu_name":"varchar(200)","description":"varchar(1000)","category3_id":"bigint","tm_id":"bigint","create_time":"datetime","operate_time":"datetime"},"old":[{"spu_name":"HUAWEI P40","description":"HUAWEI P40"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"spu_name":12,"description":12,"category3_id":-5,"tm_id":-5,"create_time":93,"operate_time":93},"table":"spu_info","ts":1689151669966,"type":"UPDATE"}
{"data":[{"id":"1","spu_name":"小米12sultra","description":"小米12","category3_id":"61","tm_id":"1","create_time":"2021-12-14 00:00:00","operate_time":null}],"database":"gmall","es":1689151700000,"id":9,"isDdl":false,"mysqlType":{"id":"bigint","spu_name":"varchar(200)","description":"varchar(1000)","category3_id":"bigint","tm_id":"bigint","create_time":"datetime","operate_time":"datetime"},"old":[{"description":"小米10"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"spu_name":12,"description":12,"category3_id":-5,"tm_id":-5,"create_time":93,"operate_time":93},"table":"spu_info","ts":1689151700998,"type":"UPDATE"}从一个kafka的主题中同步到paimon数据库中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
kafka-sync-database \
--warehouse hdfs://hadoop102:8020/paimon/hive \
--database test \
--table-prefix "t1_" \
--table-suffix "_cdc" \
--schema-init-max-read 500 \
--kafka-conf properties.bootstrap.servers=hadoop102:9092 \
--kafka-conf topic=paimon_canal_2 \
--kafka-conf properties.group.id=atguigu \
--kafka-conf scan.startup.mode=earliest-offset \
--kafka-conf value.format=canal-json \
--catalog-conf metastore=hive \
--catalog-conf uri=thrift://hadoop102:9083 \
--table-conf bucket=4 \
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4从多个kafka主题同步到paimon数据库中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
kafka-sync-database \
--warehouse hdfs://hadoop102:8020/paimon/hive \
--database test \
--table-prefix "t2_" \
--table-suffix "_cdc" \
--kafka-conf properties.bootstrap.servers=hadoop102:9092 \
--kafka-conf topic="paimon_canal;paimon_canal_1" \
--kafka-conf properties.group.id=atguigu \
--kafka-conf scan.startup.mode=earliest-offset \
--kafka-conf value.format=canal-json \
--catalog-conf metastore=hive \
--catalog-conf uri=thrift://hadoop102:9083 \
--table-conf bucket=4 \
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4
支持的表结构变更
目前支持的表结构变更包括:
- 添加列
- 更改列类型
不支持删除列的cdc
也就是如果我们在cdc一张mysql表的时候,如果增加了一列,我们是可以cdc到这个表的变更的,将paimon上表的结构也一起变更,然后存储一个快照信息。但是如果是我们删除了一列的话,是无法cdc到数据变更的