栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java > SpringBoot

SpringBoot整合阿里RocketMQ

SpringBoot 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot整合阿里RocketMQ

什么是RocketMQ

阿里消息队列 RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,同时是收费的产品。

应用场景 削峰填谷

诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列 RocketMQ 版可提供削峰填谷的服务来解决该问题。

异步解耦

交易系统作为淘宝/天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列 RocketMQ 版可实现异步通信和应用解耦,确保主站业务的连续性。

顺序收发

细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出(First In First Out,缩写 FIFO)原理类似,消息队列 RocketMQ 版提供的顺序消息即保证消息 FIFO。

分布式事务一致性

交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列 RocketMQ 版的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。

大数据分析

数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列 RocketMQ 版与流式计算引擎相结合,可以很方便的实现将业务数据进行实时分析。

分布式缓存同步

天猫双 11 大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因为带宽瓶颈限制商品变更的访问流量,通过消息队列 RocketMQ 版构建分布式缓存,实时通知商品数据的变化。

1、配置pom.xml

    org.springframework.boot
    spring-boot-starter-web




    org.projectlombok
    lombok
    true




    com.aliyun.openservices
    ons-client
    1.8.0.Final

2、配置application.properties
server.port=8888
#rocketmq配置
#鉴权用AccessKeyId在阿里云服务器管理控制台创建
rocketmq.accessKey=accessKey
#鉴权用AccessKeySecret在阿里云服务器管理控制台创建
rocketmq.secretKey=secretKey
#tcp长连接,设置TCP接入域名,进入控制台的实例管理页面,在页面上方选择实例后,在实例信息中的“获取接入点信息”区域查看
rocketmq.namesrvAddr=http://MQ_INST_15namesrvAddr7I.cn-hangzhou.mq-internal.aliyuncs.com:8080
#mq主题,,您在控制台创建的topic
rocketmq.topic=topic
#mq组名,您在控制台创建的 Group ID
rocketmq.groupId=groupId

以上参数均可在阿里控制台中找到

3、配置类
package com.ifilldream.rocketmq_lean.mq;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.Properties;


@Configuration
public class RocketMQConfig {

    @Value("${rocketmq.accessKey}")
    public String accessKey;
    public static String ACCESS_KEY;

    @Value("${rocketmq.secretKey}")
    public String secretKey;
    public static String SECRET_KEY;

    @Value("${rocketmq.namesrvAddr}")
    public String namesrvAddr;
    public static String NAMESRV_ADDR;

    @Value("${rocketmq.groupId}")
    public String groupId;
    public static String GROUP_ID;

    @Value("${rocketmq.topic}")
    public String topic;
    public static String TOPIC;

    
    public Properties getProperties() {
 Properties properties = new Properties();
 //您在控制台创建的GroupID
 properties.put(PropertyKeyConst.GROUP_ID, groupId);
 // 鉴权用AccessKeyId在阿里云服务器管理控制台创建
 properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
 // 鉴权用AccessKeySecret在阿里云服务器管理控制台创建
 properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
 //延时时间
 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
 // 顺序消息消费失败进行重试前的等待时间单位(毫秒)
 properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
 // 消息消费失败时的最大重试次数
 properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");
 // 设置TCP接入域名,进入控制台的实例管理页面,在页面上方选择实例后,在实例信息中的“获取接入点信息”区域查看
 properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
 return properties;
    }

    
    @PostConstruct
    public void init(){
 ACCESS_KEY = this.accessKey;
 SECRET_KEY = this.secretKey;
 NAMESRV_ADDR = this.namesrvAddr;
 GROUP_ID = this.groupId;
 TOPIC = this.topic;
    }
}
4、RocketMQ工具
package com.ifilldream.rocketmq_lean.mq;
import com.aliyun.openservices.ons.api.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Properties;


@Component
@Slf4j
public class MQUtil {

