栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

rabbitmq消息模式

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

rabbitmq消息模式

1.简单模式:

P : 生产者

Queue(hello) : 队列

C : 消费者

 

步骤:

1.创建工程

2.分别添加RabbitMQ依赖

3.编写生产者发送消息

4.编写消费者获取消息

pom.xml


    
        com.rabbitmq
        amqp-client
        5.10.0
    


    
        
            org.apache.maven.plugins
            maven-compiler-plugin
            3.8.0
            
                1.8
                1.8
            
        
    

 producer(生产者):

public class Producer_helloworld {
    private static final String QUEUE_NAME="queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1获取连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2设置参数
        factory.setHost("127.0.0.1");//主机号
        factory.setPort(5672);//端口号
        factory.setVirtualHost("/test");//队列虚拟机
        factory.setUsername("test");//用户名
        factory.setPassword("test");//密码
        //3创建连接Connection
        Connection connection = factory.newConnection();
        //4创建Channel
        Channel channel = connection.createChannel();
        //5创建队列
        
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        
        String body="傻逼java面试官";
        //6发送消息
        channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
        //7释放资源
       
    }

}

consumer(消费者):

public class Hello01 {
    private static final String QUEUE_NAME="queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1获取连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2设置参数
        factory.setHost("127.0.0.1");//主机号
        factory.setPort(5672);//端口号
        factory.setVirtualHost("/test");//队列虚拟机
        factory.setUsername("test");//用户名
        factory.setPassword("test");//密码
        //3创建连接Connection
        Connection connection = factory.newConnection();
        //4创建Channel
        Channel channel = connection.createChannel();
        //5创建队列
        
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                System.out.println(new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
2.work queues工作队列模式:

p: 生产者

queue': 队列

C1: 消费者1

C2: 消费者2

 

work queues:与入门的简单模式相比,多了一个消费者,多个消费者共同消费同一个队列,消费者之间为竞争关系

应用场景:对于任务过重或任务较多的工作队列可以提高任务处理的速度.

producer:

public class Producer_helloworld {
    private static final String QUEUE_NAME="queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1获取连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2设置参数
        factory.setHost("127.0.0.1");//主机号
        factory.setPort(5672);//端口号
        factory.setVirtualHost("/test");//队列虚拟机
        factory.setUsername("test");//用户名
        factory.setPassword("test");//密码
        //3创建连接Connection
        Connection connection = factory.newConnection();
        //4创建Channel
        Channel channel = connection.createChannel();
        //5创建队列
        
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        
        String body="傻逼java面试官";
        //6发送消息
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
        }
        //7释放资源
        channel.close();
        connection.close();
    }
}

consumer1/consumer2:

public class Hello01  {/// Hello02

    private static final String QUEUE_NAME="queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        //1获取连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2设置参数
        factory.setHost("127.0.0.1");//主机号
        factory.setPort(5672);//端口号
        factory.setVirtualHost("/test");//队列虚拟机
        factory.setUsername("test");//用户名
        factory.setPassword("test");//密码
        //3创建连接Connection
        Connection connection = factory.newConnection();
        //4创建Channel
        Channel channel = connection.createChannel();
        //5创建队列
        
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        
        Consumer consumer = new DefaultConsumer(channel){
            int number=0;
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,                         AMQP.BasicProperties properties, byte[] body) throws IOException {
                number++;
                System.out.println(number+(new String(body)));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

 

运行过程及结果:

首先开启两个consumer消费者,此时该队列无消息,启动完成后再启动producer生产者发送消息,

此时两个消费者竞争获取消息

3.Pub/Sub订阅模式

p(producer): 生产者

x(exchange): 交换机,接受生产者发送的消息,知道如何处理消息,例如交给某个特定的消息队列,递交给所有队列......

exchange三种模式:

        Fanout: 广播,将消息发送到所有绑定交换机的队列

        Direct: 定向,把消息发哦是那个到指定的routing key的队列

        Topic: 通配符,把消息交给符合routing pattern(路由模式)的队列

queue: 消息对列

c1/c2(consumer): 消费者

 

producer(生产者):

public class Producer_PubSub {
    private static final String QUEUE_NAME1="pubsub_queue1";
    private static final String QUEUE_NAME2="pubsub_queue2";
    private static final String EXCHANGE_NAME="fanout_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        //获取新连接
        Connection connection = factory.newConnection();
        //创建channel通道
        Channel channel = connection.createChannel();
        //创建Exchange交换机
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, false, false, null);
        //创建队列
        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
        channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
        
        //队列绑定交换机
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");
        String body="日志:张三调用了findAll方法,日志级别:info....";
        //发送消息
        channel.basicPublish(EXCHANGE_NAME,"",null,body.getBytes());
        //释放连接
        channel.close();
        connection.close();
    }
}

