栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RocketMQ

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RocketMQ

1、基础准备

 下载maven :wget http://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz

 编译: mvn clean package -Dmaven.test.skip=true

 启动NameServer
   nohup sh bin/mqnamesrv -n "公网IP:9876" &   #后台运行 nameserver
   tail -f ~/logs/rocketmqlogs/namesrv.log #监听nameserver日志文件

 启动broker
   nohup sh bin/mqbroker -n 公网IP:9876 -c conf/broker.conf autoCreateTopicEnable=true &
   tail -f ~/logs/rocketmqlogs/broker.log

 发送消息
   export NAMESRV_ADDR=localhost:9876 #设置环境变量
   sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

 接收消息
   sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

 关闭Server
   sh bin/mqshutdown broker
   sh bin/mqshutdown namesrv

在阿里云服务器上部署时遇到的坑,修改conf/broker.conf文件

  namesrvAddr = 自己云服务器的公网IP:9876

  brokerIP1 = 自己云服务器的公网IP

2、Rocket角色

  broker

  • Brocker面向producer和consumer接收和发送消息
  • 向nameserver提交自己的信息
  • 是消息中间件的消息存储、收发服务器
  • 每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报

 broker集群

  • Broker高可用,可以配置Master / Slave结构,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave
    • 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master
    • Master与Slave的对应关系通过制定相同的BrokerName,不同的BrokerId来定义为0表示Master,非0表示Slave
  • Master多机负载,可以部署多个broker
    • 每个Broker与nameserver集群中的所有节点建立长链接,定时注册Topic信息到nameserver

 producer

  • 消息的生产者
  • 通过nameserver集群中的其中一个节点(随机选择)建立长连接,获取Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等
  • 接下来向提供Topic服务的Master建立长连接,且定时向Master发送心跳

 consumer

  • 消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接

 nameserver

  • 底层有netty实现,提供了路由管理、服务注册、服务发现,是一个无状态节点
  • nameserver是服务发现者,集群中各个角色(producer、broker、consumer等)都需要定时向nameserver上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把他从列表中剔除
  • nameserver可以部署多个,当多个nameserver存在的时候,其他角色同时向他们上报信息,以保证高可用
  • nameserver集群键互不通信,没有主备的概念
  • nameserver内存式存储,nameserver中的broker、topic等信息默认不会持久化
  • 为什么不用zookeeper? rockermq希望为了提高性能,CAP定理,客户端负载均衡

  对比JSM中的Topic和Queue

   Topic是一个逻辑上的概念,实际上Message是在每个Broker上以Queue的形式记录。

      

3、发送方式  同步发送

  消息发送中进入同步等待状态,可以保证消息投递一定到达

 异步消息

  想要快速发送消息,又不想丢失的时候可以使用异步消息

producer.send(message, new SendCallback() {
   public void onSuccess(SendResult sendResult) {
      System.out.println("消息发送成功");
      System.out.println("sendResult" + sendResult);
   }

   public void onException(Throwable throwable) {
      //如果发生异常 case 异常,尝试重投  或者调整业务逻辑
      throwable.printStackTrace();
      System.out.println("发送异常");
    }
});
单向消息

  只发送消息,不等待服务器响应,只发送请求不等待应答,此方式发送消息的过程耗时非常短,一般在微秒级别   peoducer.sendoneway(message);

延迟消息

4、消息  消息消费模式

  消息消费模式由消费者决定,可以由消费者设置MessageModel来决定消息模式

 消息模式默认为集群消费模式

  consumer.setMessageModel(MessageModel.BROADCASTING);

  consumer.setMessageModel(MessageModel.CLUSTERING);

 集群消息

    

  集群消息是指集群化部署消费者

    当使用集群消费模式时,MQ认为任意一条消息只需要被集群内的任意一个消费者处理即可

  特点

  •   每条消息只需要被处理一次,broker只会把消息发送给消费集群中的一个消费者
  • 在消息重投时不能保证路由到同一台机器上
  • 消费状态由broker维护
 广播消息

  

    当使用广播消费模式时,MQ会将每条消息推送给集群内所有注册过的客户端,保证至少被每台机器消费一次

  特点

  • 消费进度由consumer维护
  • 保证每个消费者消费一次消息
  • 消费失败的消息不会重投
4、过滤

 一个group中消费者的tag selector 都不能随便变,要保持统一

 TAG

   在Producer中使用Tag: Message msg = new Message("Topic001","TAG-A","KEY-A","tag".getBytes());

   在Consumer中订阅Tag: consumer.subscribe("Topic001", "TAG-A");

 SQL表达式过滤

 实例:MessageSelector selector = MessageSelector.bySql("order > 5");

            consumer.subscribe("xxxxxx",selector);

 语法:

  RocketMQ只定义了一些基本的语法来支持这个功能,也可以扩展

  1. 数字比较  >  ,  >= , < , <= , between , =

  2.字符比较  = ,<> ,  IN

  3. is null 或者 is not null

  4.逻辑运算  and  , or , not

 常量类型是:数字,字符串('abc',必须使用单引号),  NULL , TRUE , FALSE

5、消息事务

   

  事务执行流程  

 RocketMQ实现方式

  Half Message:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中

  检查事务状态:Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调

  超时:如果超过回查次数,默认回滚消息

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/667425.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号