paimon学习笔记(二)
2024-07-04 17:45:30 # paimon # 学习笔记

Paimon学习笔记(二)

目录

流式查询

默认情况下,Streaming read 在第一次启动时会生成表上的最新快照,并继续读取最新的更改。

需要先

1
2
SET 'execution.checkpointing.interval'='30s';
SET 'execution.runtime-mode' = 'streaming';

也可以从最新读取,设置扫描模式

1
SELECT * FROM ws_t /*+ OPTIONS('scan.mode' = 'latest') */

alt text

时间旅行

如果只想处理今天以及以后的数据,就可以使用分区过滤器实现

1
SELECT * FROM test_p WHERE dt > '2023-07-01'

如果不是分区表,或者无法按分区筛选,可以使用时间旅行的流读取。

从指定快照id读取变更数据
1
SELECT * FROM ws_t /*+ OPTIONS('scan.snapshot-id' = '1') */;
从指定的时间戳开始读取
1
SELECT * FROM ws_t /*+ OPTIONS('scan.timestamp-millis' = '1688369660841') */;

alt text
批量的时间旅行读取的是快照id为1的当时的全量数据,流旅行是有点像增量查询,是从指定的这个快照id读取后面的所有的变更信息

以谁为起点就是过去的都不算了,以当前为开始,之前的都不算了

第一次启动时读取指定快照数据,并继续读取变化

如果想要的假如是快照id3的当时的快照的全量数据+后面的增量数据,需要指定扫描模式,这种方式应该是在业务上更多一点
alt text

Consumer Id

这个功能是类似于flink的savePoint,类似于kafka中的消费id

读取paimon表的时候,指定一个consumer-id开始流式查询,当停掉这个查询后,插入一条数据,再指定这个consumer-id进行流式查询,就会查询到这个最新的插入的数据,但是之前的数据内容不会查询到。如果去掉这个consumner-id的话就是从最早的数据开始读的
alt text

alt text

alt text

查询优化

前缀索引

1
2
3
4
5
6
7
CREATE TABLE orders (
catalog_id BIGINT,
order_id BIGINT,
.....,
PRIMARY KEY (catalog_id, order_id) NOT ENFORCED -- composite primary key
)

这个建表语句使用

1
2
3
4
5
6
7
SELECT * FROM orders WHERE catalog_id=1025;

SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;

SELECT * FROM orders
WHERE catalog_id=1025jkjkjk
AND order_id>2035 AND order_id<6000;

这个三种查询方式都会加速,因为有前缀索引,按照分区的顺序进行查询会很好的加速查询

1
2
3
SELECT * FROM orders WHERE order_id=29495;

SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;

这两种方式就不能很好的加速查询了

系统表

快照表 Snapshots Table

alt text

模式表

表结构信息
alt text

选项表

审计日志

alt text

维表Join

这个很有用

可以用于补充流数据的维度字段

假如我们有一个流,流里面只有一个sku_id,我们需要去拿着这个id去外部系统查询一下这个id对应的数据,这个就是lookup join。假如我们的外部系统是paimon,paimon是支持被lookup join的,直接flink sql去这里直接找就可以。假如说使用了hudi,那么hudi不能直接去找,需要资源多,去连接再去查询。而且多了一个hudi,运维成本增加,而且容易出问题。

根据customer_id补充维度数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
USE CATALOG fs_catalog;

CREATE TABLE customers (
id INT PRIMARY KEY NOT ENFORCED,
name STRING,
country STRING,
zip STRING
);

INSERT INTO customers VALUES(1,'zs','ch','123'),(2,'ls','ch','456'), (3,'ww','ch','789');


CREATE TEMPORARY TABLE Orders (
order_id INT,
total INT,
customer_id INT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second'='1',
'fields.order_id.kind'='sequence',
'fields.order_id.start'='1',
'fields.order_id.end'='1000000',
'fields.total.kind'='random',
'fields.total.min'='1',
'fields.total.max'='1000',
'fields.customer_id.kind'='random',
'fields.customer_id.min'='1',
'fields.customer_id.max'='3'
);


SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

1
2
3
4
5
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

这里是flink sql的写法,这里的FOR SYSTEM_TIME AS OF o.proc_time AS c是写的是指定事实表的处理时间字段

CDC集成

Paimon可以实现多表同步或者整库同步

目前只支持mysql和kafka

目前只支持Canal CDC的方式

Canal CDC是阿里的一种CDC的使用方式,专注于Mysql,获取mysql增量数据放到kafka中,这里获取的格式内容如下,包括:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
"data": [{
"id": "1",
"spu_name": "小米12sultra",
"description": "小米12",
"category3_id": "61",
"tm_id": "1",
"create_time": "2021-12-14 00:00:00",
"operate_time": null
}],
"database": "gmall",
"es": 1689151700000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"spu_name": "varchar(200)",
"description": "varchar(1000)",
"category3_id": "bigint",
"tm_id": "bigint",
"create_time": "datetime",
"operate_time": "datetime"
},
"old": [{
"description": "小米10"
}],
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": -5,
"spu_name": 12,
"description": 12,
"category3_id": -5,
"tm_id": -5,
"create_time": 93,
"operate_time": 93
},
"table": "spu_info",
"ts": 1689151700998,
"type": "UPDATE"
}
字段 含义 示例
data 包含具体的记录信息,表示数据库表中变化后的数据 [{"id": "1", "spu_name": "小米12sultra", "description": "小米12", "category3_id": "61", "tm_id": "1", "create_time": "2021-12-14 00:00:00", "operate_time": null}]
database 变化发生的数据库名称 gmall
es 事件时间戳,表示事件发生的时间(毫秒) 1689151700000
id 事件的唯一标识 9
isDdl 指示是否为DDL(数据定义语言)操作,false表示这是一个DML(数据操作语言)操作 false
mysqlType 每个字段在MySQL中的数据类型 {"id": "bigint", "spu_name": "varchar(200)", "description": "varchar(1000)", "category3_id": "bigint", "tm_id": "bigint", "create_time": "datetime", "operate_time": "datetime"}
old 包含变化前的数据,仅在更新操作中存在 [{"description": "小米10"}]
pkNames 表示表的主键字段 ["id"]
sql SQL语句(在这个示例中为空) ""
sqlType 每个字段在JDBC中的数据类型 {"id": -5, "spu_name": 12, "description": 12, "category3_id": -5, "tm_id": -5, "create_time": 93, "operate_time": 93}
table 变化发生的表名 spu_info
ts 事件的时间戳(毫秒),表示事件记录的时间 1689151700998
type 操作类型,表示这是一次更新操作 UPDATE

MysqlCDC集成

同步表

1
2
3
4
5
6
7
8
9
10
11
12
13
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
mysql-sync-table
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
[--partition-keys <partition-keys>] \
[--primary-keys <primary-keys>] \
[--computed-column <'column-name=expr-name(args[, ...])'> [--computed-column ...]] \
[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]

配置 描述
–warehouse Paimon仓库路径。
–database Paimon Catalog中的数据库名称。
–table Paimon 表名称。
–partition-keys Paimon 表的分区键。如果有多个分区键,请用逗号连接,例如“dt,hh,mm”。
–primary-keys Paimon 表的主键。如果有多个主键,请用逗号连接,例如“buyer_id,seller_id”。
–computed-column 计算列的定义。参数字段来自 MySQL 表字段名称。
–mysql-conf Flink CDC MySQL 源表的配置。每个配置都应以“key=value”的格式指定。主机名、用户名、密码、数据库名和表名是必需配置,其他是可选配置。
–catalog-conf Paimon Catalog的配置。每个配置都应以“key=value”的格式指定。
–table-conf Paimon 表sink的配置。每个配置都应以“key=value”的格式指定。
实操

