1、前置
需要安装maven、java8、配置好github相关参数
2、Clone项目到本地
git clone https://github.com/liukunyuan/flinkx.git
3、安装额外的jar包
1)、cd flinkx/bin
2)、执行sh ./install_jars.sh(windows执行install_jars.bat脚本)
4、打包
1)、回到flinkx目录:cd …
2)、执行打包命令:mvn clean package -Dmaven.test.skip=true
1、配置flink conf文件
1)、进入flinkconf目录
cd flinkconf
2)、修改flink-conf.yaml文件添加一行
rest.bind-port: 8888
2、配置mysqltomysql的json文件,路径:binlog2kafka.json
{
"job": {
"content": [
{
"reader": {
"parameter": {
"table": ["test.user"],
"password": "YzJ3gM5sPoxavN_3",
"database": "test",
"port": 3306,
"cat": "insert,update,delete",
"host": "192.168.178.28",
"jdbcUrl": "jdbc:mysql://192.168.178.28:3306/test",
"pavingData": true,
"username": "root"
},
"name": "binlogreader"
},
"writer": {
"parameter" : {
"producerSettings" : {
"bootstrap.servers" : "192.168.178.27:9092,192.168.178.28:9092,192.168.178.29:9092"
},
"topic" : "binlog"
},
"name" : "kafkawriter"
}
}
],
"setting": {
"restore": {
"isRestore": true,
"isStream": true
},
"speed": {
"channel": 1
}
}
}
}
创建主题
sh kafka-topics.sh --zookeeper 192.168.178.27:2181,192.168.178.28:2181,192.168.178.29:2181 --create --topic binlog--partitions 3 --replication-factor 2
创建消费者
sh kafka-console-consumer.sh --bootstrap-server vm61:9092 --topic binlog
执行
nohup ./flinkx -mode local -job ../flinkconf/binlog2kafka.json -pluginRoot ../syncplugins -flinkconf ../flinkconf/ -confProp "{"flink.checkpoint.interval":60000}" > log.txt 2>&1 &
开启数据库的binlog日志
[mysqld] character-set-server=utf8 init_connect='SET NAMES utf8' socket=/var/lib/mysql/mysql.sock server-id= 1 log-bin=mysql-bin binlog_format=row binlog-do-db=test [client] default-character-set=utf8
创建表
create table user( id int, name varchar(255) );
设置访问权限
mysql>grant all privileges on *.* to 'root'@'%' ;
插入数据
insert into user values(1,"aa");
插入数据时的kafka日志
{"schema":"test","after_name":"aa","after_id":"1","type":"INSERT","table":"user","ts":6848537755409059840}
更新数据
update user set name='bb' where id =1;
更新数据时的kafka日志
{"schema":"test","before_name":"aa","after_name":"bb","after_id":"1","type":"UPDATE","table":"user","ts":6848538609692315648,"before_id":"1"}
删除数据
delete from user where id =1;
删除数据时的kafka日志
{"schema":"test","before_name":"bb","type":"DELETE","table":"user","ts":6848538936046915584,"before_id":"1"}



