栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【RocketMQ】

【RocketMQ】

该博文里总结的内容源自于 https://www.bilibili.com/video/BV1uM4y1g7wN

什么是MQ?

MessageQueue,消息队列。是一种FIFO(先进先出)的数据结构。消息由生产者发送到MQ进行排队,然后按照原来的顺序交由信息的消费者进行处理。

MQ的作用

优点:

  • 异步:提高系统的响应速度、吞吐量
  • 解耦:
    1)减少服务之间的影响。提高系统整体的稳定性、扩展性。
    2)实现数据分发(一个生产者,多个消费者)
  • 削峰:以稳定的系统资源应对突发的流量冲击

缺点:

  • 系统可用性降低:系统引入外部依赖增多,系统的稳定性就会变差。如果MQ挂了会影响系统。
  • 系统复杂度提高:带来一些问题,例如保证消息不会丢失、不会被重复消费、消息的顺序性等。
  • 消息一致性问题:分布式事务问题

RocketMQ结构


RocketMQ安装

虚拟机:centos 7
jdk:1.8
RocketMQ:4.7.1
下载地址:https://rocketmq.apache.org/release_notes/release-notes-4.7.1/

1、将下好的压缩包上传到服务器里,并放到 /app/rocketmq/ 里

2、解压 zip 压缩包

unzip rocketmq-all-4.7.1-bin-release.zip

3、修改配置文件
1)修改 /app/rocketmq/rocketmq-all-4.7.1-bin-release/bin/runserver.sh 配置文件,根据服务器的配置修改,我这里把内存修改为512m
找到其中的

JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:metaspaceSize=128m -XX:MaxmetaspaceSize=320m"

修改成

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:metaspaceSize=128m -XX:MaxmetaspaceSize=320m"

2)修改 /app/rocketmq/rocketmq-all-4.7.1-bin-release/bin/runbroker.sh 配置文件
找到其中的

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

修改成

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"

4、添加环境变量

cd ~
vim .bash_profile
source .bash_profile
# .bash_profile

# Get the aliases and functions
if [ -f ~/.bashrc ]; then
        . ~/.bashrc
fi

# User specific environment and startup programs

export ROCKETMQ_HOME=/app/rocketmq/rocketmq-all-4.7.1-bin-release
export NAMESRV_ADDR=localhost:9876

PATH=$PATH:$HOME/bin:$ROCKETMQ_HOME/bin

export PATH


5、启动 mqnamesrv(以下设计的命令基本都是在目录/app/rocketma/rocketmq-all-4.7.1-bin-release下使用)

nohup bin/mqnamesrv &

查看 nohup.out

tail nohup.out
OpenJDK 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

查看java进程

ps -ef | grep java
root       2034   2031  1 16:35 pts/0    00:00:02 /bin/java -server -Xms512m -Xmx512m -Xmn256m -XX:metaspaceSize=128m -XX:MaxmetaspaceSize=320m -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC -verbose:gc -Xloggc:/dev/shm/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m -XX:-OmitStackTraceInFastThrow -XX:-UseLargePages -Djava.ext.dirs=/jre/lib/ext:/app/rocketmq/rocketmq-all-4.7.1-bin-release/bin/../lib:/lib/ext -cp .:/app/rocketmq/rocketmq-all-4.7.1-bin-release/bin/../conf: org.apache.rocketmq.namesrv.NamesrvStartup
root       2059   1849  0 16:39 pts/0    00:00:00 grep --color=auto java

nameServe 启动成功!

关闭mqnamesrv
sh bin/mqshutdown namesrv

6、启动 mqbroker
1)修改 /app/rocketmq/rocketmq-all-4.7.1-bin-release/conf/broker.conf,在末尾添加

autoCreateTopicEnable=true

2)启动 mqbroker

nohup bin/mqbroker -c conf/broker.conf &

同样的,可以查看 nohup.out 和 java进程确认是否成功启动,这里就不演示了

关闭 mqbroker
sh bin/mqshutdown broker

7、测试接口
调用生产者接口

bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

调用消费者接口

bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

RocketMQ样例代码

到RocketMQ官网去下载样例代码

用IDEA单独打开样例代码中的example项目

如果想要其他客户端连接到服务器的MQ,则需要配置 broker.conf ,添加服务器本机的IP地址。配置好后要重启 broker

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable=true
# 添加服务器本机IP
brokerIP1 = 192.168.188.130

在样例代码中,无论是生产者还是消费者,请求MQ时,都要配置MQ服务的地址,样例代码用的是配置环境变量的方式取IP地址的,这里我们需要手动添加上 setNamesrvAddr 的代码,下面贴部分代码

 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("192.168.188.130:9876");
        producer.start();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.188.130:9876");

修改好后,可以按序运行项目中的Consumer和Producer进行验证

运行结果(部分)

