栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RocketMQ学习二 【发送消息与接收实例】

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RocketMQ学习二 【发送消息与接收实例】

在上篇中,了解RocketMQ的基本概念,以及安装好RocketMQ,启动后。进行代码实际操作练习

本篇主要练习三种发送方式

一、基础项目创建

需要创建两个项目,来表示消费者和生产者

创建两个SpringBoot项目,并导入maven和配置好application.yaml

maven需要导入rocketmq-common、rocketmq-client、fastjson

    
        
            org.springframework.boot
            spring-boot-starter
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            org.apache.rocketmq
            rocketmq-common
            4.3.0
        

        
            org.apache.rocketmq
            rocketmq-client
            4.3.0
        

        
            com.alibaba
            fastjson
            1.2.49
        
    

application.yaml

rocketmq:
  consumer:
    # RocketMQ
    namesrvAddr: your`s ip:9876
   
    groupName: test-demo
    
    instanceName: consumer.demo
  producer:
    
    sendMsgTimeout: 10000
    
    maxMessageSize: 999999999
    
    compressOver: 40000
  topic: test-demo
  # tag , "0||1||2"
  tag: "111"

RocektMQConsumer项目中,添加配置类,用于创建Consumer,并启动该项目

@Configuration
public class RocketMQConfig {
    @Value("${rocketmq.consumer.groupName}")
    private String groupName;
    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.consumer.instanceName}")
    private String instanceName;
    @Value("${rocketmq.producer.sendMsgTimeout}")
    private int sendMsgTimeout;
    @Value("${rocketmq.producer.maxMessageSize}")
    private int maxMessageSize;
    @Value("${rocketmq.producer.compressOver}")
    private int compressOver;
    @Value("${rocketmq.topic}")
    private String topic;
    @Value("${rocketmq.tag}")
    private String tag;

    @Bean
    public DefaultMQPushConsumer getRocketMQConsumer() {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.groupName);
        consumer.setNamesrvAddr(this.namesrvAddr);
        consumer.setInstanceName(this.instanceName);
        consumer.setConsumeMessageBatchMaxSize(1);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
      


        try {
            consumer.subscribe(this.topic, tag);
            consumer.registerMessageListener((MessageListenerConcurrently) (msgList, consumeConcurrentlyContext) -> {
                try {
                    MessageExt msg = null;
                    for (MessageExt aMsgList : msgList) {
                        msg = aMsgList;

                        System.out.println("收到MQ消息:"+new String(msg.getBody()));
                    }

                } catch (JSonException e) {
                    System.out.println(e);
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
            System.out.println("已启动Conusmer【gruop:" + this.groupName + ",instance:" + this.instanceName
                    + "】,监听TOPIC-{" + this.topic + "},tag-{" + this.tag + "}");
        } catch (MQClientException e) {
            System.out.println(e);
        }
        return consumer;
    }
}

producter项目中,同样创建配置类,用于创建Producter

@Configuration
public class SpringConfig {

    @Value("${rocketmq.consumer.groupName}")
    private String groupName;
    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.consumer.instanceName}")
    private String instanceName;
    @Value("${rocketmq.producer.sendMsgTimeout}")
    private int sendMsgTimeout;
    @Value("${rocketmq.producer.maxMessageSize}")
    private int maxMessageSize;
    @Value("${rocketmq.producer.compressOver}")
    private int compressOver;
    @Value("${rocketmq.topic}")
    private String topic;
    @Value("${rocketmq.tag}")
    private String tag;


    @Bean
    public DefaultMQProducer getRocketMQProducer() {
        DefaultMQProducer producer;
        producer = new DefaultMQProducer(this.groupName);
        producer.setNamesrvAddr(this.namesrvAddr);
        producer.setInstanceName(instanceName);
        producer.setSendMsgTimeout(this.sendMsgTimeout);
        producer.setCompressMsgBodyOverHowmuch(this.compressOver);
        producer.setMaxMessageSize(this.maxMessageSize);

        try {
            System.out.println("Producer start...");
            producer.start();

        } catch (Exception e) {
            System.out.println(e);
        }
        return producer;
    }
二、测试

生产者发送消息的三种方式

发送方式发送TPS发送结果反馈可靠性
同步发送不丢失
异步发送不丢失
单向发送最快可能丢失

将在生产者项目中添加test进行测试

在test目录下对应位置创建或修改已有测试类,添加自动注入生产者实例

@Autowired
DefaultMQProducer producer;
1.同步发送
* 可靠同步发送
* 含义:
*   同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式
* 方式:
*   调用DefaultMQProducer的send方法
* 应用场景:
*   重要消息通知、短信通知等
@Test
void ReliableSynchronousT() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
    for (int i = 0; i < 10; i++)
    {
         final  int index = i;
         Message msg = new Message("test-demo" ,
                                      "111" ,
                                      ("hello rocketMq" + index).getBytes(StandardCharsets.UTF_8) 
         );
         SendResult sendResult = producer.send(msg);
         System.out.println(sendResult.getSendStatus() + "  " + i);
    }
}

启动测试

生产者窗口,发现消息是一条一条接着发送

 在消费者窗口查看

 

2.异步发送
    
    @Test
    void AsyncT() throws RemotingException, InterruptedException, MQClientException {
        producer.setRetryTimesWhenSendAsyncFailed(0);
        CountDownLatch countDownLatch = new CountDownLatch(100);
        for (int i = 0; i < 10; i++)
        {
            final  int index = i;
            Message msg = new Message("test-demo" ,
                    "111" ,
                    ("hello rocketMq by Async" + index).getBytes(StandardCharsets.UTF_8) 
            );
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("发送:" + index);
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.printf("发送:" + index + "失败 " + throwable);
                }
            });
            //添加线程延迟,否则发送失败
            countDownLatch.await(1, TimeUnit.SECONDS);
        }
    }

生产者界面

 消费者

 3.单向发送
    
    @Test
    void oneWayT() throws RemotingException, InterruptedException, MQClientException {
        for (int i = 0; i < 10; i++)
        {
            final  int index = i;
            Message msg = new Message("test-demo" ,
                    "111" ,
                    ("hello rocketMq by OneWay" + index).getBytes(StandardCharsets.UTF_8) 
            );
           producer.sendoneway(msg);

        }
    }

生产者无输出

 消费者

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/749233.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号