栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMq

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMq

1、界面管理 2、代码实现点对点的消息发送与消费(也就是一个生产者对应一个消费者。)
public class ProducerDemo01 {
    public static void main(String[] args) throws Exception {
        //创建mq的链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //配置rabbitmq的主机地址
        factory.setHost("127.0.0.1");
        //配置端口号
        factory.setPort(5672);
        //配置虚拟主机
        factory.setVirtualHost("/ems");
        //配置账户和密码
        factory.setUsername("ems");
        factory.setPassword("ems");
        //获取链接
        Connection connection = factory.newConnection();

        //通过链接获取链接中的通道
        Channel channel = connection.createChannel();

        //参数 队列名、队列是否要持久化、队列是否只允许当前通道使用、队列在消费完之后是否自动删除、额外附加参数
        channel.queueDeclare("Hello",false,false,false,null);

        //发布消息
        //参数 交换机名称 、队列名称、传递消息的额外设置、消息具体内容
        channel.basicPublish("","Hello",null,"Hello,World".getBytes());

        channel.close();
        connection.close();
    }
}




public class ConsumerDemo01 {
    public static void main(String[] args) throws Exception {
        //创建mq的链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //配置rabbitmq的主机地址
        factory.setHost("127.0.0.1");
        //配置端口号
        factory.setPort(5672);
        //配置虚拟主机
        factory.setVirtualHost("/ems");
        //配置账户和密码
        factory.setUsername("ems");
        factory.setPassword("ems");
        //获取链接
        Connection connection = factory.newConnection();

        //通过链接获取链接中的通道
        Channel channel = connection.createChannel();

        //通道绑定队列
        //参数 队列名、队列是否要持久化、队列是否只允许当前通道使用、队列在消费完之后是否自动删除、额外附加参数
        channel.queueDeclare("Hello",false,false,false,null);

        //消费消息
        //参数 队列名称、是否开启消息自动确认机制
        channel.basicConsume("Hello",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str = new String(body);
                System.out.println("str = " + str);
            }
        });

        
    }
}
3、工作模式一个生产多个消费(一个生产者对应多个消费者,但是只能有一个消费者获得消息,也称之为竞争消费者模式。

public class WorkProducer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("workQueue",true,false,false,null);
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("","workQueue",MessageProperties.PERSISTENT_TEXT_PLAIN,(i +"Hello Work Queue").getBytes());
        }
        RabbitMqConfig.closeConnection(channel,connection);
    }
}

// 消费者

public class WorkConsumer {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMqConfig.getConnection();
        final Channel channel = connection.createChannel();
        //每次只消费、确认一条消息
        channel.basicQos(1);
        //参数 队列名、队列是否要持久化、队列是否只允许当前通道使用、队列在消费完之后是否自动删除、额外附加参数
        channel.queueDeclare("workQueue",true,false,false,null);
        //参数 队列名称、是否开启消息自动确认机制
        channel.basicConsume("workQueue",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1 - :" + new String(body));
                //当消息消费完之后开启手动确认机制
                //参数: 确认队列中那个具体的消息,是否开启多个消息同事确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
4、发布订阅(一个消费者将消息首先发送到交换器,交换器绑定多个队列,然后被监听该队列的消费者所接收并消费。)
public class FanoutProducer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        //将通道声明指定的交换机
        //参数 交换机名称、交换机的模式fanout为广播
        channel.exchangeDeclare("logs","fanout");
        channel.basicPublish("logs","",null,"fanout".getBytes());
        channel.close();
        connection.close();
    }
}



public class FanoutConsumer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("logs","fanout");
        //获取临时队列
        String queueName = channel.queueDeclare().getQueue();
        //队列绑定交换机
        //参数 队列名 交换机名 路由key
        channel.queueBind(queueName,"logs","");
        //消费消息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer1-fanout消费了");
            }
        });
    }
}


public class FanoutConsumer2 {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("logs","fanout");
        //获取临时队列
        String queueName = channel.queueDeclare().getQueue();
        //队列绑定交换机
        //参数 队列名 交换机名 路由key
        channel.queueBind(queueName,"logs","");
        //消费消息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer2-fanout消费了");
            }
        });
    }
}
5、路由模式(生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的对列,接着监听该队列的消费者消费消息。)
public class DirectProducer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        //将通道声明指定的交换机
        //参数 交换机名称、交换机的模式DIRECT为广播
        channel.exchangeDeclare("logs_Direct", "direct");
        for (int i = 0; i < 10; i++) {
            if (i % 2 == 0){
                channel.basicPublish("logs_Direct","aaa",null,"DIRECT".getBytes());
            }else {
                channel.basicPublish("logs_Direct","bbb",null,"DIRECT".getBytes());
            }
        }
        channel.close();
        connection.close();
    }
}



