栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringCloudStream整合RocketMQ以及入门demo

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringCloudStream整合RocketMQ以及入门demo

出自 图灵学院 ,我自己学了一下,然后自己做了个笔记,再结合老师的讲义,整理了一下,写了个博客

概述

SpringCloudStream是Spring社区提供的一个统一的消息驱动框架,目的是想要以一个统一的编程模型来对接所有的MQ消息中间件产品。我们还是来看看SpringCloudStream如何来集成RocketMQ。

SpringCloudStream更牛逼的事情就是解耦,假如说我以后换MQ了,我把RocketMQ换成RabbitMQ了,或者Kafka了,我代码不需要任何改动,只需要换Maven依赖和properties配置文件即可.

使用SpringCloudStream可以让我们更多的经历关注我们的业务而不是各种MQ产品配置

注意点
  • 关于SpringCloudStream。这是一套几乎通用的消息中间件编程框架,例如从对接RocketMQ换到对接Kafka,业务代码几乎不需要动,只需要更换pom依赖并且修改配置文件就行了。但是,由于各个MQ产品都有自己的业务模型,差距非常大,所以使用使用SpringCloudStream时要注意业务模型转换。并且在实际使用中,要非常注意各个MQ的个性化配置属性。例如RocketMQ的个性化属性都是以spring.cloud.stream.rocketmq开头,只有通过这些属性才能用上RocketMQ的延迟消息、排序消息、事务消息等个性化功能。
  • SpringCloudStream是Spring社区提供的一套统一框架,但是官方目前只封装了kafka、kafka Stream、RabbitMQ的具体依赖。而RocketMQ的依赖是交由厂商自己维护的,也就是由阿里巴巴自己来维护。这个维护力度显然是有不小差距的。所以一方面可以看到之前在使用SpringBoot时着重强调的版本问题,在使用SpringCloudStream中被放大了很多。spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本还是4.4.0。这个差距就非常大了。另一方面,RocketMQ这帮大神不屑于写文档的问题也特别严重,SpringCloudStream中关于RocketMQ的个性化配置几乎很难找到完整的文档。
  • 总之,对于RocketMQ来说,SpringCloudStream目前来说还并不是一个非常好的集成方案。这方面跟kafka和Rabbit还没法比。所以使用时要慎重。
代码 pom依赖
 
        
            org.apache.rocketmq
            rocketmq-client
            4.7.1
        
        
            org.apache.rocketmq
            rocketmq-acl
            4.7.1
        
        
        
            com.alibaba.cloud
            spring-cloud-starter-stream-rocketmq
            2.2.3.RELEASE
            
                
                    org.apache.rocketmq
                    rocketmq-client
                
                
                    org.apache.rocketmq
                    rocketmq-acl
                
            
        
        
            org.springframework.boot
            spring-boot-starter-web
            2.3.3.RELEASE
        
    
application.properties配置文件
#消息的生产者   input来自sink.class
spring.cloud.stream.bindings.input.destination=TestTopic
spring.cloud.stream.bindings.input.group=scGroup
# 消息的发送者  output 是source.class
spring.cloud.stream.bindings.output.destination=TestTopic
# NameServer的地址
spring.cloud.stream.rocketmq.binder.name-server=zjj101:9876;zjj102:9876;zjj103:9876

Controller
package com.roy.scrocket.controller;

import com.roy.scrocket.basic.ScProducer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("/MQTest")
public class MQTestController {

    @Resource
    private ScProducer producer;

    
    @RequestMapping("/sendMessage")
    public String sendMessage(String message) {
        producer.sendMessage(message);
        return "消息发送完成";
    }
}

启动类
package com.roy.scrocket;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;

//EnableBinding 的意思是
@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class ScRocketMQApplication {

    public static void main(String[] args) {
        SpringApplication.run(ScRocketMQApplication.class, args);
    }
}

消费者
package com.roy.scrocket.basic;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

@Component
public class ScConsumer {

    @StreamListener(Sink.INPUT)
    public void onMessage(String messsage) {
        System.out.println("received message:" + messsage + " from binding:" +
                Sink.INPUT);
    }
}

生产者
package com.roy.scrocket.basic;

import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;


@Component
public class ScProducer {

    @Resource
    private Source source;

    public void sendMessage(String msg) {
        Map headers = new HashMap<>();
        headers.put(MessageConst.PROPERTY_TAGS, "testTag");
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        Message message = MessageBuilder.createMessage(msg, messageHeaders);
        //发送消息
        this.source.output().send(message);
    }
}

启动测试
启动ScRocketMQApplication,然后执行

get请求:  http://localhost:8080/MQTest/sendMessage?message=demoData
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/349497.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号