栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【Win平台开发-基础服务】Alibaba Canal 安装和配置

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Win平台开发-基础服务】Alibaba Canal 安装和配置

数据库同步软件

MySQL配置

1.开启binlog

找到mysql安装目录 mysql --help|grep my.cnf 编辑my.cf

[mysqld]

log_bin
#开启binlog
log-bin = mysql-bin

#选择row模式
binlog-format = ROW

#配置mysql replication需要定义,不能喝canal的slaveId重复
server_id = 1 

查看是否开启binlog日志

运行以下sql (如果显示OFF则代表未开启。在MySQL8以前,这个功能是默认关闭的,需要手动开启。)

show variables like 'log_bin';

或者打开 session_variables表

  • 生成canal账户
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECt, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
配置canal

D:canalcanal.deployer-1.1.5confexample

在deployer项目里面的confexampleinstance.properties改写自己的配置

# position info
canal.instance.master.address = 192.168.11.14:3306(这里换成自己的主数据库地址)

# 这里填写自己的数据库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

其他默认即可,改完后进行启动

Java 客户端配置
  • pom 依赖
    
    
      com.alibaba.otter
      canal.client
      1.1.4
    
    
      org.springframework.boot
      spring-boot-starter-jdbc
    
    
      commons-dbutils
      commons-dbutils
      1.2
    
  • boot 入口配置
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class SyncApplication implements CommandLineRunner {
    public static void main(String[] args) {
        SpringApplication.run(SyncApplication.class);
    }

    @Resource
    CanalClient canalClient;

    @Override
    public void run(String... args) throws Exception {
        canalClient.run();
    }
}
  • CanalClient 客户端连接配置
@Component
public class CanalClient {
    //sql队列
    private Queue SQL_QUEUE = new ConcurrentlinkedQueue<>();
    @Resource
    private DataSource dataSource;
    
    public void run() {

        //这里canal默认端口号是11111
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1",11111),
                "example", "", "");
        int batchSize = 1000;
        try {
            connector.connect();
            connector.subscribe(".*\..*");
            connector.rollback();
            try {
                while (true) {
                    //尝试从master那边拉去数据batchSize条记录,有多少取多少
                    Message message = connector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        Thread.sleep(1000);
                    } else {
                        dataHandle(message.getEntries());
                    }
                    connector.ack(batchId);

                    //当队列里面堆积的sql大于一定数值的时候就模拟执行
                    if (SQL_QUEUE.size() >= 1) {
                        executeQueueSql();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        } finally {
            connector.disconnect();
        }
    }

    
    public void executeQueueSql() {
        int size = SQL_QUEUE.size();
        for (int i = 0; i < size; i++) {
            String sql = SQL_QUEUE.poll();
            System.out.println("[sql]----> " + sql);
            this.execute(sql.toString());
        }
    }

    
    private void dataHandle(List entrys) throws InvalidProtocolBufferException {
        for (CanalEntry.Entry entry : entrys) {
            if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) {
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
                CanalEntry.EventType eventType = rowChange.getEventType();
                if (eventType == CanalEntry.EventType.DELETe) {
                    saveDeleteSql(entry);
                } else if (eventType == CanalEntry.EventType.UPDATE) {
                    saveUpdateSql(entry);
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    saveInsertSql(entry);
                }
            }
        }
    }

    
    private void saveUpdateSql(CanalEntry.Entry entry) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
            List rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList) {
                List newColumnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
                for (int i = 0; i < newColumnList.size(); i++) {
                    sql.append(" " + newColumnList.get(i).getName()
                            + " = '" + newColumnList.get(i).getValue() + "'");
                    if (i != newColumnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(" where ");
                List oldColumnList = rowData.getBeforeColumnsList();
                for (CanalEntry.Column column : oldColumnList) {
                    if (column.getIsKey()) {
                        //暂时只支持单一主键
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    
    private void saveDeleteSql(CanalEntry.Entry entry) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
            List rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList) {
                List columnList = rowData.getBeforeColumnsList();
                StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
                for (CanalEntry.Column column : columnList) {
                    if (column.getIsKey()) {
                        //暂时只支持单一主键
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    
    private void saveInsertSql(CanalEntry.Entry entry) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
            List rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList) {
                List columnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append(columnList.get(i).getName());
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(") VALUES (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append("'" + columnList.get(i).getValue() + "'");
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(")");
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    
    public void execute(String sql) {
        Connection con = null;
        try {
            if(null == sql) return;
            con = dataSource.getConnection();
            QueryRunner qr = new QueryRunner();
//            int row = qr.execute(con, sql);
            int row = qr.update(con, sql);
            System.out.println("update: "+ row);
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            DbUtils.closeQuietly(con);
        }
    }
}
参考资料 & 致谢

[1] Canal-入门例子-下载安装(一)

添加链接描述

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/678626.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号