栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

业务数据采集平台

业务数据采集平台

业务数据采集平台

电商业务简介

电商业务流程电商常识

SKU和SPU平台属性和销售属性 电商业务数据

电商系统表结构

活动信息表(activity_info)活动规则表(activity_rule)活动商品关联表(activity_sku)平台属性表(base_attr_info)平台属性值表(base_attr_value)一级分类表(base_category1)二级分类表(base_category2)三级分类表(base_category3)字典表(base_dic)省份表(base_province)地区表(base_region)品牌表(base_trademark)购物车表(cart_info)评价表(comment_info)优惠券信息表(coupon_info)优惠券优惠范围表(coupon_range)优惠券领用表(coupon_use)收藏表(favor_info)订单明细表(order_detail)订单明细活动关联表(order_detail_activity)订单明细优惠券关联表(order_detail_coupon)订单表(order_info)退单表(order_refund_info)订单状态流水表(order_status_log)支付表(payment_info)退款表(refund_payment)SKU平台属性值表(sku_attr_value)SKU信息表(sku_info)SKU销售属性表(sku_sale_attr_value)SPU信息表(spu_info)SPU销售属性表(spu_sale_attr)SPU销售属性值表(spu_sale_attr_value)用户地址表(user_address)用户信息表(user_info) 模拟生成业务数据

MySQL安装业务数据生成业务数据梳理工具 业务数据采集模块

业务数据同步概述

数据同步策略概述数据同步策略选择数据同步工具概述数据同步工具部署 全量表数据同步

数据通道DataX 配置文件DataX 配置文件生成脚本测试生成的 DataX 配置文件全量表数据同步脚本全量表同步总结 增量表数据同步

数据通道Maxwell 配置Flume 配置增量表首日全量同步增量表同步总结 数仓环境准备

电商业务简介 电商业务流程

以用户的浏览足迹为例说明

用户点开电商首页开始浏览,通过分类查询或通过全文搜索寻找自己中意的商品 , 将商品添加到购物车后,对商品进行结算,这时候购物车的管理和商品订单信息的生成都会对业务数据库产生影响,会生成相应的订单数据和支付数据

订单正式生成之后,还会对订单进行跟踪处理,直到订单全部完成

电商的业务流程 : 用户前台浏览商品时的商品详情的管理,用户商品加入购物车进行支付时用户个人中心&支付服务的管理,用户支付完成后订单后台服务的管理

电商常识 SKU和SPU

SKU (Stock Keeping Unit)(库存量基本单位): 产品统一编号 , 每种产品均对应有唯一的SKU号 , 如 : 一台银色、128G内存的、支持联通网络的iPhoneX

SPU( Standard Product Unit ):是商品信息聚合的最小单位,是一组可复用、易检索的标准化信息集合 , 如 : iPhoneX手机

SPU : 一类商品

同一SPU的商品可以共用 :

商品图片海报销售属性

平台属性和销售属性

平台属性 :

销售属性 :

电商业务数据 电商系统表结构

中心 :

订单表 :用户表 : 用户的详细信息SKU商品表 : 商品的详细信息活动表优惠券表

延伸 :

优惠券领用表支付流水表 : 该订单的支付详情活动订单表订单详情表 : 订单的商品数量订单状态表商品评论表编码字典表退单表SPU商品表

电商业务表 :

后台管理系统 :

活动信息表(activity_info)
字段名字段说明类型
id活动idbigint(20)
activity_name活动名称varchar(200)
activity_type活动类型(1:满减,2:折扣)varchar(10)
activity_desc活动描述varchar(2000)
start_time开始时间datetime(0)
end_time结束时间datetime(0)
create_time创建时间datetime(0)
活动规则表(activity_rule)

字段名

