栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringCloudAlibaba学习-07-SpringCloud整合RocketMQ

SpringCloudAlibaba学习-07-SpringCloud整合RocketMQ

部署Rocket首先要部署一个 NameServer
相关介绍参考: https://www.jianshu.com/p/3d8d594d9161
rocket 部署: docker部署
docker-compose

version: "3.7" 
# https://github.com/apache/rocketmq-docker
services:
    namesrv:
        image: apacherocketmq/rocketmq:4.5.0
        container_name: namesrv
        ports:
          - 9876:9876
        volumes:
          - ./data/namesrv/logs:/home/rocketmq/logs
        command: sh mqnamesrv
    mqbroker:
        image: apacherocketmq/rocketmq:4.5.0
        container_name: mqbroker
        ports:
          - 10909:10909
          - 10911:10911
          - 10912:10912
        volumes:
            - ./data/broker/logs:/home/rocketmq/logs
            - ./data/broker/store:/home/rocketmq/store
            - ./data/broker/conf/broker.conf:/home/rocketmq/rocketmq-4.5.0/conf/broker.conf
        command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf
        depends_on:
          - namesrv

broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
#set `brokerIP1` if you want to set physical IP as broker IP.
#change you own physical IP Address
brokerIP1 = localhost
# listenPort = 28081
namesrvAddr= localhost:9876
autoCreateTopicEnable=true

然后就是项目文件
消息生产者架构:

pom.xml 文件(生产者和消费者用同一个pom文件)



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.3.2.RELEASE
         
    
    com.demo.springcloud_06_rocketmq
    mq_producer
    0.0.1-SNAPSHOT
    mq_producer
    生产者
    
        Hoxton.SR9
        2.3.2.RELEASE
        2.2.6.RELEASE
    
    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            org.projectlombok
            lombok
            1.18.0
            true
        

        
            org.springframework.boot
            spring-boot-starter-actuator
        
        
            com.alibaba.cloud
            spring-cloud-starter-stream-rocketmq
        
    

    
        
            
            
                org.springframework.boot
                spring-boot-dependencies
                ${spring.boot.version}
                pom
                import
            
            
            
                org.springframework.cloud
                spring-cloud-dependencies
                ${spring.cloud.version}
                pom
                import
            
            
            
                com.alibaba.cloud
                spring-cloud-alibaba-dependencies
                ${spring.cloud.alibaba.version}
                pom
                import
            
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    



生产者配置文件
application.yml

server:
  port: 8062

spring:
  cloud:
    stream:
      bindings:
        topic_channel:
          destination: TOPIC_CHANNEL_0
          content-type: application/json
          # group: topic_group
      rocketmq:
        binder:
          name-server: localhost:9876 #,localhost:9877
        bindings:
          topic_channel:
            producer:
              group: topic_group # 生产者分组
              # sync: true # 是否同步发送消息,默认为 false 异步。

MqProducerApplication,java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import com.demo.springcloud_06_rocketmq.mq_producer.producer.ProducerChannel;

@SpringBootApplication
@EnableBinding(ProducerChannel.class)
public class MqProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(MqProducerApplication.class, args);
    }

}

ProducerController.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;


@Slf4j
@RestController
public class ProducerController {

    @Autowired
    private MessageSender sender;

    @GetMapping("/send")
    public void testKafkaMessageSend(String message) {
        log.info("message:{}",message);
        sender.sendToTopicChannel(message);
    }


}

ProducerChannel.java

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;


public interface ProducerChannel {
    
    String TOPIC_CHANNEL = "topic_channel";



    
    @Output(TOPIC_CHANNEL)
    MessageChannel sendTopicChannelMessage();
}

MessageSender.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MessageSender {
    @Autowired
    private ProducerChannel channel;

    
    public void sendToTopicChannel(String message) {
        channel.sendTopicChannelMessage().send(MessageBuilder.withPayload(message).build());
    }
}

消息消费者项目架构:


pom.xml文件 同上

application.yml 文件如下

server:
  port: 8063

spring:
  application:
    name: mq_consumer
  cloud:
    stream:
      bindings:
        topic_channel:
          destination: TOPIC_CHANNEL_0
          content-type: application/json
          group: topic_group_1
      rocketmq:
        binder:
          name-server: localhost:9876
        bindings:
          topic_channel:
            consumer:
              enabled: true # 是否开启消费,默认为 true
              broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费

MqConsumerApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import com.demo.springcloud_06_rocketmq.mq_consumer.consumer.ConsumerChannel;

@SpringBootApplication
@EnableBinding(ConsumerChannel.class)
public class MqConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(MqConsumerApplication.class, args);
    }

}

ConsumerListener.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Date;

@Slf4j
@Component
public class ConsumerListener {



    
    @StreamListener(ConsumerChannel.TOPIC_CHANNEL)
    public void receive(@Payload Message message){
        log.info("{}订阅缺省消息:通道 = {},data = {}", new Date(),"topic_channel", message);
        log.info("接收数据: {}",message.getPayload());
    }
}

ConsumerChannel.java

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface ConsumerChannel {
    String TOPIC_CHANNEL = "topic_channel";
    @Input(TOPIC_CHANNEL)
    SubscribableChannel demo01Input();
}

mq的坑点主要在mq服务器的搭建上,如果配置不正确,消息服务器启动成功,但是接收和发送还会出问题,常见的就是发送成功,但是无法接收

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/680971.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号