栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

TIDB - 使用 TICDC 将数据同步至下游 Kafka 中

TIDB - 使用 TICDC 将数据同步至下游 Kafka 中

一、TICDC

在上篇文章中,我们介绍了使用TICDC 将数据同步至 Mysql 中,从上个任务就可以看出,TiCDC相比于Tidb binlog 在配制上就简化了很多,而且我们也知道TICDC的性能也是优于 tidb binlog的,今天我们学习下使用TiCDC怎么将数据同步至下游Kafka中,以实现TIDB 到 ES、MongoDB、Redis等 NoSql 数据库的同步。

上篇博客地址:

https://blog.csdn.net/qq_43692950/article/details/121731278

注意:使用TiCDC ,需将TIDB版本上级至 v4.0.6 以上。

二、TICDC 配制数据同步Kafka

本篇文章接着上篇文章继续讲解,先看下现在的集群状况:


还是上篇文章中我们扩容出的CDC-server。

在上篇文章中,我们已经创建了TIDB 到 mysql 数据同步的任务,现在我们再创建一个到Kafka的同步任务:

./cdc cli changefeed create --pd=http://192.168.40.160:2379 --sink-uri='kafka://192.168.40.1:9092/tidb-cdc?kafka-version=2.6.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&protocol=canal-json' --changefeed-id="replication-task-2"

tidb-cdc:表示topic
kafka-version:下游 Kafka 版本号(可选,默认值 2.4.0,目前支持的最低版本为 0.11.0.2
kafka-client-id:指定同步任务的 Kafka 客户端的 ID(可选,默认值为 TiCDC_sarama_producer_同步任务的 ID
partition-num:下游 Kafka partition 数量(可选,不能大于实际 partition 数量。如果不填会自动获取 partition 数量。
protocol:表示输出到 kafka 消息协议,可选值有 default、canal、avro、maxwell、canal-json(默认值为 default
max-message-bytes:每次向 Kafka broker 发送消息的最大数据量(可选,默认值 64MB
replication-factor:kafka 消息保存副本数(可选,默认值 1
ca:连接下游 Kafka 实例所需的 CA 证书文件路径(可选)
cert:连接下游 Kafka 实例所需的证书文件路径(可选)
key:连接下游 Kafka 实例所需的证书密钥文件路径(可选)


已经创建成功。

使用下面命令就可以看到,所有的任务:

./cdc cli changefeed list --pd=http://192.168.40.160:2379

或者查看我们任务的详细情况:

./cdc cli changefeed query --pd=http://192.168.40.160:2379 --changefeed-id=replication-task-2

三、SpringBoot Kafka监听

添加POM依赖


    org.springframework.kafka
    spring-kafka

application

server:
  port: 8081

spring:
  kafka:
    # kafka服务器地址(可以多个)
#    bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092
    bootstrap-servers: 192.168.40.1:9092
    consumer:
      # 指定一个默认的组名
      group-id: kafkaGroup
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: earliest

      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
#      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288
      #失败重试次数
      retries: 3
      # 服务器地址
#      bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092

消费者监听事件

@Slf4j
@Component
public class Jms_Consumer {

    @KafkaListener(topics = "tidb-cdc")
    public void receive4(ConsumerRecord consumer) throws Exception {
        System.out.println("tidb tidb-cdc  Listener >> ");
        JSonObject jsonObject = JSONObject.parseObject(new String(consumer.value()));
        String type = jsonObject.getString("type");
        String db = jsonObject.getString("database");
        String table = jsonObject.getString("table");
        String data = jsonObject.getString("data");
        log.info("操作类型:{}",type);
        log.info("数据库:{}",db);
        log.info("数据表:{}",table);
        log.info("更新后数据:{}",data);
    }

}

四、测试数据同步

向TIDB中插入数据:

insert into user(name,age) value('bxc','25');

kafka接受JSON

{
	"id": 0,
	"database": "testdb",
	"table": "user",
	"pkNames": ["id"],
	"isDdl": false,
	"type": "INSERT",
	"es": 1638698748819,
	"ts": 0,
	"sql": "",
	"sqlType": {
		"age": -5,
		"id": -5,
		"name": 12
	},
	"mysqlType": {
		"age": "int",
		"id": "int",
		"name": "varchar"
	},
	"data": [{
		"age": "25",
		"id": "242219",
		"name": "bxc"
	}],
	"old": [null]
}


更新数据:

update user set age=24 where name = 'bxc';

Kafka接受JSON

{
	"id": 0,
	"database": "testdb",
	"table": "user",
	"pkNames": ["id"],
	"isDdl": false,
	"type": "UPDATE",
	"es": 1638699660093,
	"ts": 0,
	"sql": "",
	"sqlType": {
		"age": -5,
		"id": -5,
		"name": 12
	},
	"mysqlType": {
		"age": "int",
		"id": "int",
		"name": "varchar"
	},
	"data": [{
		"age": "24",
		"id": "242216",
		"name": "bxc"
	}],
	"old": [{
		"age": "23",
		"id": "242216",
		"name": "bxc"
	}]
}


删除数据:

delete from user where name = 'bxc';

Kafka接受JSON

{
	"id": 0,
	"database": "testdb",
	"table": "user",
	"pkNames": ["id"],
	"isDdl": false,
	"type": "DELETE",
	"es": 1638699773943,
	"ts": 0,
	"sql": "",
	"sqlType": {
		"age": -5,
		"id": -5,
		"name": 12
	},
	"mysqlType": {
		"age": "int",
		"id": "int",
		"name": "varchar"
	},
	"data": [{
		"age": "25",
		"id": "242218",
		"name": "bxc"
	}],
	"old": [{
		"age": "25",
		"id": "242218",
		"name": "bxc"
	}]
}

五、扩展

停止同步任务:

./cdc cli changefeed pause --pd=http://192.168.40.160:2379 --changefeed-id replication-task-2

删除同步任务

./cdc cli changefeed remove --pd=http://192.168.40.160:2379 --changefeed-id replication-task-2


喜欢的小伙伴可以关注我的个人微信公众号,获取更多学习资料!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/652404.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号