在发送kafak 消息时,用的kafkaTemplate.send(), 返回的是个Future对象,没有get()去阻塞,发送方法是异步的。但是有次在切内网测试时发现这个方法被阻塞了。经过不断调试,发现当第一次启动发送消息时,如果有网络问题,获取卡夫卡服务器不可达,会造成阻塞。
经查阅文档,
发现第一次发送消息时生产者回请求kafak服务端,获取该主题的元数据 metadata,metadata 内容包括了主题相关分区 Leader 所在节点信息、副本所在节点信息、ISR 列表等,Kafka Producer 获取 metadata 后,便会根据 metadata 内容将消息发送到指定的分区 Leader,Kafka Producer 在发送消息之前,会检查主题的 metadata 是否需要更新,如果需要更新,则会唤醒 Sender 线程并发送 metatadata 更新请求,此时 Kafka Producer 主线程则会阻塞等待 metadata 的更新。
我当时没有正确的切换网络导致第一次请求不到metatadata ,从而导致阻塞的。
ListenableFuture> future = kafkaTemplate.send(topic,obj2String); future.addCallback(new ListenableFutureCallback >() { @Override public void onFailure(Throwable throwable) { //发送失败的处理 } @Override public void onSuccess(SendResult stringObjectSendResult) { //成功的处理 } });



