封装公共方法
public class RabbitMqCommons {
private static ConnectionFactory connectionFactory;
static{
//创建连接工厂对象
connectionFactory = new ConnectionFactory();
//设置连接主机
connectionFactory.setHost("127.0.0.1");
//设置端口
connectionFactory.setPort(5672);
//设置连接虚拟机
connectionFactory.setVirtualHost("/ems");
//设置访问虚拟主机用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
}
public Connection getConnectInfo() throws IOException, TimeoutException {
//设置连接对象
Connection connection = connectionFactory.newConnection();
return connection;
}
}
1.1 消息提供者
@Test
void privated() throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connection = rabbitMqCommons.getConnectInfo();
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列
channel.queueDeclare("hello",true,false,false,null);
//发布消息
channel.basicPublish("","hello",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
channel.close();
connection.close();
}
1.2 消费者
@Test
void consumer() throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connection = rabbitMqCommons.getConnectInfo();
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列
channel.queueDeclare("hello",true,false,false,null);
//消费消息 第二个参数:自动确认收到消息
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override // body:xiaoxi
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("执行顺1");
System.out.println("String.valueOf(body) = " + new String(body));
super.handleDelivery(consumerTag, envelope, properties, body);
}
});
//一直打开 一直监听
System.out.println("2");
//执行结果 2 执行顺1 先执行主线程,后执行回调函数
}
2.work
2.1 消息提供者消费者交替消费消息 ->轮询
@Test
void privated() throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connection = rabbitMqCommons.getConnectInfo();
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列
channel.queueDeclare("hello-work",true,false,false,null);
//发布消息
for (int i = 0; i <20 ; i++) {
channel.basicPublish("","hello-work", MessageProperties.PERSISTENT_TEXT_PLAIN,("hello rabbitmq"+i).getBytes());
}
channel.close();
connection.close();
}
2.2 消费者
2.2.1 消费者1
@Test
void consumer1() throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connection = rabbitMqCommons.getConnectInfo();
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列
channel.queueDeclare("hello-work",true,false,false,null);
//消费消息
channel.basicConsume("hello-work",true,new DefaultConsumer(channel){
@Override // body:xiaoxi
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("String.valueOf(body) = " + new String(body));
super.handleDelivery(consumerTag, envelope, properties, body);
}
});
//一直打开 一直监听
}
2.2.2 消费者2
@Test
void consumer2() throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connection = rabbitMqCommons.getConnectInfo();
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列
channel.queueDeclare("hello-work",true,false,false,null);
//消费消息
channel.basicConsume("hello-work",true,new DefaultConsumer(channel){
@Override // body:xiaoxi
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("String.valueOf(body) = " + new String(body));
super.handleDelivery(consumerTag, envelope, properties, body);
}
});
//一直打开 一直监听
}
2.3修改轮询算法
2.3.1 消费者1*修改轮询算法 basicConsume第二个参数设为false 通道一次只能消费一个消息。轮询是把消息全部放入通道慢慢消费
public class Consumer1 { //默认轮询
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connection = rabbitMqCommons.getConnectInfo();
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列
channel.basicQos(1);//每次消费一个
channel.queueDeclare("hello-work",true,false,false,null);
//消费消息
channel.basicConsume("hello-work",false,new DefaultConsumer(channel){
@Override // body:消息 第二个参数:自动确认收到消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("String.valueOf(body) = " + new String(body));
super.handleDelivery(consumerTag, envelope, properties, body);
//手动确认 参数1 手动确认消息 确认当前 envelope消息 参数2 false 每次确认一个 true 多个
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
//一直打开 一直监听
}
}
2.3.2 消费者2
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connection = rabbitMqCommons.getConnectInfo();
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列
channel.basicQos(1);//每次消费一个
channel.queueDeclare("hello-work",true,false,false,null);
//消费消息 第二个参数:自动确认收到消息
channel.basicConsume("hello-work",false,new DefaultConsumer(channel){
@Override // body:xiaoxi
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("String.valueOf(body) = " + new String(body));
super.handleDelivery(consumerTag, envelope, properties, body);
//手动确认 参数1 手动确认消息 参数2 false 每次确认一个
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
//一直打开 一直监听
}
}
3.fanout
3.1 消息提供者扇出
public class Privider {
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connection = rabbitMqCommons.getConnectInfo();
//获取连接中通道
Channel channel = connection.createChannel();
//将通道声明指定交换机
//参数1 交换机名称 参数2 交换机类型 fanout 广播类型
channel.exchangeDeclare("logs","fanout");
//发送消息
channel.basicPublish("logs","",null,"fanout type message".getBytes());
channel.close();
connection.close();
}
}
3.2 消费者
3.2.1 消费者1
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connectInfo = rabbitMqCommons.getConnectInfo();
Channel channel = connectInfo.createChannel();
//将通道声明指定交换机
channel.exchangeDeclare("logs","fanout");
//临时队列
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs","");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 = " + new String(body));
super.handleDelivery(consumerTag, envelope, properties, body);
}
});
}
}
3.2.2 消费者2
public class Comsumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connectInfo = rabbitMqCommons.getConnectInfo();
Channel channel = connectInfo.createChannel();
//将通道声明指定交换机
channel.exchangeDeclare("logs","fanout");
//临时队列
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs","");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2 = " + new String(body));
super.handleDelivery(consumerTag, envelope, properties, body);
}
});
}
}
4.routing
4.1 消息提供者可以指定哪个消费者消费
public class Privider {
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connection = rabbitMqCommons.getConnectInfo();
//获取连接中通道
Channel channel = connection.createChannel();
//将通道声明指定交换机
//参数1 交换机名称 参数2 交换机类型 fanout 广播类型
channel.exchangeDeclare("logs-direct","direct");
//发送消息 参数2 routeKey
channel.basicPublish("logs-direct","info",null,"fanout type message".getBytes());
channel.close();
connection.close();
}
}
4.2 消费者
4.2.1 消费者1
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connectInfo = rabbitMqCommons.getConnectInfo();
Channel channel = connectInfo.createChannel();
//将通道声明指定交换机
channel.exchangeDeclare("logs-direct","direct");
//临时队列
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs-direct","error");
channel.queueBind(queue,"logs-direct","dev");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 = " + new String(body));
super.handleDelivery(consumerTag, envelope, properties, body);
}
});
}
}
4.2.2 消费者2
public class Comsumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connectInfo = rabbitMqCommons.getConnectInfo();
Channel channel = connectInfo.createChannel();
//将通道声明指定交换机
channel.exchangeDeclare("logs-direct","direct");
//临时队列
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs-direct","info");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2 = " + new String(body));
super.handleDelivery(consumerTag, envelope, properties, body);
}
});
}
}
5.topic
5.1 消息提供者通过 * # 动态指定消费者消费
public class Privider {
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connection = rabbitMqCommons.getConnectInfo();
//获取连接中通道
Channel channel = connection.createChannel();
//将通道声明指定交换机
//参数1 交换机名称 参数2 交换机类型 fanout 广播类型
channel.exchangeDeclare("logs-topic","topic");
//发送消息 参数2 routeKey
channel.basicPublish("logs-topic","user.add",null,"fanout type message".getBytes());
channel.basicPublish("logs-topic","user.add.info",null,"fanout type message".getBytes());
channel.close();
connection.close();
}
}
5.2 消费者
5.2.1 消费者1
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connectInfo = rabbitMqCommons.getConnectInfo();
Channel channel = connectInfo.createChannel();
//将通道声明指定交换机
channel.exchangeDeclare("logs-topic","topic");
//临时队列
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs-topic","user.*");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 = " + new String(body));
super.handleDelivery(consumerTag, envelope, properties, body);
}
});
}
}
5.2.2消费者2
public class Comsumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
Connection connectInfo = rabbitMqCommons.getConnectInfo();
Channel channel = connectInfo.createChannel();
//将通道声明指定交换机
channel.exchangeDeclare("logs-topic","topic");
//临时队列
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs-topic","user.*");
channel.queueBind(queue,"logs-topic","user.#");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2 = " + new String(body));
super.handleDelivery(consumerTag, envelope, properties, body);
}
});
}
}
6.整合springboot
6.1 消息提供者
@SpringBootTest
class RabbitmqSpringbootApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void test1() {
rabbitTemplate.convertAndSend("spring-hello","hello");
}
@Test
void work() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("spring-work","hello-work"+i);
}
}
@Test
void fanout() {
rabbitTemplate.convertAndSend("spring-fanout","","hello-fanout");
}
@Test
void routekey() {
rabbitTemplate.convertAndSend("spring-route","info","hello-route");
}
@Test
void topic() {
rabbitTemplate.convertAndSend("spring-topic","info.add","hello-route");
}
}
6.2 消费者
6.2.1 helloword
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "spring-hello",declare = "false"))
public class Consumer {
@RabbitHandler
public void receivel(String message){
System.out.println("message = " + message);
}
}
6.2.2 work
@Component
public class ConsumerWork {
@RabbitListener(queuesToDeclare = @Queue(value = "spring-work"))
public void receivel(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(queuesToDeclare = @Queue(value = "spring-work"))
public void receivel1(String message) {
System.out.println("message2 = " + message);
}
}
6.2.3 fanout
@Component
public class ConsumerFanout {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //创建临时对列
exchange = @Exchange(value = "spring-fanout",type = ExchangeTypes.FANOUT)
)
})
public void receivel(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //创建临时对列
exchange = @Exchange(value = "spring-fanout",type = ExchangeTypes.FANOUT)
)
})
public void receivel1(String message){
System.out.println("message2 = " + message);
}
}
6.2.4 routing
@Component
public class ConsumerRoute {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//创建临时队列
exchange = @Exchange(value = "spring-route",type = "direct"),//定义交换机名称和类型
key={"info","error","warn"}
)
})
public void recivel(String message)
{
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//创建临时队列
exchange = @Exchange(value = "spring-route",type = "direct"),//定义交换机名称和类型
key={"error","warn"}
)
})
public void recivel1(String message)
{
System.out.println("message2 = " + message);
}
}
6.2.5 topic
@Component
public class ConsumerTopic {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//创建临时队列
exchange = @Exchange(value = "spring-topic",type = "topic"),//定义交换机名称和类型
key={"info.*"}
)
})
public void recivel(String message)
{
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//创建临时队列
exchange = @Exchange(value = "spring-topic",type = "topic"),//定义交换机名称和类型
key={"error.*"}
)
})
public void recivel1(String message)
{
System.out.println("message2 = " + message);
}
}
7.rabbitmq集群


