栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ入门之常见模式

RabbitMQ入门之常见模式

1.什么是MQ

MQ(Message Queue):消息队列,是一种"先进先出"的数据结构。典型的模型就是我们所说的生产者、消费者模型。生产者不断地向消息队列中生产消息,消费者不断地从消息队列中获取消息,同时消息的生产和消费都是异步的,可以实现系统间的解耦

2.什么是RabbitMQ

RabbitMQ是使用Erlang语言开发的基于高级消息队列协议(Advanced Message Queuing Protocol,AMQP)的开源消息队列。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、数据可靠性、数据安全性

3.安装RabbitMQ

使用cat /etc/os-release查看系统版本号,我这里使用的是Ubuntu 20.04,对应的分支是focal

cat /etc/os-release

NAME="Ubuntu"
VERSION="20.04.2 LTS (Focal Fossa)"
ID=ubuntu
ID_LIKE=debian
PRETTY_NAME="Ubuntu 20.04.2 LTS"
VERSION_ID="20.04"
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
VERSION_CODENAME=focal
UBUNTU_CODENAME=focal
3.1 安装erlang和RabbitMQ

使用以下脚本快速安装:

#!/usr/bin/sh

sudo apt-get install curl gnupg apt-transport-https -y

## Team RabbitMQ's main signing key
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
## Cloudsmith: modern Erlang repository
curl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/gpg.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null
## Cloudsmith: RabbitMQ repository
curl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/gpg.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg > /dev/null

## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list < 
3.2 启动RabbitMQ 
# 启动rabbitmq
systemctl start rabbitmq-server

# 查看rabbitmq运行状态
systemctl status rabbitmq-server
3.3 加载web管理界面插件
# 加载RabbitMQ的插件,这样我们可以使用web界面来管理RabbitMQ,默认使用guest用户登录,且必须使用localhost:15672来访问管理界面
sudo rabbitmq-plugins enable rabbitmq_management

# username:guest
# password:guest
3.4 RabbitMQ配置文件

RabbitMQ给我们提供了一个配置文件模版,我们可以参照这个来配置。

模版文件地址:https://github.com/rabbitmq/rabbitmq-server/blob/v3.8.x/deps/rabbit/docs/rabbitmq.conf.example

在/etc/rabbitmq目录下创建rabbitmq.conf

# 文件名rabbitmq.conf
# 当该值为true时,我们只能通过localhost:15672来访问管理界面
# 当该值为false时,我们可以通过ip:15672来访问管理界面
loopback_users.guest = false
3.5 相关命令
# 查看相关命令的使用
sudo rabbitmqctl help
4. Java整合RabbitMQ 4.1 引入依赖

	com.rabbitmq
	amqp-client
	5.10.0

4.2 第一种模型(直连)

直连模式下,只有一个生产者和消费者,如果消费者处理消息的速度慢,但是生产者在源源不断的生产消息,就会导致消息的挤压

4.2.1 创建生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 如果不设置,默认使用guest用户
        factory.setUsername("stone");
        factory.setPassword("123456");
        factory.setHost("192.168.0.19");
        factory.setVirtualHost("test");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
            // 声明与队列相关的参数,boolean durable 如果设置为true的话就是将队列持久化
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            String message = "Hello World!";
            // String exchange, String routingKey, BasicProperties props, byte[] body
            // 发布消息
            // MessageProperties.PERSISTENT_TEXT_PLAIN  将消息持久化
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}
4.2.2 创建消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class Recv {
    // 这里我们并没有使用try-with-resource语句自动关闭channel和connection,这样可以使程序一直保持运行接收消息

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 如果不设置,默认使用guest用户
        factory.setUsername("stone");
        factory.setPassword("123456");
        factory.setHost("192.168.0.19");
        factory.setVirtualHost("test");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}
4.3第二种模型(Work Queue)

在工作队列模式中,默认使用的是轮询调度(Round-robin dispatching),RabbitMQ将会依次将消息发送给每个消费者,每个消费者将获得相同数量的消息

也可以手动设置为公平调度(Fair dispatch),即处理消息快的消费者会获得更多数量的消息来处理,处理消息慢的消费者获得的消息数量相对较少。

// 告诉RabbitMQ一次只给消费者一条消息,在该消费者处理完上一条消息之前,不再给该消费者发送消息
channel.basicQos(1);
4.3.1 生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.nio.charset.StandardCharsets;

