栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spring AMQP 笔记

Spring AMQP 笔记

这篇博客是博主学习的笔记,可能说的不是很清楚,也可能会有错误,多担待

AMQP协议

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议

概念
  • Publisher:发布者
  • Consumer:消费者
  • Message:消息
  • Queue:消息队列
  • Exchange:交换机,把发送的消息按照规则发送到符合规则的消息队列中,起消息路由作用
    • fanout类型:将消息发送到绑定的所有队列中
    • direct类型:将消息发送到routeKey相符合的队列中(默认)
    • topic类型:将消息发送到routeKey相符合的队列中,支持通配符
  • QueueBinding:绑定关系,指定队列与交换机绑定关系
  • 用户:要有在RabbitMQ注册的用户才能发送消息,有账号和密码
  • 虚拟主机:每个用户可以设置虚拟主机,用户隔离环境
工作过程
  1. 发布者(Publisher)发布消息(Message),经由交换机(Exchange)。
  2. 交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。
  3. 最后 AMQP 代理会将消息投递给订阅了此队列的消费者(Consumer),或者消费者按照需求自行获取。
SpringAMQP

依赖:


	org.springframework.boot
	spring-boot-starter-amqp
	2.5.5

配置文件:

spring:
  rabbitmq:
    host: 180.76.137.232
    port: 5672
    username: admin
    password: admin
    virtual-host: /

代码:
普通队列模式:
发布者:

@RestController
public class ProducerController {

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@GetMapping("sendSimple")
	public String sendSimple(String name, int age) {
		String message = String.format("my name is %s, %d years old!", name, age);
		rabbitTemplate.convertAndSend("simple", message);
		System.err.println("send simple message : " + message);
		return "ok";
	}
	
}

消费者:

@Component
public class ConsumerListener {

	// 需要主动创建队列
	@Bean
	public Queue queue() {
		return new Queue("simple", true);
	}

	@RabbitListener(queues = "simple")
	public void receiveSimpleQueue(String message) {
		System.err.println("receive simple message : " + message);
	}

}

交换机模式:
发布者:

@RestController
public class ProducerController {

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@GetMapping("sendDirect")
	public String sendDirect(String key, String name, int age) {
		String message = String.format("my name is %s, %d years old!", name, age);
		rabbitTemplate.convertAndSend("direct.exchange", key, message);
		System.err.println("send direct message [key=" + key + "] : " + message);
		return "ok";
	}

	@GetMapping("sendFanout")
	public String sendFanout(String name, int age) {
		String message = String.format("my name is %s, %d years old!", name, age);
		rabbitTemplate.convertAndSend("fanout.exchange","", message);
		System.err.println("send fanout message : " + message);
		return "ok";
	}

}

消费者:

@Component
public class ConsumerListener {

	// 不需要主动创建队列
	@RabbitListener(bindings = @QueueBinding(
			value = @org.springframework.amqp.rabbit.annotation.Queue("direct.queue"),
			exchange = @Exchange(value = "direct.exchange"),
			key = {"china", "news"}
	))
	public void receiveDirectQueue(String message) {
		System.err.println("receive direct message : " + message);
	}

	@RabbitListener(bindings = @QueueBinding(
			value = @org.springframework.amqp.rabbit.annotation.Queue("fanout1.queue"),
			exchange = @Exchange(value = "fanout.exchange", type = "fanout")
	))
	public void receiveFanoutQueue1(String message) {
		System.err.println("receive fanout1 message : " + message);
	}

	@RabbitListener(bindings = @QueueBinding(
			value = @org.springframework.amqp.rabbit.annotation.Queue("fanout2.queue"),
			exchange = @Exchange(value = "fanout.exchange", type = "fanout")
	))
	public void receiveFanoutQueue2(String message) {
		System.err.println("receive fanout2 message : " + message);
	}

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350314.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号