栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

JAVA RockectMQ简单实用实例

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

JAVA RockectMQ简单实用实例

一套完整的java实操RocketMQ,收发服务,拿走就能用。都是自己学习的时候,搭建测试完成的。

RocketMQ 搭建发送服务:

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

import com.cetcnav.lbs.base.util.CommonUtils;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RocketMQPubService {

	// host port
	private String namesrvAddr = RocketMQPropertiesUtils.getString("rocketmq.namesrvAddr");
	// 生成者组名
	private String producerGroupName = RocketMQPropertiesUtils.getString("rocketmq.producerGroupName");
	private DefaultMQProducer producer;

	private static RocketMQPubService service;

	private RocketMQPubService() {
		init();
	}

	public static RocketMQPubService getInstance() {
		if (service == null) {
			service = new RocketMQPubService();
		}
		return service;
	}
	
	private void init() {
		try {
			producer = new DefaultMQProducer(producerGroupName);
			// 设置重试次数(默认2次)
			producer.setRetryTimesWhenSendFailed(10);
			// 绑定name server
			producer.setNamesrvAddr(namesrvAddr);
			// 发送超时时间,默认3000 单位ms
			producer.setSendMsgTimeout(5000);
			// producer.
			producer.start();
		} catch (MQClientException e) {
			log.error(CommonUtils.printStackTrack(e));
		}
	}
	
	public Boolean sendMessage(Message msg) {
		SendResult send;
		try {
			send = producer.send(msg);
			if (send.getSendStatus().equals(SendStatus.SEND_OK)) {
				log.info("信息发送成功:[{}]", msg);
				return true;
			} else {
				// 发送失败处理
				log.info("信息发送失败:[{}]", msg);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}

		return null;
	}
}

实际业务中的使用示例:

Message message = new Message("Topic", "Tag",byte[]);
RocketMQPubService.getInstance().sendMessage(message);

RockectMQ 接收服务:

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import com.cetcnav.lbs.rocketMQ.worker.ListenerServiceWorkerInf;

import cn.hutool.core.collection.CollUtil;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class RocketMQReceiveThread extends Thread {

	// host port
	private String namesrvAddr = RocketMQPropertiesUtils.getString("rocketmq.namesrvAddr");
	// 指定消费者订阅的主题和标签
	private final String consumerProupName;
	private final String topic;
	private final String tag;
	private final ListenerServiceWorkerInf worker;
	private DefaultMQPushConsumer consumer;

	public RocketMQReceiveThread(String consumerProupName, String topic, String tag, ListenerServiceWorkerInf worker) {
		super();
		this.consumerProupName = consumerProupName;
		this.topic = topic;
		this.tag = tag;
		this.worker = worker;
	}

	
	public void run() {
		try {
			// 1 创建消费者,指定所属的消费者组名
			consumer = new DefaultMQPushConsumer(consumerProupName);
			// 2 指定NameServer的地址
			consumer.setNamesrvAddr(namesrvAddr);
			// 3 指定消费者订阅的主题和标签 订阅topic和 tags( * 代表所有标签)下信息
			consumer.subscribe(topic, tag);
			// 一个GroupName第一次消费时的位置
			consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
			consumer.setConsumeThreadMin(20);
			consumer.setConsumeThreadMax(20);
			// 一次最大消费的条数
			consumer.setConsumeMessageBatchMaxSize(10);
			// 消费模式,广播或者集群,默认集群。
			consumer.setMessageModel(MessageModel.CLUSTERING);
			// 在同一jvm中 需要启动两个同一GroupName的情况需要这个参数不一样。
			// consumer.setInstanceName("InstanceName");

			consumer.registerMessageListener(new MessageListenerConcurrently() {
				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List msgs,
						ConsumeConcurrentlyContext context) {
					worker.handle(msgs);
					log.info("Receive New Messages: [{}]", msgs);
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});
			consumer.start();
			log.info("Consumer Started.");
		} catch (MQClientException e) {
			e.printStackTrace();
		}
	}

	private DefaultLitePullConsumer defaultLitePullConsumer;
	private static boolean runFlag = true;

	
	public void pull() throws Exception {
		// 1 创建消费者,指定所属的消费者组名
		defaultLitePullConsumer = new DefaultLitePullConsumer(consumerProupName);
		// 2 指定NameServer的地址
		defaultLitePullConsumer.setNamesrvAddr(namesrvAddr);
		// 3 指定消费者订阅的主题和标签
		defaultLitePullConsumer.subscribe(topic, tag);
		// 一次最大消费的条数
		defaultLitePullConsumer.setPullBatchSize(100);

		defaultLitePullConsumer.start();
		while (runFlag) {
			try {
				// 拉取消息,无消息时会阻塞
				List msgs = defaultLitePullConsumer.poll();
				if (CollUtil.isEmpty(msgs)) {
					continue;
				}
				// 业务处理
				worker.handle(msgs);
				
				// 同步消费位置。不执行该方法,应用重启会存在重复消费。
				defaultLitePullConsumer.commitSync();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		defaultLitePullConsumer.shutdown();
	}
}

抽象work:

import java.util.List;

import org.apache.rocketmq.common.message.MessageExt;


public abstract class ListenerServiceWorkerInf {
	
	public abstract void work(List msgs);
	
	public void handle(List msgs){
		this.work(msgs);
	}
}

业务实操类:

import org.apache.rocketmq.common.message.MessageExt;
@Slf4j
public class CMDFeedback2PISWork extends ListenerServiceWorkerInf {
	@Override
	public void work(List msgs) {
		log.info("CMDFeedback2PISReceiver receive a ack.");
		try {
			for (MessageExt msg : msgs) {
				// do something……
			}
		} catch (Exception e) {
			log.error(e.toString());
		}
	}
}

监听任务实例:

public class DataListener {
	
	private static DataListener listener;
	
	public static DataListener getInstance(){
		if (listener == null){
			listener = new DataListener();
		}
		return listener;
	}

	public void handle(){		
		CMDFeedback2PISWork feedbackWork = new CMDFeedback2PISWork();
		//String consumerProupName, String topic, String tag, ListenerServiceWorkerInf worker
		new RocketMQReceiveThread("CMDFeedback2PIS_CONSUMER", "CMDFeedback2PIS", "*", feedbackWork).start();
}

项目启动监听:

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

public class PisListener implements ServletContextListener {

	@Override
	public void contextDestroyed(ServletContextEvent arg0) {
	}

	@Override
	public void contextInitialized(ServletContextEvent arg0) {
		DataListener.getInstance().handle();
	}
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/848173.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号