rabbitmq
目录
rabbitmq
mq的优势
mq的劣势
rabbitmq基于AMQP协议
官网给的六种模式实现
1、用到的工具类
2、第一种:"helloworld"实现
官网:Messaging that just works — RabbitMQ
mq的优势
-
应用解耦
-
异步提速
-
削峰填谷
mq的劣势
-
降低系统的可用性
系统引入的外部依赖越多,系统稳定性越差,一旦mq宕机,整个系统就会瘫痪
-
系统的复杂度提高
保证消息的同步消息、避免重复消费、消息丢失等一系列问题
-
一致性问题
如何保证多个服务写操作一致
rabbitmq基于AMQP协议
应用解耦
异步提速
削峰填谷
-
降低系统的可用性
系统引入的外部依赖越多,系统稳定性越差,一旦mq宕机,整个系统就会瘫痪
-
系统的复杂度提高
保证消息的同步消息、避免重复消费、消息丢失等一系列问题
-
一致性问题
如何保证多个服务写操作一致
rabbitmq基于AMQP协议
AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间设计。
官网给的六种模式实现
1、用到的工具类
RabbitConstant.class和RabbitUtils.class
public class RabbitConstant {
public static final String QUEUE_HELLOWORD = "helloworld";
public static final String QUEUE_SMS = "queuesms";
public static final String EXCHANGE_WEATHER = "weather";
public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
public static final String QUEUE_BAIDU = "baidu";
public static final String QUEUE_SINA = "sina";
}
package com.liheng.rabbitmq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitUtils {
private static ConnectionFactory connectionFactory = new ConnectionFactory();
static {
connectionFactory.setHost("xxx.xxx.xx.xx");//主机地址
connectionFactory.setPort(5672);
connectionFactory.setUsername("liheng123");
connectionFactory.setPassword("liheng123");
connectionFactory.setVirtualHost("liheng123Virtual");
}
public static Connection getConnection(){
Connection connection = null;
try {
connection = connectionFactory.newConnection();
return connection;
} catch (Exception e) {
throw new RuntimeException();
}
}
}
2、第一种:"helloworld"实现
Consumer.class 消费者代码
package com.liheng.rabbitmq.helloword;
import com.liheng.rabbitmq.util.RabbitConstant;
import com.liheng.rabbitmq.util.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) {
//获取tcp长链接
try {
Connection connection = RabbitUtils.getConnection();
//创建通信 通道
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORD,false,false,false,null);
//从mq中去消费
channel.basicConsume(RabbitConstant.QUEUE_HELLOWORD,false,new Reciver(channel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
class Reciver extends DefaultConsumer{
private Channel channel;
public Reciver(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("接受到的消息:"+ message);
System.out.println("消息的tagId:"+ envelope.getDeliveryTag());
//false 代表只前后当前的消息,true代表签收所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
Producer.class 消费者
package com.liheng.rabbitmq.helloword;
import com.liheng.rabbitmq.util.RabbitConstant;
import com.liheng.rabbitmq.util.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) {
//获取tcp长链接
try {
Connection connection = RabbitUtils.getConnection();
//创建通信 通道
Channel channel = connection.createChannel();
//RabbitConstant.QUEUE_HELLOWORD 如果没有当前队列 会自动生成队列
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORD,false,false,false,null);
String message = "李恒666";
channel.basicPublish("",RabbitConstant.QUEUE_HELLOWORD,null,message.getBytes());
channel.close();
connection.close();
System.out.println("数据发送成功!");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}



