栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

用Java代码操作RabbitMQ(包括创建和绑定)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

用Java代码操作RabbitMQ(包括创建和绑定)

生产者
package com.sky.rabbitmq.all;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Producer {

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;

        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            String message = "order来了";
            String exchange_name = "direct_message_exchange";
            String routeKey = "order";
            String exchange_type = "direct";

            // 声明队列
            
            channel.queueDeclare("queue7",true,false,false,null);
            channel.queueDeclare("queue8",true,false,false,null);
            channel.queueDeclare("queue9",true,false,false,null);

            // 声明交换机
            
            channel.exchangeDeclare(exchange_name,exchange_type,true);

            // 绑定队列
            
            channel.queueBind("queue7",exchange_name,"order");
            channel.queueBind("queue8",exchange_name,"order");
            channel.queueBind("queue9",exchange_name,"goods");
//            channel.exchangeBind();

            channel.basicPublish(exchange_name,routeKey,null,message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("消息发送失败!");
        }finally {
            // 7:关闭连接
            if (channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            // 8:关闭通道
            if (connection!= null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  • 声明队列
  • 声明交换机
  • 交换机绑定队列
  • 发送消息
  • 关闭

代码的方式和网页端操作差不多,理解即可。

消费者
package com.sky.rabbitmq.all;

import com.rabbitmq.client.*;

import java.io.IOException;


public class Consumer {
    private static Runnable runnable = new Runnable() {
        public void run() {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            connectionFactory.setVirtualHost("/");

            //获取队列的名称
            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                connection = connectionFactory.newConnection();
                channel = connection.createChannel();
                
                channel.basicConsume( queueName , true , new DeliverCallback() {
                    public void handle(String s, Delivery delivery) throws IOException {
                        System.out.println(queueName + ":收到消息是:" +
                                new String(delivery.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    public void handle(String s) throws IOException {
                        System.out.println("异常");
                    }
                });
                System.out.println("接收消息");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(runnable,"queue7").start();
        new Thread(runnable,"queue8").start();
        new Thread(runnable,"queue9").start();
//        new Thread(runnable,"queue2").start();
//        new Thread(runnable,"queue3").start();
    }
}

接收消息的时候一定要记得开启autoAck,不然一直无法回应服务器的消息,也就是无法删除消息,如果这一栏有消息变化,那么就是没开启autoAck。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/885922.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号