import java.util.Queue;
import java.util.concurrent.ConcurrentlinkedQueue;
@Component
@Slf4j
public class CanalClient implements ApplicationRunner {
@Resource
private KafkaProducer kafkaProducer;
private final static int BATCH_SIZE = 1000;
private Queue SQL_QUEUE = new ConcurrentlinkedQueue<>();
public static String SUBSCRIBE_DB_TABLE = “kaishun.tb_user”;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info(“启动canal服务,端口号:7111”);
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(“127.0.0.1”, 11111), “example”, “”, “”);
try {
//打开连接
connector.connect();
//订阅数据库表(全库全表:.… 指定库全表:库名…* 单表:库名.表名 多规则组合使用:库名1…*,库名2.表名1,库名3.表名2 (逗号分隔))
connector.subscribe(SUBSCRIBE_DB_TABLE);
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
//尝试从master那边拉去数据batchSize条记录,有多少取多少
Message message = connector.getWithoutAck(BATCH_SIZE);
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
try {
//线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//处理数据为sql
dataHandle(message.getEntries());
}
/设置队列sql语句执行最大值/
if (SQL_QUEUE.size() >= 1) {
executeQueueSql();
}
//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
System.out.println(“连接canal end”);
}
public void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
this.execute(sql);
}
}
private void dataHandle(List entrys) throws
InvalidProtocolBufferException {
for (Entry entry : entrys) {
if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
CanalEntry.EventType eventType = rowChange.getEventType();
if (eventType == CanalEntry.EventType.DELETe) {
saveDeleteSql(entry);
} else if (eventType == CanalEntry.EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == CanalEntry.EventType.INSERT) {
saveInsertSql(entry);
}
}
}
}
private void saveUpdateSql(Entry entry) {
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
List
for (CanalEntry.RowData rowdata: rowDatasList) {
List
StringBuffer sql = new StringBuffer("update " +
entry.getHeader().getTableName() + " set ");
for (int i = 0; i < newColumnList.size(); i++) {
sql.append(" " + newColumnList.get(i).getName()
- " = ‘" + newColumnList.get(i).getValue() + "’");
if (i != newColumnList.size() - 1) {
sql.append(",");
}
}
sql.append(" where ");
List
for (CanalEntry.Column column : oldColumnList) {
//暂时只支持单一主键
if (column.getIsKey()) {
sql.append(column.getName() + “= '” + column.getValue() + “’”);
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
private void saveDeleteSql(Entry entry) {
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
List
for (CanalEntry.RowData rowdata: rowDatasList) {
List
StringBuffer sql = new StringBuffer("delete from " +
entry.getHeader().getTableName() + " where ");
for (CanalEntry.Column column : columnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + “= '” + column.getValue() + “’”);
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
private void saveInsertSql(Entry entry) {
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
List
for (CanalEntry.RowData rowdata: rowDatasList) {
List
StringBuffer sql = new StringBuffer("insert into " +
entry.getHeader().getTableName() + " (");
for (int i = 0; i < columnList.size(); i++) {
sql.append(columnList.get(i).getName());
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(") VALUES (");
for (int i = 0; i < columnList.size(); i++) {
sql.append("’" + columnList.get(i).getValue() + “’”);
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(")");
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
public void execute(String sql) {
kafkaProducer.send(sql);
System.out.println(“SQL=”+sql);
}
}
KafkaProducer.java Kafka消息发送
package com.zks.canal.producer;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Component
@Slf4j
public class KafkaProducer {
@Autowired
private KafkaTemplate
//自定义topic
public static final String TOPIC_TEST = “topic.test”;
public void send(Object obj) {
String obj2String = JSONObject.toJSonString(obj);
log.info(“准备发送消息为:{},监控到变动时间:{}”, obj2String,System.currentTimeMillis());
//发送消息
ListenableFuture
future.addCallback(new ListenableFutureCallback
@Override
public void onFailure(Throwable throwable) {
//发送失败的处理
log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult
//成功的处理
log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
log.info(TOPIC_TEST + " - 成功反馈时间:{}", System.currentTimeMillis());
}
});
}
}
到这里 canal监听以及kafka发送消息就完成了。后面新建两个项目去监听就可以了
新建一个项目 kafka,结构如下
xml只需要引入kafka即可,yml同理
KafkaConsumer.java
package com.zks.kafka.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》
【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享
import java.util.Optional;
@Component
@Slf4j
public class KafkaConsumer {
@Autowired
private JdbcTemplate jdbcTemplate;
//自定义topic
public static final String TOPIC_TEST = “topic.test”;
public static final String TOPIC_GROUP1 = “topic.group1”;
@KafkaListener(topics = TOPIC_TEST, groupId = TOPIC_GROUP1)
public void topic_test(ConsumerRecord, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info(“项目kafka监听触发时间:{}”,System.currentTimeMillis());
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
String sql = (String)message.get();
jdbcTemplate.update(sql);



