栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMq-笔记

RabbitMq-笔记

RabbitMq-笔记 1. hello word

封装公共方法

public class RabbitMqCommons {
    private static   ConnectionFactory connectionFactory;
    static{
        //创建连接工厂对象
        connectionFactory = new ConnectionFactory();
        //设置连接主机
        connectionFactory.setHost("127.0.0.1");
        //设置端口
        connectionFactory.setPort(5672);
        //设置连接虚拟机
        connectionFactory.setVirtualHost("/ems");
        //设置访问虚拟主机用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123");
    }

    public Connection getConnectInfo() throws IOException, TimeoutException {

        //设置连接对象
        Connection connection = connectionFactory.newConnection();
        return  connection;
    }

}

1.1 消息提供者
 
    @Test
    void privated() throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connection = rabbitMqCommons.getConnectInfo();
        //获取连接中通道
        Channel channel = connection.createChannel();
         //通道绑定对应消息队列
        
        channel.queueDeclare("hello",true,false,false,null);
       //发布消息
        
        
channel.basicPublish("","hello",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
        channel.close();
        connection.close();

    }
1.2 消费者
 
    @Test
    void consumer() throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connection = rabbitMqCommons.getConnectInfo();

        //获取连接中通道
        Channel channel = connection.createChannel();
        //通道绑定对应消息队列
        
        channel.queueDeclare("hello",true,false,false,null);
        //消费消息  第二个参数:自动确认收到消息
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            @Override // body:xiaoxi
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("执行顺1");
                System.out.println("String.valueOf(body) = " + new String(body));
                super.handleDelivery(consumerTag, envelope, properties, body);
            }
        });
   //一直打开 一直监听
  
        System.out.println("2");

        //执行结果  2 执行顺1  先执行主线程,后执行回调函数


    }
2.work

消费者交替消费消息 ->轮询

2.1 消息提供者
 
    @Test
    void privated() throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connection = rabbitMqCommons.getConnectInfo();
        //获取连接中通道
        Channel channel = connection.createChannel();
        //通道绑定对应消息队列
        
        channel.queueDeclare("hello-work",true,false,false,null);
        //发布消息
        
        

        for (int i = 0; i <20 ; i++) {
            channel.basicPublish("","hello-work", MessageProperties.PERSISTENT_TEXT_PLAIN,("hello rabbitmq"+i).getBytes());

        }


        channel.close();
        connection.close();

    }
2.2 消费者 2.2.1 消费者1
 
    @Test
    void consumer1() throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connection = rabbitMqCommons.getConnectInfo();

        //获取连接中通道
        Channel channel = connection.createChannel();
        //通道绑定对应消息队列
        
        channel.queueDeclare("hello-work",true,false,false,null);
        //消费消息
        channel.basicConsume("hello-work",true,new DefaultConsumer(channel){
            @Override // body:xiaoxi
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("String.valueOf(body) = " + new String(body));
                super.handleDelivery(consumerTag, envelope, properties, body);
            }
        });
        //一直打开 一直监听
  



    }
2.2.2 消费者2
 
    @Test
    void consumer2() throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connection = rabbitMqCommons.getConnectInfo();

        //获取连接中通道
        Channel channel = connection.createChannel();
        //通道绑定对应消息队列
        
        channel.queueDeclare("hello-work",true,false,false,null);
        //消费消息
        channel.basicConsume("hello-work",true,new DefaultConsumer(channel){
            @Override // body:xiaoxi
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("String.valueOf(body) = " + new String(body));
                super.handleDelivery(consumerTag, envelope, properties, body);
            }
        });
        //一直打开 一直监听
  
    }
2.3修改轮询算法

*修改轮询算法 basicConsume第二个参数设为false 通道一次只能消费一个消息。轮询是把消息全部放入通道慢慢消费

2.3.1 消费者1
public class Consumer1 {  //默认轮询
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connection = rabbitMqCommons.getConnectInfo();

        //获取连接中通道
        Channel channel = connection.createChannel();
        //通道绑定对应消息队列
        

        
    channel.basicQos(1);//每次消费一个

