栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

DataX 之旅

DataX 之旅

DataX 之旅

DataX 概述

支持的数据源 DataX 架构原理

DataX设计理念框架设计运行流程调度决策思路DataX与Sqoop对比 Data 安装DataX 使用

DataX任务提交命令

DataX配置文件格式 同步 MySQL 数据到 HDFS

MySQLReader之TableModeMySQLReader之QuerySQLModeDataX传参 同步HDFS数据到MySQL DataX 优化

速度控制内存调整

DataX 概述

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,实现关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、Hbase、FTP等异构数据源之间的数据同步功能

源码地址 :

https://github.com/alibaba/DataX

支持的数据源
类型数据源Reader(读)Writer(写)
RDBMS 关系型数据库MySQL
Oracle
Oceanbase
SQLServer
PostgreSQL
DRDS
通用RDBMS
阿里云数仓数据存储ODPS
ADS
OSS
OCS
NoSQL数据存储OTS
Hbase0.94
Hbase1.1
Phoenix4.x
Phoenix5.x
MongoDB
Hive
Cassandra
无结构化数据存储TxtFile
FTP
HDFS
Elasticsearch
时间序列数据库OpenTSDB
TSDB
DataX 架构原理 DataX设计理念

为了解决异构数据源同步问题,DataX 将复杂的网状的同步链路变成了星型数据链路,DataX 作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到 DataX ,便能跟已有的数据源做到无缝数据同步

框架设计

DataX 本身作为离线数据同步框架,采用 framework + plugin 架构构建。将数据源读取和写入抽象成为 Reader / Writer 插件,纳入到整个同步框架中

Reader:数据采集模块,负责采集数据源的数据,将数据发送给 framework

Writer:数据写入模块,负责不断向 framework 取数据,并将数据写入到目的端

framework:用于连接 reader 和 writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题

运行流程

DataX 作业生命周期的时序图 :

Job:单个数据同步的作业,称为一个 Job ,一个 Job 启动一个进程

Task:根据不同数据源的切分策略,一个 Job 会切分为多个 Task,Task 是 DataX 作业的最小单元,每个 Task 负责一部分数据的同步工作

TaskGroup :Scheduler 调度模块会对 Task 进行分组,每个 Task 组称为一个 Task Group 。每个 TaskGroup 负责以一定的并发度运行其所分得的 Task,单个 Task Group 的并发度为 5

Reader → Channel → Writer:每个 Task 启动后,都会固定启动 Reader → Channel → Writer的线程来完成同步工作

调度决策思路

例子 :

用户提交了一个 DataX 作业,并且配置了总的并发度为 20,目的是对一个有 100 张分表的 mysql 数据源进行同步

DataX 的调度决策思路是:

DataX Job 根据分库分表切分策略,将同步工作分成 100 个Task根据配置的总的并发度20,以及每个 Task Group 的并发度 5,DataX计算共需要分配 4 个TaskGroup4 个 TaskGroup 平分 100 个Task,每一个 TaskGroup 负责运行 25 个Task DataX与Sqoop对比

功能DataXSqoop
运行模式单进程多线程MR
分布式不支持,可以通过调度系统规避支持
流控有流控功能需要定制
统计信息已有一些统计,上报需定制没有,分布式的数据收集不方便
数据校验在core部分有校验功能没有,分布式的数据收集不方便
监控需要定制需要定制
Data 安装

https://blog.csdn.net/qq_44226094/article/details/123261720?spm=1001.2014.3001.5501

DataX 使用 DataX任务提交命令

用户只需根据自己同步数据的数据源和目的地选择相应的 Reader 和 Writer,并将 Reader 和Writer 的信息配置在一个 json 文件中

提交数据同步任务

python bin/datax.py path/to/your/job.json
DataX配置文件格式

查看DataX配置文件模板

python bin/datax.py -r mysqlreader -w hdfswriter

json 最外层是一个 job,job 包含 setting 和 content 两部分,其中 setting 用于对整个 job 进行配置,content 用户配置数据源和目的地

{
    "job": {
        "content": [	数据源和目的地
            {
                "reader": {	配置
                    "name": "mysqlreader",名称, 不可随意
                    "parameter": {	配置参数
                        "column": [],
                        "connection": [
                            {
                                "jdbcUrl": [],
                                "table": []
                            }
                        ],
                        "password": "",
                        "username": "",
                        "where": ""
                    }
                },
                "writer": {	writer配置
                    "name": "hdfswriter",	名称, 不可随意
                    "parameter": {	配置参数
                        "column": [],
                        "compress": "",
                        "defaultFS": "",
                        "fieldDelimiter": "",
                        "fileName": "",
                        "fileType": "",
                        "path": "",
                        "writeMode": ""
                    }
                }
            }
        ],
        "setting": {	job 配置参数, 包括限速配置
            "speed": {
                "channel": ""
            }
        }
    }
}

