栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【RabbitMQ】6、Simple 简单模式完整实现步骤

【RabbitMQ】6、Simple 简单模式完整实现步骤

1、实现步骤

jdk1.8构建一个maven工程导入rabbitmq的maven依赖启动rabbitmq-server服务定义生产者定义消费者观察消息的在rabbitmq-server服务中的过程 2、构建一个maven工程

3、导入rabbitmq依赖 1)Java原生依赖

    com.rabbitmq
    amqp-client
    5.14.2

2)spring依赖

    org.springframework.amqp
    spring-amqp
    2.4.2



    org.springframework.amqp
    spring-rabbit
    2.4.2

3)springboot依赖

    org.springframework.boot
    spring-boot-starter-amqp
    2.6.4

可以根据自已的项目环境选择上面三种依赖形式

4、启动rabbitmq-server服务
systemctl start rabbitmq-server

# 或者 docker启动
docker start myrabbit
5、定义生产者
package com.tuwer.rabbitmq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Producer {
    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.19.101");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        // 2、创建连接
        Connection connection = null;
        // 3、获取通道
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("生产者");
            channel = connection.createChannel();

            // 4、通过通道声明队列queue存储消息
            String queueName = "queue1";
            
            channel.queueDeclare(queueName, false, false, false, null);

            // 5、准备消息
            String message = "Hello World! " + LocalDateTime.now();
            // 6、发送消息
            channel.basicPublish("", queueName, null, message.getBytes());

            System.out.println("消息发送成功!");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 7、关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 8、关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

6、定义消费者
package com.tuwer.rabbitmq.simple;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Consumer {
    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.19.101");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        // 2、创建连接
        Connection connection = null;
        // 3、获取通道
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("消费者");
            channel = connection.createChannel();

            // 4、通过通道接收消息
            String queueName = "queue1";
            
            channel.basicConsume(
                    queueName,
                    true,
                    new DeliverCallback() {
                        @Override
                        public void handle(String consumerTag, Delivery message) throws IOException {
                            System.out.println("收到的消息:" + new String(message.getBody()));
                        }
                    },
                    new CancelCallback() {
                        @Override
                        public void handle(String consumerTag) throws IOException {
                            System.out.println("消息接收失败!");
                        }
                    }
            );

            // 循环读取
            System.out.println("开始接收消息....");
            System.in.read();
            
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 5、关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 6、关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

lambada表达式简便写法

// 省略...
// 4、通过通道接收消息
 String queueName = "queue1";
 // 接收处理
 DeliverCallback deliverCallback = (consumerTag, message) -> {
     System.out.println("收到的消息:" + new String(message.getBody()));
 };
 // 接收失败处理
 CancelCallback cancelCallback = consumerTag -> {
     System.out.println("消息接收失败!");
 };
 
 channel.basicConsume(
         queueName,
         true,
         deliverCallback,
         cancelCallback
 );
// 省略... 
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774859.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号