服务端使用的4.9.1,java客户端使用4.5.0的包
2.消息生产者org.apache.rocketmq rocketmq-client4.5.0
1.创建消息生产者producer,并制定生产者组名 2.指定Nameserver地址 3.启动producer 4.创建消息对象,指定主题Topic、Tag和消息体 5.发送消息 6.关闭生产者producer3.消息消费者
1.创建消费者Consumer,制定消费者组名 2.指定Nameserver地址 3.订阅主题Topic和Tag 4.设置回调函数,处理消息 5.启动消费者consumer消息发送 1.发送同步消息
package com.test.mq.rocketmq.base.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.concurrent.TimeUnit;
public class SyncProducer {
//1.创建消息生产者producer,并制定生产者组名
//2.指定Nameserver地址
//3.启动producer
//4.创建消息对象,指定主题Topic、Tag和消息体
//5.发送消息
//6.关闭生产者producer
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 设置NameServer的地址
producer.setNamesrvAddr("10.211.55.10:9876;10.211.55.8:9876");
producer.setSendMsgTimeout(6000);
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" ,
"TagB" ,
("Hello RocketMQHJW " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
Thread.sleep(1000);
// TimeUnit.SECONDS.sleep(10);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
其中遇到的问题会有同步发送消息超时,解决方案,设置一下java客户端发送消息超时时间
https://blog.csdn.net/qq_16241519/article/details/103926356
发送消息的返回值如下情况,显示了消息发送在哪个broker下
和发送同步消息基本一样,只是在producer.send的时候写入返回如果成功或者异常的时候如何处理
package com.test.mq.rocketmq.base.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group2");
// 设置NameServer的地址
producer.setNamesrvAddr("10.211.55.10:9876;10.211.55.8:9876");
producer.setSendMsgTimeout(6000);
// 启动Producer实例
producer.start();
// producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 10; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagB",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
Thread.sleep(1000);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
发送结果如下
如果生产者不关心消息发送是否成功失败的情况可以使用,例如发送日志
package com.test.mq.rocketmq.base.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class onewayProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 设置NameServer的地址
producer.setNamesrvAddr("10.211.55.10:9876;10.211.55.8:9876");
producer.setSendMsgTimeout(6000);
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" ,
"TagA" ,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送单向消息,没有任何返回结果
producer.sendoneway(msg);
Thread.sleep(1000);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
消息消费
package com.test.mq.rocketmq.base.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
public class Consumer {
//1.创建消费者Consumer,制定消费者组名
//2.指定Nameserver地址
//3.订阅主题Topic和Tag
//4.设置回调函数,处理消息
//5.启动消费者consumer
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("10.211.55.10:9876;10.211.55.8:9876");
// 订阅Topic
consumer.subscribe("TopicTest", "TagB");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
打印结果



