栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flinkx同步binlog日志到kafka

Flinkx同步binlog日志到kafka

1、前置
需要安装maven、java8、配置好github相关参数

2、Clone项目到本地
git clone https://github.com/liukunyuan/flinkx.git

3、安装额外的jar包
1)、cd flinkx/bin
2)、执行sh ./install_jars.sh(windows执行install_jars.bat脚本)

4、打包
1)、回到flinkx目录:cd …
2)、执行打包命令:mvn clean package -Dmaven.test.skip=true

1、配置flink conf文件
1)、进入flinkconf目录
cd flinkconf
2)、修改flink-conf.yaml文件添加一行
rest.bind-port: 8888

2、配置mysqltomysql的json文件,路径:binlog2kafka.json

{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "table": ["test.user"],
            "password": "YzJ3gM5sPoxavN_3",
            "database": "test",
            "port": 3306,
            "cat": "insert,update,delete",
            "host": "192.168.178.28",
            "jdbcUrl": "jdbc:mysql://192.168.178.28:3306/test",
            "pavingData": true,
            "username": "root"
          },
          "name": "binlogreader"
        },
        "writer": {
        "parameter" : {
          "producerSettings" : {
            "bootstrap.servers" : "192.168.178.27:9092,192.168.178.28:9092,192.168.178.29:9092"
          },
          "topic" : "binlog"
        },
        "name" : "kafkawriter"
        }
      }
    ],
    "setting": {
      "restore": {
        "isRestore": true,
        "isStream": true
      },
      "speed": {
        "channel": 1
      }
    }
  }
}

创建主题

sh kafka-topics.sh --zookeeper 192.168.178.27:2181,192.168.178.28:2181,192.168.178.29:2181 --create --topic binlog--partitions 3  --replication-factor 2

创建消费者

sh kafka-console-consumer.sh --bootstrap-server vm61:9092 --topic binlog

执行

nohup ./flinkx -mode local -job ../flinkconf/binlog2kafka.json -pluginRoot ../syncplugins  -flinkconf ../flinkconf/ -confProp "{"flink.checkpoint.interval":60000}" > log.txt 2>&1 & 

开启数据库的binlog日志

[mysqld]
character-set-server=utf8
init_connect='SET NAMES utf8'
socket=/var/lib/mysql/mysql.sock
server-id= 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=test
[client]
default-character-set=utf8

创建表

create table user(
  id int,
  name varchar(255)
);

设置访问权限

mysql>grant all privileges on *.* to 'root'@'%' ;

插入数据

insert into user values(1,"aa");

插入数据时的kafka日志

{"schema":"test","after_name":"aa","after_id":"1","type":"INSERT","table":"user","ts":6848537755409059840}

更新数据

update user set name='bb' where id =1;

更新数据时的kafka日志

{"schema":"test","before_name":"aa","after_name":"bb","after_id":"1","type":"UPDATE","table":"user","ts":6848538609692315648,"before_id":"1"}

删除数据

delete from user where id =1;

删除数据时的kafka日志

{"schema":"test","before_name":"bb","type":"DELETE","table":"user","ts":6848538936046915584,"before_id":"1"}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278377.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号