栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

DataX

DataX

文章目录

DataX:异构数据源离线同步工具MySQLReader TableMode和HDFSWriter同步案例MysqlReader之QuerySQLModeDatax传参HDFS同步到MySQLDataX的速度控制和内存优化

DataX:异构数据源离线同步工具
源码地址:https://github.com/alibaba/DataX

下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
·安装过后自检
	·python /bin/datax.py /job/job.json
·dataX的使用:只需根据自己同步数据的数据源和目的地选择相应的Reader和Writer
	·查看dataX配置文件模板
		·python /bin/datax.py -r mysqlreader -w hdfswriter
·Reader和Writer的具体参数可参考官方文档,地址如下:
	·https://github.com/alibaba/DataX/blob/master/README.md 
	·https://gitee.com/mirrors/DataX/blob/master/README.md		
·查看hdfs的gzip压缩文件
	·hadoop fs -cat / |zcat
MySQLReader TableMode和HDFSWriter同步案例
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                   		 //对应mysql表字段
                        "column": [ 
                            "id",
                            "activity_name",
                            "activity_type",
                            "activity_desc",
                            "start_time",
                            "end_time",
                            "create_time"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://hadoop101:3306/gmall"], 
                                "table": [
                                    "gmall"
                                ]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root"
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "activity_name",
                                "type": "string"
                            },
                            {
                                "name": "activity_type",
                                "type": "string"
                            },
                            {
                                "name": "activity_desc",
                                "type": "string"
                            },
                            {
                                "name": "start_time",
                                "type": "string"
                            },
                            {
                                "name": "end_time",
                                "type": "string"
                            },
                            {
                                "name": "create_time",
                                "type": "string"
                            }
                        ], 
                        "compress": "gzip", 
                        //HDFS文件系统namenode节点地址hdfs://xxx:8020
                        "defaultFS": "hdfs://hadoop101:8020", 
                         //列的分隔符
                        "fieldDelimiter": "t", 
                        "fileName": "activity_info", 
                        "fileType": "text", 
                        "path": "/origin_data/gmall/db/activity_info_full/${dt}",
                        "writeMode": "append" 
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
MysqlReader之QuerySQLMode
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop101:3306/gmall"
                                ],
                                "querySql": [
                                    "select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
                                ]
                            }
                        ],
                        "password": "123456",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            },
                            {
                                "name": "region_id",
                                "type": "string"
                            },
                            {
                                "name": "area_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_3166_2",
                                "type": "string"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://hadoop101:8020",
                        "fieldDelimiter": "t",
                        "fileName": "base_province",
                        "fileType": "text",
                        "path": "/base_province",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}
Datax传参
·"path": "/base_province/${dt}"
·命令行
	·python bin/datax.py -p "-Ddt=2020-06-04" job/job.json
HDFS同步到MySQL
·HDFSReader
	·要求hdfs文件内容是一张逻辑意义上的二维表
	·支持列裁剪(表格中有五个字段可以随意读任意个)、支持列常量(如果有五列,可以在第六列加一个常量字段)
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "defaultFS": "hdfs://hadoop101:8020",
                        "path": "/base_province",
                        "column": [// "*" :代表全部
                             {
                                "index":"0",
                                "type":"long" //为java的类型-->framework-->用于校验
                            }, {
                                "index":"1",
                                "type":"string"
                            },{
                                "index":"string",
                                "type":"常量" //可以写常量
                            },{
                                "index":"2",
                                "type":"string"
                            },{
                                "index":"3",
                                "type":"string"
                            },{
                                "index":"4",
                                "type":"string"
                            },{
                                "index":"5",
                                "type":"string"
                            }                         
                        ],
                        "fileType": "text", //跟hdfs的文件格式必须一样
                        "compress": "gzip",
                        "encoding": "UTF-8",
                        "nullFormat": "\N", //Datax将N的hdfs字符视为null
                        "fieldDelimiter": "t",
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "connection": [
                            {
                                "table": [
                                    "test_province"
                                ],
                                "jdbcUrl": "jdbc:mysql://hadoop101:3306/gmall?useUnicode=true&characterEncoding=utf-8"
                            }
                        ],
                        "column": [ //必须和mysql中字段对应
                            "id",
                            "name",
                            "region_id",
                            "area_code",
                            "iso_code",
                            "iso_3166_2"
                        ],
                        "writeMode": "replace" //insert为添加,updata是修改某个字段的值,replace是删除再添加
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}
DataX的速度控制和内存优化
job.setting.speed.channel	总并发数
job.setting.speed.record	总record限速
job.setting.speed.byte		总byte限速
core.transport.channel.speed.record	单个channel的record限速,默认值为10000(10000条/s)
core.transport.channel.speed.byte	单个channel的byte限速,默认值1024*1024(1M/s)
注意事项:
1.若配置了总record限速,则必须配置单个channel的record限速
2.若配置了总byte限速,则必须配置单个channe的byte限速
3.若配置了总record限速和总byte限速,channel并发数参数就会失效。因为配置了总record限速和总byte限速之后,实际channel并发数是通过计算得到的:
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745366.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号