springboot集成rabbitmq消息中间件
- 生产者
-
- 配置交换机,队列和关系
-
- 消费者
- 引入依赖
- rabbitmq连接配置
- 配置队列监听器
- 运行结果
生产者
引入依赖
rabbitmq依赖
org.springframework.boot
spring-boot-starter-amqp
测试依赖
org.springframework.boot
spring-boot-starter-test
rabbitmq连接配置
spring:
rabbitmq: # rabbitmq配置
host: 192.168.0.106
port: 5672
username: admin
password: admin
virtual-host: /
application:
name: boot-producer
配置交换机,队列和关系
package com.zhw.producter.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
private static final String EXCHANGE_NAME = "boot_topic_exchange";
private static final String QUEUE_NAME = "boot_topic_queue_a";
//配置交换机
@Bean("boot_exchange")
public Exchange bootExchange(){
return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).build();
}
//配置队列
@Bean("boot_queue")
public Queue aQueqe(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//配置绑定关系
@Bean
public Binding bindQueueExchange(@Qualifier("boot_queue") Queue queue,@Qualifier("boot_exchange") Exchange exchange){
//with用于配置routingKey
return BindingBuilder.bind(queue).to(exchange).with("#.a").noargs();
}
}
测试
package com.zhw.producter;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducterTest {
@Autowired
private RabbitTemplate rabbitTemplate;
//发送3000个消息
@Test
public void test1() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
for (int i = 0; i < 3000; i++) {
rabbitTemplate.convertAndSend("boot_topic_exchange", "aaaa.a", "springboot整合rabbitmq");
}
long end = System.currentTimeMillis();
System.out.println("总共耗时:" + (end - start));
}
}
测试结果
消费者
引入依赖
org.springframework.boot
spring-boot-starter-amqp
rabbitmq连接配置
spring:
rabbitmq: # rabbitmq配置
host: 192.168.0.106
port: 5672
username: admin
password: admin
virtual-host: /
application:
name: boot-consumer
配置队列监听器
//表明监听的队列
@RabbitListener(queues = "boot_topic_queue_a")
public void messageListener(Message message) {
System.out.println(message.getBody());
}
运行结果