consumer01/consumer02:

public class consumer01 {
    private static final String QUEUE_NAME1="pubsub_queue1";//private static final String QUEUE_NAME2="pubsub_queue2";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        Connection connection = factory.newConnection();

        //获取channel
        Channel channel = connection.createChannel();
        //获取消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.
                           BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME1,true,consumer);//channel.basicConsume(QUEUE_NAME2,true,consumer);

    }
}

4.Routing模式(路由模式)

p(produmer): 生产者

x(exchange): 交换机

        type=direct 交换机类型

Q1/Q2(queue): 队列

C1/C2(consumer): 消费者

 

producer(生产者):

public class RoutingProducer {
    private static final String QUEUE_ROUTING1 = "queue_routing1";
    private static final String QUEUE_ROUTING2 = "queue_routing2";
    private static final String EXCHANGE_NAME = "exchange_Routing";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("test");
        factory.setVirtualHost("/test");
        factory.setPassword("test");
        Connection connection = factory.newConnection();
        //获取连接connection
        Channel channel = connection.createChannel();
        //创建队列queue
        channel.queueDeclare(QUEUE_ROUTING1,true,false,false,null);
        channel.queueDeclare(QUEUE_ROUTING2,true,false,false,null);
        //创建交换机exchange
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true,false,false,null);
        //消息主体
        String err_body="error:  数据插入错误error...";
        String info_body="info: 初始化.....";
        String warning_body="warning: 加载中....";
        //队列与交换机绑定,queueBind方法的第三个参数为routing key,指定路由模式
        channel.queueBind(QUEUE_ROUTING1,EXCHANGE_NAME,"error",null);
        channel.queueBind(QUEUE_ROUTING2,EXCHANGE_NAME,"info",null);
        channel.queueBind(QUEUE_ROUTING2,EXCHANGE_NAME,"warning",null);
        channel.queueBind(QUEUE_ROUTING2,EXCHANGE_NAME,"error",null);
        //发送消息
        channel.basicPublish(EXCHANGE_NAME,"info",null,info_body.getBytes());
        channel.basicPublish(EXCHANGE_NAME,"error",null,err_body.getBytes());
        channel.basicPublish(EXCHANGE_NAME,"warning",null,warning_body.getBytes());
        //关闭资源
        channel.close();
        connection.close();
    }
}

consumer01(消费者):

public class Consumer_Routing_Err {
    private static final String QUEUE_ROUTING1 = "queue_routing1";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume(QUEUE_ROUTING1,true,consumer);
    }
}

consumer02(消费者):

public class Consumer_Routing_Err {
    private static final String QUEUE_ROUTING1 = "queue_routing1";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume(QUEUE_ROUTING1,true,consumer);
    }
}

5.Topics通配符模式

p(producer): 生产者

X(exchange): 交换机

        type: topics

Q1/Q2(queue): 队列

C1/C2(consumer): 消费者

        topics中:

                *通配符: 一个单词

                #通配符: 一个或多个单词

 

producer:

public class Producer_Topics {
    private static final String QUEUE1="Queue_Topics1";
    private static final String QUEUE2="Queue_Topics2";
    private static final String EXCHANGE="exchange_topics";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        //获取连接
        Connection connection = factory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(QUEUE1,true,false,false,null);
        channel.queueDeclare(QUEUE2,true,false,false,null);
        //创建交换机
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
        //交换机绑定队列
        channel.queueBind(QUEUE1,EXCHANGE,"*.orange.*");
        channel.queueBind(QUEUE2,EXCHANGE,"*.*.rabbite");
        channel.queueBind(QUEUE2,EXCHANGE,"lazy.#");
        //发送消息
        String orange_body="orange_body: orange_body.....";
        String rabbite_body="rabbite:   rabbite.....";
        String lazy_body="lazy:     lazy......";
        channel.basicPublish(EXCHANGE,"xxx.orange.xxx",null,orange_body.getBytes());
        channel.basicPublish(EXCHANGE,"xxx.xxx.rabbite",null,rabbite_body.getBytes());
        channel.basicPublish(EXCHANGE,"lazy.xxx",null,lazy_body.getBytes());
        //关闭资源
        channel.close();
        connection.close();
    }
}

consumer01:

public class Consumer01 {
    private static final String QUEUE1="Queue_Topics1";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        //获取连接
        Connection connection = factory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //创建consumer
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        //获取消息
        channel.basicConsume(QUEUE1,true,consumer);
    }
}

consumer02:

 public class Consumer02 {
    private static final String QUEUE2="Queue_Topics2";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        //获取连接
        Connection connection = factory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //创建consumer
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        //获取消息
        channel.basicConsume(QUEUE2,true,consumer);
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/349130.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号