栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

消息中间件rabbitMQ之第一种消息模型(直连)

消息中间件rabbitMQ之第一种消息模型(直连)

AMQP协议的回顾

RabbitMQ支持的消息模型

 

引入依赖

    com.rabbitmq
    amqp-client
    5.7.2
第一种模型(直连)

 

在上图的模型中,有以下概念:

P:生产者,也就是要发送消息的程序

C:消费者:消息的接受者,会一直等待消息到来。

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

开发生产者

 

//创建连接工厂
  ConnectionFactory connectionFactory = new ConnectionFactory();
  connectionFactory.setHost("10.15.0.9");
  connectionFactory.setPort(5672);
  connectionFactory.setUsername("ems");
  connectionFactory.setPassword("123");
  connectionFactory.setVirtualHost("/ems");
  Connection connection = connectionFactory.newConnection();
  //创建通道
  Channel channel = connection.createChannel();
  //参数1: 是否持久化  参数2:是否独占队列 参数3:是否自动删除  参数4:其他属性
  channel.queueDeclare("hello",true,false,false,null);
  channel.basicPublish("","hello", null,"hello rabbitmq".getBytes());
  channel.close();
  connection.close();
开发消费者
//创建连接工厂
  ConnectionFactory connectionFactory = new ConnectionFactory();
  connectionFactory.setHost("10.15.0.9");
  connectionFactory.setPort(5672);
  connectionFactory.setUsername("ems");
  connectionFactory.setPassword("123");
  connectionFactory.setVirtualHost("/ems");
  Connection connection = connectionFactory.newConnection();
  Channel channel = connection.createChannel();
  channel.queueDeclare("hello", true, false, false, null);
  channel.basicConsume("hello",true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      System.out.println(new String(body));
    }
  });
参数的说明
  channel.queueDeclare("hello",true,false,false,null);
	'参数1':用来声明通道对应的队列
  '参数2':用来指定是否持久化队列
  '参数3':用来指定是否独占队列
  '参数4':用来指定是否自动删除队列
  '参数5':对队列的额外配置
编写rabbitmq工具类

     可以明显发现生产者和消费者的代码有冗余,所以构建出工具类编写获取连接和关闭连接的方法

package com.demo.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class RabbitMQUtils {

    public static Connection getRabbitConnection() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.15.0.9");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/ems");
        Connection connection = connectionFactory.newConnection();
        return connection;
    }

    public static void closeRabbitConnection(Channel channel , Connection connection){
        try {
        if (channel!=null) {
            channel.close();
        }
        if (connection!=null) {
            connection.close();
        }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/707156.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号