用户画像基础(三)
2024-12-18 10:22:34 # 项目基础 # 用户画像 # 数仓基础/数据导入方式(sqoop/datax/seatunnel)/基础表结构

数据采集

数仓基础理论

做一个数仓的项目最先要做的是什么?

  • 采用什么模型去设计
  • 概念模型,逻辑模型怎么去划定
  • 物理模型怎么设计

数仓的几种模型:星型模型,雪花模型,星座模型

  • 星型模型

星型模型主要是维表和事实表,以事实表为中心,所有维度直接关联在事实表上,呈星型分布。

模型的样式会比较固定,事实表的使用频次会特别高

image-20240803001711236

  • 雪花模型

雪花模型,在星型模型的基础上,维度表上又关联了其他维度表。星型模型可以理解为,一个事实表关联多个维度表,雪花模型可以理解为一个事实表关联多个维度表,维度表再关联维度表。

可以理解为星型模型过滤到雪花模型

image-20240803002341752

  • 星座模型

用户画像是以星座模型为主的

星座模型,是对星型模型的扩展延伸,多张事实表共享维度表。

星座模型是很多数据仓库的常态,因为很多数据仓库都是多个事实表的。所以星座模型只反映是否有多个事实表,他们之间是否共享一些维度表。

image-20240803002450723

用户画像逻辑模型设计

这个就是数据域的设计

商城业务逻辑模型

image-20240803122341443

借贷业务逻辑模型

image-20240803122929866

首先人去借款,去用户主题下面,然后用户会进行借款,进行贷前的行为,先进行风控决策里面进行判断这个用户是否可借款,借款额度是多少什么的,年化利率必须是多少,然后反馈到客户,客户如果能接收就发起申请,到了贷中内,然后就会走到贷后,贷后就是各种还款计划等等,有些用户发起了贷前走到了风控,我们发现这个用户的风险程度很高,我们会把这个用户卖给其他公司,让其他公司做,这个就叫做尾量,这里就是我们赚差价的钱。营销的话是识别有些用户有借款想法但是没有借款就使用营销把意向用户变成用户

用户画像表的设计

stg这个层是因为量级太大的话我们经常使用的是增量采集,然后增量数据和全量数据进行full join
stg里面我们现在放的是日增量,用ods表去full join这个stg的表去获取全量的数据

项目 库名 表名 主题域 描述
商城 stg,ods ods_user_base_info_d 客户 客户基本信息表
商城 stg,ods ods_user_auth_info_d 客户 客户实名信息表
商城 stg,ods ods_user_suppl_info_d 客户 客户补充信息表
商城 ods ods_employee_base_info_d 员工 员工基本信息表
商城 ods ods_merchant_base_info_d 商家 商家基本信息表
商城 ods ods_order_base_info_d 订单 订单基本信息表
商城 ods ods_product_base_info_d 商品 商品基本信息表
商城 ods ods_user_evaluate_log_d 评价 商品评价日志表
商城 ods ods_logistics_log_info_d 物流 物流信息表
商城 ods ods_warehousing_base_info_d 仓储 仓储信息表
借贷 ods ods_user_base_info_d 客户 客户基本信息表
借贷 ods ods_user_auth_info_d 客户 客户实名信息表
借贷 ods ods_user_suppl_info_d 客户 客户补充信息表
借贷 ods ods_user_contact_base_info_d 联系人 联系人基本信息表
借贷 ods ods_credit_user_base_info_d 贷前 贷前用户信息表
借贷 ods ods_credit_order_base_info_d 贷前 贷前订单信息表
借贷 ods ods_loan_user_base_info_d 贷中 贷中用户信息表
借贷 ods ods_loan_order_base_info_d 贷中 贷中订单信息表
借贷 ods ods_repay_user_base_info_d 贷后 贷后用户信息表
借贷 ods ods_repay_order_base_info_d 贷后 贷后订单信息表
借贷 ods ods_flow_user_base_info_d 尾量 尾量用户信息表
借贷 ods ods_flow_order_base_info_d 尾量 尾量订单信息表
借贷 ods ods_decision_base_info_d 决策流 决策流信息表
借贷 ods ods_model_base_info_d 模型 模型结果表
借贷 ods ods_risk_factor_base_info_d 风险因子 风险因子信息表
流量数仓 ods ods_h5_event_i h5埋点 h5埋点信息采集表
流量数仓 ods ods_business_event_i 业务埋点 业务埋点信息采集表
三方采买 ods ods_third_{third_name}__i 外部三方 外部三方采买数据表

