栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

大数据之MaxWell

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

大数据之MaxWell

文章目录

第1章 Maxwell简介

1.1 Maxwell概述1.2 Maxwell输出数据格式 第2章 Maxwell原理

2.1 MySQL二进制日志2.2 MySQL主从复制2.3 Maxwell原理 第3章 Maxwell部署

3.1 安装Maxwell3.2配置MySQL

3.2.1启用MySQLBinlog3.2.2 创建Maxwell所需数据库和用户 3.3 配置Maxwell 第4章 Maxwell使用

4.1 启动Kafka集群4.2 Maxwell启停**4.2 增量数据同步4.3 历史数据全量同步

4.3.1 Maxwell-bootstrap4.3.2 boostrap数据格式


第1章 Maxwell简介 1.1 Maxwell概述

​ Maxwell 是由美国Zendesk公司开源,用Java编写的MySQL变更数据抓取软件。它会实时监控Mysql数据库的数据变更操作(包括insert、update、delete),并将变更数据以 JSON 格式发送给 Kafka、Kinesi等流数据处理平台。

​ 官网地址:http://maxwells-daemon.io/

1.2 Maxwell输出数据格式

注:Maxwell输出的json字段说明:

字段解释
database变更数据所属的数据库
table表更数据所属的表
type数据变更类型
ts数据变更发生的时间
xid事务id
commit事务提交标志,可用于重新组装事务
data对于insert类型,表示插入的数据;对于update类型,标识修改之后的数据;对于delete类型,表示删除的数据
old对于update类型,表示修改之前的数据,只包含变更字段
第2章 Maxwell原理

Maxwell的工作原理是实时读取MySQL数据库的二进制日志(Binlog),从中获取变更数据,再将变更数据以JSON格式发送至Kafka等流处理平台。

2.1 MySQL二进制日志

二进制日志(Binlog)是MySQL服务端非常重要的一种日志,它会保存MySQL数据库的所有数据变更记录。Binlog的主要作用包括主从复制和数据恢复。Maxwell的工作原理和主从复制密切相关。

2.2 MySQL主从复制

MySQL的主从复制,就是用来建立一个和主数据库完全一样的数据库环境,这个数据库称为从数据库。

1)主从复制的应用场景如下:

(1)做数据库的热备:主数据库服务器故障后,可切换到从数据库继续工作。

(2)读写分离:主数据库只负责业务数据的写入操作,而多个从数据库只负责业务数据的查询工作,在读多写少场景下,可以提高数据库工作效率。

2)主从复制的工作原理如下:

(1)Master主库将数据变更记录,写到二进制日志(binary log)中

(2)Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log)

(3)Slave从库读取并回放中继日志中的事件,将改变的数据同步到自己的数据库。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9HrAAKGU-1647394817600)(file:///C:UsersLIUMIN~1AppDataLocalTempksohtmlwps4879.tmp.jpg)]

2.3 Maxwell原理

很简单,就是将自己伪装成slave,并遵循MySQL主从复制的协议,从master同步数据。

第3章 Maxwell部署 3.1 安装Maxwell

1)下载安装包

(1)地址:https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz

注:Maxwell-1.30.0及以上版本不再支持JDK1.8。

(2)将安装包上传到hadoop102节点的/opt/software目录

注:此处使用教学版安装包,教学版对原版进行了改造,增加了自定义Maxwell输出数据中ts时间戳的参数,生产环境请使用原版。

2)将安装包解压至 /opt/module

[liumingze@hadoop102 maxwell]$ tar -zxvf maxwell-1.29.2.tar.gz -C /opt/module/

3)修改名称

[liumingze@hadoop102 module]$ mv maxwell-1.29.2/ maxwell

3.2配置MySQL 3.2.1启用MySQLBinlog

MySQL服务器的Binlog默认是未开启的,如需进行同步,需要先进行开启。

1)修改MySQL配置文件/etc/my.cnf

[liumingze@hadoop102 ~]$ sudo vim /etc/my.cnf

2)增加如下配置

[mysqld]

#数据库id
server-id = 1

#启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin

#binlog类型,maxwell要求为row类型
binlog_format=row

#启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=gmall

注:MySQL Binlog模式

Statement-based:基于语句,Binlog会记录所有写操作的SQL语句,包括insert、update、delete等。

优点: 节省空间

缺点: 有可能造成数据不一致,例如insert语句中包含now()函数。

Row-based:基于行,Binlog会记录每次写操作后被操作行记录的变化。

优点:保持数据的绝对一致性。

缺点:占用较大空间。

mixed:混合模式,默认是Statement-based,如果SQL语句可能导致数据不一致,就自动切换到Row-based。

Maxwell要求Binlog采用Row-based模式。

3)重启MySQL服务

[liumingze@hadoop102 ~]$ sudo systemctl restart mysqld
3.2.2 创建Maxwell所需数据库和用户

