电商业务简介
电商业务流程电商常识
SKU和SPU平台属性和销售属性 电商业务数据
电商系统表结构
活动信息表(activity_info)活动规则表(activity_rule)活动商品关联表(activity_sku)平台属性表(base_attr_info)平台属性值表(base_attr_value)一级分类表(base_category1)二级分类表(base_category2)三级分类表(base_category3)字典表(base_dic)省份表(base_province)地区表(base_region)品牌表(base_trademark)购物车表(cart_info)评价表(comment_info)优惠券信息表(coupon_info)优惠券优惠范围表(coupon_range)优惠券领用表(coupon_use)收藏表(favor_info)订单明细表(order_detail)订单明细活动关联表(order_detail_activity)订单明细优惠券关联表(order_detail_coupon)订单表(order_info)退单表(order_refund_info)订单状态流水表(order_status_log)支付表(payment_info)退款表(refund_payment)SKU平台属性值表(sku_attr_value)SKU信息表(sku_info)SKU销售属性表(sku_sale_attr_value)SPU信息表(spu_info)SPU销售属性表(spu_sale_attr)SPU销售属性值表(spu_sale_attr_value)用户地址表(user_address)用户信息表(user_info) 模拟生成业务数据
MySQL安装业务数据生成业务数据梳理工具 业务数据采集模块
业务数据同步概述
数据同步策略概述数据同步策略选择数据同步工具概述数据同步工具部署 全量表数据同步
数据通道DataX 配置文件DataX 配置文件生成脚本测试生成的 DataX 配置文件全量表数据同步脚本全量表同步总结 增量表数据同步
数据通道Maxwell 配置Flume 配置增量表首日全量同步增量表同步总结 数仓环境准备
电商业务简介 电商业务流程以用户的浏览足迹为例说明
用户点开电商首页开始浏览,通过分类查询或通过全文搜索寻找自己中意的商品 , 将商品添加到购物车后,对商品进行结算,这时候购物车的管理和商品订单信息的生成都会对业务数据库产生影响,会生成相应的订单数据和支付数据
订单正式生成之后,还会对订单进行跟踪处理,直到订单全部完成
电商的业务流程 : 用户前台浏览商品时的商品详情的管理,用户商品加入购物车进行支付时用户个人中心&支付服务的管理,用户支付完成后订单后台服务的管理
电商常识 SKU和SPUSKU (Stock Keeping Unit)(库存量基本单位): 产品统一编号 , 每种产品均对应有唯一的SKU号 , 如 : 一台银色、128G内存的、支持联通网络的iPhoneX
SPU( Standard Product Unit ):是商品信息聚合的最小单位,是一组可复用、易检索的标准化信息集合 , 如 : iPhoneX手机
SPU : 一类商品
同一SPU的商品可以共用 :
商品图片海报销售属性
平台属性和销售属性平台属性 :
销售属性 :
电商业务数据 电商系统表结构中心 :
订单表 :用户表 : 用户的详细信息SKU商品表 : 商品的详细信息活动表优惠券表
延伸 :
优惠券领用表支付流水表 : 该订单的支付详情活动订单表订单详情表 : 订单的商品数量订单状态表商品评论表编码字典表退单表SPU商品表
电商业务表 :
后台管理系统 :
活动信息表(activity_info)| 字段名 | 字段说明 | 类型 |
|---|---|---|
| id | 活动id | bigint(20) |
| activity_name | 活动名称 | varchar(200) |
| activity_type | 活动类型(1:满减,2:折扣) | varchar(10) |
| activity_desc | 活动描述 | varchar(2000) |
| start_time | 开始时间 | datetime(0) |
| end_time | 结束时间 | datetime(0) |
| create_time | 创建时间 | datetime(0) |
字段名
| 字段名 | 字段说明 | 类型 |
|---|---|---|
| id | 编号 | int(11) |
| activity_id | 活动ID | int(11) |
| activity_type | 活动类型 | varchar(20) |
| condition_amount | 满减金额 | decimal(16, 2) |
| condition_num | 满减件数 | bigint(20) |
| benefit_amount | 优惠金额 | decimal(16, 2) |
| benefit_discount | 优惠折扣 | decimal(10, 2) |
| benefit_level | 优惠级别 | bigint(20) |
| 字段名 | 字段说明 | 类型 |
|---|---|---|
| id | 编号 | bigint(20) |
| activity_id | 活动 id | bigint(20) |
| sku_id | sku_id | bigint(20) |
| create_time | 创建时间 | datetime(0) |
| 字段名 | 字段说明 | 类型 |
|---|---|---|
| id | 编号 | bigint(20) |
| attr_name | 属性名称 | varchar(100) |
| category_id | 分类id | bigint(20) |
| category_level | 分类层级 | int(11) |
| 字段名 | 字段说明 | 类型 |
|---|---|---|
| id | 编号 | bigint(20) |
| value_name | 属性值名称 | varchar(100) |
| attr_id | 属性id | bigint(20) |
| 字段名 | 字段说明 | 类型 |
|---|---|---|
| id | 编号 | bigint(20) |
| name | 分类名称 | varchar(10) |
| 字段名 | 字段说明 | 类型 |
|---|---|---|
| id | 编号 | bigint(20) |
| name | 二级分类名称 | varchar(200) |
| category1_id | 一级分类编号 | bigint(20) |
| 字段名 | 字段说明 | 类型 |
|---|---|---|
| id | 编号 | bigint(20) |
| name | 三级分类名称 | varchar(200) |
| category2_id | 二级分类编号 | bigint(20) |
| 字段名 | 字段说明 | 类型 |
|---|---|---|
| dic_code | 编号 | varchar(10) |
| dic_name | 编码名称 | varchar(100) |
| parent_code | 父编号 | varchar(10) |
| create_time | 创建日期 | datetime(0) |
| operate_time | 修改日期 | datetime(0) |
| 字段名 | 字段说明 | 类型 |
|---|---|---|
| id | id | bigint(20) |
| name | 省名称 | varchar(20) |
| region_id | 大区id | varchar(20) |
| area_code | 行政区位码 | varchar(20) |
| iso_code | 国际编码 | varchar(20) |
| iso_3166_2 | ISO3166编码 | varchar(20) |
| 字段名 | 字段说明 | 类型 |
|---|---|---|
| id | 大区id | varchar(20) |
| region_name | 大区名称 | varchar(20) |
| 字段名 | 字段说明 | 类型 |
|---|---|---|
| id | 编号 | bigint(20) |
| tm_name | 属性值 | |
| logo_url | 品牌logo的图片路径 | varchar(20) |
| 字段名 | 字段说明 |
|---|---|
| id | 编号 |
| user_id | 用户id |
| sku_id | skuid |
| cart_price | 放入购物车时价格 |
| sku_num | 数量 |
| img_url | 图片文件 |
| sku_name | sku名称 (冗余) |
| is_checked | |
| create_time | 创建时间 |
| operate_time | 修改时间 |
| is_ordered | 是否已经下单 |
| order_time | 下单时间 |
| source_type | 来源类型 |
| source_id | 来源编号 |
| 字段名 | 字段说明 |
|---|---|
| id | 编号 |
| user_id | 用户id |
| nick_name | 用户昵称 |
| head_img | |
| sku_id | skuid |
| spu_id | 商品id |
| order_id | 订单编号 |
| appraise | 评价 1 好评 2 中评 3 差评 |
| comment_txt | 评价内容 |
| create_time | 创建时间 |
| operate_time | 修改时间 |
| 字段名 | 字段说明 |
|---|---|
| id | 购物券编号 |
| coupon_name | 购物券名称 |
| coupon_type | 购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券 |
| condition_amount | 满额数(3) |
| condition_num | 满件数(4) |
| activity_id | 活动编号 |
| benefit_amount | 减金额(1 3) |
| benefit_discount | 折扣(2 4) |
| create_time | 创建时间 |
| range_type | 范围类型 1、商品(spuid) 2、品类(三级分类id) 3、品牌 |
| limit_num | 最多领用次数 |
| taken_count | 已领用次数 |
| start_time | 可以领取的开始日期 |
| end_time | 可以领取的结束日期 |
| operate_time | 修改时间 |
| expire_time | 过期时间 |
| range_desc | 范围描述 |
| 字段名 | 字段说明 |
|---|---|
| id | 购物券编号 |
| coupon_id | 优惠券id |
| range_type | 范围类型 1、商品(spuid) 2、品类(三级分类id) 3、品牌 |
| range_id |
| 字段名 | 字段说明 |
|---|---|
| id | 编号 |
| coupon_id | 购物券ID |
| user_id | 用户ID |
| order_id | 订单ID |
| coupon_status | 购物券状态(1:未使用 2:已使用) |
| get_time | 获取时间 |
| using_time | 使用时间 |
| used_time | 支付时间 |
| expire_time | 过期时间 |
| 字段名 | 字段说明 |
|---|---|
| id | 编号 |
| user_id | 用户名称 |
| sku_id | skuid |
| spu_id | 商品id |
| is_cancel | 是否已取消 0 正常 1 已取消 |
| create_time | 创建时间 |
| cancel_time | 修改时间 |
| 字段名 | 字段说明 |
|---|---|
| id | 编号 |
| order_id | 订单编号 |
| sku_id | sku_id |
| sku_name | sku名称(冗余) |
| img_url | 图片名称(冗余) |
| order_price | 购买价格(下单时sku价格) |
| sku_num | 购买个数 |
| create_time | 创建时间 |
| source_type | 来源类型 |
| source_id | 来源编号 |
| split_total_amount | 分摊总金额 |
| split_activity_amount | 分摊活动减免金额 |
| split_coupon_amount | 分摊优惠券减免金额 |
| 字段名 | 字段说明 |
|---|---|
| id | 编号 |
| order_id | 订单id |
| order_detail_id | 订单明细id |
| activity_id | 活动ID |
| activity_rule_id | 活动规则 |
| sku_id | skuID |
| create_time | 获取时间 |
| 字段名 | 字段说明 |
|---|---|
| id | 编号 |
| order_id | 订单id |
| order_detail_id | 订单明细id |
| coupon_id | 购物券ID |
| coupon_use_id | 购物券领用id |
| sku_id | skuID |
| create_time | 获取时间 |
| 字段名 | 字段说明 |
|---|---|
| id | 编号 |
| consignee | 收货人 |
| consignee_tel | 收件人电话 |
| total_amount | 总金额 |
| order_status | 订单状态 |
| user_id | 用户id |
| payment_way | 付款方式 |
| delivery_address | 送货地址 |
| order_comment | 订单备注 |
| out_trade_no | 订单交易编号(第三方支付用) |
| trade_body | 订单描述(第三方支付用) |
| create_time | 创建时间 |
| operate_time | 操作时间 |
| expire_time | 失效时间 |
| process_status | 进度状态 |
| tracking_no | 物流单编号 |
| parent_order_id | 父订单编号 |
| img_url | 图片路径 |
| province_id | 地区 |
| activity_reduce_amount | 促销金额 |
| coupon_reduce_amount | 优惠券 |
| original_total_amount | 原价金额 |
| freight_fee | 运费 |
| freight_fee_reduce | 运费减免 |
| refundable_time | 可退款日期(签收后30天) |
| 字段名 | 字段说明 |
|---|---|
| id | 编号 |
| user_id | 用户id |
| order_id | 订单id |
| sku_id | skuid |
| refund_type | 退款类型 |
| refund_num | 退货件数 |
| refund_amount | 退款金额 |
| refund_reason_type | 原因类型 |
| refund_reason_txt | 原因内容 |
| refund_status | 退款状态(0:待审批 1:已退款) |
| create_time | 创建时间 |
| 字段名 | 字段说明 |
|---|---|
| id | |
| order_id | |
| order_status | |
| operate_time |
| 字段名 | 字段说明 |
|---|---|
| id | 编号 |
| out_trade_no | 对外业务编号 |
| order_id | 订单编号 |
| user_id | |
| payment_type | 支付类型(微信 支付宝) |
| trade_no | 交易编号 |
| total_amount | 支付金额 |
| subject | 交易内容 |
| payment_status | 支付状态 |
| create_time | 创建时间 |
| callback_time | 回调时间 |
| callback_content | 回调信息 |
| 字段名 | 字段说明 |
|---|---|
| id | 编号 |
| out_trade_no | 对外业务编号 |
| order_id | 订单编号 |
| sku_id | |
| payment_type | 支付类型(微信 支付宝) |
| trade_no | 交易编号 |
| total_amount | 退款金额 |
| subject | 交易内容 |
| refund_status | 退款状态 |
| create_time | 创建时间 |
| callback_time | 回调时间 |
| callback_content | 回调信息 |
| 字段名 | 字段说明 |
|---|---|
| id | 编号 |
| attr_id | 属性id(冗余) |
| value_id | 属性值id |
| sku_id | skuid |
| attr_name | 属性名称 |
| value_name | 属性值名称 |
| 字段名 | 字段说明 |
|---|---|
| id | 库存id(itemID) |
| spu_id | 商品id |
| price | 价格 |
| sku_name | sku名称 |
| sku_desc | 商品规格描述 |
| weight | 重量 |
| tm_id | 品牌(冗余) |
| category3_id | 三级分类id(冗余) |
| sku_default_img | 默认显示图片(冗余) |
| is_sale | 是否销售(1:是 0:否) |
| create_time | 创建时间 |
| 字段名 | 字段说明 |
|---|---|
| id | id |
| sku_id | 库存单元id |
| spu_id | spu_id(冗余) |
| sale_attr_value_id | 销售属性值id |
| sale_attr_id | |
| sale_attr_name | |
| sale_attr_value_name |
| 字段名 | 字段说明 |
|---|---|
| id | 商品id |
| spu_name | 商品名称 |
| description | 商品描述(后台简述) |
| category3_id | 三级分类id |
| tm_id | 品牌id |
| 字段名 | 字段说明 |
|---|---|
| id | 编号(业务中无关联) |
| spu_id | 商品id |
| base_sale_attr_id | 销售属性id |
| sale_attr_name | 销售属性名称(冗余) |
| 字段名 | 字段说明 |
|---|---|
| id | 销售属性值编号 |
| spu_id | 商品id |
| base_sale_attr_id | 销售属性id |
| sale_attr_value_name | 销售属性值名称 |
| sale_attr_name | 销售属性名称(冗余) |
| 字段名 | 字段说明 |
|---|---|
| id | 编号 |
| user_id | 用户id |
| province_id | 省份id |
| user_address | 用户地址 |
| consignee | 收件人 |
| phone_num | 联系方式 |
| is_default | 是否是默认 |
| 字段名 | 字段说明 |
|---|---|
| id | 编号 |
| login_name | 用户名称 |
| nick_name | 用户昵称 |
| passwd | 用户密码 |
| name | 用户姓名 |
| phone_num | 手机号 |
| 邮箱 | |
| head_img | 头像 |
| user_level | 用户级别 |
| birthday | 用户生日 |
| gender | 性别 M男,F女 |
| create_time | 创建时间 |
| operate_time | 修改时间 |
| status | 状态 |
在 cpucode101 的 /opt/module/db_log 文件夹
mkdir db_log/
把 gmall2020-mock-db-2021-11-14.jar 和 application.properties 上传到 cpucode101 的 /opt/module/db_log 路径上
根据需求修改 application.properties 相关配置
logging.level.root=info spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.url=jdbc:mysql://cpucode103:3306/gmall?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8 spring.datasource.username=root spring.datasource.password=123456 logging.pattern.console=%m%n mybatis-plus.global-config.db-config.field-strategy=not_null mybatis.mapperLocations=classpath:mapper/*.xml #业务日期 mock.date=2020-06-14 #是否重置,首日须设置为1 mock.clear=1 #是否重置用户,首日须设置为1 mock.clear.user=1 #生成新用户数量 mock.user.count=200 #男性比例 mock.user.male-rate=20 #用户数据变化概率 mock.user.update-rate:20 #收藏取消比例 mock.favor.cancel-rate=10 #收藏数量 mock.favor.count=100 #每个用户添加购物车的概率 mock.cart.user-rate=10 #每次每个用户最多添加多少种商品进购物车 mock.cart.max-sku-count=8 #每个商品最多买几个 mock.cart.max-sku-num=3 #购物车来源 用户查询,商品推广,智能推荐, 促销活动 mock.cart.source-type-rate=60:20:10:10 #用户下单比例 mock.order.user-rate=30 #用户从购物中购买商品比例 mock.order.sku-rate=50 #是否参加活动 mock.order.join-activity=1 #是否使用购物券 mock.order.use-coupon=1 #购物券领取人数 mock.coupon.user-count=100 #支付比例 mock.payment.rate=70 #支付方式 支付宝:微信 :银联 mock.payment.payment-type=30:60:10 #评价比例 好:中:差:自动 mock.comment.appraise-rate=30:10:10:50 #退款原因比例:质量问题 商品描述与实际描述不一致 缺货 号码不合适 拍错 不想买了 其他 mock.refund.reason-rate=30:10:20:5:15:5:5 logging.level.com.atguigu.gmall2020.mock.db.mapper=debug
生成 2020-06-14 日期数据:
java -jar gmall2020-mock-db-2021-11-14.jar
查看 gmall 数据库,观察是否有 2020-06-14 的数据出现
业务数据梳理工具 业务数据采集模块 业务数据同步概述 数据同步策略概述每日定时从业务数据库中抽取数据,传输到数据仓库中,之后再对数据进行分析统计
为保证统计结果的正确性,需要保证数据仓库中的数据与业务数据库是同步,离线数仓的计算周期通常为天,所以数据同步周期为天 ( 每天同步一次 )
数据的同步策略 :
全量同步增量同步
全量同步 : 每天都将业务数据库中的全部数据同步一份到数据仓库,保证两侧数据同步的最简单的方式
增量同步 : 每天只将业务数据中的新增及变化数据同步到数据仓库。采用每日增量同步的表 ( 首日一次全量同步 )
数据同步策略选择两种策略对比 :
| 同步策略 | 优点 | 缺点 |
|---|---|---|
| 全量同步 | 逻辑简单 | 在某些情况下效率较低。例如某张表数据量较大,但是每天数据的变化比例很低,若对其采用每日全量同步,则会重复同步和存储大量相同的数据。 |
| 增量同步 | 效率高,无需同步和存储重复数据 | 逻辑复杂,需要将每日的新增及变化数据同原来的数据进行整合,才能使用 |
结论:业务表数据量大,且每天数据变化低 ( 增量同步 ) ,否则 全量同步
各表同步策略:
数据同步工具概述数据同步工具 :
离线、批量同步 : 基于Select查询 , DataX、Sqoop实时流式同步 : 基于数据库数据变更日志 , Maxwell、Canal
| 增量同步方案 | DataX/Sqoop | Maxwell/Canal |
|---|---|---|
| 对数据库的要求 | 数据表中存在create_time、update_time等字段,然后根据这些字段获取变更数据 | 要求数据库记录变更操作,如 : MySQL开启 binlog |
| 数据的中间状态 | 获取最后一个状态,中间状态无法获取 | 获取变更数据的所有中间状态 |
全量同步 : DataX
增量同步 : Maxwell
数据同步工具部署DataX
Maxwell
全量表数据同步 数据通道全量表数据由 DataX 从 MySQL 业务数据库直接同步到 HDFS
DataX 配置文件目标路径中表名须包含后缀 full , 表示该表为全量同步
目标路径中包含一层日期 , 用以对不同天的数据进行区分
每张全量表编写一个 DataX 的 json 配置文件
栗子 : activity_info
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"activity_name",
"activity_type",
"activity_desc",
"start_time",
"end_time",
"create_time"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://cpucode101:3306/gmall"
],
"table": [
"activity_info"
]
}
],
"password": "123456",
"splitPk": "",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "activity_name",
"type": "string"
},
{
"name": "activity_type",
"type": "string"
},
{
"name": "activity_desc",
"type": "string"
},
{
"name": "start_time",
"type": "string"
},
{
"name": "end_time",
"type": "string"
},
{
"name": "create_time",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://cpucode101:8020",
"fieldDelimiter": "t",
"fileName": "activity_info",
"fileType": "text",
"path": "${targetdir}",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
DataX 配置文件生成脚本由于目标路径包含一层日期,用于对不同天的数据加以区分,故 path 参数并未写死,需在提交任务时通过参数动态传入,参数名称为 targetdir
DataX配置文件批量生成脚本
gen_import_config.py 脚本
vim gen_import_config.py
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb
#MySQL相关配置,需根据实际情况作出修改
mysql_host = "cpucode101"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "123456"
#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "cpucode101"
hdfs_nn_port = "8020"
#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/import"
#获取mysql连接
def get_connection():
return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)
#获取表格的元数据 包含列名和数据类型
def get_mysql_meta(database, table):
connection = get_connection()
cursor = connection.cursor()
sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERe TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
cursor.execute(sql, [database, table])
fetchall = cursor.fetchall()
cursor.close()
connection.close()
return fetchall
#获取mysql表的列名
def get_mysql_columns(database, table):
return map(lambda x: x[0], get_mysql_meta(database, table))
#将获取的元数据中 mysql 的数据类型转换为 hive 的数据类型 写入到 hdfswriter 中
def get_hive_columns(database, table):
def type_mapping(mysql_type):
mappings = {
"bigint": "bigint",
"int": "bigint",
"smallint": "bigint",
"tinyint": "bigint",
"decimal": "string",
"double": "double",
"float": "float",
"binary": "string",
"char": "string",
"varchar": "string",
"datetime": "string",
"time": "string",
"timestamp": "string",
"date": "string",
"text": "string"
}
return mappings[mysql_type]
meta = get_mysql_meta(database, table)
return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)
#生成json文件
def generate_json(source_database, source_table):
job = {
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": mysql_user,
"password": mysql_passwd,
"column": get_mysql_columns(source_database, source_table),
"splitPk": "",
"connection": [{
"table": [source_table],
"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
}]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
"fileType": "text",
"path": "${targetdir}",
"fileName": source_table,
"column": get_hive_columns(source_database, source_table),
"writeMode": "append",
"fieldDelimiter": "t",
"compress": "gzip"
}
}
}]
}
}
if not os.path.exists(output_path):
os.makedirs(output_path)
with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
json.dump(job, f)
def main(args):
source_database = ""
source_table = ""
options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
for opt_name, opt_value in options:
if opt_name in ('-d', '--sourcedb'):
source_database = opt_value
if opt_name in ('-t', '--sourcetbl'):
source_table = opt_value
generate_json(source_database, source_table)
if __name__ == '__main__':
main(sys.argv[1:])
安装 Python Mysql 驱动
sudo yum install -y MySQL-python
脚本使用说明
python gen_import_config.py -d database -t table
-d : 数据库名-t : 表名
创建 gen_import_config.sh 脚本
vim gen_import_config.sh
#!/bin/bash python ~/bin/gen_import_config.py -d gmall -t activity_info python ~/bin/gen_import_config.py -d gmall -t activity_rule python ~/bin/gen_import_config.py -d gmall -t base_category1 python ~/bin/gen_import_config.py -d gmall -t base_category2 python ~/bin/gen_import_config.py -d gmall -t base_category3 python ~/bin/gen_import_config.py -d gmall -t base_dic python ~/bin/gen_import_config.py -d gmall -t base_province python ~/bin/gen_import_config.py -d gmall -t base_region python ~/bin/gen_import_config.py -d gmall -t base_trademark python ~/bin/gen_import_config.py -d gmall -t cart_info python ~/bin/gen_import_config.py -d gmall -t coupon_info python ~/bin/gen_import_config.py -d gmall -t sku_attr_value python ~/bin/gen_import_config.py -d gmall -t sku_info python ~/bin/gen_import_config.py -d gmall -t sku_sale_attr_value python ~/bin/gen_import_config.py -d gmall -t spu_info
gen_import_config.sh 脚本增加执行权限
chmod 777 gen_import_config.sh
执行 gen_import_config.sh 脚本,生成配置文件
gen_import_config.sh
配置文件 :
ll /opt/module/datax/job/import/测试生成的 DataX 配置文件 全量表数据同步脚本
全量表数据同步脚本 mysql_to_hdfs_full.sh
vim mysql_to_hdfs_full.sh
#!/bin/bash
DATAX_HOME=/opt/module/datax
# 如果传入日期则do_date等于传入的日期,否则等于前一天日期
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi
#处理目标路径,此处的处理逻辑是,
#如果目标路径不存在,则创建;
#若存在,则清空,目的是保证同步任务可重复执行
handle_targetdir() {
hadoop fs -test -e $1
if [[ $? -eq 1 ]]; then
echo "路径$1不存在,正在创建......"
hadoop fs -mkdir -p $1
else
echo "路径$1已经存在"
fs_count=$(hadoop fs -count $1)
content_size=$(echo $fs_count | awk '{print $3}')
if [[ $content_size -eq 0 ]]; then
echo "路径$1为空"
else
echo "路径$1不为空,正在清空......"
hadoop fs -rm -r -f $1/*
fi
fi
}
#数据同步
import_data() {
datax_config=$1
target_dir=$2
handle_targetdir $target_dir
python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config
}
case $1 in
"activity_info")
import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
;;
"activity_rule")
import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
;;
"base_category1")
import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
;;
"base_category2")
import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
;;
"base_category3")
import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
;;
"base_dic")
import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
;;
"base_province")
import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
;;
"base_region")
import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
;;
"base_trademark")
import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
;;
"cart_info")
import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
;;
"coupon_info")
import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
;;
"sku_attr_value")
import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
;;
"sku_info")
import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
;;
"sku_sale_attr_value")
import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
;;
"spu_info")
import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
;;
"all")
import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
;;
esac
mysql_to_hdfs_full.sh 增加执行权限
chmod 777 mysql_to_hdfs_full.sh
测试同步脚本
mysql_to_hdfs_full.sh all 2020-06-14
检查同步结果
查看HDFS目表路径是否出现全量表数据,全量表共15张
全量表同步总结全量表同步逻辑比较简单,只需每日执行全量表数据同步脚本 mysql_to_hdfs_full.sh
增量表数据同步 数据通道Maxwell 配置目标路径中表名须包含后缀 inc,为增量同步
目标路径中包含一层日期,用以对不同天的数据进行区分
有 cart_info 、comment_info 等共计13张表需进行增量同步,Maxwell 同步 binlog 中的所有表的数据变更记录
为方便下游使用数据, Maxwell 将不同表的数据发往不同的 Kafka Topic
修改 Maxwell 配置文件 config.properties
vim /opt/module/maxwell/config.properties
log_level=info
producer=kafka
kafka.bootstrap.servers=cpucode101:9092,cpucode102:9092
#kafka topic动态配置
kafka_topic=%{table}
# mysql login info
host=cpucode101
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
#表过滤,只同步特定的13张表
filter= include:gmall.cart_info,include:gmall.comment_info,include:gmall.coupon_use,include:gmall.favor_info,include:gmall.order_detail,include:gmall.order_detail_activity,include:gmall.order_detail_coupon,include:gmall.order_info,include:gmall.order_refund_info,include:gmall.order_status_log,include:gmall.payment_info,include:gmall.refund_payment,include:gmall.user_info
重新启动 Maxwell
mxw.sh restartFlume 配置
Flume 需要将 Kafka 中各 topic 的数据传输到 HDFS,故其需选用 KafkaSource 以及 HDFSSink ,Channe 选用 FileChanne
KafkaSource 需订阅 Kafka 中的 13 个 topic,HDFSSink 需要将不同 topic 的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = cart_info,comment_info,coupon_use,favor_info,order_detail_activity,order_detail_coupon,order_detail,order_info,order_refund_info,order_status_log,payment_info,refund_payment,user_info
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.db.TimestampInterceptor$Builder
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1123456
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{topic}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
增量表首日全量同步org.apache.flume flume-ng-core 1.9.0 provided com.alibaba fastjson 1.2.62 maven-compiler-plugin 2.3.2 1.8 1.8 maven-assembly-plugin jar-with-dependencies make-assembly package single
增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用 Maxwell 的 bootstrap 功能
mysql_to_kafka_inc_init.sh
vim mysql_to_kafka_inc_init.sh
#!/bin/bash
# 该脚本的作用是初始化所有的增量表,只需执行一次
MAXWELL_HOME=/opt/module/maxwell
import_data() {
$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}
case $1 in
"cart_info")
import_data cart_info
;;
"comment_info")
import_data comment_info
;;
"coupon_use")
import_data coupon_use
;;
"favor_info")
import_data favor_info
;;
"order_detail")
import_data order_detail
;;
"order_detail_activity")
import_data order_detail_activity
;;
"order_detail_coupon")
import_data order_detail_coupon
;;
"order_info")
import_data order_info
;;
"order_refund_info")
import_data order_refund_info
;;
"order_status_log")
import_data order_status_log
;;
"payment_info")
import_data payment_info
;;
"refund_payment")
import_data refund_payment
;;
"user_info")
import_data user_info
;;
"all")
import_data cart_info
import_data comment_info
import_data coupon_use
import_data favor_info
import_data order_detail
import_data order_detail_activity
import_data order_detail_coupon
import_data order_info
import_data order_refund_info
import_data order_status_log
import_data payment_info
import_data refund_payment
import_data user_info
;;
esac
mysql_to_kafka_inc_init.sh 增加执行权限
chmod 777 mysql_to_kafka_inc_init.sh
清理历史数据
hadoop fs -ls /origin_data/gmall/db | grep _inc | awk '{print $8}' | xargs hadoop fs -rm -r -f
执行同步脚本
mysql_to_kafka_inc_init.sh all
观察HDFS上是否重新出现增量表数据
增量表同步总结增量表同步,需要在首日进行一次全量同步,后续每日才是增量同步。首日进行全量同步时,需先启动数据通道,包括 Maxwell、Kafka、Flume,然后执行增量表首日同步脚本 mysql_to_kafka_inc_init.sh 进行同步。后续每日只需保证采集通道正常运行即可,Maxwell 便会实时将变动数据发往 Kafka
数仓环境准备