字段名字段说明类型
id编号int(11)
activity_id活动IDint(11)
activity_type活动类型varchar(20)
condition_amount满减金额decimal(16, 2)
condition_num满减件数bigint(20)
benefit_amount优惠金额decimal(16, 2)
benefit_discount优惠折扣decimal(10, 2)
benefit_level优惠级别bigint(20)
活动商品关联表(activity_sku)
字段名字段说明类型
id编号bigint(20)
activity_id活动 idbigint(20)
sku_idsku_idbigint(20)
create_time创建时间datetime(0)
平台属性表(base_attr_info)
字段名字段说明类型
id编号bigint(20)
attr_name属性名称varchar(100)
category_id分类idbigint(20)
category_level分类层级int(11)
平台属性值表(base_attr_value)
字段名字段说明类型
id编号bigint(20)
value_name属性值名称varchar(100)
attr_id属性idbigint(20)
一级分类表(base_category1)
字段名字段说明类型
id编号bigint(20)
name分类名称varchar(10)
二级分类表(base_category2)
字段名字段说明类型
id编号bigint(20)
name二级分类名称varchar(200)
category1_id一级分类编号bigint(20)
三级分类表(base_category3)
字段名字段说明类型
id编号bigint(20)
name三级分类名称varchar(200)
category2_id二级分类编号bigint(20)
字典表(base_dic)
字段名字段说明类型
dic_code编号varchar(10)
dic_name编码名称varchar(100)
parent_code父编号varchar(10)
create_time创建日期datetime(0)
operate_time修改日期datetime(0)
省份表(base_province)
字段名字段说明类型
ididbigint(20)
name省名称varchar(20)
region_id大区idvarchar(20)
area_code行政区位码varchar(20)
iso_code国际编码varchar(20)
iso_3166_2ISO3166编码varchar(20)
地区表(base_region)
字段名字段说明类型
id大区idvarchar(20)
region_name大区名称varchar(20)
品牌表(base_trademark)
字段名字段说明类型
id编号bigint(20)
tm_name属性值
logo_url品牌logo的图片路径varchar(20)
购物车表(cart_info)
字段名字段说明
id编号
user_id用户id
sku_idskuid
cart_price放入购物车时价格
sku_num数量
img_url图片文件
sku_namesku名称 (冗余)
is_checked
create_time创建时间
operate_time修改时间
is_ordered是否已经下单
order_time下单时间
source_type来源类型
source_id来源编号
评价表(comment_info)
字段名字段说明
id编号
user_id用户id
nick_name用户昵称
head_img
sku_idskuid
spu_id商品id
order_id订单编号
appraise评价 1 好评 2 中评 3 差评
comment_txt评价内容
create_time创建时间
operate_time修改时间
优惠券信息表(coupon_info)
字段名字段说明
id购物券编号
coupon_name购物券名称
coupon_type购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券
condition_amount满额数(3)
condition_num满件数(4)
activity_id活动编号
benefit_amount减金额(1 3)
benefit_discount折扣(2 4)
create_time创建时间
range_type范围类型 1、商品(spuid) 2、品类(三级分类id) 3、品牌
limit_num最多领用次数
taken_count已领用次数
start_time可以领取的开始日期
end_time可以领取的结束日期
operate_time修改时间
expire_time过期时间
range_desc范围描述
优惠券优惠范围表(coupon_range)
字段名字段说明
id购物券编号
coupon_id优惠券id
range_type范围类型 1、商品(spuid) 2、品类(三级分类id) 3、品牌
range_id
优惠券领用表(coupon_use)
字段名字段说明
id编号
coupon_id购物券ID
user_id用户ID
order_id订单ID
coupon_status购物券状态(1:未使用 2:已使用)
get_time获取时间
using_time使用时间
used_time支付时间
expire_time过期时间
收藏表(favor_info)
字段名字段说明
id编号
user_id用户名称
sku_idskuid
spu_id商品id
is_cancel是否已取消 0 正常 1 已取消
create_time创建时间
cancel_time修改时间
订单明细表(order_detail)
字段名字段说明
id编号
order_id订单编号
sku_idsku_id
sku_namesku名称(冗余)
img_url图片名称(冗余)
order_price购买价格(下单时sku价格)
sku_num购买个数
create_time创建时间
source_type来源类型
source_id来源编号
split_total_amount分摊总金额
split_activity_amount分摊活动减免金额
split_coupon_amount分摊优惠券减免金额
订单明细活动关联表(order_detail_activity)
字段名字段说明
id编号
order_id订单id
order_detail_id订单明细id
activity_id活动ID
activity_rule_id活动规则
sku_idskuID
create_time获取时间
订单明细优惠券关联表(order_detail_coupon)
字段名字段说明
id编号
order_id订单id
order_detail_id订单明细id
coupon_id购物券ID
coupon_use_id购物券领用id
sku_idskuID
create_time获取时间
订单表(order_info)
字段名字段说明
id编号
consignee收货人
consignee_tel收件人电话
total_amount总金额
order_status订单状态
user_id用户id
payment_way付款方式
delivery_address送货地址
order_comment订单备注
out_trade_no订单交易编号(第三方支付用)
trade_body订单描述(第三方支付用)
create_time创建时间
operate_time操作时间
expire_time失效时间
process_status进度状态
tracking_no物流单编号
parent_order_id父订单编号
img_url图片路径
province_id地区
activity_reduce_amount促销金额
coupon_reduce_amount优惠券
original_total_amount原价金额
freight_fee运费
freight_fee_reduce运费减免
refundable_time可退款日期(签收后30天)
退单表(order_refund_info)
字段名字段说明
id编号
user_id用户id
order_id订单id
sku_idskuid
refund_type退款类型
refund_num退货件数
refund_amount退款金额
refund_reason_type原因类型
refund_reason_txt原因内容
refund_status退款状态(0:待审批 1:已退款)
create_time创建时间
订单状态流水表(order_status_log)
字段名字段说明
id
order_id
order_status
operate_time
支付表(payment_info)
字段名字段说明
id编号
out_trade_no对外业务编号
order_id订单编号
user_id
payment_type支付类型(微信 支付宝)
trade_no交易编号
total_amount支付金额
subject交易内容
payment_status支付状态
create_time创建时间
callback_time回调时间
callback_content回调信息
退款表(refund_payment)
字段名字段说明
id编号
out_trade_no对外业务编号
order_id订单编号
sku_id
payment_type支付类型(微信 支付宝)
trade_no交易编号
total_amount退款金额
subject交易内容
refund_status退款状态
create_time创建时间
callback_time回调时间
callback_content回调信息
SKU平台属性值表(sku_attr_value)
字段名字段说明
id编号
attr_id属性id(冗余)
value_id属性值id
sku_idskuid
attr_name属性名称
value_name属性值名称
SKU信息表(sku_info)
字段名字段说明
id库存id(itemID)
spu_id商品id
price价格
sku_namesku名称
sku_desc商品规格描述
weight重量
tm_id品牌(冗余)
category3_id三级分类id(冗余)
sku_default_img默认显示图片(冗余)
is_sale是否销售(1:是 0:否)
create_time创建时间
SKU销售属性表(sku_sale_attr_value)
字段名字段说明
idid
sku_id库存单元id
spu_idspu_id(冗余)
sale_attr_value_id销售属性值id
sale_attr_id
sale_attr_name
sale_attr_value_name
SPU信息表(spu_info)
字段名字段说明
id商品id
spu_name商品名称
description商品描述(后台简述)
category3_id三级分类id
tm_id品牌id
SPU销售属性表(spu_sale_attr)
字段名字段说明
id编号(业务中无关联)
spu_id商品id
base_sale_attr_id销售属性id
sale_attr_name销售属性名称(冗余)
SPU销售属性值表(spu_sale_attr_value)
字段名字段说明
id销售属性值编号
spu_id商品id
base_sale_attr_id销售属性id
sale_attr_value_name销售属性值名称
sale_attr_name销售属性名称(冗余)
用户地址表(user_address)
字段名字段说明
id编号
user_id用户id
province_id省份id
user_address用户地址
consignee收件人
phone_num联系方式
is_default是否是默认
用户信息表(user_info)
字段名字段说明
id编号
login_name用户名称
nick_name用户昵称
passwd用户密码
name用户姓名
phone_num手机号
email邮箱
head_img头像
user_level用户级别
birthday用户生日
gender性别 M男,F女
create_time创建时间
operate_time修改时间
status状态
模拟生成业务数据 MySQL安装 业务数据生成

