栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rocketmq

rocketmq

消息生产者

package com.rocketmqpro.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class RMQPro {
    public static void main(String[] args) {
        final boolean[] close = {false};
        DefaultMQProducer producer = new DefaultMQProducer("Producer");
        // 必须要设置nameserver地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        try {
            producer.setClientIP("**");
            //设置实例名称
            producer.setInstanceName("rocketMQ");
            //设置重试的次数
            producer.setRetryTimesWhenSendFailed(3);
            //开启生产者
            producer.start();
            //创建一条消息
            Message msg = new Message("PushTopic", "PushTopic", "PushTopic_01", "内容一".getBytes());
            //发送,并触发回调函数
            producer.send(msg, new SendCallback() {
                @Override
                //成功的回调函数
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult.getSendStatus());
                    State.sta = true;
                }

                @Override
                //出现异常的回调函数
                public void onException(Throwable e) {
                    System.out.println("SEND_EXCEPTION:" + e.getMessage());
                    State.sta = true;
                }
            });
            //获取某个主题的消息队列
            //List messageQueues = producer.fetchPublishMessageQueues("PushTopic");
            //System.out.println("消息队列数量:" + messageQueues.size());
            //for (int i = 0; i < messageQueues.size(); i++) {
            //    System.out.println(messageQueues.get(i).toString());
            //}
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            while (!State.sta) {
            }
            producer.shutdown();
            System.out.println("producer shutdown......");
        }
    }
}
package com.rocketmqpro.producer;

public class State {
    public volatile static boolean sta=false;
}

消息消费者(用的springboot)

server:
  port: 8080
  server-header:
rocketmq:
  name-server: 127.0.0.1:9876 #私有云消费mq地址


    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.4.10
         
    

    com.example
    rocketmqpro
    0.0.1-SNAPSHOT
    rocketmqpro

    
        1.8
    
    
        
            org.springframework.boot
            spring-boot-starter
        

        
            org.springframework.boot
            spring-boot-starter-web
        

        
        
            org.apache.rocketmq
            rocketmq-spring-boot-starter
            2.1.0
        

        
            org.apache.rocketmq
            rocketmq-client
            4.6.0
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    


package com.rocketmqpro;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RocketmqproApplication {

    public static void main(String[] args) {
        SpringApplication.run(RocketmqproApplication.class, args);
    }

}
package com.rocketmqpro.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;


@Component
@RocketMQMessageListener(topic = "PushTopic", consumerGroup = "jhc")
public class RMQCon implements RocketMQListener, RocketMQPushConsumerLifecycleListener {


    @Override
    public void onMessage(MessageExt message) {
        System.out.println(new String(message.getBody()));
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {

        //从消息队列头部开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //设置广播消费模式
        consumer.setMessageModel(MessageModel.BROADCASTING);

    }
}
广播消费 一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个 Consumer Group ,消息也会 被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意义。 集群消费 一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其 中一个 Consumer Group 有 3 个实例 ( 可能是 3 个进程,或者 3 台机器 ) ,那举每个实例只消费其中的 3条消息。 标签(Tag) 为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不 同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。 生产者发送消息发生返回状态(SendResult#SendStatus)有如下四种: FLUSH_DISK_TIMEOUT FLUSH_SLAVE_TIMEOUT SLAVE_NOT_AVAILABLE SEND_OK 不同状态在不同的刷盘策略和同步策略的配置下含义是不同的: 1. FLUSH_DISK_TIMEOUT: 表示没有在规定时间内完成刷盘(需要 Broker 的刷盘策略被设置成 SYNC_FLUSH 才会报这个错误)。 2. FLUSH_SLAVE_TIMEOUT: 表示在主备方式下,并且 Broker 被设置成 SYNC_MASTER 方式, 没有在设定时间内完成主从同步。 3. SLAVE_NOT_AVAILABLE: 这个状态产生的场景和 FLUSH_SLAVE_TIMEOUT 类似,表示在主 备方式下,并且 Broker 被设置成 SYNC_MASTER ,但是没有找到被配置成 Slave 的 Broker 。 4. SEND_OK: 表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是 否被同步到了 Slave 上?消息在 Slave 上是否被写入磁盘?需要结合所配置的刷盘策略、主从策 略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是 SEND_OK 。 在Linux操作系统层级进行调优,推荐使用EXT4文件系统,IO调度算法使用deadline算法。 消费上的优化 提高消费并行度 在 同一个 ConsumerGroup 下( Clustering 方式),可以通过 增加 Consumer 实例 的数量来提 高并行度。通过加机器,或者在已有机器中启动多个 Consumer 进程都可以增加 Consumer 实例数。 注意:总的 Consumer 数量不要超过 Topic 下 Read Queue 数量,超过的 Consumer 实例接收 不到消息。此外,通过提高单个 Consumer 实例中的 并行处理的线程数 ,可以在同一个 Consumer 内增加 并行度来提高吞吐量(设置方法是修改 consumeThreadMin 和 consumeThreadMax )。   以批量方式进行消费 某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中 涉及 update 某个数据库,一次 update10 条的时间会大大小于十次 update1 条数据的时间。 可以通过批量方式消费来提高消费的吞吐量。实现方法是设置 Consumer 的 consumeMessageBatchMaxSize 这个参数,默认是 1 ,如果设置为 N ,在消息多的时候每次 收到的是个长度为 N 的 消息链表 。 检测延时情况,跳过非重要消息 Consumer 在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆 积,这个时候可以选择丢弃不重要的消息,使 Consumer 尽快追上 Producer 的进度。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735734.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号