MQ(Message Queue)消息队列,是一种用来保存消息数据的队列
队列:数据结构的一种,特征为 “先进先出”
2.MQ 的作用-
应用解耦(技术上必须弄好才能使用MQ )
-
快速应用变更维护
-
流量削锋(削峰填谷)
1系统可用性降低: 集群
2系统复杂度提高:(程序员提升水平)
3异步消息机制(都有解决方案)
消息顺序性
消息丢失
消息一致性
消息重复使用
ActiveMQ:java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高
RabbitMQ :erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,
RocketMQ :java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强
kafka :scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多
1.提供方
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
public class Provider {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException {
// 创建发送消息的提供方
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 设置发送命名的地址
producer.setNamesrvAddr("192.168.23.143:9876");
// 启动发送消息的服务
producer.start();
// 创建消息内容,指定topic,指定内容body
Message msg = new Message("topic","hello rq".getBytes(StandardCharsets.UTF_8));
// 发送消息
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
// 关闭连接
producer.shutdown();
}
}
2.消费方
package com.itheima.rock;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException {
// 开启接收消息的对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 指定消费消息的地址
consumer.setNamesrvAddr("192.168.23.143:9876");
// 指定 接收消息对应的topic,对应的sub标签为任意*
consumer.subscribe("topic","*");
// 开启监听消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 遍历消息
for (MessageExt msg : list) {
System.out.println("消息"+new String(msg.getBody()));
}
// 返回成功消息的通知,不会再发送这个消息给消费方
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费的服务
consumer.start();
}
}
6.广播模式
生产方
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
public class Provider {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException {
// 创建发送消息的提供方
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 设置发送命名的地址
producer.setNamesrvAddr("192.168.23.143:9876");
// 启动发送消息的服务
producer.start();
// 创建消息内容,指定topic,指定内容body
Message msg = new Message("topic","hello rq".getBytes(StandardCharsets.UTF_8));
// 发送消息
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
// 关闭连接
producer.shutdown();
}
}
消费方
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
System.out.println(consumer.getInstanceName());
//consumer.setInstanceName("instance01");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("192.168.31.80:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意*
consumer.subscribe("topic1","*");
//设置当前消费者的消费模式(默认模式:负载均衡)
// consumer.setMessageModel(MessageModel.CLUSTERING);
//设置当前消费者的消费模式为广播模式:所有客户端接收的消息都是一样的
consumer.setMessageModel(MessageModel.BROADCASTING);
//3.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//遍历消息
for(MessageExt msg : list){
// System.out.println("收到消息:"+msg);
System.out.println("消费者1:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//4.启动接收消息的服务
consumer.start();
System.out.println("接收消息服务已开启运行");
广播模式的现象
1) 如果 生产者先发送消息, 后启动消费者, 消息只能被消费一次
2) 如果多个消费者先启动(广播模式),后发消息,才有广播的效果
结论:
必须先启动消费者再启动发送者才有广播的效果
同步消息发送
异步消息发送
单向消息
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.31.80:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
//同步消息发送
Message msg = new Message("topic2",("同步消息:hello rocketmq "+i).getBytes("UTF-8"));
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
//异步消息发送
Message msg2 = new Message("topic2",("异步消息:hello rocketmq "+i).getBytes("UTF-8"));
producer.send(msg, new SendCallback() {
//表示成功返回结果
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
//表示发送消息失败
public void onException(Throwable t) {
System.out.println(t);
}
});
//单向消息
Message msg3 = new Message("topic2",("单向消息:hello rocketmq "+i).getBytes("UTF-8"));
producer.sendoneway(msg);
}
//添加一个休眠操作,确保异步消息返回后能够输出
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
}
8.延时消息
立刻发送, 只是 告诉MQ ,消息隐藏一段时间再暴露
应用场景
下订单时 网mq 发一个取消订单消息 (订单号 30分钟演示)
30分钟后,消费者能看到这个消息,开始处理取消订单(如果没付费)
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.31.80:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
Message msg = new Message("topic3",("非延时消息:hello rocketmq "+i).getBytes("UTF-8"));
// 30秒后再发送,而是先发送,但是通知mq , 30s 才对外暴露数据
//设置当前消息的延时效果(比如订单,下订单后,20分钟后,决定这个订单是否删除,)
msg.setDelayTimeLevel(3);
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
}
producer.shutdown();
}
9.Tag
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.31.80:9876");
producer.start();
//创建消息的时候除了制定topic,还可以指定tag
Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));
SendResult send = producer.send(msg);
System.out.println(send);
producer.shutdown();
}
消费者
*代表任意tag
sql//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");
//为消息添加属性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");
消费者
//使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
注意:SQL过滤需要依赖服务器的功能支持,在broker配置文件中添加对应的功能项,并开启对应功能
enablePropertyFilter=true
启动服务器
10 顺序消息sh mqbroker -n localhost:9876 -c ../conf/broker.conf
默认情况下,MQ 开启了多个队列, 同时发送多个消息的的话,发送给那个队列是不确定的,同时消息的消费者读取消息,每读取一个消息开启一个线程,也不能保证消息的顺序性,
想要保证消息的有序性,需要指定消息的队列,同时 消息的消费者应该一个队列开启一个线程进行接收而不是一个消息一个线程)
发送者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.31.80:9876");
producer.start();
//创建要执行的业务队列
List orderList = new ArrayList();
Order order11 = new Order();
order11.setId("a");
order11.setMsg("主单-1");
orderList.add(order11);
Order order12 = new Order();
order12.setId("a");
order12.setMsg("子单-2");
orderList.add(order12);
Order order13 = new Order();
order13.setId("a");
order13.setMsg("支付-3");
orderList.add(order13);
Order order14 = new Order();
order14.setId("a");
order14.setMsg("推送-4");
orderList.add(order14);
Order order21 = new Order();
order21.setId("b");
order21.setMsg("主单-1");
orderList.add(order21);
Order order22 = new Order();
order22.setId("b");
order22.setMsg("子单-2");
orderList.add(order22);
Order order31 = new Order();
order31.setId("c");
order31.setMsg("主单-1");
orderList.add(order31);
Order order32 = new Order();
order32.setId("c");
order32.setMsg("子单-2");
orderList.add(order32);
Order order33 = new Order();
order33.setId("c");
order33.setMsg("支付-3");
orderList.add(order33);
//设置消息进入到指定的消息队列中
for(final Order order : orderList){
Message msg = new Message("orderTopic",order.toString().getBytes());
//发送时要指定对应的消息队列选择器
SendResult result = producer.send(msg, new MessageQueueSelector() {
//设置当前消息发送时使用哪一个消息队列
public MessageQueue select(List list, Message message, Object o) {
System.out.println(list.size());// 数量只能通过修改 mq 的配置 改变(阿里开发团队认为,这个是敏感资源需要服务器管理员控制,而不是编码人员控制)
//根据发送的信息不同,选择不同的消息队列
//根据id来选择一个消息队列的对象,并返回->id得到int值
int mqIndex = order.getId().hashCode() % list.size();
return list.get(mqIndex);
}
}, null);
System.out.println(result);
}
producer.shutdown();
}
接受者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.31.80:9876");
consumer.subscribe("orderTopic","*");
//使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列
consumer.registerMessageListener(new MessageListenerOrderly() {
//使用MessageListenerOrderly接口后,对消息队列的处理由一个消息队列多个线程服务,转化为一个消息队列一个线程服务
public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) {
for(MessageExt msg : list){
System.out.println(Thread.currentThread().getName()+" 消息:"+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("接收消息服务已开启运行");
}



