第一步:需要引入如下依赖:
org.springframework.boot spring-boot-starter-actuator org.springframework.cloud spring-cloud-stream 2.0.1.RELEASE org.springframework.cloud spring-cloud-starter-stream-kafka 2.0.1.RELEASE org.springframework.integration spring-integration-kafka 3.0.2.RELEASE org.springframework.kafka spring-kafka org.springframework.integration spring-integration-core
第二步:application.yml进行配置如下:
server:
port: 8080
spring:
application:
name: dum-stream
cloud:
stream:
kafka:
binder:
brokers: 192.168.1.202:9092
auto-create-topics: true
bindings:
testStreamOut:
destination: test-stream
contentType: application/json
testStreamInput:
destination: test-stream
contentType: application/json
第三步:springcloud-stream模块的代码编写,在该模块下定义一个StreamMessageService,如下:
import com.test.dum.stream.service.AppsStreams;
import com.test.dum.stream.param.MessageStream;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class StreamMessageService {
@Resource
private AppsStreams appsStreams;
public boolean sendMessage(MessageStream messageStream) {
boolean isSuccess = appsStreams.testStreamOut().send(MessageBuilder
.withPayload(messageStream)
.build());
return isSuccess;
}
}
第四步:消息配置类如下:
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface AppsStreams {
String TEST_STREAM_OUT = "testStreamOut";
String TEST_STREAM_INPUT = "testStreamInput";
@Output(TEST_STREAM_OUT)
MessageChannel testStreamOut();
@Input(TEST_STREAM_INPUT)
MessageChannel testStreamInput();
}
第五步:绑定消息通道:
import com.test.dum.stream.service.AppsStreams;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableBinding(AppsStreams.class)
public class StreamsConfig {
}
---
>第六步:消息消费:
```c
import com.test.dum.stream.param.MessageStream;
import com.test.dum.stream.service.AppsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
@EnableBinding(value = AppsStreams.class)
@Slf4j
public class StreamMessageListener {
@StreamListener(AppsStreams.TEST_STREAM_INPUT)
public void handleStreamCity(MessageStream payload) {
log.info("消息接收: " + payload);
}
}
项目demo
spring cloud stream kafka demo下载地址