Reader和Writer的具体参数可参考官方文档 :

https://github.com/alibaba/DataX/blob/master/README.md

https://gitee.com/mirrors/DataX/blob/master/README.md

同步 MySQL 数据到 HDFS

需求 :

同步 gmall 数据库中 base_province 表数据到 HDFS 的 / base_province 目录

需求分析:

要实现该功能,需选用 MySQLReader 和 HDFSWriter

MySQLReader 具有两种模式 :

TableMode : table,column,where 等属性声明需要同步的数据QuerySQLMode : SQL查询语句声明需要同步的数据 MySQLReader之TableMode

创建配置文件 base_province.json

vim /opt/module/datax/job/base_province.json

配置文件内容 :

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "name",
                            "region_id",
                            "area_code",
                            "iso_code",
                            "iso_3166_2"
                        ],
                        "where": "id>=3",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop102:3306/gmall"
                                ],
                                "table": [
                                    "base_province"
                                ]
                            }
                        ],
                        "password": "123456",
                        "splitPk": "",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            },
                            {
                                "name": "region_id",
                                "type": "string"
                            },
                            {
                                "name": "area_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_3166_2",
                                "type": "string"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://hadoop102:8020",
                        "fieldDelimiter": "t",
                        "fileName": "base_province",
                        "fileType": "text",
                        "path": "/base_province",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(’’),而Hive默认的null值存储格式为N。所以后期将DataX同步的文件导入Hive表就会出现问题

修改DataX HDFS Writer的源码,增加自定义null值存储格式的逻辑

在Hive中建表时指定null值存储格式为空字符串(’’)

DROp TABLE IF EXISTS base_province;
CREATE EXTERNAL TABLE base_province
(
    `id`         STRING COMMENT '编号',
    `name`       STRING COMMENT '省份名称',
    `region_id`  STRING COMMENT '地区ID',
    `area_code`  STRING COMMENT '地区编码',
    `iso_code`   STRING COMMENT '旧版ISO-3166-2编码,供可视化使用',
    `iso_3166_2` STRING COMMENT '新版IOS-3166-2编码,供可视化使用'
) COMMENT '省份表'
    ROW FORMAT DELIMITED FIELDS TERMINATED BY 't'
    NULL DEFINED AS ''
    LOCATION '/base_province/';

Setting 参数说明


提交任务

在HDFS创建 /base_province 目录

使用 DataX 向 HDFS 同步数据时,需确保目标路径已存在

hadoop fs -mkdir /base_province
MySQLReader之QuerySQLMode

修改配置文件 base_province.json

vim /opt/module/datax/job/base_province.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://cpucode101:3306/gmall"
                                ],
                                "querySql": [
                                    "select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
                                ]
                            }
                        ],
                        "password": "xxxxxx",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            },
                            {
                                "name": "region_id",
                                "type": "string"
                            },
                            {
                                "name": "area_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_3166_2",
                                "type": "string"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://cpucode101:8020",
                        "fieldDelimiter": "t",
                        "fileName": "base_province",
                        "fileType": "text",
                        "path": "/base_province",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

清空历史数据

hadoop fs -rm -r -f /base_province/*

进入 DataX 根目录

cd /opt/module/datax 
python bin/datax.py job/base_province.json

查看HDFS文件

hadoop fs -cat /base_province/* | zcat
DataX传参

离线数据同步任务需要每日定时重复执行,故 HDFS 上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此 DataX 配置文件中 HDFS Writer 的 path 参数的值应该是动态的。为实现这一效果,就需要使用 DataX 传参的功能

DataX 传参的用法如下,在 JSON 配置文件中使用 ${param} 引用参数,在提交任务时使用 -p"-Dparam=value" 传入参数值

修改配置文件 base_province.json

vim /opt/module/datax/job/base_province.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop102:3306/gmall"
                                ],
                                "querySql": [
                                    "select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
                                ]
                            }
                        ],
                        "password": "123456",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            },
                            {
                                "name": "region_id",
                                "type": "string"
                            },
                            {
                                "name": "area_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_3166_2",
                                "type": "string"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://hadoop102:8020",
                        "fieldDelimiter": "t",
                        "fileName": "base_province",
                        "fileType": "text",
                        "path": "/base_province/${dt}",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}
同步HDFS数据到MySQL DataX 优化 速度控制

DataX3.0 三种流控模式 :

通道(并发)记录流字节流

可以随意控制你的作业速度,让作业在数据库可以承受的范围内达到最佳的同步速度

参数说明
job.setting.speed.channel总并发数
job.setting.speed.record总record限速
job.setting.speed.byte总byte限速
core.transport.channel.speed.record单个channel的record限速,默认值为10000(10000条/s)
core.transport.channel.speed.byte单个channel的byte限速,默认值1024*1024(1M/s)

配置了总record限速,则必须配置单个channel的record限速 内存调整

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/751759.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号