public class DirectConsumer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        //绑定交换机 交换机名称、交换机模式DIRECT(路由)
        channel.exchangeDeclare("logs_Direct",BuiltinExchangeType.DIRECT);
        //获取临时队列
        String queueName = channel.queueDeclare().getQueue();
        //队列绑定交换机
        //参数 队列名 交换机名 路由key
        channel.queueBind(queueName,"logs_Direct","aaa");
        //消费消息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String string = new String(body);
                System.out.println(string);
            }
        });
    }
}

public class DirectConsumer2 {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        //绑定交换机 交换机名称、交换机模式DIRECT(路由)
        channel.exchangeDeclare("logs_Direct","direct");
        //获取临时队列
        String queueName = channel.queueDeclare().getQueue();
        //队列绑定交换机
        //参数 队列名 交换机名 路由key
        channel.queueBind(queueName,"logs_Direct","bbb");
        //消费消息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer1-fanout消费了");
            }
        });
    }
}
6、动态路由
public class TopicProducer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        //交换机名、交换机类型
        channel.exchangeDeclare("Topic", BuiltinExchangeType.TOPIC);
        String RoutingKey = "user.info";
        channel.basicPublish("Topic",RoutingKey,null,"Topic交换机".getBytes());
        RabbitMqConfig.closeConnection(channel,connection);
    }
}


public class TopicConsumer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMqConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("Topic", BuiltinExchangeType.TOPIC);
        //获取临时队列
        String queueName = channel.queueDeclare().getQueue();
        //参数 队列名 交换机名 路由key
        //通配符*表示只匹配一个、#表示匹配多个
        channel.queueBind(queueName,"Topic","*.info");
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str = new String(body);
                System.out.println(str);
            }
        });
    }
}
spring boot集成Rabbitmq
// 相关jar包
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
        //相关配置文件
        # rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=ems
spring.rabbitmq.password=ems
spring.rabbitmq.virtual-host=/ems
# 手动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 一次消费一条消息
spring.rabbitmq.listener.simple.prefetch= 1
spring boot生产者
package com.itheima.demo;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;


@SpringBootTest
public class RabbitmqTest {

    //注入rabbitmq的使用模板
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void  testHello(){
        //点对点模式
        //队列名和消息
        rabbitTemplate.convertAndSend("Boot","HelloRabbitmq");
    }

    @Test
    public void testWork(){
        //工作模式
        //队列名和消息
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work2","HelloWork");
        }
    }

    @Test
    public void testFanout(){
        //广播模式
        //交换机名、路由、消息
        rabbitTemplate.convertAndSend("FanoutExchange","","FanoutExchange");
    }

    @Test
    public void testRoutingKey(){
        //路由模式
        //交换机名、路由、消息
        rabbitTemplate.convertAndSend("RoutingKey2021","userInfo","RoutingKey2021");
    }

    @Test
    public void testTopic(){
        //动态路由模式
        //交换机名、路由、消息
        rabbitTemplate.convertAndSend("testTopic","user.Info","testTopic");
    }
}

spring boot消费者
@Component
//当不配置持久化和自动删除时默认的是 不排他和非持久化的队列
@RabbitListener(queuesToDeclare = @Queue(value = "Boot",durable = "false",autoDelete = "false"))
public class Consumer {
    @RabbitHandler
    //@RabbitHandler使用此注解接收@RabbitListener指定的队列
    public void receive1(String message){
        System.out.println("message = " + message);
    }
}


@Component
public class WorkConsumer {

    @RabbitListener(queuesToDeclare = @Queue("work2"))
    public void receive1(String msg, Message message, Channel channel){
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("message1 = " + msg);
            channel.basicAck(deliveryTag,false);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @RabbitListener(queuesToDeclare = @Queue("work2"))
    public void receive2(String msg, Message message, Channel channel){
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("message2 = " + msg);
            channel.basicAck(deliveryTag,false);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}



@Component
public class FanoutConsumer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    //如果不指定队列会创建临时队列
                    value = @Queue("fanout2021"),
                    exchange = @Exchange(value = "FanoutExchange",type = "fanout")
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    //如果不指定队列会创建临时队列
                    value = @Queue("fanout2022"),
                    exchange = @Exchange(value = "FanoutExchange",type = "fanout")
            )
    })
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}



@Component
public class RoutingConsumer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    //如果不指定队列会创建临时队列
                    value = @Queue,
                    exchange = @Exchange(value = "RoutingKey2021",type = "direct"),
                    key = {"userInfo"}
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    //如果不指定队列会创建临时队列
                    value = @Queue,
                    exchange = @Exchange(value = "RoutingKey2021",type = "direct"),
                    key = {"userInfo"}
            )
    })
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}



@Component
public class TopicConsumer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    //如果不指定队列会创建临时队列
                    value = @Queue,
                    exchange = @Exchange(value = "testTopic",type = "topic"),
                    key = {"user.*"}
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    //如果不指定队列会创建临时队列
                    value = @Queue,
                    exchange = @Exchange(value = "testTopic",type = "topic"),
                    key = {"user.*"}
            )
    })
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/606344.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号