在 cpucode101 的 /opt/module/db_log 文件夹

mkdir db_log/

把 gmall2020-mock-db-2021-11-14.jar 和 application.properties 上传到 cpucode101 的 /opt/module/db_log 路径上

根据需求修改 application.properties 相关配置

logging.level.root=info

spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://cpucode103:3306/gmall?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456

logging.pattern.console=%m%n

mybatis-plus.global-config.db-config.field-strategy=not_null
mybatis.mapperLocations=classpath:mapper/*.xml

#业务日期
mock.date=2020-06-14
#是否重置,首日须设置为1
mock.clear=1
#是否重置用户,首日须设置为1
mock.clear.user=1

#生成新用户数量
mock.user.count=200
#男性比例
mock.user.male-rate=20
#用户数据变化概率
mock.user.update-rate:20

#收藏取消比例
mock.favor.cancel-rate=10
#收藏数量
mock.favor.count=100

#每个用户添加购物车的概率
mock.cart.user-rate=10
#每次每个用户最多添加多少种商品进购物车
mock.cart.max-sku-count=8 
#每个商品最多买几个
mock.cart.max-sku-num=3 

#购物车来源  用户查询,商品推广,智能推荐, 促销活动
mock.cart.source-type-rate=60:20:10:10

#用户下单比例
mock.order.user-rate=30
#用户从购物中购买商品比例
mock.order.sku-rate=50
#是否参加活动
mock.order.join-activity=1
#是否使用购物券
mock.order.use-coupon=1
#购物券领取人数
mock.coupon.user-count=100

#支付比例
mock.payment.rate=70
#支付方式 支付宝:微信 :银联
mock.payment.payment-type=30:60:10

#评价比例 好:中:差:自动
mock.comment.appraise-rate=30:10:10:50

#退款原因比例:质量问题 商品描述与实际描述不一致 缺货 号码不合适 拍错 不想买了 其他
mock.refund.reason-rate=30:10:20:5:15:5:5

logging.level.com.atguigu.gmall2020.mock.db.mapper=debug

生成 2020-06-14 日期数据:

java -jar gmall2020-mock-db-2021-11-14.jar

查看 gmall 数据库,观察是否有 2020-06-14 的数据出现

业务数据梳理工具 业务数据采集模块 业务数据同步概述 数据同步策略概述

每日定时从业务数据库中抽取数据,传输到数据仓库中,之后再对数据进行分析统计

为保证统计结果的正确性,需要保证数据仓库中的数据与业务数据库是同步,离线数仓的计算周期通常为天,所以数据同步周期为天 ( 每天同步一次 )

数据的同步策略 :

全量同步增量同步

全量同步 : 每天都将业务数据库中的全部数据同步一份到数据仓库,保证两侧数据同步的最简单的方式

增量同步 : 每天只将业务数据中的新增及变化数据同步到数据仓库。采用每日增量同步的表 ( 首日一次全量同步 )

数据同步策略选择

两种策略对比 :

同步策略优点缺点
全量同步逻辑简单在某些情况下效率较低。例如某张表数据量较大,但是每天数据的变化比例很低,若对其采用每日全量同步,则会重复同步和存储大量相同的数据。
增量同步效率高,无需同步和存储重复数据逻辑复杂,需要将每日的新增及变化数据同原来的数据进行整合,才能使用

结论:业务表数据量大,且每天数据变化低 ( 增量同步 ) ,否则 全量同步

各表同步策略:

数据同步工具概述

数据同步工具 :

离线、批量同步 : 基于Select查询 , DataX、Sqoop实时流式同步 : 基于数据库数据变更日志 , Maxwell、Canal

增量同步方案DataX/SqoopMaxwell/Canal
对数据库的要求数据表中存在create_time、update_time等字段,然后根据这些字段获取变更数据要求数据库记录变更操作,如 : MySQL开启 binlog
数据的中间状态获取最后一个状态,中间状态无法获取获取变更数据的所有中间状态

全量同步 : DataX

增量同步 : Maxwell

数据同步工具部署

DataX

Maxwell

全量表数据同步 数据通道

全量表数据由 DataX 从 MySQL 业务数据库直接同步到 HDFS

目标路径中表名须包含后缀 full , 表示该表为全量同步
目标路径中包含一层日期 , 用以对不同天的数据进行区分

DataX 配置文件

每张全量表编写一个 DataX 的 json 配置文件

栗子 : activity_info

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "activity_name",
                            "activity_type",
                            "activity_desc",
                            "start_time",
                            "end_time",
                            "create_time"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://cpucode101:3306/gmall"
                                ],
                                "table": [
                                    "activity_info"
                                ]
                            }
                        ],
                        "password": "123456",
                        "splitPk": "",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "activity_name",
                                "type": "string"
                            },
                            {
                                "name": "activity_type",
                                "type": "string"
                            },
                            {
                                "name": "activity_desc",
                                "type": "string"
                            },
                            {
                                "name": "start_time",
                                "type": "string"
                            },
                            {
                                "name": "end_time",
                                "type": "string"
                            },
                            {
                                "name": "create_time",
                                "type": "string"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://cpucode101:8020",
                        "fieldDelimiter": "t",
                        "fileName": "activity_info",
                        "fileType": "text",
                        "path": "${targetdir}",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

由于目标路径包含一层日期,用于对不同天的数据加以区分,故 path 参数并未写死,需在提交任务时通过参数动态传入,参数名称为 targetdir

DataX 配置文件生成脚本

DataX配置文件批量生成脚本

gen_import_config.py 脚本

vim gen_import_config.py 
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host = "cpucode101"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "123456"

#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "cpucode101"
hdfs_nn_port = "8020"

#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/import"

#获取mysql连接
def get_connection():
    return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)

#获取表格的元数据  包含列名和数据类型
def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERe TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall

#获取mysql表的列名
def get_mysql_columns(database, table):
    return map(lambda x: x[0], get_mysql_meta(database, table))

#将获取的元数据中 mysql 的数据类型转换为 hive 的数据类型  写入到 hdfswriter 中
def get_hive_columns(database, table):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string"
        }
        return mappings[mysql_type]

    meta = get_mysql_meta(database, table)
    return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)

#生成json文件
def generate_json(source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(source_database, source_table),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "fileType": "text",
                        "path": "${targetdir}",
                        "fileName": source_table,
                        "column": get_hive_columns(source_database, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)


def main(args):
    source_database = ""
    source_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value

    generate_json(source_database, source_table)


if __name__ == '__main__':
    main(sys.argv[1:])

安装 Python Mysql 驱动

sudo yum install -y MySQL-python

脚本使用说明

python gen_import_config.py -d database -t table

-d : 数据库名-t : 表名

创建 gen_import_config.sh 脚本

vim gen_import_config.sh
#!/bin/bash

python ~/bin/gen_import_config.py -d gmall -t activity_info
python ~/bin/gen_import_config.py -d gmall -t activity_rule
python ~/bin/gen_import_config.py -d gmall -t base_category1
python ~/bin/gen_import_config.py -d gmall -t base_category2
python ~/bin/gen_import_config.py -d gmall -t base_category3
python ~/bin/gen_import_config.py -d gmall -t base_dic
python ~/bin/gen_import_config.py -d gmall -t base_province
python ~/bin/gen_import_config.py -d gmall -t base_region
python ~/bin/gen_import_config.py -d gmall -t base_trademark
python ~/bin/gen_import_config.py -d gmall -t cart_info
python ~/bin/gen_import_config.py -d gmall -t coupon_info
python ~/bin/gen_import_config.py -d gmall -t sku_attr_value
python ~/bin/gen_import_config.py -d gmall -t sku_info
python ~/bin/gen_import_config.py -d gmall -t sku_sale_attr_value
python ~/bin/gen_import_config.py -d gmall -t spu_info

gen_import_config.sh 脚本增加执行权限

chmod 777 gen_import_config.sh

执行 gen_import_config.sh 脚本,生成配置文件

gen_import_config.sh

配置文件 :

ll /opt/module/datax/job/import/
测试生成的 DataX 配置文件 全量表数据同步脚本

全量表数据同步脚本 mysql_to_hdfs_full.sh

vim mysql_to_hdfs_full.sh 
#!/bin/bash

DATAX_HOME=/opt/module/datax

# 如果传入日期则do_date等于传入的日期,否则等于前一天日期
if [ -n "$2" ] ;then
    do_date=$2
else
    do_date=`date -d "-1 day" +%F`
fi

#处理目标路径,此处的处理逻辑是,
#如果目标路径不存在,则创建;
#若存在,则清空,目的是保证同步任务可重复执行
handle_targetdir() {
  hadoop fs -test -e $1
  if [[ $? -eq 1 ]]; then
    echo "路径$1不存在,正在创建......"
    hadoop fs -mkdir -p $1
  else
    echo "路径$1已经存在"
    fs_count=$(hadoop fs -count $1)
    content_size=$(echo $fs_count | awk '{print $3}')
    if [[ $content_size -eq 0 ]]; then
      echo "路径$1为空"
    else
      echo "路径$1不为空,正在清空......"
      hadoop fs -rm -r -f $1/*
    fi
  fi
}

#数据同步
import_data() {
  datax_config=$1
  target_dir=$2

  handle_targetdir $target_dir
  python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config
}

case $1 in
"activity_info")
  import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
  ;;
"activity_rule")
  import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
  ;;
"base_category1")
  import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
  ;;
"base_category2")
  import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
  ;;
"base_category3")
  import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
  ;;
"base_dic")
  import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
  ;;
"base_province")
  import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
  ;;
"base_region")
  import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
  ;;
"base_trademark")
  import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
  ;;
"cart_info")
  import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
  ;;
"coupon_info")
  import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
  ;;
"sku_attr_value")
  import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
  ;;
"sku_info")
  import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
  ;;
"sku_sale_attr_value")
  import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
  ;;
"spu_info")
  import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
  ;;
"all")
  import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
  import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
  import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
  import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
  import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
  import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
  import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
  import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
  ;;
esac

mysql_to_hdfs_full.sh 增加执行权限

chmod 777 mysql_to_hdfs_full.sh

测试同步脚本

mysql_to_hdfs_full.sh all 2020-06-14

检查同步结果

查看HDFS目表路径是否出现全量表数据,全量表共15张

全量表同步总结

全量表同步逻辑比较简单,只需每日执行全量表数据同步脚本 mysql_to_hdfs_full.sh

增量表数据同步 数据通道

目标路径中表名须包含后缀 inc,为增量同步
目标路径中包含一层日期,用以对不同天的数据进行区分

Maxwell 配置

有 cart_info 、comment_info 等共计13张表需进行增量同步,Maxwell 同步 binlog 中的所有表的数据变更记录

为方便下游使用数据, Maxwell 将不同表的数据发往不同的 Kafka Topic

修改 Maxwell 配置文件 config.properties

vim /opt/module/maxwell/config.properties
log_level=info

producer=kafka
kafka.bootstrap.servers=cpucode101:9092,cpucode102:9092

#kafka topic动态配置
kafka_topic=%{table}

# mysql login info
host=cpucode101
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

#表过滤,只同步特定的13张表
filter= include:gmall.cart_info,include:gmall.comment_info,include:gmall.coupon_use,include:gmall.favor_info,include:gmall.order_detail,include:gmall.order_detail_activity,include:gmall.order_detail_coupon,include:gmall.order_info,include:gmall.order_refund_info,include:gmall.order_status_log,include:gmall.payment_info,include:gmall.refund_payment,include:gmall.user_info

重新启动 Maxwell

mxw.sh restart
Flume 配置

Flume 需要将 Kafka 中各 topic 的数据传输到 HDFS,故其需选用 KafkaSource 以及 HDFSSink ,Channe 选用 FileChanne

KafkaSource 需订阅 Kafka 中的 13 个 topic,HDFSSink 需要将不同 topic 的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = cart_info,comment_info,coupon_use,favor_info,order_detail_activity,order_detail_coupon,order_detail,order_info,order_refund_info,order_status_log,payment_info,refund_payment,user_info
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.db.TimestampInterceptor$Builder


a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1123456
a1.channels.c1.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{topic}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

    
        org.apache.flume
        flume-ng-core
        1.9.0
        provided
    

    
        com.alibaba
        fastjson
        1.2.62
    



    
        
            maven-compiler-plugin
            2.3.2
            
                1.8
                1.8
            
        
        
            maven-assembly-plugin
            
                
                    jar-with-dependencies
                
            
            
                
                    make-assembly
                    package
                    
                        single
                    
                
            
        
    

增量表首日全量同步

增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用 Maxwell 的 bootstrap 功能

mysql_to_kafka_inc_init.sh

vim mysql_to_kafka_inc_init.sh
#!/bin/bash

# 该脚本的作用是初始化所有的增量表,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}

case $1 in
"cart_info")
  import_data cart_info
  ;;
"comment_info")
  import_data comment_info
  ;;
"coupon_use")
  import_data coupon_use
  ;;
"favor_info")
  import_data favor_info
  ;;
"order_detail")
  import_data order_detail
  ;;
"order_detail_activity")
  import_data order_detail_activity
  ;;
"order_detail_coupon")
  import_data order_detail_coupon
  ;;
"order_info")
  import_data order_info
  ;;
"order_refund_info")
  import_data order_refund_info
  ;;
"order_status_log")
  import_data order_status_log
  ;;
"payment_info")
  import_data payment_info
  ;;
"refund_payment")
  import_data refund_payment
  ;;
"user_info")
  import_data user_info
  ;;
"all")
  import_data cart_info
  import_data comment_info
  import_data coupon_use
  import_data favor_info
  import_data order_detail
  import_data order_detail_activity
  import_data order_detail_coupon
  import_data order_info
  import_data order_refund_info
  import_data order_status_log
  import_data payment_info
  import_data refund_payment
  import_data user_info
  ;;
esac

mysql_to_kafka_inc_init.sh 增加执行权限

chmod 777 mysql_to_kafka_inc_init.sh

清理历史数据

hadoop fs -ls /origin_data/gmall/db | grep _inc | awk '{print $8}' | xargs hadoop fs -rm -r -f

执行同步脚本

mysql_to_kafka_inc_init.sh all 

观察HDFS上是否重新出现增量表数据

增量表同步总结

增量表同步,需要在首日进行一次全量同步,后续每日才是增量同步。首日进行全量同步时,需先启动数据通道,包括 Maxwell、Kafka、Flume,然后执行增量表首日同步脚本 mysql_to_kafka_inc_init.sh 进行同步。后续每日只需保证采集通道正常运行即可,Maxwell 便会实时将变动数据发往 Kafka

数仓环境准备
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/741879.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号