Message message = connector.getWithoutAck(BATCH_SIZE);
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
try {
//线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//处理数据为sql
dataHandle(message.getEntries());
}
/设置队列sql语句执行最大值/
if (SQL_QUEUE.size() >= 1) {
executeQueueSql();
}
//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
System.out.println(“连接canal end”);
}
public void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
this.execute(sql);
}
}
private void dataHandle(List entrys) throws
InvalidProtocolBufferException {
for (Entry entry : entrys) {
if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
CanalEntry.EventType eventType = rowChange.getEventType();
if (eventType == CanalEntry.EventType.DELETe) {
saveDeleteSql(entry);
} else if (eventType == CanalEntry.EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == CanalEntry.EventType.INSERT) {
saveInsertSql(entry);
}
}
}
}
private void saveUpdateSql(Entry entry) {
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
List
for (CanalEntry.RowData rowdata: rowDatasList) {
List
StringBuffer sql = new StringBuffer("update " +
entry.getHeader().getTableName() + " set ");
for (int i = 0; i < newColumnList.size(); i++) {
sql.append(" " + newColumnList.get(i).getName()
- " = ‘" + newColumnList.get(i).getValue() + "’");
if (i != newColumnList.size() - 1) {
sql.append(",");
}
}
sql.append(" where ");
List
for (CanalEntry.Column column : oldColumnList) {
//暂时只支持单一主键
if (column.getIsKey()) {
sql.append(column.getName() + “= '” + column.getValue() + “’”);
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
private void saveDeleteSql(Entry entry) {
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
List
for (CanalEntry.RowData rowdata: rowDatasList) {
List columnList = rowData.getBeforeColumnsList(); StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where "); for (CanalEntry.Column column : columnList) { if (column.getIsKey()) { //暂时只支持单一主键 sql.append(column.getName() + “= '” + column.getValue() + “’”); break; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } private void saveInsertSql(Entry entry) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStorevalue()); List for (CanalEntry.RowData rowdata: rowDatasList) { List StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " ("); for (int i = 0; i < columnList.size(); i++) { sql.append(columnList.get(i).getName()); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(") VALUES ("); for (int i = 0; i < columnList.size(); i++) { sql.append("’" + columnList.get(i).getValue() + “’”); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(")"); SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } public void execute(String sql) { kafkaProducer.send(sql); System.out.println(“SQL=”+sql); } } KafkaProducer.java Kafka消息发送 package com.zks.canal.producer; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @Component @Slf4j public class KafkaProducer { @Autowired private KafkaTemplate //自定义topic public static final String TOPIC_TEST = “topic.test”; public void send(Object obj) { String obj2String = JSONObject.toJSonString(obj); log.info(“准备发送消息为:{},监控到变动时间:{}”, obj2String,System.currentTimeMillis()); //发送消息 ListenableFuture future.addCallback(new ListenableFutureCallback @Override public void onFailure(Throwable throwable) { //发送失败的处理 log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage()); } @Override public void onSuccess(SendResult //成功的处理 log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString()); log.info(TOPIC_TEST + " - 成功反馈时间:{}", System.currentTimeMillis()); } }); } } 到这里 canal监听以及kafka发送消息就完成了。后面新建两个项目去监听就可以了 新建一个项目 kafka,结构如下 xml只需要引入kafka即可,yml同理 KafkaConsumer.java package com.zks.kafka.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Optional; @Component @Slf4j public class KafkaConsumer { @Autowired private JdbcTemplate jdbcTemplate; //自定义topic public static final String TOPIC_TEST = “topic.test”; public static final String TOPIC_GROUP1 = “topic.group1”; @KafkaListener(topics = TOPIC_TEST, groupId = TOPIC_GROUP1) public void topic_test(ConsumerRecord, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { log.info(“项目kafka监听触发时间:{}”,System.currentTimeMillis()); Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { String sql = (String)message.get(); jdbcTemplate.update(sql); log.info(“项目kafka入库成功时间:{}”,System.currentTimeMillis()); ack.acknowledge(); } } } 然后再建立一个项目模拟子模块,如kafka2,结构与这个一样 测试结构展示 启动zookeeper、Kafka、conal 启动canal项目,以及其他子模块 下图是我的一个测试demo,canal绑定的也就是这台数据库 canal监听处理并发送 kafka消费并入库 查看数据库,三台数据库表中都插入了数据,修改、删除同理 测试结果、结论 Canal结合kafka可以满足需求,新增、修改、删除的操作主库同步到各个子库的时间间隔本地测试基本在一秒内。其中修改、删除基本在100毫秒内或者更快。 **注: 本次测试主库为阿里云服务器远程mysql; 子库一为我电脑window本地mysql; 子库二与主库在同一个mysql服务,不过是不同数据库而已。 Zk、kafka、canal均为本地环境。 开发量:除base服务中需要书写canal工具类以及日志转sql的工具类,子服务基本没有什么开发量。直接使用spring boot的JdbcTemple 服务直接执行sql即可。**【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取



