栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

简单队列模型

简单队列模型

"Hello World!"

The simplest thing that does something 

消息发送者:

package com.tech.rabbitmq.normal;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;


public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.50.134");
        factory.setPort(5672);
        factory.setVirtualHost("/dev");
        factory.setUsername("tech");
        factory.setPassword("tech");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //队列名称、是否持久化、是否独占(只能有一个消费者)、是否自动删除(在没有消费者时自动删除)、其他参数
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            //交换机名称(不写为默认交换机,路由键名称与队列名称一致才能被路由)
            //路由键名称
            //配置信息
            //字节数组
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

 消息接收者:

package com.tech.rabbitmq.normal;

import com.rabbitmq.client.*;

import java.io.IOException;


public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.50.134");
        factory.setPort(5672);
        factory.setVirtualHost("/dev");
        factory.setUsername("tech");
        factory.setPassword("tech");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //一般是固定的,可以作为会话的名称
                System.out.println("consumerTag="+consumerTag);
                //可以获取交换机、路由键等信息
                System.out.println("envelope="+envelope);
                System.out.println("properties="+properties);
                System.out.println("body="+new String("utf-8"));
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consumer);
//        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//            String message = new String(delivery.getBody(), "UTF-8");
//            System.out.println(" [x] Received '" + message + "'");
//        };
//        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

接收端打印内容:

 [*] Waiting for messages. To exit press CTRL+C
consumerTag=amq.ctag-CJUdS84aaSe-qJgmZNTfRA
envelope=Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=hello)
properties=#contentHeader(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body=utf-8

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350601.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号