栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

03 事务消息

03 事务消息

  1. 当事务消息发送至broker的时候,如果没有提交,那么消息对于consumer来说是不可见的,mqserver接受到消息后,会通知producer,已经接受到消息.那么此时的producer会先执行一次本地事务,本地事务执行之后,才会向mqserver发送commit或者rollback;
    当mqserver接受到commit的时候,consumer才可以消费消息.如果是rollback的时候,那么mqserver会删除broker中的消息.
    如果本地消息执行失败或者其他原因没有向mqserver进行确认,那么mqserver会调用回调方法进行回查,回查之后,会调用producer的方法进行commit或者rollback.


2 事务消息的状态:
(1)commit: 提交事务,允许消费者消费此消息
(2)rollback: 回滚事务,表示该消息将被删除,不允许被消费
(3)unknown: 中间状态,表示需要检查消息队列来确定状态,此时不对consumer可用

  1. 事务消息的实现:

    Message msg = new Message(“TransactionTopic”, tags[i], (“Hello World” + i).getBytes());
    //5.发送消息
    SendResult result = producer.sendMessageInTransaction(msg, null);
    //发送状态
    SendStatus status = result.getSendStatus();
    System.out.println(“发送结果:” + result);
    //线程睡1秒
    TimeUnit.SECONDS.sleep(2);
    }
    //6.关闭生产者producer
    //producer.shutdown();
    }
    }
  2. public class Consumer {
    public static void main(String[] args) throws Exception {
    //1.创建消费者Consumer,制定消费者组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group1”);
    //2.指定Nameserver地址
    consumer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
    //3.订阅主题Topic和Tag
    consumer.subscribe(“TransactionTopic”, “*”);
    //4.设置回调函数,处理消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    //接受消息内容
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
    for (MessageExt msg : msgs) {
    System.out.println(“consumeThread=” + Thread.currentThread().getName() + “,” + new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    //5.启动消费者consumer
    consumer.start();
    System.out.println(“生产者启动”);
    }
    }

    1. 消息存储方式介绍
      分布式队列因为有高可靠性的要求,所以消息要进行持久化的存储

      (1) 首先是producer发送消息发送至MQ
      (2) MQ接收到消息后先进行存盘行为
      (3) 存盘之后向producer进行ack确认
      (4) Consumer上线之后,MQ将消息push给了consumer,等待consumer ack
      (5) 如果Consumer在指定范围内成功返回ack,则mq认为消息被成功消费,在存储中删除消息.如果在指定范围内没有收到consumer的ack,那么MQ则会重新push
      (6) MQ删除已消费的消息


      14.4 RocketMQ的消息存储结构:
      在消息数据存盘的时候,mq的存盘文件的路径下,保存了消息存放的几个数据文件.如下:

      commitLog: 存储消息的元数据,发送的所有的消息都会存入到commitLog中.包含了消息的topic,queueid,message消息等,如果文件已经大于1G,那么则会自动生成新的文件.
      新创建的文件大小仍然是1G
      consumerQueue: 消息逻辑队列,里面存放的是commitlog的索引.用于快速从1G的文件中读取数据.先从这个索引文件中找到消息的索引,再根据索引去commitlog中检索数据.
      consumerQueue专为快速检索消息存在
      如果consumerQueue丢失,那么可以通过commitLog进行恢复,commitLog不仅存放了消息数据,还存放了consumerQueue全部数据
      IndexFile: 为消息查询提供了一种通过key或者时间区间来查询消息的方法,这种通过indexFile来查询消息的方法不影响发送与消费消息的主流程.

    15 刷盘机制:
    RocketMQ的消息是存储到磁盘上的,这样既能保证断电后使用,又可以让存储的消息超出内存的限制.为了保证性能,rocketmq会尽量保证磁盘的顺序写,
    15.1 两种磁盘写入方式:


    16 RocketMQ高可用:















转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/663280.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号