栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ Channel实操篇05

RabbitMQ Channel实操篇05

功能

使用Channel创建队列、交换机,发送消息以及消费消息

1 github: 2 rabbitmq04


    4.0.0

    
        com.yzm
        rabbitmq
        0.0.1-SNAPSHOT
        ../pom.xml 
    

    rabbitmq04
    0.0.1-SNAPSHOT
    jar
    rabbitmq04
    Demo project for Spring Boot

    

    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    


项目结构

application.yml
不配置

3 简单模式
package com.yzm.rabbitmq04.config;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.stereotype.Component;

@Component
public class RabbitConfig {

    public static final String QUEUE = "queue-a";
    
    // 获取RabbitMQ服务器连接
    public static Connection getConnection() {
        Connection connection = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            connection = factory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return connection;
    }
}

生产者

package com.yzm.rabbitmq04.service;

import com.rabbitmq.client.*;
import com.yzm.rabbitmq04.config.RabbitConfig;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@Component
public class SenderService {

    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
    public void producerA() throws IOException, TimeoutException {
        //1、获取连接
        Connection connection = RabbitConfig.getConnection();
        //2、创建通道,使用通道才能完成消息相关的操作
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(RabbitConfig.QUEUE, true, false, false, null);
        
        for (int i = 1; i <= 10; i++) {
            String message = "Hello World!...... " + i;
            System.out.println(" [ Sent ] 消息内容 " + message);
            channel.basicPublish("", RabbitConfig.QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        }

        // 5、释放资源
        channel.close();
        connection.close();
    }
}

消费者,自动确认消息

package com.yzm.rabbitmq04.service;

import com.rabbitmq.client.*;
import com.yzm.rabbitmq04.config.RabbitConfig;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

@Component
public class ReceiverService {

    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
    public void consumerA() throws IOException {
        // 1、获取连接
        Connection connection = RabbitConfig.getConnection();
        // 2、创建通道
        Channel channel = connection.createChannel();
        
        channel.basicConsume(RabbitConfig.QUEUE, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(" [ received@A_1 ] 消息内容 : " + new String(body, StandardCharsets.UTF_8) + "!");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}    

运行结果:

修改消费者,手动确认消息

@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
    public void consumerA_2() throws IOException {
        Connection connection = RabbitConfig.getConnection();
        Channel channel = connection.createChannel();

        //取消自动ack
        channel.basicConsume(RabbitConfig.QUEUE, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(" [ received@A_2 ] 消息内容 : " + new String(body, StandardCharsets.UTF_8) + "!");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //消息确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

注释

运行结果:

4 fanout 交换机
	public static final String FANOUT_EXCHANGE = "fanout-exchange-a";
    public static final String FANOUT_QUEUE_A = "fanout-queue-a";
    public static final String FANOUT_QUEUE_B = "fanout-queue-b";

生产者

	@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
    public void producerB() throws IOException, TimeoutException {
        Connection connection = RabbitConfig.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(RabbitConfig.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);
        // 声明队列
        channel.queueDeclare(RabbitConfig.FANOUT_QUEUE_A, true, false, false, null);
        channel.queueDeclare(RabbitConfig.FANOUT_QUEUE_B, true, false, false, null);
        // 队列绑定交换机,不需要路由键,用空字符串表示
        channel.queueBind(RabbitConfig.FANOUT_QUEUE_A, RabbitConfig.FANOUT_EXCHANGE, "");
        channel.queueBind(RabbitConfig.FANOUT_QUEUE_B, RabbitConfig.FANOUT_EXCHANGE, "");

        for (int i = 1; i <= 10; i++) {
            String message = "Hello World!...... " + i;
            System.out.println(" [ Sent ] 消息内容 " + message);
            channel.basicPublish(RabbitConfig.FANOUT_EXCHANGE, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        }

        channel.close();
        connection.close();
    }

消费者B_1、B_2

	@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
    public void consumerB_1() throws IOException {
        Connection connection = RabbitConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.basicConsume(RabbitConfig.FANOUT_QUEUE_A, true, getConsumer(channel, " [ received@B_1 ] 消息内容 : ", 500));
    }

    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
    public void consumerB_2() throws IOException {
        Connection connection = RabbitConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.basicConsume(RabbitConfig.FANOUT_QUEUE_B, true, getConsumer(channel, " [ received@B_2 ] 消息内容 : ", 1000));
    }

    private DefaultConsumer getConsumer(Channel channel, String s, int i) {
        return new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(s + new String(body, StandardCharsets.UTF_8) + "!");
                try {
                    Thread.sleep(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
    }

运行结果:

5 direct 交换机
    public static final String DIRECT_EXCHANGE = "direct-exchange-a";
    public static final String DIRECT_QUEUE_A = "direct-queue-a";
    public static final String DIRECT_QUEUE_B = "direct-queue-b";

生产者

    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
    public void producerC() throws IOException, TimeoutException {
        Connection connection = RabbitConfig.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(RabbitConfig.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);
        channel.queueDeclare(RabbitConfig.DIRECT_QUEUE_A, true, false, false, null);
        channel.queueDeclare(RabbitConfig.DIRECT_QUEUE_B, true, false, false, null);
        channel.queueBind(RabbitConfig.DIRECT_QUEUE_A, RabbitConfig.DIRECT_EXCHANGE, "direct.a");
        channel.queueBind(RabbitConfig.DIRECT_QUEUE_B, RabbitConfig.DIRECT_EXCHANGE, "direct.a.b");

        for (int i = 1; i <= 10; i++) {
            String message = "Hello World! " + i;
            if (i % 2 == 0) {
                channel.basicPublish(RabbitConfig.DIRECT_EXCHANGE, "direct.a", null, message.getBytes());
            } else {
                channel.basicPublish(RabbitConfig.DIRECT_EXCHANGE, "direct.a.b", null, message.getBytes());
            }
            System.out.println(" [ Sent ] 消息内容 " + message);
        }

        channel.close();
        connection.close();
    }

消费者C_1、C_2

    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
    public void consumerC_1() throws IOException {
        Connection connection = RabbitConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.basicConsume(RabbitConfig.DIRECT_QUEUE_A, true, getConsumer(channel, " [ received@C_1 ] 消息内容 : ", 500));
    }

    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
    public void consumerC_2() throws IOException {
        Connection connection = RabbitConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.basicConsume(RabbitConfig.DIRECT_QUEUE_B, true, getConsumer(channel, " [ received@C_2 ] 消息内容 : ", 1000));
    }

    private DefaultConsumer getConsumer(Channel channel, String s, int i) {
        return new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(s + new String(body, StandardCharsets.UTF_8) + "!");
                try {
                    Thread.sleep(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
    }

运行结果:

6 topic 交换机
    public static final String TOPIC_EXCHANGE = "topic-exchange-a";
    public static final String TOPIC_QUEUE_A = "topic-queue-a";
    public static final String TOPIC_QUEUE_B = "topic-queue-b";
    public static final String TOPIC_QUEUE_C = "topic-queue-c";

生产消息

    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
    public void producerD() throws IOException, TimeoutException {
        Connection connection = RabbitConfig.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(RabbitConfig.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);
        channel.queueDeclare(RabbitConfig.TOPIC_QUEUE_A, true, false, false, null);
        channel.queueDeclare(RabbitConfig.TOPIC_QUEUE_B, true, false, false, null);
        channel.queueDeclare(RabbitConfig.TOPIC_QUEUE_C, true, false, false, null);
        channel.queueBind(RabbitConfig.TOPIC_QUEUE_A, RabbitConfig.TOPIC_EXCHANGE, "topic.*.a");
        channel.queueBind(RabbitConfig.TOPIC_QUEUE_B, RabbitConfig.TOPIC_EXCHANGE, "topic.#");
        channel.queueBind(RabbitConfig.TOPIC_QUEUE_C, RabbitConfig.TOPIC_EXCHANGE, "topic.a.*");

        for (int i = 1; i <= 30; i++) {
            String message = "Hello World! " + i;
            if (i % 3 == 0) {
                channel.basicPublish(RabbitConfig.TOPIC_EXCHANGE, "topic.a.a", null, message.getBytes());
            } else if (i % 3 == 1) {
                channel.basicPublish(RabbitConfig.TOPIC_EXCHANGE, "topic.b.b", null, message.getBytes());
            } else {
                channel.basicPublish(RabbitConfig.TOPIC_EXCHANGE, "topic.a.c", null, message.getBytes());
            }
            System.out.println(" [ Sent ] 消息内容 " + message);
        }

        channel.close();
        connection.close();
    }

消费消息

	@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
    public void consumerD_1() throws IOException {
        Connection connection = RabbitConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.basicConsume(RabbitConfig.TOPIC_QUEUE_A, true, consumer(channel, " [ received@D_1 ] 消息内容 : "));
    }

    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
    public void consumerD_2() throws IOException {
        Connection connection = RabbitConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.basicConsume(RabbitConfig.TOPIC_QUEUE_B, true, consumer(channel, " [ received@D_2 ] 消息内容 : "));
    }

    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 10000)
    public void consumerD_3() throws IOException {
        Connection connection = RabbitConfig.getConnection();
        Channel channel = connection.createChannel();
        channel.basicConsume(RabbitConfig.TOPIC_QUEUE_C, true, consumer(channel, " [ received@D_3 ] 消息内容 : "));
    }

    private DefaultConsumer consumer(Channel channel, String s) {
        return new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(s + new String(body, StandardCharsets.UTF_8) + "!");
            }
        };
    }

分析

运行结果:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/443613.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号