如果指定的 Paimon 表不存在,此操作将自动创建该表。其schema将从所有指定的 MySQL 表派生。如果 Paimon 表已存在,则其schema将与所有指定 MySQL 表的schema进行比较。

  1. mysql一张表同步到paimon的一张表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
mysql-sync-table \
--warehouse hdfs://hadoop102:8020/paimon/hive \
--database test \
--table order_info_cdc \
--primary-keys id \
--mysql-conf hostname=hadoop102 \
--mysql-conf username=root \
--mysql-conf password=000000 \
--mysql-conf database-name=gmall \
--mysql-conf table-name='order_info' \
--catalog-conf metastore=hive \
--catalog-conf uri=thrift://hadoop102:9083 \
--table-conf bucket=4 \
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4
  1. mysql多张表同步到paimon的一张表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    bin/flink run \
    /opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
    mysql-sync-table \
    --warehouse hdfs://hadoop102:8020/paimon/hive \
    --database test \
    --table order_cdc \
    --primary-keys id \
    --mysql-conf hostname=hadoop102 \
    --mysql-conf username=root \
    --mysql-conf password=000000 \
    --mysql-conf database-name=gmall \
    --mysql-conf table-name='order_.*' \
    --catalog-conf metastore=hive \
    --catalog-conf uri=thrift://hadoop102:9083 \
    --table-conf bucket=4 \
    --table-conf changelog-producer=input \
    --table-conf sink.parallelism=4

    这里只支持模式匹配的写法,不支持直接写两个表名,将两个表的数据直接同步到一张表,这种方式是不支持的

同步库

1
2
3
4
5
6
7
8
9
10
11
12
13
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
mysql-sync-database
--warehouse <warehouse-path> \
--database <database-name> \
[--ignore-incompatible <true/false>] \
[--table-prefix <paimon-table-prefix>] \
[--table-suffix <paimon-table-suffix>] \
[--including-tables <mysql-table-name|name-regular-expr>] \
[--excluding-tables <mysql-table-name|name-regular-expr>] \
[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]
配置 描述
–warehouse Paimon仓库路径。
–database Paimon Catalog中的数据库名称。
–ignore-incompatible 默认为 false,在这种情况下,如果 Paimon 中存在 MySQL 表名,并且它们的 schema 不兼容,则会抛出异常。您可以显式将其指定为 true 以忽略不兼容的表和异常。
–table-prefix 所有需要同步的Paimon表的前缀。例如,如果您希望所有同步表都以“ods_”作为前缀,则可以指定“–table-prefix ods_”。
–table-suffix 所有需要同步的Paimon表的后缀。用法与“–table-prefix”相同。
–including-tables 用于指定要同步哪些源表。您必须使用“
–excluding-tables 用于指定哪些源表不同步。用法与“–include-tables”相同。如果同时指定了“–excluding-tables”,则“–excluding-tables”的优先级高于“–including-tables”。
–mysql-conf Flink CDC MySQL源表的配置。每个配置都应以“key=value”的格式指定。主机名、用户名、密码、数据库名和表名是必需配置,其他是可选配置。
–catalog-conf Paimon Catalog的配置。每个配置都应以“key=value”的格式指定。
–table-conf Paimon 表sink的配置。每个配置都应以“key=value”的格式指定。

只有具有主键的表才会被同步

对于每个需要同步的MySQL表,如果对应的Paimon表不存在,该操作会自动创建该表。其schema将从所有指定的 MySQL 表派生。如果 Paimon 表已存在,则其schema将与所有指定 MySQL 表的schema进行比较