SendResult [sendStatus=SEND_OK, msgId=0A84682A202418B4AAC2078BB109007E, offsetMsgId=C0A8BC8200002A9F000000000007DA25, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=632]
SendResult [sendStatus=SEND_OK, msgId=0A84682A202418B4AAC2078BB114007F, offsetMsgId=C0A8BC8200002A9F000000000007DAF9, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=632]
ConsumeMessageThread_7 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=1, storeSize=212, queueOffset=632, sysFlag=0, bornTimestamp=1635822595337, bornHost=/192.168.188.1:52495, storeTimestamp=1635822595958, storeHost=/192.168.188.130:10911, msgId=C0A8BC8200002A9F000000000007DA25, commitLogOffset=514597, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=633, KEYS=OrderID188, CONSUME_START_TIME=1635822595353, UNIQ_KEY=0A84682A202418B4AAC2078BB109007E, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]] 
ConsumeMessageThread_8 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=212, queueOffset=632, sysFlag=0, bornTimestamp=1635822595348, bornHost=/192.168.188.1:52495, storeTimestamp=1635822595963, storeHost=/192.168.188.130:10911, msgId=C0A8BC8200002A9F000000000007DAF9, commitLogOffset=514809, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=633, KEYS=OrderID188, CONSUME_START_TIME=1635822595364, UNIQ_KEY=0A84682A202418B4AAC2078BB114007F, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]] 


RocketMQ消息样例 1、基本样例

生产者有三种发送消息的方式:

  • 同步发送:发送消息之后,等返回响应,可检查broker是否收到消息
  • 异步发送:发送消息之后,不关注响应,可检查broker是否收到消息
  • 单向发送:发送消息之后,不关注响应,不检查broker是否收到消息

消费者有两种消费方式:

  • 消息推送:等待Broker把消息推送过来
  • 主动拉取:主动去Broker上拉消息

每个生产者/消费者在请求Broker时都会声明自己属于哪个生产者组/消费者组

DefaultMQProducer producer = new DefaultMQProducer("ProduceGroup");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
1)消息推送

生产者发送的消息体中,由3部分组成

  • Topic
  • Tag
  • Message body
Message msg = new Message("TopicTest" ,
                    "TagA" ,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) 
                );

消费者订阅消息的时候,会声明接收消息的类型:

  • Topic
  • Tag
/,
				"TagA" ,
				("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) 
			  );
			  
//在 apache rocketmq 开源版中,提供了18个级别的延迟等级(阿里的rocketmq商业版中能比较自由地设置延迟时间)
//messageDelayLevel= 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3);  //10s之后再消费

5、批量消息

批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网联IO,提升吞吐量。

DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.start();

//If you just send messages of no more than 1MiB at a time, it is easy to use batch(注意:批量的消息不要大于1m,最新版本rocketmq放宽到4m)
//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support(注意:批量的消息的topic要一致)
String topic = "BatchTest";
List messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));

producer.send(messages);

如果批量的消息实在是过大了,那么可以将批量的消息拆分,再逐个发送

DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.start();

//large batch
String topic = "BatchTest";
List messages = new ArrayList<>(100 * 1000);
for (int i = 0; i < 100 * 1000; i++) {
	messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}

//split the large batch into small ones:
//1、拆分
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
	List listItem = splitter.next();
	//2、发送
	producer.send(listItem);
}
class ListSplitter implements Iterator> {
    private int sizeLimit = 10 * 1000;
    private final List messages;
    private int currIndex;

    public ListSplitter(List messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map properties = message.getProperties();
            for (Map.Entry entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            //计算出总的大小
            tmpSize = tmpSize + 20; //for log overhead
            //进行拆分
            if (tmpSize > sizeLimit) {
                //it is unexpected that single message exceeds the sizeLimit
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                    //if the next sublist has no element, add this one and then break, otherwise just break
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > sizeLimit) {
                break;
            } else {
                totalSize += tmpSize;
            }

        }
        List subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Not allowed to remove");
    }
}

6、过滤消息

消费者接收消息时,可以过滤出自己需要的消息,这里利用broker来对消息进行过滤,然后再发送给消费者。
过滤的方式有两种:
1)Tag 过滤
2)Sql 过滤

案例代码: 1)Tag 过滤

生产者:

        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.start();

		//发送
        String[] tags = new String[] {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 60; i++) {
            Message msg = new Message("TagFilterTest",
                tags[i % tags.length],
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();

消费者:

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

		//过滤出需要的 TagA、TagC
        consumer.subscribe("TagFilterTest", "TagA || TagC");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
2)Sql 过滤

生产者:

        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 10; i++) {
            Message msg = new Message("SqlFilterTest",
                tags[i % tags.length],
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            msg.putUserProperty("a", String.valueOf(i));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();

消费者:

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // Don't forget to set enablePropertyFilter=true in broker
        consumer.subscribe("SqlFilterTest",
            MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                "and (a is not null and a between 0 and 3)"));

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");



7、事务消息
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/699757.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号