栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

Kafka-使用高级使用者的延迟队列实施

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka-使用高级使用者的延迟队列实施

解决此问题的一种方法是使用不同的主题,在其中推送所有将要延迟的消息。如果所有延迟的消息都应在相同的时间延迟之后进行处理,这将很简单:

while(it.hasNext()) {    val message = it.next().message()    if(shouldBeDelayed(message)) {        val delay = 24 hours        val delayTo = getCurrentTime() + delay        putMessageonDelayedQueue(message, delay, delayTo)    }    else {       process(message)    }    consumer.commitOffset()}

现在将尽快处理所有常规消息,而需要延迟的消息将放在另一个主题上。

令人高兴的是,我们知道延迟主题开头的消息是应该首先处理的消息,因为其delayTo值将是最小的。因此,我们可以设置另一个读取头消息的使用者,检查时间戳是否在过去,如果是,则处理该消息并提交偏移量。如果不是,它不会提交偏移量,而是一直休眠直到那个时间:

while(it.hasNext()) {    val delayedMessage = it.peek().message()    if(delayedMessage.delayTo < getCurrentTime()) {        val readMessage = it.next().message        process(readMessage.originalMessage)        consumer.commitOffset()    } else {        delayProcessingUntil(delayedMessage.delayTo)    }}

如果存在不同的延迟时间,则可以按延迟划分主题(例如24小时,12小时,6小时)。如果延迟时间比该时间更动态,那么它将变得更加复杂。您可以通过引入两个延迟主题来解决。从延迟主题中读取所有消息,

A
并处理所有
delayTo
值都为过去的消息。除其他外,您只需找到最接近的一个,
delayTo
然后将它们放在主题上即可
B
。休眠直到应该处理最接近的消息为止,然后相反地进行所有处理,即处理来自主题的消息,
B
然后将尚未处理的消息放回主题
A

回答您的特定问题(在您的问题的注释中已经解决了一些问题)

  1. 提交每个偏移量可能会使ZK变慢

您可以考虑切换到在Kafka中存储偏移量(自0.8.2起可用的功能,请

offsets.storage
在消费者配置中检出属性)

  1. Consumer.commitOffsets是否可以引发异常?如果是,我将使用同一条消息两次(可以解决幂等消息)

我认为,例如,如果它不能与偏移存储进行通信,则可以。正如您所说,使用幂等消息可以解决此问题。

  1. 等待较长时间而不提交偏移量的问题,例如延迟时间为24小时,将从迭代器中获取下一个,睡眠24小时,进行处理并提交(ZK会话超时?)

除非消息本身的处理花费的时间超过会话超时,否则上述解决方案不会有问题。

4.
ZK会话如何在不提交新偏移的情况下保持活动?(设置一个配置单元zookeeper.session.timeout.ms可以解决死掉的消费者而又不认识它)

同样,使用上述方法,您无需设置长时间的会话超时。

  1. 我还有其他问题吗?

总是有;)



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/570284.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号