栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

kafka的多数据源监听相关操作

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

kafka的多数据源监听相关操作

我们经常会遇到一个微服务里面想要对多态服务器的kafka进行监听(非集群)这时候平常在application.properties可能就没办法支撑了,我们就需要通过原始方式进行配置

@Configuration
@EnableKafka
public class KafkaConfiguration {

    @Value("${brokerAddress}") // kafka所在地址ip:port
    private String brokerAddress;

    @Value("${groupId}") // kafka的分组Id
    private String groupId;
    
// 这里的工厂类配置可以配置多个,想要今天多少台服务器的kafka就可以写多少个
@Bean
KafkaListenerContainerFactory> consumerFactory(){
        // 创建kafka的工厂类
ConcurrentKafkaListenerContainerFactory factory = 
		new ConcurrentKafkaListenerContainerFactory();
        // 创建kafka的消费工厂类 这里也可以使kafka的提供者工厂类
ConsumerFactory consumerFactory = 
 		new DefaultKafkaConsumerFactory(factoryProperties(groupId,brokerAddress));
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(1);
        // 将当前kafka的工厂对象返回装载到ioc
        return factory;
    }

    private Map factoryProperties(String groupId, String brokerAddress) {
        // 这里就是配置kafka的相关参数 比如ip地址和分组这些参数
        Map properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        return properties;
    }
}

	// 监听类
	// 这里的topic名字就是当前配置kafka里面的topic
    // containerFactory就是配置的kafka工厂对象在ioc的名字
    @KafkaListener(topics = "topicName",containerFactory = 
    											"consumerFactory")
    public void kafkaTest(String message){
        // 对监听到的消息进行操作
        ...
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/354223.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号