log-bin = mysql-bin # 设置服务id,主从不能一致 server-id = 1 # BINLOG模式为row binlog-format = ROW(3)非必填内容
# 设置需要同步的数据库 binlog-do-db=ds_01 # 屏蔽系统库同步 binlog-ignore-db=mysql binlog-ignore-db=information_schema binlog-ignore-db=performance_schema(4) 配置完成重启服务 (5)验证是否成功
# 查看binlog是否开启 on为开启 show variables like 'log_bin' # 查看binlog是否开启 row 模式 show variables like 'binlog_format'; # 查询 可以获取到 binlog记录日志文件名 mysql-bin.000001 ,position 4040 ,binlog_db,以及其他信息 show master status; #创建账号,分配账号权限 grant SELECt, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'sync_db'@'localhost' #查看密码插件,可能影响canal 连不上数据库 select host,user,plugin from mysql.user;二、canal安装配置 1、下载canal.deployer-1.1.5.tar.gz,解压
官方下载地址
2、进入conf文件夹 复制example文件夹 改为自定义的 实例名称 3、主要改动文件 canal.properties ,instance.properties (1)canal.properties 去掉mq位置无用代码,追加#模式设置为rabbitMq模式 # canal.serverMode = rabbitMQ #地址,不需要端口 # rabbitmq.host = 127.0.0.1 #当前Vhost # rabbitmq.virtual.host = / #刚才配置的交换机 # rabbitmq.exchange = cannal-exchange #刚才配置的账号密码 # rabbitmq.username = cannal # rabbitmq.password = cannal # rabbitmq.deliveryMode = 2(2)、instance.properties
#数据库地址 # canal.instance.master.address=127.0.0.1:3306 #binlog文件名 # canal.instance.master.journal.name=mysql-bin.000003 # username/password 连接,其实已经配好了 # canal.instance.dbUsername=sync_db # canal.instance.dbPassword=sync_db # canal.instance.connectionCharset = UTF-8 # mq config, 指定 rabbitmq 设置绑定的路由 # canal.mq.topic=cannal-exchange-routing # canal.instance.slaveId=2三、rabbitMq安装
略。。。
四 、代码 1、application.properties 文件rabbitmq.canal=canal # 主题名称 rabbitmq.topic="" # 交换机 rabbitmq.exchange=canal-exchange # 路由 rabbitmq.routing=canal-exchange-routing # 队列持久化 rabbitmq.queue.durable=true # 队列私有 rabbitmq.queue.exclusive=false # 临时队列 rabbitmq.queue.autoDelete=false rabbitmq.exchange.durable=true rabbitmq.exchange.autoDelete=false # ip spring.rabbitmq.host=localhost # 端口号 spring.rabbitmq.port=5672 # 用户名 spring.rabbitmq.username=canalcustomer # 密码 spring.rabbitmq.password=canalcustomer2、
package com.example.cannal.cannal;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
@Component
public class CanalConsumer {
private static Logger log = LoggerFactory.getLogger(CanalConsumer.class);
@RabbitListener(bindings = @QueueBinding(
value = @Queue("${rabbitmq.cannal}"),
exchange = @Exchange("${rabbitmq.exchange}")
))
public void receive(Message message, Channel channel) throws IOException {
try {
String value = new String(message.getBody());
//System.out.println("RABBITMQ:"+value);
//转换为javaBean
CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);
//获取是否是DDL语句
boolean isDdl = canalBean.getIsDdl();
//获取类型
String type = canalBean.getType();
//不是DDL语句
if (!isDdl) {
List tbCommodityInfos = canalBean.getData();
//过期时间
long TIME_OUT = 600L;
if ("INSERT".equals(type)) {
//新增语句
for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
String id = tbCommodityInfo.getId();
//新增到redis中,过期时间是10分钟
//redisClient.setString(id, JSONObject.toJSonString(tbCommodityInfo), TIME_OUT);
System.out.println("INSERT:" + JSONObject.toJSONString(tbCommodityInfo).toString());
}
} else if ("UPDATE".equals(type)) {
//更新语句
for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
String id = tbCommodityInfo.getId();
//更新到redis中,过期时间是10分钟
System.out.println("UPDATE:" + JSONObject.toJSONString(tbCommodityInfo).toString());
//redisClient.setString(id, JSONObject.toJSonString(tbCommodityInfo), TIME_OUT);
}
} else {
//删除语句
for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
String id = tbCommodityInfo.getId();
System.out.println("DELETE:" + JSONObject.toJSONString(tbCommodityInfo).toString());
//从redis中删除
//redisClient.deleteKey(id);
}
}
}
}catch(Exception e){
log.error("error",e);
}
}
}
package com.example.cannal.cannal;
import java.util.List;
public class CanalBean {
//数据
private List data;
//数据库名称
private String database;
private long es;
//递增,从1开始
private int id;
//是否是DDL语句
private boolean isDdl;
//表结构的字段类型
private MysqlType mysqlType;
//UPDATE语句,旧数据
private String old;
//主键名称
private List pkNames;
//sql语句
private String sql;
private SqlType sqlType;
//表名
private String table;
private long ts;
//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
private String type;
public List getData() {
return data;
}
public void setData(List data) {
this.data = data;
}
public boolean isDdl() {
return isDdl;
}
public void setDdl(boolean ddl) {
isDdl = ddl;
}
public void setDatabase(String database) {
this.database = database;
}
public String getDatabase() {
return database;
}
public void setEs(long es) {
this.es = es;
}
public long getEs() {
return es;
}
public void setId(int id) {
this.id = id;
}
public int getId() {
return id;
}
public void setIsDdl(boolean isDdl) {
this.isDdl = isDdl;
}
public boolean getIsDdl() {
return isDdl;
}
public void setMysqlType(MysqlType mysqlType) {
this.mysqlType = mysqlType;
}
public MysqlType getMysqlType() {
return mysqlType;
}
public void setOld(String old) {
this.old = old;
}
public String getOld() {
return old;
}
public void setPkNames(List pkNames) {
this.pkNames = pkNames;
}
public List getPkNames() {
return pkNames;
}
public void setSql(String sql) {
this.sql = sql;
}
public String getSql() {
return sql;
}
public void setSqlType(SqlType sqlType) {
this.sqlType = sqlType;
}
public SqlType getSqlType() {
return sqlType;
}
public void setTable(String table) {
this.table = table;
}
public String getTable() {
return table;
}
public void setTs(long ts) {
this.ts = ts;
}
public long getTs() {
return ts;
}
public void setType(String type) {
this.type = type;
}
public String getType() {
return type;
}
}
package com.example.cannal.cannal;
public class MysqlType {
private String id;
private String commodity_name;
private String commodity_price;
private String number;
private String description;
public void setId(String id) {
this.id = id;
}
public String getId() {
return id;
}
public void setCommodity_name(String commodity_name) {
this.commodity_name = commodity_name;
}
public String getCommodity_name() {
return commodity_name;
}
public void setCommodity_price(String commodity_price) {
this.commodity_price = commodity_price;
}
public String getCommodity_price() {
return commodity_price;
}
public void setNumber(String number) {
this.number = number;
}
public String getNumber() {
return number;
}
public void setDescription(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
package com.example.cannal.cannal;
public class SqlType {
private int id;
private int commodity_name;
private int commodity_price;
private int number;
private int description;
public void setId(int id) {
this.id = id;
}
public int getId() {
return id;
}
public void setCommodity_name(int commodity_name) {
this.commodity_name = commodity_name;
}
public int getCommodity_name() {
return commodity_name;
}
public void setCommodity_price(int commodity_price) {
this.commodity_price = commodity_price;
}
public int getCommodity_price() {
return commodity_price;
}
public void setNumber(int number) {
this.number = number;
}
public int getNumber() {
return number;
}
public void setDescription(int description) {
this.description = description;
}
public int getDescription() {
return description;
}
}