用户画像采集方式

一般阿里使用的dataX更多,sqoop已经不更新了,但是sqoop会更加的稳定

sqoop:

  • 数据量大–Sqoop的性能在处理大量数据时可能受到限制,因为它使用MapReduce,而MapReduce的批处理性质可能导致较长的传输延迟。
  • Sqoop是Apache Hadoop生态系统的一部分,专门用于在Hadoop和关系型数据库之间传输数据。它支持将数据从关系型数据库(如MySQL、Oracle、SQL Server)导入到Hadoop中,也支持将数据从Hadoop导出到关系型数据库。
  • 可接受高并发的模式

sqoop采集是最常用的一种形式 import是将数据写入到hdfs列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
sqoop import \
-Dmapreduce.job.queuename=root \ --填写你的队列
--connect jdbc:mysql://user_base:3306/ \ --指定数据库连接URL
--driver com.mysql.jdbc.Driver \
--username ${username} \ --mysql的用户名
--password ${passwd} \ --mysql的密码
--query 'SELECT id,user_id,first_channel,second_channel,id_card_md5,id_card_bind_time,id_home,user_sex,user_birthday,user_age,mobile,mobile_home,user_bind_mobile_time,load_time FROM user_base.user where $CONDITIONS' \ --需要采集的列,这里的顺序会映射到hive表中,所以上面的顺序需要确定下来,和hive表的顺序相同,sqoop采集的字段百分之90都是存储在textfile的表中,因为存储为textfile表也就是在hdfs上正常的文本格式存储,hive表才能通过文本信息直接做映射都很方便
--delete-target-dir \
--target-dir /user/hive/warehouse/mall.db/ods_user_base_info_d/dt=${1}/ \ --需要写入的hdfs的地址
--hive-import \ --写入
--hive-overwrite -m 1 \ --覆盖写入,写入的并发数量
--hive-database mall \ --写入的hive库
--hive-table ods_user_base_info_d \ --写入的hive表
--hive-partition-key dt \ --写入hive表的分区字段名
--hive-partition-value ${1} \ --写入hive表的分区字段值
--fields-terminated-by "\\0001" \ --写入hive表字段的分割符
--lines-terminated-by "\n" \ --写入hive表列的分割符
--input-null-string \ --Hive =中的 Null 在底层是以“\N”来存储,而 MySQL 中的 Null 在底层就是 Null,为了保证数据两端的一致性
--input-null-non-string \
--split-by id --采集时用来划分并发的字段


这里的一些内容进行如下解释:

  • –hive-partition-value ${1}

    作用: 定义 Hive 表的分区值。

    例子:

    如果 ${1} 是传入的分区值,比如日期 2024-11-21,则 Hive 表会按分区存储数据,路径为:

    1
    /user/hive/warehouse/mall.db/ods_user_base_info_d/dt=2024-11-21/

    数据会写入分区字段 dt 的值为 2024-11-21 的分区中。

  • –fields-terminated-by “\0001”

    作用: 定义字段之间的分隔符。

    例子:

    在导入数据到 Hive 表时,每行的字段用 \u0001(不可见字符)分隔。例如:

    1
    1\u00012024-11-21\u0001John\u0001Doe'

    这样 Hive 能正确解析字段。

  • –lines-terminated-by “\n”

    作用: 定义行的分隔符。

    例子:

    数据文件中的每一行是以换行符 \n 结束的。例如:

    1
    2
    1\u00012024-11-21\u0001John\u0001Doe\n
    2\u00012024-11-22\u0001Jane\u0001Doe\n
  • –input-null-string 和 –input-null-non-string

    作用:

    保证 MySQL 和 Hive 之间对于 NULL 值的处理一致。

    –input-null-string: 用于处理 字符串类型的 NULL 值。

    –input-null-non-string: 用于处理 非字符串类型的 NULL 值。

    例子:

    MySQL 中:

    id name age
    1 NULL 25
    2 John NULL

    Hive 中:

1
2
1\u0001\N\u000125
2\u0001John\u0001\N

NULL 值会被转换为 \N,以便 Hive 正确识别。

这里有一些需要注意的点:

  • 一般我们query中的sql中的字段顺序需要和我们写入到hive表对应列的顺序相同
  • 我们在设置了hive-overwrite -m这个设置也就是并发数量的设置,如果不是1的话我们必须要设置下面的split-by设置,因为是这样的,sqoop并发的话需要一个并发字段,首先假如我们这里设置的并发数量是4,看我们这里设置的并发字段是id,就会根据id先去排序,然后根据排序平均划分4份,然后把这平均划分的四份分散到4个不同的线程去执行并发,如果设置了并发但是不写并发字段的话sqoop就不知道用哪个字段做并发就会报错

