- 安装rabbitMQ
- 1. 安装erlang的yum仓库
- 2. 安装rabbitMQ的yum仓库
- 3. 启动rabbitMQ服务
- 4. 添加用户,开放端口
- rabbitMQ四种交换机
- 1. Direct Exchange
- 2. Fanout Exchange
- 3. Topic Exchange
- 4. Headers Exchanges
- springboot整合RabbitMQ
- 1. 添加maven依赖
- 2. yml配置对应参数
- 3. 项目中使用其他配置
- 4. 使用示例
需要rabbitMQ版本与erlang版本匹配,最好是从官网下载
1. 安装erlang的yum仓库curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash yum install -y epel-release yum install -y e2. 安装rabbitMQ的yum仓库
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash yum install -y r3. 启动rabbitMQ服务
systemctl start rabbitmq-server rabbitmq-plugins enable rabbitmq_management4. 添加用户,开放端口
rabbitmqctl add_user admin admin #添加用户 rabbitmqctl add_user_tags admin administrator #给用户增加标签 rabbitmqctl set_permissions -p '/' 'admin' '.*' '.*' '.*' #给用户增加访问权限 #开放5672、15672端口rabbitMQ四种交换机 1. Direct Exchange
将队列绑定到交换机上,同一个队列可以用不同的路由键绑定同一个交换机,消息发送需要路由键完全匹配。
2. Fanout Exchange扇形交换机接收到的消息,会分发到所有绑定到该交换机的所有队列,与路由键不相关。
3. Topic Exchange主题交换机,支持路由键通过通配符匹配。
4. Headers Exchanges不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。
匹配规则x-match有下列两种类型:
x-match = all :表示所有的键值对都匹配才能接受到消息
x-match = any :表示只要有键值对匹配就能接受到消息
springboot整合RabbitMQ 1. 添加maven依赖
org.springframework.boot
spring-boot-starter-amqp
2.5.2
2. yml配置对应参数
spring:
rabbitmq:
host: 192.168.0.203
port: 5672
username: admin
password: admin
virtual-host: /test
3. 项目中使用其他配置
public class RabbitConnection extends AbstractConnectionFactory {
public RabbitConnection(ConnectionFactory rabbitConnectionFactory) {
super(rabbitConnectionFactory);
}
@Override
public Connection createConnection() throws AmqpException {
return this.createBareConnection();
}
}
获取rabbitTemplate,可以通过注解获取yml配置的链接,也可以自己配置
@Component
public class RabbitTemplateFactory {
@Autowired
private RabbitTemplate rabbitTemplate;
public RabbitTemplate getRabbitTemplate(){
return this.rabbitTemplate;
}
public RabbitTemplate getRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplateMy = new RabbitTemplate();
rabbitTemplateMy.setConnectionFactory(connectionFactory);
rabbitTemplateMy.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplateMy;
}
public RabbitTemplate getRabbitTemplate(String host,int port,String username,String password,String virHost){
com.rabbitmq.client.ConnectionFactory rabbitConnection = new com.rabbitmq.client.ConnectionFactory();
rabbitConnection.setHost(host);
rabbitConnection.setPort(port);
rabbitConnection.setUsername(username);
rabbitConnection.setPassword(password);
rabbitConnection.setVirtualHost(virHost);
RabbitConnection connection = new RabbitConnection(rabbitConnection);
RabbitTemplate rabbitTemplateMy = new RabbitTemplate();
rabbitTemplateMy.setConnectionFactory(connection);
rabbitTemplateMy.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplateMy;
}
public RabbitAdmin getRabbitAdmin(){
return new RabbitAdmin(this.rabbitTemplate);
}
public RabbitAdmin getRabbitAdmin(RabbitTemplate rabbitTemplate){
return new RabbitAdmin(rabbitTemplate);
}
}
4. 使用示例
@RestController
@RequestMapping("mes/rabbitDemo")
@Api(tags="rabbitMQ使用demo")
public class RabbitMQDemo {
//如果使用默认配置,可以直接使用rabbitTemplate,如果有其他不同配置可以使用下面自己配置
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitTemplateFactory rabbitTemplateFactory;
@GetMapping("sendMessageByRoute")
@ApiOperation("通配符匹配路由")
public Result sendMessageByRoute(){
JSONObject o = new JSONObject();
o.put("test1","按鼠标");
o.put("test2","发生了");
o.put("是客服","sdfkj");
RabbitTemplate rabbitTemplate1 = rabbitTemplateFactory.getRabbitTemplate("192.168.0.203",5672,"admin","admin","/test");
rabbitTemplate1.convertAndSend("amq.topic","topic.1",o);
return new Result();
}
@GetMapping("sendMessageByExchange")
@ApiOperation("指定交换机路由")
public Result sendMessageByExchange(){
JSONObject o = new JSONObject();
o.put("test1","按鼠标");
o.put("test2","发生了");
o.put("是客服","sdfkj");
RabbitTemplate rabbitTemplate1 = rabbitTemplateFactory.getRabbitTemplate("192.168.0.203",5672,"admin","admin","/test");
rabbitTemplate1.convertAndSend("test.direct","1",o);
return new Result();
}
@GetMapping("sendMessageWithOutRoute")
@ApiOperation("扇形交换机广播")
public Result sendMessageWithOutRoute(){
JSONObject o = new JSONObject();
o.put("test1","按鼠标");
o.put("test2","发生了");
o.put("是客服","sdfkj");
RabbitTemplate rabbitTemplate1 = rabbitTemplateFactory.getRabbitTemplate("192.168.0.203",5672,"admin","admin","/test");
rabbitTemplate1.convertAndSend("amq.fanout","*",o);
return new Result();
}
@GetMapping("getMessageByQueueName")
@ApiOperation("通过队列名获取消息")
public Result getMessageByQueueName(){
JSONObject o = new JSONObject();
o= JSONObject.parseObject(rabbitTemplate.receiveAndConvert("test.que1").toString());
System.out.println(o.toJSONString());
return new Result();
}
//根据队列监听
@RabbitListener(queues = {"test.que2","test.que3"})
public void listenRabbit(String msg){
JSONObject o = new JSONObject();
o= JSONObject.parseObject(msg);
System.out.println(o.toJSONString());
}
//根据路由监听
@RabbitListener(bindings = {
@QueueBinding(value = @org.springframework.amqp.rabbit.annotation.Queue("test.que1"),
exchange = @org.springframework.amqp.rabbit.annotation.Exchange(name = "test.direct"),
key = {"9"})})
public void listenRabbit(Object msg){
JSONObject o = new JSONObject();
// o= JSONObject.parseObject(msg);
System.out.println(msg.toString());
}
@RabbitListener(bindings = {
@QueueBinding(value = @org.springframework.amqp.rabbit.annotation.Queue("test.que1"),
exchange = @org.springframework.amqp.rabbit.annotation.Exchange(name = "test.direct"),
key = {"1"})})
public void listenRabbits(Object msg){
JSONObject o = new JSONObject();
// o= JSONObject.parseObject(msg);
System.out.println("s"+msg.toString());
}
@GetMapping("createExchangeAndQueue")
@ApiOperation("创建交换机并绑定队列")
public Result createExchangeAndQueue(){
DirectExchange directExchange = new DirectExchange("exchangeCreateByCode");
RabbitTemplate rabbitTemplate1 = rabbitTemplateFactory.getRabbitTemplate("192.168.0.203",5672,"admin","admin","/test");
AmqpAdmin amqpAdmin = rabbitTemplateFactory.getRabbitAdmin(rabbitTemplate1);
Queue queue = new Queue("test.que9");
amqpAdmin.declareExchange(directExchange);
amqpAdmin.declareQueue(queue);
amqpAdmin.declareBinding(new Binding("test.que9",Binding.DestinationType.QUEUE,"exchangeCreateByCode","9",null));
return new Result();
}
//需要添加监听器才能自动创建
//创建队列
@Bean("helloQueue")
public Queue helloQueue(){
return new Queue("helloQueue");
}
//创建交换机
@Bean("helloTopic")
public TopicExchange topicExchange(){
return new TopicExchange("helloTopic");
}
//绑定队列交换机
@Bean
public Binding bindingExchangeMessage(@Qualifier("helloQueue") Queue helloQueue,@Qualifier("helloTopic") TopicExchange topicExchange){
return BindingBuilder.bind(helloQueue).to(topicExchange).with("topic.test");
}
}