        channel.queueDeclare("hello-work",true,false,false,null);
        //消费消息
        channel.basicConsume("hello-work",false,new DefaultConsumer(channel){
            @Override // body:消息 第二个参数:自动确认收到消息
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("String.valueOf(body) = " + new String(body));
                super.handleDelivery(consumerTag, envelope, properties, body);
                //手动确认 参数1 手动确认消息 确认当前 envelope消息 参数2  false 每次确认一个 true 多个
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
        //一直打开 一直监听
  


    }
}

2.3.2 消费者2
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connection = rabbitMqCommons.getConnectInfo();

        //获取连接中通道
        Channel channel = connection.createChannel();
        //通道绑定对应消息队列
        

        
        channel.basicQos(1);//每次消费一个
        channel.queueDeclare("hello-work",true,false,false,null);
        //消费消息 第二个参数:自动确认收到消息
        channel.basicConsume("hello-work",false,new DefaultConsumer(channel){
            @Override // body:xiaoxi
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("String.valueOf(body) = " + new String(body));
                super.handleDelivery(consumerTag, envelope, properties, body);
                //手动确认 参数1 手动确认消息 参数2 false 每次确认一个
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
        //一直打开 一直监听
  

    }
}

3.fanout

扇出

3.1 消息提供者
public class Privider {
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connection = rabbitMqCommons.getConnectInfo();
        //获取连接中通道
        Channel channel = connection.createChannel();
         //将通道声明指定交换机
         //参数1 交换机名称 参数2 交换机类型 fanout 广播类型
        channel.exchangeDeclare("logs","fanout");
        //发送消息
        channel.basicPublish("logs","",null,"fanout type message".getBytes());

        channel.close();
        connection.close();
    }
}

3.2 消费者 3.2.1 消费者1
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connectInfo = rabbitMqCommons.getConnectInfo();
        Channel channel = connectInfo.createChannel();

        //将通道声明指定交换机
        channel.exchangeDeclare("logs","fanout");
        //临时队列
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,"logs","");
        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1 = " + new String(body));
                super.handleDelivery(consumerTag, envelope, properties, body);
            }
        });
    }
}

3.2.2 消费者2
public class Comsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connectInfo = rabbitMqCommons.getConnectInfo();
        Channel channel = connectInfo.createChannel();

        //将通道声明指定交换机
        channel.exchangeDeclare("logs","fanout");
        //临时队列
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,"logs","");
        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2 = " + new String(body));
                super.handleDelivery(consumerTag, envelope, properties, body);
            }
        });
    }

}

4.routing

可以指定哪个消费者消费

4.1 消息提供者
public class Privider {
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connection = rabbitMqCommons.getConnectInfo();
        //获取连接中通道
        Channel channel = connection.createChannel();
        //将通道声明指定交换机
        //参数1 交换机名称 参数2 交换机类型 fanout 广播类型
        channel.exchangeDeclare("logs-direct","direct");
        //发送消息 参数2 routeKey
        channel.basicPublish("logs-direct","info",null,"fanout type message".getBytes());

        channel.close();
        connection.close();
    }
}

4.2 消费者 4.2.1 消费者1
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connectInfo = rabbitMqCommons.getConnectInfo();
        Channel channel = connectInfo.createChannel();

        //将通道声明指定交换机
        channel.exchangeDeclare("logs-direct","direct");
        //临时队列
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,"logs-direct","error");
        channel.queueBind(queue,"logs-direct","dev");

        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1 = " + new String(body));
                super.handleDelivery(consumerTag, envelope, properties, body);
            }
        });
    }
}

4.2.2 消费者2
public class Comsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connectInfo = rabbitMqCommons.getConnectInfo();
        Channel channel = connectInfo.createChannel();

        //将通道声明指定交换机
        channel.exchangeDeclare("logs-direct","direct");
        //临时队列
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,"logs-direct","info");
        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2 = " + new String(body));
                super.handleDelivery(consumerTag, envelope, properties, body);
            }
        });
    }

}

5.topic

通过 * # 动态指定消费者消费

5.1 消息提供者
public class Privider {
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connection = rabbitMqCommons.getConnectInfo();
        //获取连接中通道
        Channel channel = connection.createChannel();
        //将通道声明指定交换机
        //参数1 交换机名称 参数2 交换机类型 fanout 广播类型
        channel.exchangeDeclare("logs-topic","topic");
        //发送消息 参数2 routeKey
        channel.basicPublish("logs-topic","user.add",null,"fanout type message".getBytes());
        channel.basicPublish("logs-topic","user.add.info",null,"fanout type message".getBytes());

