Producer只要把消息往Mq里面一推,Producer就不管了,至于消息是否成功到达MQ,Producer不管,这种吞吐量是最高的,
package org.apache.rocketmq.example.simple;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
producer.start();
for (int i = 0; i < 20; i++)
try {
{
Message msg = new Message("TopicTest", // 发送的topic
"TagA", //tags
"OrderID188", // keys3
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
);
//同步传递消息,消息会发给集群中的一个Broker节点。
// SendResult sendResult = producer.send(msg);
// System.out.printf("%s%n", sendResult);
//这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的
//不知道消息是否发送成功,反正Producer发送完了就不管了 .
producer.sendOneway(msg);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
同步发送
Producer 往MQ发送消息之后,会等待MQ给他自己响应,然后Producer那里的代码再往下继续执行自己的逻辑.
好处就是 Producer发送消息有没有发送成功,Producer自己是知道的,如果发送失败了话,Producer内部会有代码进行重试.
SendResult sendResult = producer.send(msg); 就是同步发送
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
producer.start();
try {
{
Message msg = new Message("TopicTest", // 发送的topic
"TagA", //tags
"OrderID188", // keys3
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
);
//同步传递消息,消息会发给集群中的一个Broker节点。
//如果发送失败了 ,这里会有重试机制
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
打印结果
SendResult [sendStatus=SEND_OK, msgId=AC100A010AD818B4AAC255F0E3B50000, offsetMsgId=AC100A6600002A9F00000000000974CF, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=380]
其中就能通过sendStatus字段来判断是否发送成功了
Producer发送完了消息给MQ之后就继续执行下面的逻辑了,然后Producer会给MQ一个回调函数,
MQ在完成消息之后会回过来请求Producer的回调方法,在回调方法去执行逻辑, 但是这个逻辑什么时候执行Producer就不管了.
异步模式就是一个双向的交互,Producer既发消息也会接收MQ传过来的回调结果
MQ既接收消息也会发送 处理结果给Producer
需要注意一点就是如果Producer服务停了,就无法接收到MQ发过来的回调了.
package org.apache.rocketmq.example.simple;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class AsyncProducer {
public static void main(
String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
producer.start();
//重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
//由于是异步发送,这里引入一个countDownLatch,保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) { // 发送100条消息
try {
final int index = i;
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback 是回调函数
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
System.out.println("消息发送完成");
} catch (Exception e) {
e.printStackTrace();
}
}
//保证100条消息的回调都处理完了,再结束producer
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
输出结果:
日志输出太多了,下面只是粘贴了部分的输出结果
消息发送完成 消息发送完成 消息发送完成 18 OK AC100A012F3018B4AAC255F865870011 2 OK AC100A012F3018B4AAC255F865810009 87 OK AC100A012F3018B4AAC255F8658A0056三种方式总结
吞吐量:
1.单向发送模式因为发送过去之后不用接收结果,所以这种方式吞吐量是最高的,
2.同步发送模式是吞吐量最慢的方式,因为它需要等待MQ给它响应结果,它才能继续往下执行
3.异步发送模式吞吐量是比同步发送高比单向发送低的.
安全性:
1.同步发送模式安全性是最高的
2.异步发送模式和单向发送模式都容易丢失消息
消费者代码,三种方式公用这一个消费者发送指的是消息从Producer发送给MQ, 至于消费者是否消费消息,Producer是不管的.
package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}