传输到mysql,使用比较少

1
2
3
4
5
6
7
8
sqoop export 

--connect jdbc:mysql://localhost/mydatabase \
--username myuser \
--password mypassword \
--table mytable \
--export-dir /user/hadoop/mydata

这个就是eval的脚本测试的时候用

1
2
3
4
5
6
sqoop eval
--connect jdbc:mysql://localhost/mydatabase \
--username myuser \
--password mypassword \
--query "SELECT * FROM mytable limit 100"

1、sqoop的默认并发数是4 - m 1. 每次采集数据的一个量级

2、sqoop数据倾斜问题 - m 10 60min. split by KEY (int) 100000/10 10000 0-10000 10000+100000/10 ;KEY

这个数据倾斜的问题大概率是这个split by key中的这个key的问题,key最好是int类型

3、sqoop底层执行的是什么任务 mr map-m 10 reduc-0
只有map任务,没有reduce

DataX:

  • DataX是阿里巴巴开源的数据交换工具,不仅支持Hadoop生态系统,还支持其他各种数据存储和处理系统,如关系型数据库、NoSQL数据库、Hive、HBase等。
  • DataX支持多种读写插件,可以更好地处理不同数据源和目标的性能需求,从而提供更高的灵活性和性能。
  • 全内存操作,不读写磁盘,对大数据量的处理上存在内存限制的问题

包装到shell里的一个命令:

Data_shell.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
source ~/.bashrc
dt=$1

table_name="ods_user_base_info_d"

# 这个是先看看这个文件夹下面有没有这个文件,先保证路径没有先进行路径删除
hadoop fs -rmr /user/hive/warehouse/mall.db/${table_name}/dt=${dt}
hadoop fs -mkdir -p /user/hive/warehouse/mall.db/${table_name}/dt=${dt}

python datax.py ods_user_base_info_d.json

if [ $? -ne 0 ];then
exit 1
fi

hive -e "load data inpath '/user/hive/warehouse/mall.db/ods_user_base_info_d/dt=${dt}' overwrite into table mall.ods_user_base_info_d partition (dt='${dt}');"

if [ $? -ne 0 ];then
echo "load data to Hive Fail!"
exit 1
fi

echo "load data to Hive Success!"

json的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
{
"job": {
"setting": {
"speed": {
"channel": 3 //并行任务数
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"querySql": [
" select id
,name
,version
,project_id
,strategy_id
,rule_code
,rule_type
,content
,lock_user
,lock_time
,create_time
,update_time
,create_user
,update_user
,strategy_depends
,is_origin
,on_demand
,output
from user

​ union all

​ select id
​ ,name
​ ,version
​ ,project_id
​ ,strategy_id
​ ,rule_code
​ ,rule_type
​ ,content
​ ,lock_user
​ ,lock_time
​ ,create_time
​ ,update_time
​ ,create_user
​ ,update_user
​ ,strategy_depends
​ ,is_origin
​ ,on_demand
​ ,output
​ from user2

​ union all

​ select id
​ ,name
​ ,version
​ ,project_id
​ ,strategy_id
​ ,rule_code
​ ,rule_type
​ ,content
​ ,lock_user
​ ,lock_time
​ ,create_time
​ ,update_time
​ ,create_user
​ ,update_user
​ ,strategy_depends
​ ,is_origin
​ ,on_demand
​ ,output
​ from user3

​ "
],
"jdbcUrl": [
"jdbc:mysql://user_base:3306/user?useUnicode=true&characterEncoding=utf8& "
]
}
],
"username": "user_name",
"password": "passwd"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"fileType": "text",
"fieldDelimiter": "\u0001", // hive表默认分隔符
"path": "/user/hive/warehouse/mall.db/ods_user_base_info_d/dt=${dt}",
"fileName": "data",
"defaultFS": "hdfs://wejoyservice",
"column": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
}

],
"writeMode": "append",
"hadoopConfig": {
"dfs.nameservices": "wejoyservice",
"dfs.ha.namenodes.wejoyservice": "nn1,nn2",
"dfs.namenode.rpc-address.wejoyservice.nn1": "10-96-25-10-master01.dp.local:8020",
"dfs.namenode.rpc-address.wejoyservice.nn2": "10-96-25-11-master02.dp.local:8020",
"dfs.client.failover.proxy.provider.wejoyservice": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" //使用 ConfiguredFailoverProxyProvider 来实现主备切换。
}
}
}
}
]
}
}

