1.收到日志异常报警,一个Flink任务写入kafka消息出现了异常,异常消息如下
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: The message is 1602187 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:766) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)原因分析
1.看堆栈信息,发送的消息 大概1.6m,单条消息太大了,超出了kakfa的设置,查下kafka客户端默认配置
这里默认是1m,发送的消息超出了默认值
- 设置生产者客户端配置 ,这里设置为2m
Properties properties = new Properties();
properties.setProperty("max.request.size","2097152");
2.测试验证
在本地测试环境,测试依然报错,
依然没有效果,猜测服务端给限制了,就网上查了查资料,服务端也需要配置
3.kafka broker端配置
- topic级别配置,只对topic生效,直接使用 kafak-config.sh命令设置,立马生效,不需要重启kafka 服务端
max.message.bytes
全局配置,需要重启kakfa 服务端
message.max.bytes设置完成,再次验证
消息发送成功
5.消费者
对消费者没有影响,消费者可以正常消费消息,但是如果消息都是特别大,可能需要稍微调整下,不然消费者批量拉去就会拉取很少的
消息
kafka broker 0.10.11
kafak client 0.10.11



