栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringCloudAlibaba学习-06-SpringCloud整合Kafka入门(二)

SpringCloudAlibaba学习-06-SpringCloud整合Kafka入门(二)

延续 SpringCloudAlibaba学习-06-SpringCloud整合Kafka入门(一)
主要是增加分组和自定义信道
https://blog.csdn.net/xy3233/article/details/122084419

项目结构

pom文件 延续上一章
main函数 延续上一章

自定义通道 MyKafkaChannel.java

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MyKafkaChannel {
    
    String MY_DEFAULT_OUTPUT = "my_default_output";

    
    String MY_DEFAULT_INPUT = "my_default_input";

    
    String MY_ALARM_OUTPUT = "my_alarm_output";

    
    String MY_ALARM_INPUT = "my_alarm_input";

    
    @Output(MY_DEFAULT_OUTPUT)
    MessageChannel sendMyDefaultMessage();

    
    @Output(MY_ALARM_OUTPUT)
    MessageChannel sendMyAlarmMessage();

    
    @Input(MY_DEFAULT_INPUT)
    MessageChannel recieveMyDefaultMessage();

    
    @Input(MY_ALARM_INPUT)
    MessageChannel recieveMyAlarmMessage();

}

接受自定义通道的数据 KafkaMessageReceiveListener.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;

import java.util.Date;


@Slf4j
@EnableBinding(value = MyKafkaChannel.class)
public class KafkaMessageReceiveListener {

    
    @StreamListener(MyKafkaChannel.MY_DEFAULT_INPUT)
    public void receive(Message message){
        log.info("{}订阅缺省消息:通道 = es_default_input,data = {}", new Date(), message);
    }

    @StreamListener(MyKafkaChannel.MY_ALARM_INPUT)
    public void receiveAlarm(Message message){
        log.info("{}订阅告警消息:通道 = my_alarm_input,data = {}", new Date(), message);
    }

}

发送消息 KafkaMessageSender.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@EnableBinding(MyKafkaChannel.class)
public class KafkaMessageSender {
    @Autowired
    private MyKafkaChannel channel;

    
    public void sendToDefaultChannel(String message) {
        channel.sendMyDefaultMessage().send(MessageBuilder.withPayload(message).build());
    }

    public void sendToAlarmChannel(String message) {
        channel.sendMyAlarmMessage().send(MessageBuilder.withPayload(message).build());
    }

}

KafkaSenderController.java 延续上一章

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/683204.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号