seatunel学习笔记(一)
2024-10-23 10:37:30 # seatunnel # 学习笔记

seatunel学习笔记(一)

简介

用于离线和实时的数据同步,主要是编写配置文件实现,配置文件自动转为spark或flink的任务来进行数据库之间的数据同步

低代码,易维护

官方示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 配置Spark或Flink的参数
env {
# You can set flink configuration here
execution.parallelism = 1
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://hadoop102:9092/checkpoint"
}
# 在source所属的块中配置数据源
source {
SocketStream{
host = data1 # 这个连接的host主机名
result_table_name = "fake" # 这个flink或spark创建的临时表用于做后续的处理
field_name = "info" # 定义了从socket流中获取的数据的字段名叫做info
}
}
# 在transform的块中声明转换插件
transform { #下面这个transform转换的意思是按照#切分,第一个是name,第二个是age
Split{
separator = "#"
fields = ["name","age"]
}
sql { # 这里的意思是输出的是传入的内容和切分后的内容,格式按照上面切分的"name","age"两部分,中间加逗号的格式输出split(info)
sql = "select info, split(info) as info_row from fake"
}
}
# 在sink块中声明要输出到哪
sink {
ConsoleSink {}
}


在bin目录下有两个sh脚本,一个是提交flink任务,一个是提交spark任务
alt text

bin/start-seatunnel-flink.sh --config config/example01.conf

结果如下:
alt text

参数

config参数

这个参数是指定了应用配置文件的路径

variable参数

这个参数是给配置文件传值

比如我们之前的示例里面,如果我们要获取age大于多少的数据,我们就可以向内传值

1
2
3
4
sql {
sql = "select * from (select info,split(info) from fake) where age > '"${age}"'"
}

如果我们要传入多个值可以使用-i

1
bin/start-seatunnel-flink.sh --config/xxx.sh -i age=18 -i sex=man

注意

这里需要注意的是假如我们要写查询age大于多少的数据,我们需要写成以下的形式

1
select * from (select info , split(info) from fake) where age > '"${age}"'

我们需要套一层查询,因为where会先于select,split出的字段也无法用where过滤

flink参数

在seatunnel里面,seatunnel只是调用了flink的启动脚本,所以对于flink的启动脚本中的配置参数使用在seatunnel中也同样适用

比如我们可以指定flink任务的并行度

1
bin/start-seatunnel-flink.sh --config config/ -p 2\