栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

65.RocketMQ

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

65.RocketMQ

RocketMQ 1.简介

首先要先了解一下为什么要引入RocketMQ消息队列是什么?

​ 比如公司本身的业务体量很小,所以直接单机一把梭哈都能搞定了,但是后面业务体量不断扩大,采用微服务的设计思想,分布式的部署方式,所以拆分了很多的服务,随着体量的增加以及业务场景越来越复杂了,很多场景单机的技术栈和中间件以及不够用了,而且对系统的友好性也下降了,最后做了很多技术选型的工作,我们决定引入消息队列中间件。

主要功能:异步、削峰、解耦

RocketMQ:

它是一款分布式、队列模型(queue)的消息中间件,是Alibaba自主研发的专业消息中间件,实现了业务消峰、分布式事务的优秀框架。

2.安装

工欲善其事必先利其器

  1. 官网:https://rocketmq.apache.org/




image-20211207192901447.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WSxb0pD4-1638880091963)(C:Users王元元AppDataRoamingTyporatypora-user-imagesimage-20211207193256601.png)]

简单来讲,binary是编译好的可以直接使用,source是还没编译过的源代码,需要自行编译。

4.解压

5.配置环境变量

6

ROCKETMQ_HOME="D:rocketmq"
NAMESRV_ADDR="localhost:9876"
# 启动 nameserver
.binmqnamesrv.cmd
# 启动broker
.binmqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
3.使用 3.1案例一
  1. 创建maven quickstart项目

  2. 添加依赖

    
        org.apache.rocketmq
        rocketmq-client
        4.9.1
    
    
  3. 发送

    package com.woniuxy.cloud.simple;
    
    import com.woniuxy.cloud.AppConstants;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.UnsupportedEncodingException;
    import java.util.Scanner;
    
    public class Sender {
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
    
            //(1)创建生产者
            
            DefaultMQProducer producer = new DefaultMQProducer("TestSender");
            producer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);
            //(2)启动producer
            producer.start();
    
            //(3)构建消息并发送
            Scanner scanner = new Scanner(System.in);
            while (true) {
                System.out.println("请输入要发送的消息");
                String smsContent = scanner.next();
                if (smsContent.equals("exit")) {
                    //(4)关闭producer
                    producer.shutdown();
                }
                Message msg = new Message(AppConstants.SMS_TOPIC, "user_register", smsContent.getBytes("UTF-8"));
    
                //同步发送到RocketMQ
                SendResult sendResult = producer.send(msg);
                System.out.println("sendResult:" + sendResult);
            }
        }
    }
    
    
  4. 接收

package com.woniuxy.cloud.simple;

import com.woniuxy.cloud.AppConstants;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class Receiver {

    public static void main(String[] args) throws MQClientException {
        //(1)创建消费者实例
        //消费者分组,同一个名字的消费者组成一个集群
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestReceive");

        consumer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);


        //(2)订阅某个主题,收到特定的消息
        consumer.subscribe(AppConstants.SMS_TOPIC,"*");

        //(3)向MQ注册一个监听器
        
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msgExt:msgs){

                try {
                    System.out.println("消息内容:"+new String(msgExt.getBody(),"utf-8"));

                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }

            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        // (4)启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");

    }
}
3.2发送的三种模式 1.发送-同步确认发送结果

同步发送是指消息发送方发出一条消息后,会在收到服务端响应后才发吓一条的通讯方式。

  • 应用场景:此场景应用非常广泛,ex:重要的通知邮件、报名短信通知、营销短信系统等。
2.发送-异步确认发送结果

3.发送-结束 oneway

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

  • 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

对比

他的优缺点是啥 RocketMQ优点:

单机吞吐量:十万级

可用性:非常高,分布式架构

消息可靠性:经过参数优化配置,消息可以做到0丢失

功能支持:MQ功能较为完善,还是分布式的,扩展性好

支持10亿级别的消息堆积,不会因为堆积导致性能下降

源码是java,我们可以自己阅读源码,定制自己公司的MQ,可以掌控

天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况

RoketMQ在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ

RocketMQ缺点:

支持的客户端语言不多,目前是java及c++,其中c++不成熟

社区活跃度不是特别活跃那种

没有在 mq 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码

消息类型

分类:普通消息、顺序消息、延时消息、事务消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

延时消息

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java

付款就取消订单释放库存。

private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/642617.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号