栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitmq各种模式java实现

rabbitmq各种模式java实现

pom.xml:



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.5.6
         
    
    com.zoomy
    rabbitmq
    0.0.1-SNAPSHOT
    rabbitmq
    Demo project for Spring Boot
    
        1.8
    
    
        
            org.springframework.boot
            spring-boot-starter
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
        
            com.rabbitmq
            amqp-client
            5.10.0
        


    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    



一、简单模式simple

Producer:

package com.zoomy.rabbitmq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Producer {


    public static void main(String[] args) {
        // 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
        // ip port

        // 1.创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("******");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建连接Connection
            connection = connectionFactory.newConnection("生产者");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();
            // 4.通过创建交换机,声明队列,绑定关系,路由Key,发送消息,和接收消息
            String queueName = "queue1";
            
            channel.queueDeclare(queueName, false, false, false, null);
            // 5.准备消息内容
            String message = "Hello rabbitmq!!!";
            // 6.发送消息给队列queue
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("消息发送成功!!!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 7.关闭连接
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 8.关闭通道
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }


}

Consumer:

package com.zoomy.rabbitmq.simple;


import com.rabbitmq.client.*;

import java.io.IOException;


public class Consumer {

    public static void main(String[] args) {
        // 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
        // ip port

        // 1.创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("*******");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建连接Connection
            connection = connectionFactory.newConnection("生产者");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();
            // 4.通过创建交换机,声明队列,绑定关系,路由Key,发送消息,和接收消息



            channel.basicConsume("queue1", true, new DeliverCallback() {
                        @Override
                        public void handle(String consumerTag, Delivery message) throws IOException {
                            System.out.println("收到信息是" + new String(message.getBody(), "utf-8"));
                        }
                    }, new CancelCallback() {
                        @Override
                        public void handle(String consumerTag) throws IOException {
                            System.out.println("接受消息失败");
                        }
                    }
            );
            System.out.println("开始接收消息");
            System.in.read();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 7.关闭连接
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 8.关闭通道
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }


}

首先执行生产者producer得main方法:

登录rabbitmq:看到queue1生产了一条队列信息


然后使用消费者消费掉:

看到ready变成0,被成功消费掉

二、fanout

1.复制simple模式的producer,consumer

2.rabbitmq添加交换机,创建队列,绑定关系



3.修改producer

package com.zoomy.rabbitmq.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Producer {


    public static void main(String[] args) {
        // 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
        // ip port

        // 1.创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("******");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建连接Connection
            connection = connectionFactory.newConnection("生产者");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();

            // 4.准备消息内容
            String message = "Hello rabbitmq!!!";

            //5.准备交换机
            String exchange = "fanout-exchange";

            // 6.定义routing key
            String routekey = "";

            // 7.指定交换机类型
            String type = "fanout";

            //8.发送消息给队列queue
            
            channel.basicPublish(exchange, routekey, null, message.getBytes());
            System.out.println("消息发送成功!!!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 9.关闭连接
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 10.关闭通道
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }


}


4.修改consumer

package com.zoomy.rabbitmq.routing;


import com.rabbitmq.client.*;

import java.io.IOException;


public class Consumer {

    public static Runnable runnable = new Runnable() {
        @Override
        public void run() {

            // 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
            // ip port

            // 1.创建连接工程
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("********");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            connectionFactory.setVirtualHost("/");
            //获取队列名称
            final String queueName = Thread.currentThread().getName();

            Connection connection = null;
            Channel channel = null;
            try {
                // 2.创建连接Connection
                connection = connectionFactory.newConnection("生产者");
                // 3.通过连接获取通道Channel
                channel = connection.createChannel();
                // 4.通过创建交换机,声明队列,绑定关系,路由Key,发送消息,和接收消息


                channel.basicConsume(queueName, true, new DeliverCallback() {
                            @Override
                            public void handle(String consumerTag, Delivery message) throws IOException {
                                System.out.println(queueName + "收到信息是" + new String(message.getBody(), "utf-8"));
                            }
                        }, new CancelCallback() {
                            @Override
                            public void handle(String consumerTag) throws IOException {
                                System.out.println("接受消息失败");
                            }
                        }
                );
                System.in.read();

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 7.关闭连接
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                // 8.关闭通道
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    //创建三个消费者
    public static void main(String[] args) {
        new Thread(runnable, "queue1").start();
        new Thread(runnable, "queue2").start();
        new Thread(runnable, "queue3").start();
    }
}

5.测试:
运行producer

队列信息都增加一条

运行consumer,发现队列123都进行了消费

三、direct

1、创建交换机,绑定关系


2.复制fanout的producer,和consumer,给producer做修改

package com.zoomy.rabbitmq.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Producer {


    public static void main(String[] args) {
        // 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
        // ip port

        // 1.创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("*****");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建连接Connection
            connection = connectionFactory.newConnection("生产者");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();

            // 4.准备消息内容
            String message = "Hello rabbitmq!!!";

            //5.准备交换机
            String exchange = "direct-exchange";

            // 6.定义routing key
            String routekey = "email";

            // 7.指定交换机类型
            String type = "direct";

            //8.发送消息给队列queue
            
            channel.basicPublish(exchange, routekey, null, message.getBytes());
            System.out.println("消息发送成功!!!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 9.关闭连接
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 10.关闭通道
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }


}

运行producer

运行consumer,看到routing key是 email的队列1 ,3 进行了消费

四、topic模式

复制direct代码

1、创建交换机,并绑定关系

2.修改producer

package com.zoomy.rabbitmq.topics;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Producer {


    public static void main(String[] args) {
        // 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
        // ip port

        // 1.创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("121.41.211.173");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建连接Connection
            connection = connectionFactory.newConnection("生产者");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();

            // 4.准备消息内容
            String message = "Hello rabbitmq!!!";

            //5.准备交换机
            String exchange = "topic-exchange";

            // 6.定义routing key
            String routekey = "com.order.test.xxxx";

            // 7.指定交换机类型
            String type = "topic";

            //8.发送消息给队列queue
            
            channel.basicPublish(exchange, routekey, null, message.getBytes());
            System.out.println("消息发送成功!!!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 9.关闭连接
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 10.关闭通道
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }


}

3.运行producer

4.运行consumer,队列1 3成功消费

五、上面都是通过图形化界面进行创建交换机,下面是纯代码实现

1.创建producer

package com.zoomy.rabbitmq.all;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Producer {


    public static void main(String[] args) {
        // 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
        // ip port

        // 1.创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("121.41.211.173");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建连接Connection
            connection = connectionFactory.newConnection("生产者");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();

            // 4.准备消息内容
            String message = "Hello rabbitmq!!!";

            //5.准备交换机
            String exchange = "direct_message_exchange";

            // 6.定义routing key
            String routekey = "order";

            // 7.指定交换机类型
            String type = "direct";

            //8.声明交换机,true是 持久化,交换机不会随着服务器重启造成丢失
            channel.exchangeDeclare(exchange,type,true);

            //9.声明队列
            channel.queueDeclare("queue5",true,false,false,null);
            channel.queueDeclare("queue6",true,false,false,null);
            channel.queueDeclare("queue7",true,false,false,null);

            //10.绑定队列和交换机关系
            channel.queueBind("queue5",exchange,"order");
            channel.queueBind("queue6",exchange,"order");
            channel.queueBind("queue7",exchange,"course");

            //8.发送消息给队列queue
            
            channel.basicPublish(exchange, routekey, null, message.getBytes());
            System.out.println("消息发送成功!!!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 9.关闭连接
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 10.关闭通道
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }


}




2.运行consumer

六、工作模式work(轮询分发模式,公共分发模式)


1.procuder

package com.zoomy.rabbitmq.work.lunxun;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Producer {


    public static void main(String[] args) {
        // 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
        // ip port

        // 1.创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("121.41.211.173");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建连接Connection
            connection = connectionFactory.newConnection("生产者");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();

            //4.准备发送消息的内容
            for (int i = 0; i < 20; i++) {
                //消息内容
                String msg = "rabbitmq" + i;

                channel.basicPublish("", "queue1", null, msg.getBytes());
            }


            System.out.println("消息发送成功!!!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 9.关闭连接
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 10.关闭通道
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }


}

2.Work1,Work2

package com.zoomy.rabbitmq.work.lunxun;


import com.rabbitmq.client.*;

import java.io.IOException;


public class Work1 {

    public static void main(String[] args) {

        // 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
        // ip port

        // 1.创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("121.41.211.173");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //获取队列名称
        final String queueName = Thread.currentThread().getName();

        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建连接Connection
            connection = connectionFactory.newConnection("消费者-Work1");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();


            //4定义接受消息的回调
            Channel finalChannel = channel;
//                finalChannel.basicQos(1);
            // 5.通过创建交换机,声明队列,绑定关系,路由Key,发送消息,和接收消息


            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
                        @Override
                        public void handle(String consumerTag, Delivery message) throws IOException {
                            try {
                                System.out.println("Work1-收到信息是" + new String(message.getBody(), "utf-8"));
                                Thread.sleep(2000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }, new CancelCallback() {
                        @Override
                        public void handle(String consumerTag) throws IOException {
                            System.out.println("接受消息失败");
                        }
                    }
            );
            System.out.println("Work1-开始接受消息");
            System.in.read();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 6.关闭连接
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 7.关闭通道
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

package com.zoomy.rabbitmq.work.lunxun;


import com.rabbitmq.client.*;

import java.io.IOException;


public class Work2 {

    public static void main(String[] args) {

        // 所有的中间件技术都是基于htp/ip协议之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
        // ip port

        // 1.创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("121.41.211.173");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //获取队列名称
        final String queueName = Thread.currentThread().getName();

        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建连接Connection
            connection = connectionFactory.newConnection("消费者-Work2");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();


            //4定义接受消息的回调
            Channel finalChannel = channel;
//                finalChannel.basicQos(1);
            // 5.通过创建交换机,声明队列,绑定关系,路由Key,发送消息,和接收消息


            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
                        @Override
                        public void handle(String consumerTag, Delivery message) throws IOException {
                            try {
                                System.out.println("Work2-收到信息是" + new String(message.getBody(), "utf-8"));
                                Thread.sleep(2000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }, new CancelCallback() {
                        @Override
                        public void handle(String consumerTag) throws IOException {
                            System.out.println("接受消息失败");
                        }
                    }
            );
            System.out.println("Work2-开始接受消息");
            System.in.read();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 6.关闭连接
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 7.关闭通道
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

3.分别运行work1,work2

4.运行producer


5.看到是轮询

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/355098.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号