栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ入门案例

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ入门案例

1:jdk1.8
2:构建一个maven工程
3:导入rabbitmq的maven依赖
4:启动rabbitmq-server服务
5:定义生产者
6:定义消费者
7:观察消息的在rabbitmq-server服务中的过程

1、导入依赖

    com.rabbitmq
    amqp-client
    5.10.0

2、RabbitMQ服务
docker start myrabbit
3、定义生产者
package com.sky.rabbitmq.simple;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Producer {
    public static void main(String[] args){
        // 所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过RabbitMQ遵循的是amqp
        // ip port

        // 1:创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // ip
        connectionFactory.setHost("47.112.138.176");
        // 端口不是管理界面的端口
        connectionFactory.setPort(5672);
        // 登录账号
        connectionFactory.setUsername("admin");
        // 登录密码
        connectionFactory.setPassword("admin");
        // 访问路径根节点
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        // 2:创建连接connection
        try {
            connection = connectionFactory.newConnection("生产者");
            // 3:通过连接获取通道channel
            channel = connection.createChannel();
            // 4:通过创建交换机,声明队列,绑定关系,路由key,发送消息和接收消息
            String queueName = "queue1";
            
            channel.queueDeclare(queueName,false,false,true,null);
            // 5:准备消息内容
            String message = "HelloWorld";
            // 6:发送消息给队列queue 这里空字符串,是一个交换机
            channel.basicPublish("",queueName,null,message.getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            // 7:关闭连接
            if (channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            // 8:关闭通道
            if (connection!= null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
4、消费者
package com.sky.rabbitmq.simple;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Consumer {
    public static void main(String[] args) {
        // 所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过RabbitMQ遵循的是amqp
        // ip port

        // 1:创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // ip
        connectionFactory.setHost("47.112.138.176");
        // 端口不是管理界面的端口
        connectionFactory.setPort(5672);
        // 登录账号
        connectionFactory.setUsername("admin");
        // 登录密码
        connectionFactory.setPassword("admin");
        // 访问路径根节点
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        // 2:创建连接connection
        try {
            connection = connectionFactory.newConnection("消费者");
            // 3:通过连接获取通道channel
            channel = connection.createChannel();
            // 4:通过创建交换机,声明队列,绑定关系,路由key,发送消息和接收消息
            String queueName = "queue1";
            
            channel.queueDeclare(queueName,false,false,true,null);
            // 5:准备消息内容
            String message = "HelloWorld";
            // 6:发送消息给队列queue
            channel.basicConsume("queue1", true, new DeliverCallback() {
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("收到消息是" + new String(delivery.getBody(),"UTF-8"));
                }
            }, new CancelCallback() {
                public void handle(String s) throws IOException {
                    System.out.println("接收消息失败。。。");
                }
            });

            System.out.println("开始接收消息");
            System.in.read();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            // 7:关闭连接
            if (channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            // 8:关闭通道
            if (connection!= null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

5、测试 执行发送

运行生产者

接收

关闭consumer后队列删除

6、图解



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/854777.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号