栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

在Kafka流中处理异常

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

在Kafka流中处理异常

这取决于您要如何处理生产者方面的异常。如果将对生产者抛出异常(例如,由于网络故障或kafka代理已死),则默认情况下流将死亡。在kafka-streams
1.1.0版中,您可以通过

ProductionExceptionHandler
以下方式实现覆盖默认行为:

public class CustomProductionExceptionHandler implements ProductionExceptionHandler {    @Override    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,         final Exception exception) {        log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]",  new String(record.value()), record.topic(), exception);        return ProductionExceptionHandlerResponse.CONTINUE;    }    @Override    public void configure(final Map<String, ?> configs) {    }}

从句柄方法中,

CONTINUE
如果您不希望流因异常而死,则可以返回
FAIL
;如果您希望流停止,则可以返回(FAIL是默认值之一)。并且您需要在流配置中指定此类:

default.production.exception.handler=com.example.CustomProductionExceptionHandler

另外要注意的是

ProductionExceptionHandler
手柄只在生产异常,以及与流方法处理邮件时不会处理异常
mapValues(..)
filter(..)
branch(..)
等你需要用与try
/ catch块,这些方法的逻辑(把所有的方法逻辑到try块来保证您将处理所有例外情况):

.filter((key, value) -> { try {..} catch (Exception e) {..} })

据我所知,我们不需要在用户端显式处理异常,因为kafka流将在稍后自动重试使用(因为偏移量不会改变,直到消息被使用和处理为止);例如,如果kafka代理在一段时间内无法访问,您将从kafka流中获取异常,并且当断开时,kafka流将消耗所有消息。因此,在这种情况下,我们将只是延迟而没有损坏/丢失。

使用,

setUncaughtExceptionHandler
您将无法更改默认行为,例如使用
ProductionExceptionHandler
,则只能记录错误或将消息发送到失败主题。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/469962.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号