由于
onMessage()不允许抛出已检查的异常,因此可以将异常包装在中
RuntimeException并重新抛出。
try { testService.process(message);} catch (BusinessException e) { throw new RuntimeException(e);}但是请注意,这可能导致消息无限期地重新发送。这是这样的:
RabbitMQ支持拒绝消息并要求代理重新排队。这显示在这里。但是RabbitMQ本身没有重试策略的机制,例如设置最大重试次数,延迟等。
使用Spring
AMQP时,“拒绝时重新排队”是默认选项。
SimpleMessageListenerContainer如果有未处理的异常,Spring
将默认执行此操作。因此,在您的情况下,您只需要重新引发捕获的异常即可。但是请注意,如果您无法处理消息并且总是抛出异常,则它将无限期地重新发送,并导致无限循环。
您可以通过引发
AmqpRejectAndDontRequeueException异常来覆盖每个消息的此行为,在这种情况下,不会重新排队该消息。
您也可以
SimpleMessageListenerContainer通过设置完全关闭“拒绝时重新排队”行为
container.setDefaultRequeueRejected(false)
如果在RabbitMQ中设置了一条消息,则该消息被拒绝但不重新排队将丢失或转移到DLQ。
如果您需要具有最大尝试次数,延迟次数等的重试策略,最简单的方法是设置一个弹簧“无状态”
RetryOperationsInterceptor,它将在线程内进行所有重试(使用
Thread.sleep()),而不会拒绝每次重试的消息(因此无需为每个重试返回RabbitMQ重试)。重试用尽时,默认情况下将记录一条警告,并且该消息将被使用。如果要发送到DLQ,则需要一个
RepublishMessageRecoverer或
MessageRecoverer不接受消息而无需重新排队的自定义(在后一种情况下,您还应该在队列上设置
RabbitMQ DLQ)。默认消息恢复程序的示例:
container.setAdviceChain(new Advice[] { org.springframework.amqp.rabbit.config.RetryInterceptorBuilder .stateless() .maxAttempts(5) .backOffOptions(1000, 2, 5000) .build()});显然,这样做的缺点是您将在重试的整个过程中占用线程。您还可以选择使用“有状态”
RetryOperationsInterceptor,该状态将在每次重试时将消息发送回RabbitMQ,但是延迟仍将
Thread.sleep()在应用程序内实现,此外,设置有状态拦截器会更加复杂。
因此,如果您希望在不占用任何延迟的情况下重试,
Thread则需要在RabbitMQ队列上使用TTL进行更复杂的自定义解决方案。如果您不希望出现指数退缩(因此每次重试不会增加延迟),则要简单一些。要实现这种解决方案,您基本上可以在RabbitMQ上创建另一个带有参数的队列:
"x-message-ttl": <delay time in milliseconds>和
"x-dead-letter-exchange":"<name of theoriginal queue>"。然后在您设置的主队列上
"x-dead-letter-exchange":"<name of the queuewith theTTL>"。因此,现在当您拒绝但不重新排队时,RabbitMQ会将其重定向到第二个队列。TTL过期时,它将被重定向到原始队列,然后重新传递到应用程序。因此,现在您需要一个重试拦截器,该拦截器在每次失败后拒绝发送给RabbitMQ的消息,并跟踪重试计数。为了避免需要在应用程序中保留状态(因为如果您的应用程序是集群的,则需要复制状态),则可以根据
x-deathRabbitMQ设置的标头计算重试计数。在此处查看有关此标头的更多信息。因此,在这一点上,实现自定义拦截器比通过这种行为自定义Spring有状态拦截器要容易得多。
另请参阅Spring AMQP参考中有关重试的部分。



