栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Java-主流框架—(14)Spring-ElasticSearch

Java-主流框架—(14)Spring-ElasticSearch

1.RocketMQ简介 1.1MQ简介

MQ(Message Queue)消息队列,是一种用来保存消息数据的队列

队列:数据结构的一种,特征为 “先进先出”

 1.2何为消息

服务器间的业务请求

原始架构: 服务器中的 A 功能需要调用 B 、 C 模块才能完成 微服务架构: 服务器 A 向服务器 B 发送要执行的操作(视为消息) 服务器 A 向 服务器 C 发送 要执行的 操作 (视为消息 )  1.3MQ作用

1.应用解耦(异步消息发送)

MQ基本工作模式

2.快速应用变更维护(异步消息发送)

3.流量削锋(异步消息发送)

 1.4MQ优缺点分析

优点(作用)

应用 解耦 快速应用变更 维护 流量削 锋

缺点

系统 可用性 降低 系统 复杂度 提高 异步消息机制 消息 顺序性 消息丢失 消息 一致性 消息 重复 使用

1.5MQ产品介绍

ActiveMQ:java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高

RabbitMQ :erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,

RocketMQ :java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强

kafka :scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多

RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术metaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的双十一,RocketMQ流转的消息量达到了万亿级,峰值TPS达到5600万)

解决所有缺点

 2.环境搭建 2.1基础概念

 2.2安装

下载:https://www.apache.org/

 

 

 3.消息发送

3.1消息发送与接收开发流程 1. 谁 来发? 2. 发给 谁? 3. 怎么 发 ? 4. 发什么? 5. 发的结果是什么? 6. 打扫 战场 3.2单生产者单消费者消息发送 

导入RocketMQ客户端坐标


    org.apache.rocketmq
    rocketmq-client
    4.5.2

 生产者

//1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("192.168.175.130:9876");
//3.1启动发送的服务
producer.start();
//4.创建要发送的消息对象,指定topic,指定内容body
Message msg = new Message("topic1","hello rocketmq".getBytes("UTF-8"));
//3.2发送消息
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
//5.关闭连接
producer.shutdown();

注意:关闭服务器防火墙

systemctl stop firewalld.service

 消费者