实例
  1. 同步库下面的表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    bin/flink run \
    /opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
    mysql-sync-database \
    --warehouse hdfs://hadoop102:8020/paimon/hive \
    --database test \
    --table-prefix "ods_" \
    --table-suffix "_cdc" \
    --mysql-conf hostname=hadoop102 \
    --mysql-conf username=root \
    --mysql-conf password=000000 \
    --mysql-conf database-name=gmall \
    --catalog-conf metastore=hive \
    --catalog-conf uri=thrift://hadoop102:9083 \
    --table-conf bucket=4 \
    --table-conf changelog-producer=input \
    --table-conf sink.parallelism=4 \
    --including-tables 'user_info|order_info|activity_rule'

  2. 动态加表

    这个适用于这种情景,有时候我们正在同步任务,但是这个时候发现我们落下了一张表同步,我们需要再加一张表,但是这几张表正在同步中,同步完还需要很久,停掉还需要删除,删除也需要很久,这时候我们使用这个方法

    假设我们现在提交的同步作业是下面这样:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    <FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
    mysql-sync-database \
    --warehouse hdfs:///path/to/warehouse \
    --database test_db \
    --mysql-conf hostname=127.0.0.1 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=source_db \
    --catalog-conf metastore=hive \
    --catalog-conf uri=thrift://hive-metastore:9083 \
    --table-conf bucket=4 \
    --table-conf changelog-producer=input \
    --table-conf sink.parallelism=4 \
    --including-tables 'product|user|address'

    我们希望也同步一张历史数据表,就可以从作业的先前的快照中恢复,并且重用作业的现有的状态,恢复的作业将首先对新添加的表进行快,自动读取变更日志

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    <FLINK_HOME>/bin/flink run \
    --fromSavepoint savepointPath \
    /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
    mysql-sync-database \
    --warehouse hdfs:///path/to/warehouse \
    --database test_db \
    --mysql-conf hostname=127.0.0.1 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=source_db \
    --catalog-conf metastore=hive \
    --catalog-conf uri=thrift://hive-metastore:9083 \
    --table-conf bucket=4 \
    --including-tables 'product|user|address|order|custom'

    这里的--fromSavepoint savepointPath including-tables 'product|user|address|order|custom需要更改一下就可以了

KafkaCDC集成

这里的kafka中的数据是我们直接从Cancle中获取到的mysql的增量数据然后传到到kafka中,我们再使用paimon这个cdc的功能从kafka中获取到cancle的增量数据,然后按照每条数据上的database,table等信息,将对应的信息写入到对应的paimon表中

同步表

将kafka的一个主题中的一张或者多张表同步到一张Paimon表中

