栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

canal Failed to update metadata after 60000 ms 等问题解决

canal Failed to update metadata after 60000 ms 等问题解决

问题排查了一天,很长

问题描述: 需求

需要将mysql数据通过canal传入kafka
测试将正则匹配的表按照字段动态存入不同topic中
例如存在两类表
A_1 , A_2 , A_3 , B_1 , B_2 , B_3
A类表 -> topic_1
B类表 -> topic_2


canal最初对应配置如下
canal.instance.filter.regex = test.A,test.B
# kafka topic对应partition数据量 不然会将数据都发到一个partition中 
canal.mq.partitionsNum=3
#canal.mq.partition=0 即 将数据都发到partition_0中
#kafka topic 
# 使用partitionsNum 必须要设置表字段hash 不然不会将数据分发至不同partition中, ^为拼接两个字段取hash
canal.mq.partitionHash = test.A:FPQQLSH,test.B:FPQQLSH^SEQUENCE_NR
# 动态topic 每个topic及其后面对应的表正则,用逗号分隔 test1对用test库A表  test2 topic对应B表    
canal.mq.dynamicTopic = test1:test.A,test2:test.B

执行一段时间后就会报错如下

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms

由于处于测试阶段,将canal 删除zookeeper元数据及重启后并没有效果。


问题分析 查看日志example.log
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
2022-02-07 00:01:48.113 [pool-4-thread-1] ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
	at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:215) ~[canal.server-1.1.4.jar:na]
	at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:179) ~[canal.server-1.1.4.jar:na]
	at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:117) ~[canal.server-1.1.4.jar:na]
	at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:183) [canal.server-1.1.4.jar:na]
	at com.alibaba.otter.canal.server.CanalMQStarter.access$500(CanalMQStarter.java:23) [canal.server-1.1.4.jar:na]
	at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:225) [canal.server-1.1.4.jar:na]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_282]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_282]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_282]
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
	at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1150) ~[kafka-clients-1.1.1.jar:na]
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:846) ~[kafka-clients-1.1.1.jar:na]
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784) ~[kafka-clients-1.1.1.jar:na]
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:671) ~[kafka-clients-1.1.1.jar:na]
	at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:199) ~[canal.server-1.1.4.jar:na]
	... 8 common frames omitted

看不出问题原因

查看canal.log
WARN [Producer clientId=producer-8] Error while fetching metadata with correlation id xxx: {=UNKNOWN_TOPIC_OR_PARTITION}

最初显示 空格 = UNKNOWN_TOPIC_OR_PARTITION
后来突然想到是kafka配置关闭了自动创建topic,修改配置为true

在将配置改为true后还是同样报错

WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id XXX: {=INVALID_TOPIC_EXCE}

想到空值肯定是不能作为topic名啊,为什么会有空值?

重新查看canal配置
想到mysql ddl 以及dml
我只需要dml INSERT UPDATE DELETE相关记录
由于使用动态分区,指定hash字段,而ddl读取数据格式为以下格式,data为null。

{
  "data": null,
  "database": "test",
  "es": 1636952896000,
  "id": 2389,
  "isDdl": true,
  "mysqlType": null,
  "old": null,
  "pkNames": null,
  "sql": "ALTER TABLE `test`.`demo10` rnCHANGE COLUMN `addr` `address` varchar(255) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL AFTER `age`",
  "sqlType": null,
  "table": "demo10",
  "ts": 1636952896772,
  "type": "ALTER"
}

于是添加 canal.instance.filter.query.ddl= true 想要过滤掉ddl相关数据

canal.instance.filter.regex = test.A,test.B
# kafka topic对应partition数据量 不然会将数据都发到一个partition中 
canal.mq.partitionsNum = 3
#canal.mq.partition=0 即 将数据都发到partition_0中
#kafka topic 
# 使用partitionsNum 必须要设置表字段hash 不然不会将数据分发至不同partition中, ^为拼接两个字段取hash
canal.mq.partitionHash = test.A:FPQQLSH,test.B:FPQQLSH^SEQUENCE_NR
# 动态topic 每个topic及其后面对应的表正则,用逗号分隔 test1对用test库A表  test2 topic对应B表    
canal.mq.dynamicTopic = test1:test.A,test2:test.B
canal.instance.filter.query.ddl = true

但重启canal后执行查看还是报错如下:

