1.同步发送
适用于消息很重要,同时对响应时间不敏感的场景。此时发送完消息后需要同步等待发送结果.
public class SyncProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name
DefaultMQProducer producer = new DefaultMQProducer("producer_group_01");
//Specify name server addresses
producer.setNamesrvAddr("localhost:9876");
//Launch the instance
producer.start();
for(int i = 0; i < 100; i++){
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
//Shut down once the producer instance is not longer in use.
}
producer.shutdown();
}
}
2.异步发送
适用于消息很重要,但是对时间很敏感的场景。发送完消息后通过回调来获取发送结果。
public class AsyncProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("producer_group_01");
//Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for(int i = 0; i < messageCount; i++){
try{
final int index = i;
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}catch (Exception e){
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
3.直接发送
直接发送适用于能容忍消息丢失的场景,吞吐量最大。适用于日志等场景。
public class onewayProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name
DefaultMQProducer producer = new DefaultMQProducer("producer_group_01");
//Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for(int i = 0; i < 100; i++){
//Create a message instance,specifying topic, tag and message body.
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//Call send message to deliver message to one of brokers.
producer.sendoneway(msg);
}
//Wait for sending to complete
Thread.sleep(5000);
producer.shutdown();
}
}



