配置文件创建Sink创建Source创建消费者创建生产者启动服务后RabbitMq页面显示交换机情况队列情况调用效果
快速根据代码进行上手,后面补充原理。。。。
配置文件spring.rabbitmq.host=****** spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ #消费者开启延时队列支持 spring.cloud.stream.rabbit.bindings.input-channe.consumer.delayed-exchange=true #生产者开启延时队列支持 spring.cloud.stream.rabbit.bindings.onput-channe.producer.delayed-exchange=true #指定消息所属exchange spring.cloud.stream.bindings.input_channel.destination=my-exchange #指定消费者分组,在多实例的时候必需指定,防止重复消费 spring.cloud.stream.bindings.input_channel.group=my-mq #指定消息所属exchange spring.cloud.stream.bindings.output_channel.destination=my-exchange spring.cloud.stream.bindings.output_channel.group=my-mq创建Sink
public interface MySink {
String input_channel = "input_channel";
@Input(input_channel)
SubscribableChannel in();
}
创建Source
public interface MySource {
String output_channel = "output_channel";
@Output(output_channel)
MessageChannel out();
}
创建消费者
@Component
@EnableBinding(MySink.class)
@Log4j2
public class MyConsumer {
@StreamListener(MySink.input_channel)
public void input(Message
创建生产者
@EnableBinding(MySource.class)
@Component
@Log4j2
public class MySender {
@Autowired
private MySource source;
public String sendDelayedMessage(String body,
Integer seconds) {
Map message = new HashMap<>();
message.put("body", body);
source.out().send(
MessageBuilder.withPayload(message)
.setHeader("x-delay", seconds * 1000)
.build()
);
log.info("发送延迟消息成功");
return "SUCCESS";
}
}
启动服务后RabbitMq页面显示交换机情况
队列情况
调用
@RestController
@RequestMapping("/delayed")
@Log4j2
public class DelayedCtrl {
@Autowired
private MySender sender;
@GetMapping("/delayed")
public String sendDelayedMessage(@RequestParam("body") String body,
@RequestParam("seconds") Integer seconds) {
sender.sendDelayedMessage(body,seconds);
return "SUCCESS";
}
}
效果