Seatunnel基本介绍

详情查看博客中seatunnel中的内容

Flat file:

  • 注意这张表的格式 是一个JSON格式的表,可以做到直接将一个json字段的key映射为表中的字段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
CREATE EXTERNAL TABLE `ods.ods_third_{third_name}_i`(
`additionparams` string COMMENT 'from deserializer',
`attempt_number` string COMMENT 'from deserializer',
`callback_status` string COMMENT 'from deserializer',
`channel` string COMMENT 'from deserializer',
`cost` string COMMENT 'from deserializer',
`mongouniqueid` string COMMENT 'from deserializer',
`order_id` string COMMENT 'from deserializer',
`order_type` string COMMENT 'from deserializer',
`query_interface` string COMMENT 'from deserializer',
`query_result` struct \<result_msg:string,req_msg_id:string,result_code:string,query_result:struct<umkt_out_put_info:string,base_info:struct<umkt_result:string,customer_key:string,query_template:string>COMMENT 'from deserializer',
`query_source` string COMMENT 'from deserializer',
`stage` string COMMENT 'from deserializer',
`status` string COMMENT 'from deserializer',
`user_id` string COMMENT 'from deserializer',
`createtimestr` string COMMENT 'from deserializer',
`create_time` string COMMENT 'from deserializer',
`dstype` string COMMENT 'from deserializer',
`dstimestamp` string COMMENT 'from deserializer')
COMMENT '原始三方'
PARTITIONED BY (
`dt` string)
ROW FORMAT SERDE
'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'mapping.channel'='channel|platformChannel',
'mapping.create_time'='create_time|createTime',
'mapping.createtimestr'='createTimeStr|createtimeStr',
'mapping.ds'='channel|platformChannel',
'mapping.ds_id'='1',
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://wejoyservice/user/hive/warehouse/ods.db/ods.ods_third_i/third_name=xxx1'

表结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
--商城用户信息表
CREATE EXTERNAL TABLE mall.ods_user_base_info_d (
user_id bigint COMMENT '用户ID',
first_channel string COMMENT '注册first_channel', -- 这里是我们的第一个注册渠道
second_channel string COMMENT '注册second_channel', -- 这里是我们的第二个注册渠道
user_sex string COMMENT '性别',
user_birthday string COMMENT '用户生日',
user_age int COMMENT '用户年龄',
mobile bigint COMMENT '用户手机号',
mobile_home string COMMENT '手机号归属地',
user_bind_mobile_time string COMMENT '用户手机号注册时间', --一个用户可能会有多个手机号,每个手机号的注册时间比较重要
`load_time` string COMMENT '数据导入时间'
)
COMMENT '用户信息表'
PARTITIONED BY (
`dt` string)
row format delimited fields terminated by ','
STORED AS TEXTFILE
location 'file:/root/wangqi/spark-warehouse/mall.db/ods_user_base_info_d';
-- 客户客户实名信息表商城 这里是有一些加密内容的
CREATE EXTERNAL TABLE mall.ods_user_auth_info_d (
user_id bigint COMMENT '用户ID',
orc_info string COMMENT '用户orc信息', --身份证正反面的orc
user_face string COMMENT '用户人脸信息', --这里的orc信息,人脸信息这些二进制存下来很长,主要也不是为了查询,而是为了撞库
id_card_md5 string COMMENT '身份证号MD5',
id_card_bind_time string COMMENT '身份证号注册时间'
)
COMMENT '用户实名信息表'
PARTITIONED BY (
`dt` string)
row format delimited fields terminated by ','
STORED AS TEXTFILE
location 'file:/root/wangqi/spark-warehouse/mall.db/ods_user_auth_info_d';

--员工员工基本信息表
CREATE EXTERNAL TABLE mall.ods_employee_base_info_d (
uid bigint COMMENT '员工ID',
mobile int COMMENT '用户手机号',
mobile_home string COMMENT '手机号归属地',
entry_time string COMMENT '入职时间', --入职和离职时间是核心
dimission_time string COMMENT '离职时间'
)
COMMENT '员工员工基本信息表'
PARTITIONED BY (
`dt` string)
row format delimited fields terminated by ','
STORED AS TEXTFILE
location 'file:/root/wangqi/spark-warehouse/mall.db/ods_employee_base_info_d';

