栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

记一次kafak异步发送变同步阻塞的问题

记一次kafak异步发送变同步阻塞的问题

在发送kafak 消息时,用的kafkaTemplate.send(), 返回的是个Future对象,没有get()去阻塞,发送方法是异步的。但是有次在切内网测试时发现这个方法被阻塞了。经过不断调试,发现当第一次启动发送消息时,如果有网络问题,获取卡夫卡服务器不可达,会造成阻塞。
经查阅文档,

发现第一次发送消息时生产者回请求kafak服务端,获取该主题的元数据 metadata,metadata 内容包括了主题相关分区 Leader 所在节点信息、副本所在节点信息、ISR 列表等,Kafka Producer 获取 metadata 后,便会根据 metadata 内容将消息发送到指定的分区 Leader,Kafka Producer 在发送消息之前,会检查主题的 metadata 是否需要更新,如果需要更新,则会唤醒 Sender 线程并发送 metatadata 更新请求,此时 Kafka Producer 主线程则会阻塞等待 metadata 的更新。

我当时没有正确的切换网络导致第一次请求不到metatadata ,从而导致阻塞的。

 ListenableFuture> future = kafkaTemplate.send(topic,obj2String);
        future.addCallback(new ListenableFutureCallback>() {
            @Override
            public void onFailure(Throwable throwable) {

                //发送失败的处理
            
            }

            @Override
            public void onSuccess(SendResult stringObjectSendResult) {
                //成功的处理
             
            }
        });
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761484.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号