栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

基于dubbo+RocketMQ+springboot的简单埋点系统

基于dubbo+RocketMQ+springboot的简单埋点系统

1:整体架构流程

2:目录详情

consumer 消息消费者项目provider 方法提供者项目mq RocketMQ项目point 打点接口项目 3:关键代码详解 3.1:mq项目

mq.properties
mq.defaultmqgroup=xd-mq-group   //默认消息组名称
mq.namesrvaddr=localhost:9876   //rocketMQ地址,这里我用的是window docker启用的,具体说明在下面
mq.instancename=xd-rmq-instance 
复制代码
PdbaseMqService

用于初始化RocketMQ 服务 和 提供生成消息方法

// 从配置文件中获取
Resource resource = new ClassPathResource("mq.properties");
Properties p = new Properties();

try {
   p.load(resource.getInputStream());
} catch (IOException e) {
   logger.error("配置文件异常");
}

producer = new DefaultMQProducer(String.valueOf(p.getProperty("mq.defaultmqgroup")));
producer.setNamesrvAddr(String.valueOf(p.getProperty("mq.namesrvaddr")));
producer.setInstanceName(String.valueOf(p.getProperty("mq.instancename")));

try {
   producer.start();
   producer.setRetryTimesWhenSendAsyncFailed(2);
   logger.info("MQ生产者,Producer Started...");
} catch (Exception e) {
   logger.error("MQ生产者,异常:" + e.getMessage(), e);
}
复制代码
try {

   Message message = new Message(topic, tags, JSON.toJSonString(object).getBytes());
   logger.info("MQ生产者,【异步】发送消息:topic=" + topic + ",tags=" + tags);

   // 异步发送
   producer.send(message, new SendCallback() {
      @Override
      public void onSuccess(SendResult sendResult) {
         //logger.info("消息发送结果:result=" + sendResult.getSendStatus().name() + ",msgId=" + sendResult.getMsgId());
      }

      @Override
      public void onException(Throwable e) {
         logger.error("消息发送结果,异常了:" + e.getMessage(), e);
      }
   }, TIMEOUT);

} catch (Exception e) {
   logger.error("MQ生产者,发送消息,异常:" + e.getMessage(), e);
   return false;
}
复制代码
ProducerMqService

定义 RocketMQ的Topic 和 Tag 及发送消息的方法

// 定义主题,一般一个项目,使用同一个
public static String TOPIC = "XdTopic";

//================================
// 定义Tag,实际业务
//================================

public static String TAG_DOT_APP_EVENT_HIS = "tagDotAppEventHis";

public boolean sendDotAppEventHis(DotAppEventHisVo dotAppEventHisVo);
复制代码
ProducerMqServiceImpl

执发送消息的方法

@Override
public boolean sendDotAppEventHis(DotAppEventHisVo dotAppEventHisVo) {
   try {
      return PdbaseMqService.sendMsg(ProducerMqService.TOPIC, ProducerMqService.TAG_DOT_APP_EVENT_HIS, dotAppEventHisVo);
   } catch (Exception e) {
      logger.error("mq异步化:应用事件打点异常:" + e.getMessage(), e);
      return false;
   }
}
复制代码
3.2:provider方法提供者

ProducerMqServiceImpl

providers.xml dubbo xml方式封装方法




   
   
   

   
   

   
   

   
   
   


复制代码

**注意:zookeeper 我用的是本地环境,换为你们的zookeeper即可 **

3.3:point打点项目

ProducerMqServiceImpl

consumers.xml 注册使用dubbo的方法



       
   
   
   
   
   
   
   
   
   
   

复制代码

3.2:consumer消费者

ConsumerMqServiceImpl 订阅消息

package com.xd.consumer.service.impl;

![image.png](https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/b7f657ba58ad4eeb981a4471acd14122~tplv-k3u1fbpfcp-watermark.image?)
import com.google.gson.Gson;
import com.xd.consumer.base.CmbaseMqService;
import com.xd.mq.service.ProducerMqService;
import com.xd.mq.vo.DotAppEventHisVo;
import com.xd.consumer.service.ConsumerMqService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.List;


@Service
public class ConsumerMqServiceImpl implements ConsumerMqService {

    protected static Logger logger = LoggerFactory.getLogger(com.xd.consumer.service.impl.ConsumerMqServiceImpl.class);

    @Override
    public boolean consume() {
        try {
            DefaultMQPushConsumer consumer = CmbaseMqService.getConsumer();
            // 订阅所有消息
            consumer.subscribe(ProducerMqService.TOPIC, "*");
            logger.info("消费者,订阅所有消息");

            consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                                ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        Gson gson = new Gson();

                        logger.info("消费者,准备消费消息:tag= " + msg.getTags());

                        // 区分不同Tag,不同处理方式
                        switch (msg.getTags()) {
                            // 应用事件打点
                            case ProducerMqService.TAG_DOT_APP_EVENT_HIS:
                                DotAppEventHisVo dotAppEventHisVo = gson.fromJson(new String(msg.getBody()),
                                        DotAppEventHisVo.class);
                                logger.info("消费消息:得到内容为:" + dotAppEventHisVo.toString());
                                break;
                            default:
                                logger.error("无处理类型,请检查。tag=" + msg.getTags(), new RuntimeException("未知Tag类型"));
                                break;
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

            consumer.start();
            logger.info("消费者,启动成功...");

        } catch (Exception e) {
            logger.error("消费者,启动异常:" + e.getMessage(), e);
        }
        return true;
    }
}
复制代码

注意事项: 需要自己备好 RocketMQ 和 zookeeper的调用地址,我是window用docker运行的打包镜像运行的,这两个地址需要更换成自己的 本次工程也已上传到 github,看这里:dus-system

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758118.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号