--商家商家基本信息表
CREATE EXTERNAL TABLE mall.ods_merchant_base_info_d (
uuid bigint COMMENT '商家ID',
mobile int COMMENT '商家手机号',
settlement_time string COMMENT '入驻时间',
offsite_time string COMMENT '离场时间',
merchant_level int COMMENT '商家等级'
)
COMMENT '商家基本信息表'
PARTITIONED BY (
`dt` string)
row format delimited fields terminated by ','
STORED AS TEXTFILE
location 'file:/root/wangqi/spark-warehouse/mall.db/ods_merchant_base_info_d';

--订单基本信息表
CREATE EXTERNAL TABLE mall.ods_order_base_info_d (
order_id bigint COMMENT '订单ID',
user_id bigint COMMENT '客户ID',
uuid bigint COMMENT '商家ID',
product_id bigint COMMENT '商品ID',
order_generate_time string COMMENT '订单生成时间',
order_status string COMMENT '订单状态'
)
COMMENT '订单基本信息表'
PARTITIONED BY (
`dt` string)
row format delimited fields terminated by ','
STORED AS TEXTFILE
location 'file:/root/wangqi/spark-warehouse/mall.db/ods_order_base_info_d';

--商品基本信息表
CREATE EXTERNAL TABLE mall.ods_product_base_info_d (
product_id bigint COMMENT '商品ID',
sku string COMMENT '商品SKU',
product_information string COMMENT '商品详情',
product_price string COMMENT '商品价格'
)
COMMENT '商品基本信息表'
PARTITIONED BY (
`dt` string)
row format delimited fields terminated by ','
STORED AS TEXTFILE
location 'file:/root/wangqi/spark-warehouse/mall.db/ods_product_base_info_d';

--贷前订单信息表借贷
CREATE EXTERNAL TABLE loan.ods_credit_order_base_info_d (
credit_id bigint COMMENT '贷前订单ID',
credit_amount string COMMENT '贷前订单金额',
apply_status string COMMENT '订单申请状态',
apply_time string COMMENT '订单申请时间',
audit_time string COMMENT '订单通过时间'
)
COMMENT '贷前订单信息表借贷'
PARTITIONED BY (
`dt` string)
row format delimited fields terminated by ','
STORED AS TEXTFILE
location 'file:/root/wangqi/spark-warehouse/loan.db/ods_credit_order_base_info_d';

--贷中订单信息表借贷
CREATE EXTERNAL TABLE loan.ods_loan_order_base_info_d (
loan_id bigint COMMENT '贷中订单ID',
loan_day int COMMENT '借款天数',
interest_rate string COMMENT '借款利率',
due_amount string COMMENT '应还本金',
act_amount string COMMENT '实还本金',
repay_type string COMMENT '还款方式'
)
COMMENT '贷中订单信息表借贷'
PARTITIONED BY (
`dt` string)
row format delimited fields terminated by ','
STORED AS TEXTFILE
location 'file:/root/wangqi/spark-warehouse/loan.db/ods_loan_order_base_info_d';

--决策流信息表
CREATE EXTERNAL TABLE loan.ods_decision_base_info_d (
decision_id bigint COMMENT '决策流ID',
decision_tree string COMMENT '决策树',
decision_status string COMMENT '决策状态'
)
COMMENT '决策流信息表'
PARTITIONED BY (
`dt` string)
row format delimited fields terminated by ','
STORED AS TEXTFILE
location 'file:/root/wangqi/spark-warehouse/loan.db/ods_decision_base_info_d';


--模型结果表
CREATE EXTERNAL TABLE loan.ods_model_base_info_d (
model_id bigint COMMENT '模型ID',
model_mark string COMMENT '模型分数',
model_status string COMMENT '模型状态'
)
COMMENT '模型结果表'
PARTITIONED BY (
`dt` string)
row format delimited fields terminated by ','
STORED AS TEXTFILE
location 'file:/root/wangqi/spark-warehouse/loan.db/ods_model_base_info_d';

--风险因子信息表
CREATE EXTERNAL TABLE loan.ods_risk_factor_base_info_d (
factor_id bigint COMMENT '因子ID',
factor_name string COMMENT '因子名',
factor_mark string COMMENT '因子分数'
)
COMMENT '风险因子信息表'
PARTITIONED BY (
`dt` string)
row format delimited fields terminated by ','
STORED AS TEXTFILE
location 'file:/root/wangqi/spark-warehouse/loan.db/ods_risk_factor_base_info_d';