栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Rabbitmq入门(一)

Rabbitmq入门(一)

rabbitmq

目录

rabbitmq

mq的优势

mq的劣势

rabbitmq基于AMQP协议

官网给的六种模式实现

1、用到的工具类

2、第一种:"helloworld"实现


​​​​​​​

 

官网:Messaging that just works — RabbitMQ

mq的优势
  1. 应用解耦

  2. 异步提速 

  3. 削峰填谷

mq的劣势
  1. 降低系统的可用性

    系统引入的外部依赖越多,系统稳定性越差,一旦mq宕机,整个系统就会瘫痪

  2. 系统的复杂度提高

    保证消息的同步消息、避免重复消费、消息丢失等一系列问题

  3. 一致性问题

    如何保证多个服务写操作一致

rabbitmq基于AMQP协议

AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间设计。

官网给的六种模式实现

1、用到的工具类

RabbitConstant.class和RabbitUtils.class

public class RabbitConstant {
    public static final String QUEUE_HELLOWORD = "helloworld";
    public static final String QUEUE_SMS = "queuesms";
    public static final String EXCHANGE_WEATHER = "weather";
    public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
    public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
    public static final String QUEUE_BAIDU = "baidu";
    public static final String QUEUE_SINA = "sina";
}
​
​
package com.liheng.rabbitmq.util;
​
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
​

public class RabbitUtils {
    private static ConnectionFactory connectionFactory = new ConnectionFactory();
​
    static {
        connectionFactory.setHost("xxx.xxx.xx.xx");//主机地址
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("liheng123");
        connectionFactory.setPassword("liheng123");
        connectionFactory.setVirtualHost("liheng123Virtual");
    }
    public static Connection getConnection(){
        Connection connection = null;
        try {
            connection = connectionFactory.newConnection();
            return connection;
        } catch (Exception e) {
            throw new RuntimeException();
        }
    }
}

2、第一种:"helloworld"实现

Consumer.class 消费者代码

package com.liheng.rabbitmq.helloword;
​
import com.liheng.rabbitmq.util.RabbitConstant;
import com.liheng.rabbitmq.util.RabbitUtils;
import com.rabbitmq.client.*;
​
import java.io.IOException;
​

public class Consumer {
    public static void main(String[] args) {
        //获取tcp长链接
        try {
            Connection connection = RabbitUtils.getConnection();
            //创建通信 通道
            Channel channel = connection.createChannel();
          
            channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORD,false,false,false,null);
​
            //从mq中去消费
            channel.basicConsume(RabbitConstant.QUEUE_HELLOWORD,false,new Reciver(channel));
​
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
class Reciver extends DefaultConsumer{
​
    private Channel channel;
    public Reciver(Channel channel) {
        super(channel);
        this.channel = channel;
    }
​
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body);
        System.out.println("接受到的消息:"+ message);
        System.out.println("消息的tagId:"+ envelope.getDeliveryTag());
        //false 代表只前后当前的消息,true代表签收所有未签收的消息
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}

Producer.class 消费者

package com.liheng.rabbitmq.helloword;
​
import com.liheng.rabbitmq.util.RabbitConstant;
import com.liheng.rabbitmq.util.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​

public class Producer {
    public static void main(String[] args) {
        //获取tcp长链接
        try {
            Connection connection = RabbitUtils.getConnection();
            //创建通信 通道
            Channel channel = connection.createChannel();
            //RabbitConstant.QUEUE_HELLOWORD 如果没有当前队列 会自动生成队列
            
            channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORD,false,false,false,null);
​
            String message = "李恒666";
          
            channel.basicPublish("",RabbitConstant.QUEUE_HELLOWORD,null,message.getBytes());
            channel.close();
            connection.close();
            System.out.println("数据发送成功!");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}
​

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/335199.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号