public class NewTask {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 如果不设置,默认使用guest用户
        factory.setUsername("stone");
        factory.setPassword("123456");
        factory.setHost("192.168.0.104");
        factory.setVirtualHost("test");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 队列持久化
            boolean durable = true;
            channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);

            for (int i = 0; i < 100; i++) {
                String message = "task_message_.";
                message = message + i;
                channel.basicPublish("", TASK_QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }

        }
    }
}
4.3.2 消费者1
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class Worker1 {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 如果不设置,默认使用guest用户
        factory.setUsername("stone");
        factory.setPassword("123456");
        factory.setHost("192.168.0.104");
        factory.setVirtualHost("test");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
	    // 指示一次性只接收一条消息
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 将消息确认改为收到确认,当消费者处理消息宕机时,可以保证消息不丢失
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
        });
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(3);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}
4.3.3 消费者2
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class Worker2 {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 如果不设置,默认使用guest用户
        factory.setUsername("stone");
        factory.setPassword("123456");
        factory.setHost("192.168.0.104");
        factory.setVirtualHost("test");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 指示一次性只接收一条消息
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 将消息确认改为收到确认,当消费者处理消息宕机时,可以保证消息不丢失
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
        });
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}
4.4 第三种模型(Publish/Subscribe)

在Publish/Subscribe模型中,生产者将消息发布到Exchange中,Exchange将消息推送到队列中,消费者再去队列中获得消息进行消费,这种模式是将一条消息发送给多个消费者。

RabbitMQ中消息传递模型的核心思想是,生产者从不直接向队列发送任何消息。生产者将消息发送到Exchange,Exchange将消息推送到队列中。

Exchange的类型:direct,topic,headers 和fanout

**在这种模式下,我们主要介绍fanout类型的Exchange。**使用fanout类型,不需要设置routingKey,Exchange会将消息广播到与之绑定的所有的队列中。

direct

direct类型的Exchangequeue和Exchange绑定,并设置一个routingKey和routingKey完全匹配的消息将被路由到queue topic

topic类型的交换机queue和Exchange绑定,并设置一个规则的routingKey匹配routingKey规则的消息,将路由到指定的queue路由规则

* 指代一个字符# 指代一个或多个字符 headersfanout

fanout类型的Exchangequeue和Exchange绑定,不设置routingKey将收到的消息广播到与之绑定的所有队列 4.4.1 生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Provider {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.0.106");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            for (int i = 0; i < 10; i++) {
                String message = "测试fanout模型_" + i;
                // 因为是广播模型,所以不需要指定routingKey,消息将会推送至所有的queue
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}
4.4.2 消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class Consumer {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.0.106");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 获得临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 将交换机和queue绑定起来,无需routingKey
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}
4.5 第四种模型(Routing)

与Subscribe/Publish模式不同之处在于,Routing模式的Exchange类型是direct,并且queue和Exchange绑定的时候,设置了routingKey,只有routingKey完全匹配的消息才会路由到queue中。

当消息的routingKey为error时,消息将被路由到q1,当消息的routingKey为warning/info时,消息将被路由到q2。

4.5.1 生产者
public class Provider {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.0.106");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            String routingKey = "info";
            for (int i = 0; i < 10; i++) {
                String message = "direct_routing_message_" + i;
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
            }
        }
    }
}
4.5.2 消费者
public class Consumer {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.0.106");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 获得临时队列
        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

该模式下需要routingKey完全匹配的消息才能路由到queue中,在消息类型多的情况下不利于扩展,于是Topics模式诞生了

4.6 第五种模型(Topics)

该模式与Routing模式的区别在于,Routing模式下,routingKey是指定的,Topics模式下,routingKey必须是一个单词列表,用.分割,最多255个字节,例如:"my.routingkey",我们可以使用通配符来指定路由规则。

当然如果我们在topics模式中,不使用特殊字符*和#,其效果和Routing模式是一样的。

*:指代一个字符

#:指代0个或多个字符

4.6.1 生产者
public class Provider {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String routingKey = "message.a";
            String message ="topic_message" ;

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }
    }
}
4.6.2 消费者
public class Consumer {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
		// 绑定队列,并设置路由规则,消费者能消费到routingKey例如:message.a,message.b的消息
        channel.queueBind(queueName, EXCHANGE_NAME, "message.*");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

最后,欢迎关注微信公众号一起交流

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/730631.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号