栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RocketMQProducer三种发送方式以及入门Demo

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RocketMQProducer三种发送方式以及入门Demo

单向(OneWay)发送

Producer只要把消息往Mq里面一推,Producer就不管了,至于消息是否成功到达MQ,Producer不管,这种吞吐量是最高的,

package org.apache.rocketmq.example.simple;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;


public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        //NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
        producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
        producer.start();

        for (int i = 0; i < 20; i++)
            try {
                {
                    Message msg = new Message("TopicTest", // 发送的topic
                            "TagA",  //tags
                            "OrderID188", // keys3
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
                    );
                    //同步传递消息,消息会发给集群中的一个Broker节点。
//                    SendResult sendResult = producer.send(msg);
//                    System.out.printf("%s%n", sendResult);
                    //这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的
                    //不知道消息是否发送成功,反正Producer发送完了就不管了 .
                    producer.sendOneway(msg);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}

同步发送

Producer 往MQ发送消息之后,会等待MQ给他自己响应,然后Producer那里的代码再往下继续执行自己的逻辑.
好处就是 Producer发送消息有没有发送成功,Producer自己是知道的,如果发送失败了话,Producer内部会有代码进行重试.


SendResult sendResult = producer.send(msg); 就是同步发送

public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        //NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
        producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
        producer.start();

        try {
            {
                Message msg = new Message("TopicTest", // 发送的topic
                        "TagA",  //tags
                        "OrderID188", // keys3
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
                );
                //同步传递消息,消息会发给集群中的一个Broker节点。
                //如果发送失败了 ,这里会有重试机制
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);

            }

        } catch (Exception e) {
            e.printStackTrace();
        }

        producer.shutdown();
    }

打印结果

SendResult [sendStatus=SEND_OK, msgId=AC100A010AD818B4AAC255F0E3B50000, offsetMsgId=AC100A6600002A9F00000000000974CF, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=380]

其中就能通过sendStatus字段来判断是否发送成功了

异步发送

Producer发送完了消息给MQ之后就继续执行下面的逻辑了,然后Producer会给MQ一个回调函数,
MQ在完成消息之后会回过来请求Producer的回调方法,在回调方法去执行逻辑, 但是这个逻辑什么时候执行Producer就不管了.


异步模式就是一个双向的交互,Producer既发消息也会接收MQ传过来的回调结果
MQ既接收消息也会发送 处理结果给Producer

需要注意一点就是如果Producer服务停了,就无法接收到MQ发过来的回调了.

package org.apache.rocketmq.example.simple;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


public class AsyncProducer {
    public static void main(
            String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {

        DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
        producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
        producer.start();
        //重试次数
        producer.setRetryTimesWhenSendAsyncFailed(0);

        int messageCount = 100;
        //由于是异步发送,这里引入一个countDownLatch,保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) { // 发送100条消息
            try {
                final int index = i;
                Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback 是回调函数
                producer.send(msg, new SendCallback() {
                    
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    
                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
                System.out.println("消息发送完成");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //保证100条消息的回调都处理完了,再结束producer
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}

输出结果:

日志输出太多了,下面只是粘贴了部分的输出结果

消息发送完成
消息发送完成
消息发送完成
18         OK AC100A012F3018B4AAC255F865870011 
2          OK AC100A012F3018B4AAC255F865810009 
87         OK AC100A012F3018B4AAC255F8658A0056 
三种方式总结

吞吐量:
1.单向发送模式因为发送过去之后不用接收结果,所以这种方式吞吐量是最高的,
2.同步发送模式是吞吐量最慢的方式,因为它需要等待MQ给它响应结果,它才能继续往下执行
3.异步发送模式吞吐量是比同步发送高比单向发送低的.

安全性:
1.同步发送模式安全性是最高的
2.异步发送模式和单向发送模式都容易丢失消息

发送指的是消息从Producer发送给MQ, 至于消费者是否消费消息,Producer是不管的.

消费者代码,三种方式公用这一个消费者
package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;


public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        
        consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
        
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        
        consumer.subscribe("TopicTest", "*");

        
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/332878.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号