这取决于您要如何处理生产者方面的异常。如果将对生产者抛出异常(例如,由于网络故障或kafka代理已死),则默认情况下流将死亡。在kafka-streams
1.1.0版中,您可以通过
ProductionExceptionHandler以下方式实现覆盖默认行为:
public class CustomProductionExceptionHandler implements ProductionExceptionHandler { @Override public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record, final Exception exception) { log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]", new String(record.value()), record.topic(), exception); return ProductionExceptionHandlerResponse.CONTINUE; } @Override public void configure(final Map<String, ?> configs) { }}从句柄方法中,
CONTINUE如果您不希望流因异常而死,则可以返回
FAIL;如果您希望流停止,则可以返回(FAIL是默认值之一)。并且您需要在流配置中指定此类:
default.production.exception.handler=com.example.CustomProductionExceptionHandler
另外要注意的是
ProductionExceptionHandler手柄只在生产异常,以及与流方法处理邮件时不会处理异常
mapValues(..),
filter(..),
branch(..)等你需要用与try
/ catch块,这些方法的逻辑(把所有的方法逻辑到try块来保证您将处理所有例外情况):
.filter((key, value) -> { try {..} catch (Exception e) {..} })据我所知,我们不需要在用户端显式处理异常,因为kafka流将在稍后自动重试使用(因为偏移量不会改变,直到消息被使用和处理为止);例如,如果kafka代理在一段时间内无法访问,您将从kafka流中获取异常,并且当断开时,kafka流将消耗所有消息。因此,在这种情况下,我们将只是延迟而没有损坏/丢失。
使用,
setUncaughtExceptionHandler您将无法更改默认行为,例如使用
ProductionExceptionHandler,则只能记录错误或将消息发送到失败主题。