parse faield : CREATE DEFINER = `root`@`%` PROCEDURE 'getWsTotal'(
	......

com.alibaba.fastsql.sql.parser.ParserException: syntax error, error in : 'cur ;
	read_loop : loop
	......


不是已经过滤了ddl 难道dml里面也有东西?
想到将匹配不到的数据放入默认topic中 于是创建topic test
(看到下面成功后test topic中的数据 “isDdl”: false,

最终配置更改为

canal.instance.filter.regex = test.A,test.B
# kafka topic对应partition数据量 不然会将数据都发到一个partition中 
canal.mq.partitionsNum = 3
#canal.mq.partition=0 即 将数据都发到partition_0中
#kafka topic 
# 使用partitionsNum 必须要设置表字段hash 不然不会将数据分发至不同partition中, ^为拼接两个字段取hash
canal.mq.partitionHash = test.A:FPQQLSH,test.B:FPQQLSH^SEQUENCE_NR
# 动态topic 每个topic及其后面对应的表正则,用逗号分隔 test1对用test库A表  test2 topic对应B表    
canal.mq.dynamicTopic = test1:test.A,test2:test.B
canal.instance.filter.query.ddl = true
canal.mq.topic= test

可以查看到数据
果然存在数据传入默认的test topic中 ,data还是null 导致的

{
  "data": null,
  "database": "",
  "es": 1644301624000,
  "id": 800,
  "isDdl": false,
  "mysqlType": null,
  "old": null,
  "pkNames": null,
  "sql": "CREATE DEFINERu003d`root`@`%` PROCEDURE `_Navicat_Temp_Stored_Proc`(IN iqsri varchar(20),IN iedri varchar(20))nBEGINrntdeclare c varchar(20);rn  declare total int default 0;   rn  declare done int default false;   rn  declare cur cursor for select DISTINCT CONCAt(u0027tbl_ec_document_u0027,ssb) as tablename from tbl_jtxx;   rn  declare continue HANDLER for not found set done u003d true;rntset @iqsriu003diqsri;rntset @iedriu003diedri;rntset @tempsql u003du0027u0027;rntset @tempsqlend u003du0027u0027;rntIF (iqsri is not null)rntTHENrnttSET @tempsqlendu003dCONCAt(@tempsqlend," and CREATE_DATEu003eu003du0027",@iqsri,"u0027 ");rntend IF;rntIF (iedri is not null)rntTHENrnttSET @tempsqlendu003dCONCAt(@tempsqlend," and CREATE_DATEu003cu003du0027",@iedri,"u0027 ");rntend IF;rntopen cur;   rntread_loop:loop   rntfetch cur into c;    rntif done then  rntttleave read_loop; rntend if;    rntset @tempsqlu003dCONCAt(@tempsql," select nsrsbh,nsrmc,lrfs,count(fpqqlsh) as sl,issue_err_msgrnttttt  from ",c," where issued u003d u00279u0027 ",@tempsqlend,"group by nsrsbh, issue_err_msg  UNIOn ALL"); rntend loop;    rntclose cur;rntset @tempsqlu003dSUBSTr(@tempsql,1,(CHAR_LENGTH(@tempsql)-9));rnrntset @tempsqlu003dCONCAt(@tempsql," order by issue_err_msg  desc");rn   #select @tempsql;rntprepare stmt from @tempsql;rntEXECUTE stmt;rntdeallocate prepare stmt; rnEND",
  "sqlType": null,
  "table": "",
  "ts": 1644301624400,
  "type": "QUERY"
}

解决办法:

canal动态根据表传入不同topic配置 类似如下

canal.instance.filter.regex = test.A,test.B
# kafka topic对应partition数据量 不然会将数据都发到一个partition中 
canal.mq.partitionsNum = 3
#canal.mq.partition=0 即 将数据都发到partition_0中
#kafka topic 
# 使用partitionsNum 必须要设置表字段hash 不然不会将数据分发至不同partition中, ^为拼接两个字段取hash
canal.mq.partitionHash = test.A:FPQQLSH,test.B:FPQQLSH^SEQUENCE_NR
# 动态topic 每个topic及其后面对应的表正则,用逗号分隔 test1对用test库A表  test2 topic对应B表    
canal.mq.dynamicTopic = test1:test.A,test2:test.B
# ddl dcl 
canal.instance.filter.query.ddl = true
canal.mq.topic= test
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/730336.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号