        channel.close();
        connection.close();
    }
}

5.2 消费者 5.2.1 消费者1
public class Consumer1 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connectInfo = rabbitMqCommons.getConnectInfo();
        Channel channel = connectInfo.createChannel();

        //将通道声明指定交换机
        channel.exchangeDeclare("logs-topic","topic");
        //临时队列
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,"logs-topic","user.*");


        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1 = " + new String(body));
                super.handleDelivery(consumerTag, envelope, properties, body);
            }
        });
    }
}

5.2.2消费者2
public class Comsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();
        Connection connectInfo = rabbitMqCommons.getConnectInfo();
        Channel channel = connectInfo.createChannel();

        //将通道声明指定交换机
        channel.exchangeDeclare("logs-topic","topic");
        //临时队列
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,"logs-topic","user.*");
        channel.queueBind(queue,"logs-topic","user.#");
        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2 = " + new String(body));
                super.handleDelivery(consumerTag, envelope, properties, body);
            }
        });
    }

}

6.整合springboot 6.1 消息提供者
@SpringBootTest
class RabbitmqSpringbootApplicationTests {
    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @Test
    void test1() {
        rabbitTemplate.convertAndSend("spring-hello","hello");
    }
    
    @Test
    void work() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("spring-work","hello-work"+i);

        }
    }

    @Test
    void fanout() {
        rabbitTemplate.convertAndSend("spring-fanout","","hello-fanout");
    }

    @Test
    void routekey() {
        rabbitTemplate.convertAndSend("spring-route","info","hello-route");
    }

    
    @Test
    void topic() {
        rabbitTemplate.convertAndSend("spring-topic","info.add","hello-route");
    }

}

6.2 消费者 6.2.1 helloword
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "spring-hello",declare = "false"))
public class Consumer {
@RabbitHandler
    public  void receivel(String message){
    System.out.println("message = " + message);
}
}

6.2.2 work
@Component
public class ConsumerWork {
    @RabbitListener(queuesToDeclare = @Queue(value = "spring-work"))
    public void receivel(String message) {
        System.out.println("message1 = " + message);
    }

    @RabbitListener(queuesToDeclare = @Queue(value = "spring-work"))
    public void receivel1(String message) {
        System.out.println("message2 = " + message);
    }
}

6.2.3 fanout
@Component
public class ConsumerFanout {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创建临时对列
                    exchange = @Exchange(value = "spring-fanout",type = ExchangeTypes.FANOUT)
            )
    })
    public  void receivel(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创建临时对列
                    exchange = @Exchange(value = "spring-fanout",type = ExchangeTypes.FANOUT)
            )
    })
    public  void receivel1(String message){
        System.out.println("message2 = " + message);
    }
}

6.2.4 routing
@Component
public class ConsumerRoute {



  @RabbitListener(bindings = {
          @QueueBinding(
                  value = @Queue,//创建临时队列
                  exchange = @Exchange(value = "spring-route",type = "direct"),//定义交换机名称和类型
                  key={"info","error","warn"}
          )
  })
    public void recivel(String message)
  {
      System.out.println("message1 = " + message);
  }


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "spring-route",type = "direct"),//定义交换机名称和类型
                    key={"error","warn"}
            )
    })
    public void recivel1(String message)
    {
        System.out.println("message2 = " + message);
    }



}

6.2.5 topic
@Component
public class ConsumerTopic {



  @RabbitListener(bindings = {
          @QueueBinding(
                  value = @Queue,//创建临时队列
                  exchange = @Exchange(value = "spring-topic",type = "topic"),//定义交换机名称和类型
                  key={"info.*"}
          )
  })
    public void recivel(String message)
  {
      System.out.println("message1 = " + message);
  }


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "spring-topic",type = "topic"),//定义交换机名称和类型
                    key={"error.*"}
            )
    })
    public void recivel1(String message)
    {
        System.out.println("message2 = " + message);
    }



}

7.rabbitmq集群
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774307.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号