一套完整的java实操RocketMQ,收发服务,拿走就能用。都是自己学习的时候,搭建测试完成的。
RocketMQ 搭建发送服务:
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import com.cetcnav.lbs.base.util.CommonUtils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RocketMQPubService {
// host port
private String namesrvAddr = RocketMQPropertiesUtils.getString("rocketmq.namesrvAddr");
// 生成者组名
private String producerGroupName = RocketMQPropertiesUtils.getString("rocketmq.producerGroupName");
private DefaultMQProducer producer;
private static RocketMQPubService service;
private RocketMQPubService() {
init();
}
public static RocketMQPubService getInstance() {
if (service == null) {
service = new RocketMQPubService();
}
return service;
}
private void init() {
try {
producer = new DefaultMQProducer(producerGroupName);
// 设置重试次数(默认2次)
producer.setRetryTimesWhenSendFailed(10);
// 绑定name server
producer.setNamesrvAddr(namesrvAddr);
// 发送超时时间,默认3000 单位ms
producer.setSendMsgTimeout(5000);
// producer.
producer.start();
} catch (MQClientException e) {
log.error(CommonUtils.printStackTrack(e));
}
}
public Boolean sendMessage(Message msg) {
SendResult send;
try {
send = producer.send(msg);
if (send.getSendStatus().equals(SendStatus.SEND_OK)) {
log.info("信息发送成功:[{}]", msg);
return true;
} else {
// 发送失败处理
log.info("信息发送失败:[{}]", msg);
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
实际业务中的使用示例:
Message message = new Message("Topic", "Tag",byte[]);
RocketMQPubService.getInstance().sendMessage(message);
RockectMQ 接收服务:
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import com.cetcnav.lbs.rocketMQ.worker.ListenerServiceWorkerInf;
import cn.hutool.core.collection.CollUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RocketMQReceiveThread extends Thread {
// host port
private String namesrvAddr = RocketMQPropertiesUtils.getString("rocketmq.namesrvAddr");
// 指定消费者订阅的主题和标签
private final String consumerProupName;
private final String topic;
private final String tag;
private final ListenerServiceWorkerInf worker;
private DefaultMQPushConsumer consumer;
public RocketMQReceiveThread(String consumerProupName, String topic, String tag, ListenerServiceWorkerInf worker) {
super();
this.consumerProupName = consumerProupName;
this.topic = topic;
this.tag = tag;
this.worker = worker;
}
public void run() {
try {
// 1 创建消费者,指定所属的消费者组名
consumer = new DefaultMQPushConsumer(consumerProupName);
// 2 指定NameServer的地址
consumer.setNamesrvAddr(namesrvAddr);
// 3 指定消费者订阅的主题和标签 订阅topic和 tags( * 代表所有标签)下信息
consumer.subscribe(topic, tag);
// 一个GroupName第一次消费时的位置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);
// 一次最大消费的条数
consumer.setConsumeMessageBatchMaxSize(10);
// 消费模式,广播或者集群,默认集群。
consumer.setMessageModel(MessageModel.CLUSTERING);
// 在同一jvm中 需要启动两个同一GroupName的情况需要这个参数不一样。
// consumer.setInstanceName("InstanceName");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
worker.handle(msgs);
log.info("Receive New Messages: [{}]", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
log.info("Consumer Started.");
} catch (MQClientException e) {
e.printStackTrace();
}
}
private DefaultLitePullConsumer defaultLitePullConsumer;
private static boolean runFlag = true;
public void pull() throws Exception {
// 1 创建消费者,指定所属的消费者组名
defaultLitePullConsumer = new DefaultLitePullConsumer(consumerProupName);
// 2 指定NameServer的地址
defaultLitePullConsumer.setNamesrvAddr(namesrvAddr);
// 3 指定消费者订阅的主题和标签
defaultLitePullConsumer.subscribe(topic, tag);
// 一次最大消费的条数
defaultLitePullConsumer.setPullBatchSize(100);
defaultLitePullConsumer.start();
while (runFlag) {
try {
// 拉取消息,无消息时会阻塞
List msgs = defaultLitePullConsumer.poll();
if (CollUtil.isEmpty(msgs)) {
continue;
}
// 业务处理
worker.handle(msgs);
// 同步消费位置。不执行该方法,应用重启会存在重复消费。
defaultLitePullConsumer.commitSync();
} catch (Exception e) {
e.printStackTrace();
}
}
defaultLitePullConsumer.shutdown();
}
}
抽象work:
import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
public abstract class ListenerServiceWorkerInf {
public abstract void work(List msgs);
public void handle(List msgs){
this.work(msgs);
}
}
业务实操类:
import org.apache.rocketmq.common.message.MessageExt;
@Slf4j
public class CMDFeedback2PISWork extends ListenerServiceWorkerInf {
@Override
public void work(List msgs) {
log.info("CMDFeedback2PISReceiver receive a ack.");
try {
for (MessageExt msg : msgs) {
// do something……
}
} catch (Exception e) {
log.error(e.toString());
}
}
}
监听任务实例:
public class DataListener {
private static DataListener listener;
public static DataListener getInstance(){
if (listener == null){
listener = new DataListener();
}
return listener;
}
public void handle(){
CMDFeedback2PISWork feedbackWork = new CMDFeedback2PISWork();
//String consumerProupName, String topic, String tag, ListenerServiceWorkerInf worker
new RocketMQReceiveThread("CMDFeedback2PIS_CONSUMER", "CMDFeedback2PIS", "*", feedbackWork).start();
}
项目启动监听:
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
public class PisListener implements ServletContextListener {
@Override
public void contextDestroyed(ServletContextEvent arg0) {
}
@Override
public void contextInitialized(ServletContextEvent arg0) {
DataListener.getInstance().handle();
}
}



