ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 12126826 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
第一阶段分析本来以为问题出在kafka接收了过大的消息,导致消费端消费不掉。如果是这样的话,修改kafka的如下参数就能解决。
producer端: max.request.size=5242880(5M) broker: message.max.bytes=6291456(6M) consumer: fetch.max.bytes=7340032(7M)
但是,经过排查kafaka的配置文件,发现用的全都是默认的。默认的参数一般不会出现 生产者参数过大,消费者参数过小,导致消息进入kafka后无法消费的情况。
另外,kafka没有报错说有大消息卡住的情况,说明大消息并没有进入kafka。
第二阶段分析日志很明显生产者无法将大消息放入kafka,而canal恰恰就是生产者,那说明应该是canal端对进入kafka的消息长度做了限制。排查日志,这里果然做了限制。报错日志显示12M的消息无法进入kafka,而目前的配置消息上限只有10M,我们把它修改到15M。于是修改配置,canal.mq.maxRequestSize=15728640
canal.properties 对kafka的完整配置如下:
canal.mq.servers = xxxx1:9099,xxx2:9099,xxx3:9099 canal.mq.retries = 10 canal.mq.batchSize = 16384 #canal.mq.maxRequestSize = 10485760 canal.mq.maxRequestSize = 15728640 canal.mq.lingerMs = 100 canal.mq.bufferMemory = 33554432 canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all #canal.mq.properties. = canal.mq.producerGroup = test # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = local
重启canal集群,报错日志消失,问题解决!



