栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

使用Java连接Rabbitmq

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使用Java连接Rabbitmq

rabbitmq

rabbitmq入门简介 : https://blog.csdn.net/kavito/article/details/91403659

rabbitmq安装教程: https://blog.csdn.net/weixin_42673046/article/details/118442323

rabbitmq入门介绍以及安装教程网上有过很好的讲解,这就不重复造轮子了,直奔主题。记录使用java进行连接rabbitmq以及springboot集成rabbitmq中遇到的问题

使用Java连接rabbitmq

导入依赖:



    com.rabbitmq
    amqp-client
    5.9.0



    commons-io
    commons-io
    2.6

编写rabbitmqconfigutils配置类,把mq的连接操作提取出来,代码解耦

public class RabbitmqUtils {

    private static Channel channel = null;

    private static Connection connection = null;

    public static Channel getChannel(){
		//定义连接池
        ConnectionFactory factory = new ConnectionFactory();
		//设置主机地址
        factory.setHost("192.168.81.131");
		//设置端口
        factory.setPort(5672);
		//设置用户名
        factory.setUsername("admin");
		//密码
        factory.setPassword("admin");
		//虚拟机路径
        factory.setVirtualHost("/");

        try {
            connection = factory.newConnection();
			//创建信道
            channel = connection.createChannel();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return channel;
    }
	//关闭连接
    public static void closeConnection(){
        try {
            channel.close();
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }
}

rabbitmq有六种工作模式分别是:简单队列、work模式、发布/订阅模式、路由模式、主题模式还有个rpc模式,

下面主要记录发布订阅工作模式、路由工作模式、topic主题工作模式

发布订阅模式

无选择接收消息,一个消息生产者,一个交换器,多个消息队列,多个消费者。生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息

消息生产者
//利用写好工具类获取信道连接
Channel channel = RabbitmqUtils.getChannel();

try {
    
    channel.exchangeDeclare("text_pubsub", BuiltinExchangeType.FANOUT,false,false,false,null);
    
    channel.queueDeclare("pubsub_queue1",false,false,false,null);
    channel.queueDeclare("pubsub_queue2",false,false,false,null);
	
    channel.queueBind("pubsub_queue1","text_pubsub","");
    channel.queueBind("pubsub_queue2","text_pubsub","");

    String msg = "发布订阅模式!!!";
	
    channel.basicPublish("text_pubsub","",null,msg.getBytes());

} catch (IOException e) {
    e.printStackTrace();
}finally {
    RabbitmqUtils.closeConnection();
}

运行测试对比rabbitmq界面管理(代码若没报错创建成功,可通过ip地址+15672访问web界面查看)

查看队列是否成功创建

查看是否成功创建交换机

消息消费者
Channel channel = RabbitmqUtils.getChannel();
try {
     
    channel.basicConsume("pubsub_queue1",true,(s, delivery) ->{
        System.out.println( new String(delivery.getBody())+"存到数据库");
    } ,consumerTag-> System.out.println("取消消费"));
} catch (IOException e) {
    e.printStackTrace();
}

运行消费者代码再次访问web控制界面看到其中pubsub_queue1还消息已经被消费

路由模式

路由模式生产者指定路由发送数据,消费者绑定路由接受数据。与发布/订阅模式不同的是,发布/订阅模式只要是绑定了交换机的队列都会收到生产者向交换机推送过来的数据。而路由模式下加了一个路由设置,生产者向交换机发送数据时,会声明发送给交换机下的那个路由,并且只有当消费者的队列绑定了交换机并且声明了路由,才会收到数据

消息生产者
Channel channel = RabbitmqUtils.getChannel();
//这些模式的编码都没有很大的差别,只是在原有的基础上去添加配置代码
try {
    //创建交换机 为DIRECT模式
    channel.exchangeDeclare("text_routing", BuiltinExchangeType.DIRECT,false,false,false,null);
	//创建路由
    channel.queueDeclare("routing_queue1",false,false,false,null);
    channel.queueDeclare("routing_queue2",false,false,false,null);
	//将交换机与队列进行绑定设置routerkey
    channel.queueBind("routing_queue1","text_routing","error");
    channel.queueBind("routing_queue2","text_routing","info");
    channel.queueBind("routing_queue2","text_routing","error");
    String msg = "路由模式!!!";
    //向text_routing里发送消息routerkey为error,只有路由key为error的队列方可接收到消息
    channel.basicPublish("text_routing","error",null,msg.getBytes());
} catch (IOException e) {
    e.printStackTrace();
}finally {
    RabbitmqUtils.closeConnection();
}

查看界面消息

消息消费者
Channel channel = RabbitmqUtils.getChannel();
try {
    //路由key error绑定的是routing_queue1 和 routing_queue2我们访问任一个都可以接收到消息
    //但当routerkey为info时在访问queue1时就接收不到数据了,业务队列1绑定的仅有error
    channel.basicConsume("routing_queue1",true,(s, delivery) ->{
        System.out.println( new String(delivery.getBody())+"存到数据库");
    } ,consumerTag-> System.out.println("取消消费"));
} catch (IOException e) {
    e.printStackTrace();
}
主题模式

主题模式可以简单的理解为可以动态路由,*代表一个单词,#可以代替零个或多个单词,单词最多 255 个字节,通过相关的匹配规则后就会将满足条件的消息放到对应的队列中,每个单词之间要用点隔开

生产者
 Channel channel = RabbitmqUtils.getChannel();
try {
    channel.exchangeDeclare("text_topic", BuiltinExchangeType.TOPIC,false,false,false,null);

    channel.queueDeclare("topic_queue1",false,false,false,null);
    channel.queueDeclare("topic_queue2",false,false,false,null);
	//使用通配符去匹配routerkey
    channel.queueBind("topic_queue1","text_topic","#.error");
    channel.queueBind("topic_queue2","text_topic","*.*");

    String msg = "topic主题订阅模式!!!";
	//使用order.info 可以发布到queue2里
    channel.basicPublish("text_topic","order.info",null,msg.getBytes());

} catch (IOException e) {
    e.printStackTrace();
}finally {
    RabbitmqUtils.closeConnection();
}

运行代码访问web界面

访问queue1队列没有消息,访问queue2队列会 获得刚刚发送的消息

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/837629.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号