栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ 初步学习

RabbitMQ 初步学习

功能

流量削峰应用解耦异步处理 组成 4大部分

生成者交换机队列消费者信道Channel 用于 复用一个连接(Connection–抽象socket连接,实现协议转换认证等) 使用

服务启动/停止:service rabbitmq-server start/stop安装管理插件:rabbitmq-plugins enable rabbitmq_management 默认访问端口:15672

rabbitmqctl add_user 添加用户rabbitmq set_user_tags administrator 设置管理员角色rabbitmq [-p ] 设置用户对 某host的访问权 Hello World

Producer --> RabbitMQ --> Consumer导入amqp-clientProducer

public class Producer {
    private static final String QUEUE_NAME = "Test_Queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.141.151.176");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword(" ");
        Connection connection = connectionFactory.newConnection();
        System.out.println("Connection...");

        //创建信道
        Channel channel = connection.createChannel();

        //声明队列
        //声明队列是幂等的
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //发送消息
        for(int i = 0; i<10000000; i++){
            String message = "Hello!!!!!!! " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println("Send: " + message);
        }

        channel.close();
        connection.close();
    }
}

Consumer

public class Consumer {
    private static final String QUEUE_NAME = "Test_Queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        //......

        //创建信道
        //......

        //声明队列
        //声明队列是幂等的
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery)->{
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Received: " + message);
        };

        //注册回调函数,开始监听
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

queueDeclare(…)

若不存在,则创建队列。若已存在,则获取已存在队列(不允许参数不一致)— 幂等 工作队列 Work Queue

使用消息队列分发耗时的任务,避免立即执行资源密集型任务,并等待其完成

默认使用 Round-robin(轮询)分派消息

消息确认(Message Acknowlegment) 机制

消息队列收到Ack后,认为消息已经送达并被处理消费者终止或断开后,若没有收到Ack则快速将消息发送给下一个消费者超时时间(默认设为30s)autoAck = true自动Ackchannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);手动Ack

消息持久性

queueDeclare的durable参数设为true: RabbitMQ关闭或崩溃后, 标记为persistent消息不会从该队列中丢失(通过MessageProperties设置)并不保证完全不会丢失(写入磁盘缓存区的数据可能未刷新 fsync(2) )

channel.basicQos(prefetchCount); – 限制同一时间一个消费者可获取的消息的数量

发布/订阅 Publish/Subscribe

一个消息传递给多个消费者 – 发布/订阅模式

创建临时队列 channel.queueDeclare() – 无参数

路由 Routing

控制exchange将消息派送到 匹配该消息的routingKey的 消息队列将消息队列绑定至exchange时,设置其binding key(routingKey),以匹配消息

queueBind(String queue, String exchange, String routingKey)

一个消息队列 可以 以不同的binding key多次绑定到同一个exchange

多个消息队列 可以 以相同的binding key同时绑定到同一个exchange

发送消息时,设置routingKey以匹配队列

basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) exchange

绑定一个或多个channel

将消息分派到指定的channel

类型:

direct:将消息的routingKey 与 队列绑定到exchange的bindingKey 精确匹配topic:

消息和队列绑定所用 routing key 应为以’.'分割的一组单词。exchange转发时进行匹配‘*’:表示任意一个词‘#’:表示任意多个词 headers:fanout: 将消息送至所有与其绑定的队列。 将忽略routingKey

声明exchange channel.exchangeDeclare();

channel.basicPublish("", "hello", null, message.getBytes())使用匿名exchange分派消息到hello队列;

绑定exchange 和 channel channel.queueBind(queueName, exchangeName, "");

若无队列绑定到exchange上,则送至该exchange的消息将被丢弃

远程过程调用 Remote Procedure Call (RPC)

请求端:

设置请求消息的correlationId属性,以标识一个request消息设置消息的replyTo属性,用于通知接收方用该队列返回response消息发送request消息等待reponse消息

Channel channel = RabbitMQUtils.getChannel();

//声明一个匿名的消息队列,用于服务端传回响应
String callbackQueueName = channel.queueDeclare().getQueue();
//correlationId 携带在request上,再由response带回,以确定response对应的request
String myCorrelationId = "misakamikoto";
//配置
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties props = builder.replyTo(callbackQueueName).correlationId(myCorrelationId).build();


//发送携带着 correlationId 和 replyTo 属性的消息
channel.basicPublish("", "RPCQueue", props, "hello.".getBytes(StandardCharsets.UTF_8));
System.out.println("request...");

//等待response
System.out.println("wait response...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    System.out.println("response: " + new String(delivery.getBody()));
    System.out.println("message correlationId: " + delivery.getProperties().getCorrelationId());
};
channel.basicConsume(callbackQueueName, deliverCallback, consumerTag -> {});

服务端

等待接收request消息处理request消息使用request消息的replyTo属性指定的消息队列返回response消息(设置reponse消息的correlationId属性为request消息的correlationId以标识其回应的请求)

Channel channel = RabbitMQUtils.getChannel();

channel.queueDeclare("RPCQueue", false, false, true, null);

DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
    String request = new String(delivery.getBody(), StandardCharsets.UTF_8);
    System.out.println("receive: " + request);

    //处理请求
    work();

    String callbackQueueName = delivery.getProperties().getReplyTo();           //获取用于返回的队列
    String myCorrelationId = delivery.getProperties().getCorrelationId();       //获取该消息的correlationId,并将其返回,以标识对那个request进行response

    //发送response
    AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
    AMQP.BasicProperties props = builder.correlationId(myCorrelationId).build();
    channel.basicPublish("", callbackQueueName, props, "work out.".getBytes(StandardCharsets.UTF_8));
});

channel.basicConsume("RPCQueue", deliverCallback, consumerTag -> {});

消息的常用属性(AMQP 0-9-1协议)

deliveryMode: 是否持久contentType: 消息内容的类型 (application/json …)replyTo: 用于callback的队列correlationId:用于标识一个消息(用于response和request匹配)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775853.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号