栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringCloud Stream 整合kafka

SpringCloud Stream 整合kafka

一、引入依赖包

org.springframework.cloud
spring-cloud-stream


org.springframework.cloud
spring-cloud-stream-binder-kafka

二、自定义信息通道
官方提供了Sink(输入通道)、Source(输出通道)、Processor(集成Sink和Source通道),我们也可以自定义我们自己的信息通道。
@Input注解标识一个输入通道
@Output注解标识一个输出通道
通道名称作为参数,如果未提供参数,默认使用方法名称作为通道名称。
如下我们自定义信息通道EsChannel


String ES_DEFAULT_OUTPUT = “es_default_output”;


String ES_DEFAULT_INPUT = “es_default_input”;


String ES_ALARM_OUTPUT = “es_alarm_output”;


String ES_ALARM_INPUT = “es_alarm_input”;


@Output(ES_DEFAULT_OUTPUT)
MessageChannel sendEsDefaultMessage();


@Output(ES_ALARM_OUTPUT)
MessageChannel sendEsAlarmMessage();


@Input(ES_DEFAULT_INPUT)
MessageChannel recieveEsDefaultMessage();


@Input(ES_ALARM_INPUT)
MessageChannel recieveEsAlarmMessage();
}

三、@EnableBinding使应用程序连接到消息代理

@EnableDiscoveryClient
@SpringBootApplication
@EnableFeignClients
@EnableHystrix
@MapperScan(basePackages = “com.es.mapper”)
@EnableBinding(EsChannel.class)
public class EsonenetApplication {

public static void main(String[] args) {
    SpringApplication.run(EsOnenetApplication.class, args);
}

}

四、SpringCloudStream及kafka配置

#==============================================================
#spring-cloud-stream-Kafka配置 开始
#==============================================================
#是否开启kafka(非spring-cloud-stream配置)
spring.kafka.enabled=false
#缺省的输入、输出通道
spring.cloud.stream.bindings.es_default_input.destination=es_default_topic
spring.cloud.stream.bindings.es_default_input.binder=kafka
spring.cloud.stream.bindings.es_default_input.group=es_default_group

spring.cloud.stream.bindings.es_default_output.destination=es_default_topic
spring.cloud.stream.bindings.es_default_output.binder=kafka

#入站消费者的并发性
spring.cloud.stream.bindings.es_default_input.consumer.concurrency=2

#告警的输入、输出通道(多主题、分组测试用,实际开发中根据业务需求定义)
spring.cloud.stream.bindings.es_alarm_input.destination=es_alarm_topic
spring.cloud.stream.bindings.es_alarm_input.binder=kafka
spring.cloud.stream.bindings.es_alarm_input.group=es_alarm_group

spring.cloud.stream.bindings.es_alarm_output.destination=es_alarm_topic
spring.cloud.stream.bindings.es_alarm_output.binder=kafka

#kafka配置
spring.cloud.stream.kafka.binder.brokers=172...6:9092,172...7:9092,172...8:9092
spring.cloud.stream.kafka.binder.zkNodes=172...6:2181,172...7:2181,172...8:2181
spring.cloud.stream.kafka.binder.requiredAcks=1
#==============================================================
#spring-cloud-stream-Kafka配置 结束
#==============================================================

五、发送消息到输出通道

@Component
public class EsKafkaMessageSender {
@Autowired
private EsChannel channel;


public void sendToDefaultChannel(String message){
channel.sendEsDefaultMessage().send(MessageBuilder.withPayload(message).build());
}


public void sendToAlarmChannel(String message){
channel.sendEsAlarmMessage().send(MessageBuilder.withPayload(message).build());
}
}
注入先前定义的通道EsChannel,sendToDefaultChannel、sendToAlarmChannel分别为我们自定义的两个发送方法,可将消息发送到不同的通道中,每个通道对应一个kafka的主题。

六、从输入通道订阅消息
@EnableBinding(value = EsChannel.class)
public class EsStreamListener {

@StreamListener(EsChannel.ES_DEFAULT_INPUT)
public void receive(Message message){
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
    System.out.println(sdf.format(new Date())+"------start--------安全用电默认消息:" + message);
    try {
        Thread.sleep(1000*10);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(sdf.format(new Date())+"------end--------安全用电默认消息");
}


@StreamListener(EsChannel.ES_ALARM_INPUT)
public void receiveAlarm(Message message){
    System.out.println("订阅告警消息:" + message);
}

}
从不同的通道实现消息的订阅。

七、这样完整的消息系统就搭建好了,定义Controller发送消息测试

@ApiOperation(value = “test1”, httpMethod = “POST”)
@PostMapping(value = “/test1”, produces = “application/json;charset=UTF-8”)
public void test1(String message, HttpServletRequest request,
HttpServletResponse response) {
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
}

@ApiOperation(value = "test", httpMethod = "POST")
@PostMapping(value = "/test2", produces = "application/json;charset=UTF-8")
public void test2(String message, HttpServletRequest request,
                  HttpServletResponse response) {
    sender.sendToAlarmChannel(message);
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/583459.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号