由于我我们现在所作的项目有有很多的外放接口供代理商调用,但是有些接口的响应并不是实时返回的,此时我们就需要以回调接口的方式,将信息响应给代理商。在这期间可能会出网络不稳定等其他情况,导致回调接口调用失败。所以需要特定的回调重试机制。这个机制参考了支付宝的通知模式。
思考过程: 回调本身很简单,只需代理商按要求实现我们的接口便可,但是重试机制比较麻烦,他的重试时间间隔不固定,而且到后面时间的间隔过长,使用定时任务达到的效果也并不理想,最后,通过使用rabbitmq,死信队列的特性,完美的实现了这个重试机制(我认为的,不知道有没有更好的办法),解决方案见下。
解决方案: 首先,rabbitmq的消息如果设置了ttl(ttl对应我们重试的间隔时间),在ttl时间内没有被消费,就自动进入死信队列,此时我们就消费死信队列里面的消息,这样正好达到消息延迟消费的效果,那么,如何实现不同的重试间隔:我们把七次重试的间隔,保存到数组里面,并用redis记录消息的重试次数取对应的数组下标,当我们重试失败的话,就通过redis拿到次数,在数组中取出时间,设置到消息里面,重新发送到队列中。到此,重试机制就完成了。下面是代码实现。
代码实现:
这里是,队列的配置
@Configuration
public class RabbitGatewayCallbackConfig {
@Bean
public DirectExchange gatewayCallbackDelayExchange() {
return new DirectExchange(ExchangeConst.GATEWAY_CALLBACK_DELAY_EXCHANGE);
}
@Bean
public Queue gatewayCallbackQueue() {
return new Queue(QueueConst.GATEWAY_CALLBACK_QUEUE,true,false,false);
}
@Bean
public Binding gatewayCallbackBinding() {
return BindingBuilder.bind(gatewayCallbackQueue()).to(gatewayCallbackDelayExchange()).with(RoutingConstant.GATEWAY_CALLBACK_ROUTING);
}
@Bean
public Queue gatewayCallBufferQueue() {
Map args = new HashMap<>();
//args.put("x-message-ttl", "10000");由于延迟回调时间不固定,所以禁用此配置
args.put("x-dead-letter-exchange", ExchangeConst.GATEWAY_CALLBACK_DELAY_EXCHANGE);
args.put("x-dead-letter-routing-key", RoutingConstant.GATEWAY_CALLBACK_ROUTING);
return new Queue(QueueConst.GATEWAY_CALLBACK_BUFFER_QUEUE, true, false, false, args);
}
}
如何为消息设置ttl
public static Message getMessage(String JsonStringData, int index) {
if (index > CALLBACK_INTERVAL.length - 1) {
throw new RuntimeException("index 不可大于数组的长度");
}
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration(CALLBACK_INTERVAL[index]);//设置失效时间,取对应的数组下标
return new Message(JsonStringData.getBytes(StandardCharsets.UTF_8), messageProperties);
}
具体业务的代码,由于涉及到公司的业务就不贴出来了,基本的思路就是,当我们回调失败的话,就会对回调的内容进行封装,并保存到消息里面,设置一个成员数组,保存重试之间间隔,然后在redis上设置记录该消息重试次数的key值,每次重试都会,取数组的中的值设置到消息里面,再加一直到第七次不再继续进行重试(记得为redis设置ttl,防止业务异常,而key一直存在没删掉)。



