栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spring cloud stream kafka 集成

spring cloud stream kafka 集成

spring cloud stream kafka 集成 有完整demo

第一步:需要引入如下依赖:

        
            org.springframework.boot
            spring-boot-starter-actuator
        
        
            org.springframework.cloud
            spring-cloud-stream
            2.0.1.RELEASE
        
        
            org.springframework.cloud
            spring-cloud-starter-stream-kafka
            2.0.1.RELEASE
        

        
        
            org.springframework.integration
            spring-integration-kafka
            3.0.2.RELEASE
            
                
                    org.springframework.kafka
                    spring-kafka
                
                
                    org.springframework.integration
                    spring-integration-core
                
            
        

第二步:application.yml进行配置如下:

server:
  port: 8080
spring:
  application:
    name: dum-stream
  cloud:
    stream:
      kafka:
        binder:
          brokers: 192.168.1.202:9092
          auto-create-topics: true
      bindings:
        testStreamOut:
          destination: test-stream
          contentType: application/json
        testStreamInput:
          destination: test-stream
          contentType: application/json


第三步:springcloud-stream模块的代码编写,在该模块下定义一个StreamMessageService,如下:

import com.test.dum.stream.service.AppsStreams;
import com.test.dum.stream.param.MessageStream;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;


@Service
public class StreamMessageService {

    @Resource
    private AppsStreams appsStreams;


    public boolean sendMessage(MessageStream messageStream) {
        boolean isSuccess = appsStreams.testStreamOut().send(MessageBuilder
                .withPayload(messageStream)
                .build());
        return isSuccess;
    }
}



第四步:消息配置类如下:

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;


public interface AppsStreams {

    
    String TEST_STREAM_OUT = "testStreamOut";
    String TEST_STREAM_INPUT = "testStreamInput";

    
    @Output(TEST_STREAM_OUT)
    MessageChannel testStreamOut();

    
    @Input(TEST_STREAM_INPUT)
    MessageChannel testStreamInput();
}


第五步:绑定消息通道:

import com.test.dum.stream.service.AppsStreams;
import org.springframework.cloud.stream.annotation.EnableBinding;


@EnableBinding(AppsStreams.class)
public class StreamsConfig {
}

---
>第六步:消息消费:
```c
import com.test.dum.stream.param.MessageStream;
import com.test.dum.stream.service.AppsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;


@EnableBinding(value = AppsStreams.class)
@Slf4j
public class StreamMessageListener {

    
    @StreamListener(AppsStreams.TEST_STREAM_INPUT)
    public void handleStreamCity(MessageStream payload) {
        log.info("消息接收: " + payload);
    }

}

项目demo

spring cloud stream kafka demo下载地址

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775196.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号