//1.创建一个接收消息的对象Comsumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.设定接受的命名服务器地址
        consumer.setNamesrvAddr("192.168.175.130:9876");
        //3.设定接收消息对应的topic
        consumer.subscribe("topic1","*");
        //3.开启监听 用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //遍历消息
                for (MessageExt messageExt : list) {
                    System.out.println("收到消息:"+messageExt);
                    System.out.println("消息:"+new String(messageExt.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //4.启动接收消息的服务
        consumer.start();
        System.out.println("接受消息服务已开启运行");

 3.3单生产者多消费者消息发送

消费者

//1.创建一个接收消息的对象Comsumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.设定接受的命名服务器地址
        consumer.setNamesrvAddr("192.168.175.130:9876");
        //3.设定接收消息对应的topic
        consumer.subscribe("topic1","*");

        //设置当前消费者的消费模式(默认模式 负载均衡)
//        consumer.setMessageModel(MessageModel.CLUSTERING);
        //设置当前消费者的消费模式为广播模式(所有客户端接收到的消息都是一样的)
        consumer.setMessageModel(MessageModel.BROADCASTING);

        //3.开启监听 用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //遍历消息
                for (MessageExt messageExt : list) {
                    System.out.println("收到消息:"+messageExt);
                    System.out.println("消息:"+new String(messageExt.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //4.启动接收消息的服务
        consumer.start();
        System.out.println("接受消息服务已开启运行");

 均衡负载

 广播模式

 3.4多生产者多消费者消息发送

多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费

 3.5消息类别 3.5.1同步消息

特征:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)

public class Producer {
    public static void main(String[] args) throws Exception {



        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        producer.start();
        for (int i=1;i<=5;i++) {
            同步消息发送
            Message msg = new Message("topic2",("同步消息:hello rocketmq"+i).getBytes("UTF-8"));
            SendResult send = producer.send(msg);
            System.out.println("返回结果"+send);
        }
        //添加一个休眠操作,确保异步消息返回后能够输出
        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
    }
}
 3.5.2异步消息

特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息

(回调处理结果必须在生产者进程结束前执行,否则回调无法正确执行)

public class Producer {
    public static void main(String[] args) throws Exception {



        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        producer.start();
        for (int i=1;i<=5;i++) {
            //异步消息发送
            Message msg = new Message("topic2",("异步消息:hello rocketmq"+i).getBytes("UTF-8"));
            producer.send(msg,new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.println(throwable);
                }
            });
        }
        //添加一个休眠操作,确保异步消息返回后能够输出
        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
    }
}
3.5.3单项消息

特征:不需要有回执的消息,例如日志类消息 

public class Producer {
    public static void main(String[] args) throws Exception {



        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        producer.start();
        for (int i=1;i<=5;i++) {
            //单向消息发送
            Message msg = new Message("topic2",("单项消息:hello rocketmq"+i).getBytes("UTF-8"));
            producer.sendoneway(msg);
        }
        //添加一个休眠操作,确保异步消息返回后能够输出
        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
    }
}
3.5.4延时消息

消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用

public class Producer {
    public static void main(String[] args) throws Exception {



        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        producer.start();
        for (int i=1;i<=5;i++) {
            //同步消息发送
            Message msg = new Message("topic2",("非延时消息:hello rocketmq"+i).getBytes("UTF-8"));
            //设置当前消息的延时效果
            msg.setDelayTimeLevel(3);

            SendResult send = producer.send(msg);
            System.out.println("返回结果"+send);
        }
        producer.shutdown();
    }
}

目前支持的消息时间

u 秒级: 1 , 5 , 10 , 30 u 分级: 1~10 , 20 , 30 u 时级: 1 , 2

                1s  5s  10s  30s  1m  2m  3m  4m  5m  6m  7m  8m  9m  10m  20m  30m  1h  2h

3.5.5批量消息
public class Producer {
    public static void main(String[] args) throws Exception {
        
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        producer.start();
        //创建一个集合保存多个消息
        List msgList = new ArrayList();
        Message msg1 = new Message("topic3",("批量消息:hello rocketmq111").getBytes("UTF-8"));
        Message msg2 = new Message("topic3",("批量消息:hello rocketmq222").getBytes("UTF-8"));
        Message msg3 = new Message("topic3",("批量消息:hello rocketmq333").getBytes("UTF-8"));
        msgList.add(msg1);
        msgList.add(msg2);
        msgList.add(msg3);

        //批量消息发送(每次发送的消息总量不得超过4M)
        //消息的总长度包含4个信息:topic,body,消息的属性,日志(20字节)
        SendResult send = producer.send(msgList);

        System.out.println("返回结果"+send);
        producer.shutdown();
    }
}
3.5.6消息过滤

分类过滤

生产者

Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));

消费者

//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");

消息过滤

生产者

//为消息添加属性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");

消费者

//使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));

注意:SQL过滤需要依赖服务器的功能支持,在broker配置文件中添加对应的功能项,并开启对应功能

enablePropertyFilter=true

启动服务器使启用对应配置文件

sh mqbroker -n localhost:9876 -c ../conf/broker.conf

 3.5.7错乱的消息顺序

 3.5.8顺序消息

发送消息

//设置消息进入到指定的消息队列中
for(final Order order : orderList){
    Message msg = new Message("orderTopic",order.toString().getBytes());
    //发送时要指定对应的消息队列选择器
   
    SendResult result = producer.send(msg, new MessageQueueSelector() {
        //设置当前消息发送时使用哪一个消息队列
      
          public MessageQueue select(List list, Message message, Object o) {
            //根据发送的信息不同,选择不同的消息队列
     
                   //根据id来选择一个消息队列的对象,并返回->id得到int
           
             int mqIndex = order.getId().hashCode() % list.size();
           return list.get(mqIndex);
        }
    }, null);
    System.out.println(result);
}

接收消息

//使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列
consumer.registerMessageListener(new MessageListenerOrderly() {
    //使用MessageListenerOrderly接口后,对消息队列的处理由一个消息队列多个线程服务

        / /转化为一个消息队列一个线程服务
   
    public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) {
        for(MessageExt msg : list){
            System.out.println("消息:"+new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

 3.5.9事务消息

 事务消息状态

提交状态:允许进入队列,此消息与非事务消息无区别

回滚状态:不允许进入队列,此消息等同于未发送过

中间状态:完成了half消息的发送,未对MQ进行二次状态确认

注意:事务消息仅与生产者有关,与消费者无关

public static void main1(String[] args) throws Exception {
        //事务消息使用的生产者TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        //添加本地事务的监听
        producer.setTransactionListener(new TransactionListener() {
            //正常的事务过程
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
            //事务补偿过程
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                return null;
            }
        });
        producer.start();

        Message msg = new Message("topic7",("事务消息:hello rocketmq").getBytes("UTF-8"));
        SendResult send = producer.sendMessageInTransaction(msg,null);
        System.out.println("返回结果"+send);
        producer.shutdown();
    }
public static void main2(String[] args) throws Exception {
        //事务消息使用的生产者TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        //添加本地事务的监听
        producer.setTransactionListener(new TransactionListener() {
            //正常的事务过程
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            //事务补偿过程
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                return null;
            }
        });
        producer.start();

        Message msg = new Message("topic8",("事务消息:hello rocketmq").getBytes("UTF-8"));
        SendResult send = producer.sendMessageInTransaction(msg,null);
        System.out.println("返回结果"+send);
        producer.shutdown();
    }
public static void main(String[] args) throws Exception {
        //事务消息使用的生产者TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        //添加本地事务的监听
        producer.setTransactionListener(new TransactionListener() {
            //正常的事务过程
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                return LocalTransactionState.UNKNOW;
            }
            //事务补偿过程
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("事务补偿过程执行");
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();

        Message msg = new Message("topic9",("事务消息:hello rocketmq").getBytes("UTF-8"));
        SendResult send = producer.sendMessageInTransaction(msg,null);
        System.out.println("返回结果"+send);
        //实物补偿过程必须保障服务器在运行过程中,否则将无法进行正常的事务补偿
//        producer.shutdown();
    }
4.集群搭建 4.1RocketMQ集群分类

 4.2RocketMQ集群特征

 4.3RocketMQ集群工作流程

步骤1:NameServer启动,开启监听,等待broker、producer与consumer连接

步骤2:broker启动,根据配置信息,连接所有的NameServer,并保持长连接

步骤2补充:如果broker中有现存数据, NameServer将保存topic与broker关系

步骤3:producer发信息,连接某个NameServer,并建立长连接

步骤4:producer发消息

u 步骤 4.1 若果 topic 存在,由 NameServer 直接分配 u 步骤 4.2 如果 topic 不存在,由 NameServer 创建 topic 与 broker 关系,并分配

步骤5:producer在broker的topic选择一个消息队列(从列表中选择)

步骤6:producer与broker建立长连接,用于发送消息

步骤7:producer发送消息

comsumer工作流程同producer

 4.4双主双从集群搭建

5.高级特性 5.1消息的存储 ① 消息 生成者发送 消息到 MQ ② MQ 返回 ACK给 生产者 ③ MQ push 消息给对应的 消费者 ④ 消息消费者返回 ACK 给 MQ

说明:ACK(Acknowledge character)

 

① 消息 生成者发送消息到 MQ ② MQ 收到消息,将消息进行持久化 ,存储该消息 ③ MQ 返回ACK给 生产者 ④ MQ push 消息给对应的消费者 ⑤ 消息消费者返回 ACK 给 MQ ⑥ MQ 删除消息

注意:

① 第⑤步 MQ 在指定 时间 内接到消息消费者返回 ACK ,MQ认定消息 消费成功 ,执行⑥ ② 第⑤步 MQ 在指定时间 内未接到 消息消费者返回 ACK ,MQ认定消息 消费失败,重新执行④⑤⑥

 

数据库

ActiveMQ 缺点:数据库瓶颈将成为 MQ 瓶颈

文件系统

RocketMQ /Kafka/RabbitMQ 解决方案:采用消息 刷 盘机制进行数据存储 缺点:硬盘损坏的问题无法避免

 

 5.2高效的消息存储妤读写方式

 

 5.3消息存储结构

MQ数据存储区域包含如下内容

消息数据存储区域 topic queueId message 消费逻辑队列 minOffset maxOffset consumerOffset 索引 key 索引 创建时间索引 ……

 

 5.4刷盘机制

 

同步刷盘:安全性高,效率低,速度慢(适用于对数据安全要求较高的业务)

异步刷盘:安全性低,效率高,速度快(适用于对数据处理速度要求较高的业务)

 配置方式

#刷盘方式

#- ASYNC_FLUSH 异步刷盘

#- SYNC_FLUSH 同步刷盘

flushDiskType=SYNC_FLUSH

 5.5高可用性

nameserver

无状态 + 全服务器注册

消息服务器

主从架构( 2M-2S )

消息生产

生产者将相同的 topic 绑定到多个 group 组,保障 master 挂掉后,其他 master 仍可正常进行消息接收

消息消费

RocketMQ 自身会根据 master 的压力确认是否由 master 承担消息读取的功能,当 master 繁忙时候,自动切换由 slave 承担数据读取的工作 5.6主从数据复制

同步复制

master 接到消息后,先复制到 slave ,然后反馈给生产者写操作成功 优点:数据安全,不丢数据,出现故障容易恢复 缺点:影响数据吞吐量,整体性能低

异步复制

master 接到消息后,立即返回给 生产者写操作 成功,当消息达到一定 量 后再异步复制到 slave 优点 :数据吞吐量大,操作延迟低,性能高 缺点:数据不安全,会出现数据丢失的现象,一旦 master 出现故障,从上次数据同步到故障时间的数据将丢失

配置方式

#Broker 的角色

#- ASYNC_MASTER 异步复制Master

#- SYNC_MASTER 同步双写Master

#- SLAVE

brokerRole=SYNC_MASTER

 5.7负载均衡

Producer负载均衡

内部实现了不同 broker 集群中对同一 topic 对应消息队列的负载均衡

 

 

Consumer负载均衡

平均分配 循环平均分配  5.8消息重试

当消息消费后未正常返回消费成功的信息将启动消息重试机制

消息重试机制

顺序消息         当 消费者消费消息失败后 , RocketMQ 会自动进行 消息重试(每次间隔时间为 1 秒 )         注意:应用 会出现消息消费被阻塞的 情况,因此,要对顺序消息的消费情况进行监控,避免阻塞 现象的 发生 无序消息

 死信队列

       当消息消费重试到达了指定次数(默认16次)后,MQ将无法被正常消费的消息称为死信消息(Dead-Letter Message)

        死信消息不会被直接抛弃,而是保存到了一个全新的队列中,该队列称为死信队列(Dead-Letter Queue)

死信队列特征

归属某一个组( Gourp Id ),而不归属 Topic ,也不归属消费者 一个死信队列中可以包含同一个组下的多个 Topic 中的死信消息 死信队列不会进行默认初始化,当第一个死信出现后,此队列首次初始化

死信队列中消息特征

不会被再次重复消费 死信队列中的消息有效期为 3 天,达到时限后将被 清除

死信处理:在监控平台中,通过查找死信,获取死信的messageId,然后通过id对死信进行精准消费

5.9消息重复消费

 

---------------------------------------------------------------------------------------------------------------------------------

内容有部分存在书籍、课堂、网络记录,如有雷同纯属巧合

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746455.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号