Maxwell需要在MySQL中存储其运行过程中的所需的一些数据,包括binlog同步的断点位置(Maxwell支持断点续传)等等,故需要在MySQL为Maxwell创建数据库及用户。

1)创建数据库

msyql> CREATE DATAbase maxwell;

2)调整MySQL数据库密码级别

mysql> set global validate_password_policy=0;

mysql> set global validate_password_length=4;

3)创建Maxwell用户并赋予其必要权限

mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';

mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';

mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
3.3 配置Maxwell

1)修改Maxwell配置文件名称

[liumingze@hadoop102 maxwell]$ cd /opt/module/maxwell

[liumingze@hadoop102 maxwell]$ cp config.properties.example config.properties

2)修改Maxwell配置文件

[liumingze@hadoop102 maxwell]$ vim config.properties

#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
#目标Kafka集群地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=maxwell
#MySQL相关配置
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
第4章 Maxwell使用 4.1 启动Kafka集群

若Maxwell发送数据的目的地为Kafka集群,则需要先确保Kafka集群为启动状态。

4.2 Maxwell启停**

1)启动Maxwell

[liumingze@hadoop102 ~]$ /opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon

2)停止Maxwell

[liumingze@hadoop102 ~]$ ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9

3)Maxwell启停脚本

(1)创建并编辑Maxwell启停脚本

[liumingze@hadoop102 bin]$ vim mxw.sh

(2)脚本内容如下

#!/bin/bash

MAXWELL_HOME=/opt/module/maxwell

status_maxwell(){
    result=`ps -ef | grep maxwell | grep -v grep | wc -l`
    return $result
}


start_maxwell(){
    status_maxwell
    if [[ $? -lt 1 ]]; then
        echo "启动Maxwell"
        $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
    else
        echo "Maxwell正在运行"
    fi
}


stop_maxwell(){
    status_maxwell
    if [[ $? -gt 0 ]]; then
        echo "停止Maxwell"
        ps -ef | grep maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
    else
        echo "Maxwell未在运行"
    fi
}


case $1 in
    start )
        start_maxwell
    ;;
    stop )
        stop_maxwell
    ;;
    restart )
       stop_maxwell
       start_maxwell
    ;;
esac
4.2 增量数据同步

1)启动Kafka消费者

[liumingze@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell

2)模拟生成数据

[liumingze@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-01-22.jar

3)观察Kafka消费者

{"database":"gmall","table":"comment_info","type":"insert","ts":1634023510,"xid":1653373,"xoffset":11998,"data":{"id":1447825655672463369,"user_id":289,"nick_name":null,"head_img":null,"sku_id":11,"spu_id":3,"order_id":18440,"appraise":"1204","comment_txt":"评论内容:12897688728191593794966121429786132276125164551411","create_time":"2020-06-16 15:25:09","operate_time":null}}

{"database":"gmall","table":"comment_info","type":"insert","ts":1634023510,"xid":1653373,"xoffset":11999,"data":{"id":1447825655672463370,"user_id":774,"nick_name":null,"head_img":null,"sku_id":25,"spu_id":8,"order_id":18441,"appraise":"1204","comment_txt":"评论内容:67552221621263422568447438734865327666683661982185","create_time":"2020-06-16 15:25:09","operate_time":null}}
4.3 历史数据全量同步

​ 上一节,我们已经实现了使用Maxwell实时增量同步MySQL变更数据的功能。但有时只有增量数据是不够的,我们可能需要使用到MySQL数据库中从历史至今的一个完整的数据集。这就需要我们在进行增量同步之前,先进行一次历史数据的全量同步。这样就能保证得到一个完整的数据集。

4.3.1 Maxwell-bootstrap

​ Maxwell提供了bootstrap功能来进行历史数据的全量同步,命令如下:

[liumingze@hadoop102 maxwell]$ /opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell/config.properties

4.3.2 boostrap数据格式

采用bootstrap方式同步的输出数据格式如下:

{

    "database": "fooDB",

    "table": "barTable",

    "type": "bootstrap-start",

    "ts": 1450557744,

    "data": {}

}

{

    "database": "fooDB",

    "table": "barTable",

    "type": "bootstrap-insert",

    "ts": 1450557744,

    "data": {

        "txt": "hello"

    }

}

{

    "database": "fooDB",

    "table": "barTable",

    "type": "bootstrap-insert",

    "ts": 1450557744,

    "data": {

        "txt": "bootstrap!"

    }

}

{

    "database": "fooDB",

    "table": "barTable",

    "type": "bootstrap-complete",

    "ts": 1450557744,

    "data": {}

}

注意事项:

1)第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的数据才包含数据。

2)一次bootstrap输出的所有记录的ts都相同,为bootstrap开始的时间。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/768810.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号