栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ使用 Java使用案例

RabbitMQ使用 Java使用案例

RabbitMQ Java实例 引入RabbitMQ的jar包

    com.rabbitmq
    amqp-client
    5.1.2

创建消息生产者
    public void publisher() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
		factory.setUsername("test");
		factory.setPassword("testpssword");
		factory.setPort(5201);
		factory.setVirtualHost("mss");
        Connection connection = null;
        Channel channel = null;
        try {
            //1.创建连接和通道
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.exchangeDeclare("EXCHANGE", BuiltinExchangeType.FANOUT);   //不同类型
            //2.为通道声明队列
            channel.queueDeclare("object_pick_data", true, false, false, null);
            
            //3.发布消息
            String msg = "{"+
					""event_id": 1331,"+
					""event_type": 3,"+
					""status": 1,"+
					""event_time": "2021-06-03 10:50:39","+
					""data": [{"+
					""person_id": 908,"+
						""goods_id": "1","+
						""image_uri": "/home/ai/projects/VideoShowHall/pictures/669a28b3-e3f9-536c-8ec3-9b8c3f03ac5b.jpg""+
					"}]"+
				    "}";
            
            channel.basicPublish("", "object_pick_data", null, msg.getBytes());
            System.out.println("provider send a msg: " + msg);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (java.util.concurrent.TimeoutException e) {
            e.printStackTrace();
        } finally {
            //4.关闭连接
            if (channel != null) {
                try {
				    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (java.util.concurrent.TimeoutException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
            }

            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
创建消息消费者
	
	@SuppressWarnings("deprecation")
	public String consumer(String queuename, boolean durable) {
		String Host=this.getSysConfigValueByDB("MQHost");
		String Username=this.getSysConfigValueByDB("MQUsername");
		String Password=this.getSysConfigValueByDB("MQPassword");
		String Port=this.getSysConfigValueByDB("MQPort");
		String VirtualHost=this.getSysConfigValueByDB("MQVirtualHost");//虚拟消息服务器
		String message="";
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Host);
		factory.setUsername(Username);
		factory.setPassword(Password);
		factory.setPort(Integer.parseInt(Port));
		factory.setVirtualHost(VirtualHost);
        Connection connection = null;
        Channel channel = null;
         try {
             // 1.创建连接和通道
		     connection = factory.newConnection();
             channel = connection.createChannel();
             channel.exchangeDeclare("EXCHANGE", BuiltinExchangeType.FANOUT);   //不同类型
             // 2.设置交换机类型 MQ上看
             channel.queueBind(queuename,"EXCHANGE","");
             // 3.为通道声明队列   //第二个参数持久化 设置成true 跟对方一致
             channel.queueDeclare(queuename, durable, false, false, null);
             System.out.println(" **** keep alive ,waiting for messages, and then deal them");

             // 4.创建队列消费者
     		 QueueingConsumer consumer = new QueueingConsumer(channel);
     		 boolean ack = false; // 是否自动确认消息被成功消费
     		 channel.basicQos(1, false);
     		 channel.basicConsume(queuename, ack, consumer); // 指定消费队列

     		 //5..消费消息
			 // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
			 try {
				System.out.println("延时阻塞请求");
				QueueingConsumer.Delivery delivery = consumer.nextDelivery();
				message = new String(delivery.getBody());
			    System.out.println(new Date()+"=====接收消息=====" + message);
			    //消费应答
			    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
			    //关闭连接
			    channel.close();
			    connection.close();
			 } catch (ShutdownSignalException e) {
				e.printStackTrace();
			 } catch (ConsumerCancelledException e) {
			 	e.printStackTrace();
			 } catch (InterruptedException e) {
				e.printStackTrace();
			 }
         } catch (IOException e) {
             e.printStackTrace();
         }catch (java.util.concurrent.TimeoutException e) {
			 e.printStackTrace();
		 }
         return message;
     }
消费者调用
	    //消费者方法调用 
		String  message= this.consumer("object_pick_data",true);
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/710571.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号