    @Autowired
    private RocketMQConfig rocketMQConfig;

    
    public void sendMessage(String content,String tag){
 Message message = new Message();
 message.setBody(content.getBytes());
 message.setTopic(RocketMQConfig.TOPIC);
 message.setTag(tag);
 this.sendCustomerMessage(message);
    }

    
    public void sendDelayMessage(String content,String tag,long delayTime){
 Message message = new Message();
 message.setBody(content.getBytes());
 message.setTopic(RocketMQConfig.TOPIC);
 message.setTag(tag);
 
 message.setStartDeliverTime(System.currentTimeMillis()+delayTime);
 this.sendCustomerMessage(message);
    }

    
    private void sendCustomerMessage(Message message) {
 Properties properties=rocketMQConfig.getProperties();
 Producer producer = ONSFactory.createProducer(properties);
 //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可
 producer.start();
 try {
     SendResult sendResult = producer.send(message);
     // 同步发送消息,只要不抛异常就是成功
     if (sendResult != null) {
  log.info("消息发送成功:messageID:"+sendResult.getMessageId());
     }
 } catch (Exception e) {
     // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
     e.printStackTrace();
 }
 //在应用退出前,销毁Producer对象
 producer.shutdown();
    }
}

5、标签
package com.ifilldream.rocketmq_lean.mq;


public class MqTag {
    
    //测试1
    public final static String ROCKETMQTEST1 = "ROCKETMQ_TEST1";
    //测试2
    public final static String ROCKETMQTEST2 = "ROCKETMQ_TEST2";
}
6、消费者
package com.ifilldream.rocketmq_lean.mq;
import com.aliyun.openservices.ons.api.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Properties;


@Component
@Slf4j
public class RocketMQConsumer {

    @Autowired
    private RocketMQConfig rocketMQConfig;

    
    public void normalSubscribe() {
 Properties properties = rocketMQConfig.getProperties();
 Consumer consumer = ONSFactory.createConsumer(properties);
 consumer.subscribe(RocketMQConfig.TOPIC, "", new MessageListener() {
     @Override
     public Action consume(Message message, ConsumeContext context) {
  try {
      //接收到的消息内容
      String msg = new String(message.getBody(), "UTF-8");
      String tag = message.getTag();
      switch (tag) {
   case MqTag.ROCKETMQTEST1:
log.info("收到消息messageID:" + message.getMsgID() + " msg:" + msg);
//TODO do something
break;
   case  MqTag.ROCKETMQTEST2:
log.info("收到消息messageID:" + message.getMsgID() + " msg:" + msg);
//TODO do something
break;
      }
      return Action.CommitMessage;
  } catch (Exception e) {
      log.info("消费失败:messageID:" + message.getMsgID());
      e.printStackTrace();
      return Action.ReconsumeLater;
  }
     }
 });
 consumer.start();
    }
}
7、消费者启动监听
package com.ifilldream.rocketmq_lean.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;


@Component
public class RocketConsumerListener implements CommandLineRunner {

    @Autowired
    private RocketMQConsumer rocketMQConsumer;

    @Override
    public void run(String... args) {
 System.out.println("========rocketMQ消费者启动==========");
 rocketMQConsumer.normalSubscribe();
    }
}
8、接口
package com.ifilldream.rocketmq_lean.controller;
import com.ifilldream.rocketmq_lean.mq.MQUtil;
import com.ifilldream.rocketmq_lean.mq.MqTag;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;


@RestController
@RequestMapping("/ifilldream/rocketmq")
public class RocketController {

    @Resource
    private MQUtil mqUtil;

    @GetMapping("/test")
    public String test(String content) {
 return content;
    }

    @GetMapping("/test1")
    public String test1(String content) {
 mqUtil.sendMessage(content, MqTag.ROCKETMQTEST1);
 mqUtil.sendDelayMessage("测试", MqTag.ROCKETMQTEST1, 1000L);
 return "success";
    }

    @GetMapping("/test2")
    public String test2(String content) {
 mqUtil.sendMessage(content, MqTag.ROCKETMQTEST2);
 mqUtil.sendDelayMessage("测试", MqTag.ROCKETMQTEST2,3000L);
 return "success";
    }

}

此时代码完毕,在Linux服务器上运行项目Jar包,浏览器中输入:xx.xx.xx.xx:8888/ifilldream/rocketmq/test1?content=nihao即可看到效果;

xx.xx.xx.xx为服务器的IP或域名,运行效果如下:

以上代码亲测可用,更多详情请关注阿里官方文档https://help.aliyun.com/product/29530.html?spm=a2c4g.11186623.6.540.1a4b7e805ygc75

统一首发平台为微信公众号"轻梦致新",搜索关注公众号,第一时间阅读最新内容。

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号