1
2
3
4
5
6
7
8
9
10
11
12
13
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
kafka-sync-table
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
[--partition-keys <partition-keys>] \
[--primary-keys <primary-keys>] \
[--computed-column <'column-name=expr-name(args[, ...])'> [--computed-column ...]] \
[--kafka-conf <kafka-source-conf> [--kafka-conf <kafka-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]

配置 描述
–warehouse Paimon仓库路径。
–database Paimon Catalog中的数据库名称。
–table Paimon 表名称。
–partition-keys Paimon 表的分区键。如果有多个分区键,请用逗号连接,例如“dt,hh,mm”。
–primary-keys Paimon 表的主键。如果有多个主键,请用逗号连接,例如“buyer_id,seller_id”。
–computed-column 计算列的定义。参数字段来自 Kafka 主题的表字段名称。
–kafka-conf Flink Kafka 源的配置。每个配置都应以“key=value”的格式指定。properties.bootstrap.serverstopicproperties.group.idvalue.format 是必需配置,其他配置是可选的。
–catalog-conf Paimon Catalog的配置。每个配置都应以“key=value”的格式指定。
–table-conf Paimon 表sink的配置。每个配置都应以“key=value”的格式指定。
实操
  1. 准备好Canal-json格式的数据放到kafka的topic中

kafka-console-producer.sh --broker-list hadoop102:9092 --topic paimon_canal

  1. 插入数据如下:

    1
    2
    3
    4
    5
    {"data":[{"id":"6","login_name":"t7dk2h","nick_name":"冰冰11","passwd":null,"name":"淳于冰","phone_num":"13178654378","email":"t7dk2h@263.net","head_img":null,"user_level":"1","birthday":"1997-12-08","gender":null,"create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689150607000,"id":1,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"冰冰"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151566836,"type":"UPDATE"}
    {"data":[{"id":"7","login_name":"vihcj30p1","nick_name":"豪心22","passwd":null,"name":"魏豪心","phone_num":"13956932645","email":"vihcj30p1@live.com","head_img":null,"user_level":"1","birthday":"1991-06-07","gender":"M","create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151623000,"id":2,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"豪心"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151623139,"type":"UPDATE"}
    {"data":[{"id":"8","login_name":"02r2ahx","nick_name":"卿卿33","passwd":null,"name":"穆卿","phone_num":"13412413361","email":"02r2ahx@sohu.com","head_img":null,"user_level":"1","birthday":"2001-07-08","gender":"F","create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151626000,"id":3,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"卿卿"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151626863,"type":"UPDATE"}
    {"data":[{"id":"9","login_name":"mjhrxnu","nick_name":"武新44","passwd":null,"name":"罗武新","phone_num":"13617856358","email":"mjhrxnu@yahoo.com","head_img":null,"user_level":"1","birthday":"2001-08-08","gender":null,"create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151630000,"id":4,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"武新"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151630781,"type":"UPDATE"}
    {"data":[{"id":"10","login_name":"kwua2155","nick_name":"纨纨55","passwd":null,"name":"姜纨","phone_num":"13742843828","email":"kwua2155@163.net","head_img":null,"user_level":"3","birthday":"1997-11-08","gender":"F","create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151633000,"id":5,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"纨纨"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151633697,"type":"UPDATE"}
  2. 从一个kafka主题同步到paimon表上

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    bin/flink run \
    /opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
    kafka-sync-table \
    --warehouse hdfs://hadoop102:8020/paimon/hive \
    --database test \
    --table kafka_user_info_cdc \
    --primary-keys id \
    --kafka-conf properties.bootstrap.servers=hadoop102:9092 \
    --kafka-conf topic=paimon_canal \
    --kafka-conf properties.group.id=atguigu \
    --kafka-conf scan.startup.mode=earliest-offset \
    --kafka-conf value.format=canal-json \
    --catalog-conf metastore=hive \
    --catalog-conf uri=thrift://hadoop102:9083 \
    --table-conf bucket=4 \
    --table-conf changelog-producer=input \
    --table-conf sink.parallelism=4

同步数据库

将多个主题或者一个主题同步到一个paimon的库中

一个主题中因为是cancle-json格式,所以可能会有多个表,多个主题中也可以有多个表

只有具有主键的表才会被同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
kafka-sync-database
--warehouse <warehouse-path> \
--database <database-name> \
[--schema-init-max-read <int>] \
[--ignore-incompatible <true/false>] \
[--table-prefix <paimon-table-prefix>] \
[--table-suffix <paimon-table-suffix>] \
[--including-tables <table-name|name-regular-expr>] \
[--excluding-tables <table-name|name-regular-expr>] \
[--kafka-conf <kafka-source-conf> [--kafka-conf <kafka-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]
配置 描述
–warehouse The path to Paimon warehouse. 通往派蒙仓库的道路。
–database Paimon 目录中的数据库名称。
–schema-init-max-read 如果您的表全部来自某个Topic,您可以设置该参数来初始化需要同步的表数量。默认值为 1000。
–ignore-incompatible 默认为 false,在这种情况下,如果 Paimon 中存在 MySQL 表名,并且它们的 schema 不兼容,则会抛出异常。您可以显式将其指定为 true 以忽略不兼容的表和异常。
–table-prefix 所有需要同步的Paimon表的前缀。例如,如果您希望所有同步表都以“ods_”作为前缀,则可以指定“–table-prefix ods_”。
–table-suffix 所有需要同步的Paimon表的后缀。用法与“–table-prefix”相同。
–including-tables 用于指定要同步哪些源表。您必须使用“
–excluding-tables 用于指定哪些源表不同步。用法与“–include-tables”相同。如果同时指定了“–excluding-tables”,则“–excluding-tables”的优先级高于“–including-tables”。
–kafka-conf Flink Kafka 源的配置。每个配置都应以“key=value”的格式指定。properties.bootstrap.serverstopicproperties.group.idvalue.format 是必需配置,其他配置是可选的。有关完整配置列表,请参阅其文档。
–catalog-conf Paimon 目录的配置。每个配置都应以“key=value”的格式指定。请参阅此处以获取目录配置的完整列表。
–table-conf Paimon 餐桌水槽的配置。每个配置都应以“key=value”的格式指定。请参阅此处了解表配置的完整列表。
实例
  1. kafka-console-producer.sh --broker-list hadoop102:9092 --topic paimon_canal_2

  2. 插入数据如下(注意不要有空行):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    {"data":[{"id":"6","login_name":"t7dk2h","nick_name":"冰冰11","passwd":null,"name":"淳于冰","phone_num":"13178654378","email":"t7dk2h@263.net","head_img":null,"user_level":"1","birthday":"1997-12-08","gender":null,"create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689150607000,"id":1,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"冰冰"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151566836,"type":"UPDATE"}
    {"data":[{"id":"7","login_name":"vihcj30p1","nick_name":"豪心22","passwd":null,"name":"魏豪心","phone_num":"13956932645","email":"vihcj30p1@live.com","head_img":null,"user_level":"1","birthday":"1991-06-07","gender":"M","create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151623000,"id":2,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"豪心"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151623139,"type":"UPDATE"}
    {"data":[{"id":"8","login_name":"02r2ahx","nick_name":"卿卿33","passwd":null,"name":"穆卿","phone_num":"13412413361","email":"02r2ahx@sohu.com","head_img":null,"user_level":"1","birthday":"2001-07-08","gender":"F","create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151626000,"id":3,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"卿卿"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151626863,"type":"UPDATE"}
    {"data":[{"id":"9","login_name":"mjhrxnu","nick_name":"武新44","passwd":null,"name":"罗武新","phone_num":"13617856358","email":"mjhrxnu@yahoo.com","head_img":null,"user_level":"1","birthday":"2001-08-08","gender":null,"create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151630000,"id":4,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"武新"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151630781,"type":"UPDATE"}
    {"data":[{"id":"10","login_name":"kwua2155","nick_name":"纨纨55","passwd":null,"name":"姜纨","phone_num":"13742843828","email":"kwua2155@163.net","head_img":null,"user_level":"3","birthday":"1997-11-08","gender":"F","create_time":"2022-06-08 00:00:00","operate_time":null,"status":null}],"database":"gmall","es":1689151633000,"id":5,"isDdl":false,"mysqlType":{"id":"bigint","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime","status":"varchar(200)"},"old":[{"nick_name":"纨纨"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93,"status":12},"table":"user_info","ts":1689151633697,"type":"UPDATE"}
    {"data":[{"id":"12","spu_name":"华为智慧屏 4K全面屏智能电视机1","description":"华为智慧屏 4K全面屏智能电视机","category3_id":"86","tm_id":"3","create_time":"2021-12-14 00:00:00","operate_time":null}],"database":"gmall","es":1689151648000,"id":6,"isDdl":false,"mysqlType":{"id":"bigint","spu_name":"varchar(200)","description":"varchar(1000)","category3_id":"bigint","tm_id":"bigint","create_time":"datetime","operate_time":"datetime"},"old":[{"spu_name":"华为智慧屏 4K全面屏智能电视机"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"spu_name":12,"description":12,"category3_id":-5,"tm_id":-5,"create_time":93,"operate_time":93},"table":"spu_info","ts":1689151648872,"type":"UPDATE"}
    {"data":[{"id":"3","spu_name":"Apple iPhone 13","description":"Apple iPhone 13","category3_id":"61","tm_id":"2","create_time":"2021-12-14 00:00:00","operate_time":null}],"database":"gmall","es":1689151661000,"id":7,"isDdl":false,"mysqlType":{"id":"bigint","spu_name":"varchar(200)","description":"varchar(1000)","category3_id":"bigint","tm_id":"bigint","create_time":"datetime","operate_time":"datetime"},"old":[{"spu_name":"Apple iPhone 12","description":"Apple iPhone 12"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"spu_name":12,"description":12,"category3_id":-5,"tm_id":-5,"create_time":93,"operate_time":93},"table":"spu_info","ts":1689151661828,"type":"UPDATE"}
    {"data":[{"id":"4","spu_name":"HUAWEI P50","description":"HUAWEI P50","category3_id":"61","tm_id":"3","create_time":"2021-12-14 00:00:00","operate_time":null}],"database":"gmall","es":1689151669000,"id":8,"isDdl":false,"mysqlType":{"id":"bigint","spu_name":"varchar(200)","description":"varchar(1000)","category3_id":"bigint","tm_id":"bigint","create_time":"datetime","operate_time":"datetime"},"old":[{"spu_name":"HUAWEI P40","description":"HUAWEI P40"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"spu_name":12,"description":12,"category3_id":-5,"tm_id":-5,"create_time":93,"operate_time":93},"table":"spu_info","ts":1689151669966,"type":"UPDATE"}
    {"data":[{"id":"1","spu_name":"小米12sultra","description":"小米12","category3_id":"61","tm_id":"1","create_time":"2021-12-14 00:00:00","operate_time":null}],"database":"gmall","es":1689151700000,"id":9,"isDdl":false,"mysqlType":{"id":"bigint","spu_name":"varchar(200)","description":"varchar(1000)","category3_id":"bigint","tm_id":"bigint","create_time":"datetime","operate_time":"datetime"},"old":[{"description":"小米10"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"spu_name":12,"description":12,"category3_id":-5,"tm_id":-5,"create_time":93,"operate_time":93},"table":"spu_info","ts":1689151700998,"type":"UPDATE"}
  3. 从一个kafka的主题中同步到paimon数据库中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    bin/flink run \
    /opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
    kafka-sync-database \
    --warehouse hdfs://hadoop102:8020/paimon/hive \
    --database test \
    --table-prefix "t1_" \
    --table-suffix "_cdc" \
    --schema-init-max-read 500 \
    --kafka-conf properties.bootstrap.servers=hadoop102:9092 \
    --kafka-conf topic=paimon_canal_2 \
    --kafka-conf properties.group.id=atguigu \
    --kafka-conf scan.startup.mode=earliest-offset \
    --kafka-conf value.format=canal-json \
    --catalog-conf metastore=hive \
    --catalog-conf uri=thrift://hadoop102:9083 \
    --table-conf bucket=4 \
    --table-conf changelog-producer=input \
    --table-conf sink.parallelism=4
  4. 从多个kafka主题同步到paimon数据库中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    bin/flink run \
    /opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
    kafka-sync-database \
    --warehouse hdfs://hadoop102:8020/paimon/hive \
    --database test \
    --table-prefix "t2_" \
    --table-suffix "_cdc" \
    --kafka-conf properties.bootstrap.servers=hadoop102:9092 \
    --kafka-conf topic="paimon_canal;paimon_canal_1" \
    --kafka-conf properties.group.id=atguigu \
    --kafka-conf scan.startup.mode=earliest-offset \
    --kafka-conf value.format=canal-json \
    --catalog-conf metastore=hive \
    --catalog-conf uri=thrift://hadoop102:9083 \
    --table-conf bucket=4 \
    --table-conf changelog-producer=input \
    --table-conf sink.parallelism=4

支持的表结构变更

目前支持的表结构变更包括:

  1. 添加列
  2. 更改列类型

不支持删除列的cdc

也就是如果我们在cdc一张mysql表的时候,如果增加了一列,我们是可以cdc到这个表的变更的,将paimon上表的结构也一起变更,然后存储一个快照信息。但是如果是我们删除了一列的话,是无法cdc到数据变更的