什么是MQ?该博文里总结的内容源自于 https://www.bilibili.com/video/BV1uM4y1g7wN
MessageQueue,消息队列。是一种FIFO(先进先出)的数据结构。消息由生产者发送到MQ进行排队,然后按照原来的顺序交由信息的消费者进行处理。
优点:
- 异步:提高系统的响应速度、吞吐量
- 解耦:
1)减少服务之间的影响。提高系统整体的稳定性、扩展性。
2)实现数据分发(一个生产者,多个消费者) - 削峰:以稳定的系统资源应对突发的流量冲击
缺点:
- 系统可用性降低:系统引入外部依赖增多,系统的稳定性就会变差。如果MQ挂了会影响系统。
- 系统复杂度提高:带来一些问题,例如保证消息不会丢失、不会被重复消费、消息的顺序性等。
- 消息一致性问题:分布式事务问题
RocketMQ安装
虚拟机:centos 7
jdk:1.8
RocketMQ:4.7.1
下载地址:https://rocketmq.apache.org/release_notes/release-notes-4.7.1/
1、将下好的压缩包上传到服务器里,并放到 /app/rocketmq/ 里
2、解压 zip 压缩包
unzip rocketmq-all-4.7.1-bin-release.zip
3、修改配置文件
1)修改 /app/rocketmq/rocketmq-all-4.7.1-bin-release/bin/runserver.sh 配置文件,根据服务器的配置修改,我这里把内存修改为512m
找到其中的
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:metaspaceSize=128m -XX:MaxmetaspaceSize=320m"
修改成
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:metaspaceSize=128m -XX:MaxmetaspaceSize=320m"
2)修改 /app/rocketmq/rocketmq-all-4.7.1-bin-release/bin/runbroker.sh 配置文件
找到其中的
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
修改成
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
4、添加环境变量
cd ~ vim .bash_profile source .bash_profile
# .bash_profile
# Get the aliases and functions
if [ -f ~/.bashrc ]; then
. ~/.bashrc
fi
# User specific environment and startup programs
export ROCKETMQ_HOME=/app/rocketmq/rocketmq-all-4.7.1-bin-release
export NAMESRV_ADDR=localhost:9876
PATH=$PATH:$HOME/bin:$ROCKETMQ_HOME/bin
export PATH
5、启动 mqnamesrv(以下设计的命令基本都是在目录/app/rocketma/rocketmq-all-4.7.1-bin-release下使用)
nohup bin/mqnamesrv &
查看 nohup.out
tail nohup.out
OpenJDK 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release. The Name Server boot success. serializeType=JSON
查看java进程
ps -ef | grep java
root 2034 2031 1 16:35 pts/0 00:00:02 /bin/java -server -Xms512m -Xmx512m -Xmn256m -XX:metaspaceSize=128m -XX:MaxmetaspaceSize=320m -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC -verbose:gc -Xloggc:/dev/shm/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m -XX:-OmitStackTraceInFastThrow -XX:-UseLargePages -Djava.ext.dirs=/jre/lib/ext:/app/rocketmq/rocketmq-all-4.7.1-bin-release/bin/../lib:/lib/ext -cp .:/app/rocketmq/rocketmq-all-4.7.1-bin-release/bin/../conf: org.apache.rocketmq.namesrv.NamesrvStartup root 2059 1849 0 16:39 pts/0 00:00:00 grep --color=auto java
nameServe 启动成功!
关闭mqnamesrv
sh bin/mqshutdown namesrv
6、启动 mqbroker
1)修改 /app/rocketmq/rocketmq-all-4.7.1-bin-release/conf/broker.conf,在末尾添加
autoCreateTopicEnable=true
2)启动 mqbroker
nohup bin/mqbroker -c conf/broker.conf &
同样的,可以查看 nohup.out 和 java进程确认是否成功启动,这里就不演示了
关闭 mqbroker
sh bin/mqshutdown broker
7、测试接口
调用生产者接口
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
调用消费者接口
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
RocketMQ样例代码
到RocketMQ官网去下载样例代码
用IDEA单独打开样例代码中的example项目
如果想要其他客户端连接到服务器的MQ,则需要配置 broker.conf ,添加服务器本机的IP地址。配置好后要重启 broker
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH autoCreateTopicEnable=true # 添加服务器本机IP brokerIP1 = 192.168.188.130
在样例代码中,无论是生产者还是消费者,请求MQ时,都要配置MQ服务的地址,样例代码用的是配置环境变量的方式取IP地址的,这里我们需要手动添加上 setNamesrvAddr 的代码,下面贴部分代码
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("192.168.188.130:9876");
producer.start();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.188.130:9876");
修改好后,可以按序运行项目中的Consumer和Producer进行验证
运行结果(部分)
SendResult [sendStatus=SEND_OK, msgId=0A84682A202418B4AAC2078BB109007E, offsetMsgId=C0A8BC8200002A9F000000000007DA25, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=632] SendResult [sendStatus=SEND_OK, msgId=0A84682A202418B4AAC2078BB114007F, offsetMsgId=C0A8BC8200002A9F000000000007DAF9, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=632]
ConsumeMessageThread_7 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=1, storeSize=212, queueOffset=632, sysFlag=0, bornTimestamp=1635822595337, bornHost=/192.168.188.1:52495, storeTimestamp=1635822595958, storeHost=/192.168.188.130:10911, msgId=C0A8BC8200002A9F000000000007DA25, commitLogOffset=514597, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=633, KEYS=OrderID188, CONSUME_START_TIME=1635822595353, UNIQ_KEY=0A84682A202418B4AAC2078BB109007E, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]]
ConsumeMessageThread_8 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=212, queueOffset=632, sysFlag=0, bornTimestamp=1635822595348, bornHost=/192.168.188.1:52495, storeTimestamp=1635822595963, storeHost=/192.168.188.130:10911, msgId=C0A8BC8200002A9F000000000007DAF9, commitLogOffset=514809, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=633, KEYS=OrderID188, CONSUME_START_TIME=1635822595364, UNIQ_KEY=0A84682A202418B4AAC2078BB114007F, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]]
RocketMQ消息样例 1、基本样例
生产者有三种发送消息的方式:
- 同步发送:发送消息之后,等返回响应,可检查broker是否收到消息
- 异步发送:发送消息之后,不关注响应,可检查broker是否收到消息
- 单向发送:发送消息之后,不关注响应,不检查broker是否收到消息
消费者有两种消费方式:
- 消息推送:等待Broker把消息推送过来
- 主动拉取:主动去Broker上拉消息
每个生产者/消费者在请求Broker时都会声明自己属于哪个生产者组/消费者组
DefaultMQProducer producer = new DefaultMQProducer("ProduceGroup");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
1)消息推送
生产者发送的消息体中,由3部分组成
- Topic
- Tag
- Message body
Message msg = new Message("TopicTest" ,
"TagA" ,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
消费者订阅消息的时候,会声明接收消息的类型:
- Topic
- Tag
/,
"TagA" ,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//在 apache rocketmq 开源版中,提供了18个级别的延迟等级(阿里的rocketmq商业版中能比较自由地设置延迟时间)
//messageDelayLevel= 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3); //10s之后再消费
5、批量消息
批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网联IO,提升吞吐量。
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.start();
//If you just send messages of no more than 1MiB at a time, it is easy to use batch(注意:批量的消息不要大于1m,最新版本rocketmq放宽到4m)
//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support(注意:批量的消息的topic要一致)
String topic = "BatchTest";
List messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
如果批量的消息实在是过大了,那么可以将批量的消息拆分,再逐个发送
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.start();
//large batch
String topic = "BatchTest";
List messages = new ArrayList<>(100 * 1000);
for (int i = 0; i < 100 * 1000; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}
//split the large batch into small ones:
//1、拆分
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List listItem = splitter.next();
//2、发送
producer.send(listItem);
}
class ListSplitter implements Iterator> { private int sizeLimit = 10 * 1000; private final List
messages; private int currIndex; public ListSplitter(List messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length; Map properties = message.getProperties(); for (Map.Entry entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } //计算出总的大小 tmpSize = tmpSize + 20; //for log overhead //进行拆分 if (tmpSize > sizeLimit) { //it is unexpected that single message exceeds the sizeLimit //here just let it go, otherwise it will block the splitting process if (nextIndex - currIndex == 0) { //if the next sublist has no element, add this one and then break, otherwise just break nextIndex++; } break; } if (tmpSize + totalSize > sizeLimit) { break; } else { totalSize += tmpSize; } } List subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } @Override public void remove() { throw new UnsupportedOperationException("Not allowed to remove"); } }
6、过滤消息
消费者接收消息时,可以过滤出自己需要的消息,这里利用broker来对消息进行过滤,然后再发送给消费者。
过滤的方式有两种:
1)Tag 过滤
2)Sql 过滤
生产者:
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
//发送
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 60; i++) {
Message msg = new Message("TagFilterTest",
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
消费者:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
//过滤出需要的 TagA、TagC
consumer.subscribe("TagFilterTest", "TagA || TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
2)Sql 过滤
生产者:
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
Message msg = new Message("SqlFilterTest",
tags[i % tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
消费者:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Don't forget to set enablePropertyFilter=true in broker
consumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");



