栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

springboot集成kafka:二、手动装配

springboot集成kafka:二、手动装配

前言

单位内网spring-kafka依赖只有1.x的,spingboot集成kafka过程中发现配置文件不认一些项,比如spring.kafka.producer.properties和spring.kafka.consumer.properties,配上去也修改不了,日志显示依然维持默认配置,于是决定手动装配各项参数,主要是为了和线上项目的kafka相关配置统一,不管你在生产集群生不生效,反正我配了

另外在自己玩的环境上看的spring kafka 2.x就啥都有了,下面的都不用看,直接用上一篇文章用配置文件自动装配就可以,没有什么不同

另外下面代码都是在内网上先写完在手敲到这,不保证正确性,内网开发环境可以用

另外相较于上一篇文章配置文件前缀,我去掉了spring前缀,为了跟Spring kafka auto-config的配置相区分,食用时只需稍加注意即可

1.producer和consumer配置类
@ConfigurationProperties(prefix = "kafka.producer")
public class KafkaProducerPropsConfig {
    
    private String topic;
    private String retries;
    private String acks;
    private String batchSize;
    private String lingerMs;
    private String bufferMemory;
    private String keySerializer;
    private String valueSerializer;

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getRetries() {
        return retries;
    }

    public void setRetries(String retries) {
        this.retries = retries;
    }

    public String getAcks() {
        return acks;
    }

    public void setAcks(String acks) {
        this.acks = acks;
    }

    public String getBatchSize() {
        return batchSize;
    }

    public void setBatchSize(String batchSize) {
        this.batchSize = batchSize;
    }

    public String getLingerMs() {
        return lingerMs;
    }

    public void setLingerMs(String lingerMs) {
        this.lingerMs = lingerMs;
    }

    public String getBufferMemory() {
        return bufferMemory;
    }

    public void setBufferMemory(String bufferMemory) {
        this.bufferMemory = bufferMemory;
    }

    public String getKeySerializer() {
        return keySerializer;
    }

    public void setKeySerializer(String keySerializer) {
        this.keySerializer = keySerializer;
    }

    public String getValueSerializer() {
        return valueSerializer;
    }

    public void setValueSerializer(String valueSerializer) {
        this.valueSerializer = valueSerializer;
    }
}
@ConfigurationProperties(prefix = "kafka.consumer")
public class KafkaConsumerPropsConfig {
    
    private String topic;
    private String groupId;
    private String enableAutoCommit;
    private String autoCommitInterval;
    private String autoOffsetReset;
    private String keyDeserializer;
    private String valueDeserializer;
    private String sessionTimeoutMs;

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getGroupId() {
        return groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    public String getEnableAutoCommit() {
        return enableAutoCommit;
    }

    public void setEnableAutoCommit(String enableAutoCommit) {
        this.enableAutoCommit = enableAutoCommit;
    }

    public String getAutoCommitInterval() {
        return autoCommitInterval;
    }

    public void setAutoCommitInterval(String autoCommitInterval) {
        this.autoCommitInterval = autoCommitInterval;
    }

    public String getAutoOffsetReset() {
        return autoOffsetReset;
    }

    public void setAutoOffsetReset(String autoOffsetReset) {
        this.autoOffsetReset = autoOffsetReset;
    }

    public String getKeyDeserializer() {
        return keyDeserializer;
    }

    public void setKeyDeserializer(String keyDeserializer) {
        this.keyDeserializer = keyDeserializer;
    }

    public String getValueDeserializer() {
        return valueDeserializer;
    }

    public void setValueDeserializer(String valueDeserializer) {
        this.valueDeserializer = valueDeserializer;
    }

    public String getSessionTimeoutMs() {
        return sessionTimeoutMs;
    }

    public void setSessionTimeoutMs(String sessionTimeoutMs) {
        this.sessionTimeoutMs = sessionTimeoutMs;
    }
}

入口类开启允许覆盖

@EnableConfigurationProperties({KafkaProducerPropsConfig.class, KafkaConsumerPropsConfig.class})
2.KafkaTemplate配置类
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaTemplateConfig {

    @Value("${spring.kafka.bootStrap}")
    private String bootstrapServers;

    @Autowired
    KafkaProducerPropsConfig producerPropsConfig;

    @Autowired
    KafkaConsumerPropsConfig consumerPropsConfig;

    @Bean
    public Map producerProp() {
        HashMap prop = new HashMap<>();
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        prop.put(ProducerConfig.RETRIES_CONFIG, producerPropsConfig.getAcks());
        prop.put(ProducerConfig.ACKS_CONFIG, producerPropsConfig.getAcks());
        prop.put(ProducerConfig.BATCH_SIZE_CONFIG, producerPropsConfig.getBatchSize());
        prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerPropsConfig.getBufferMemory());
        prop.put(ProducerConfig.LINGER_MS_CONFIG, producerPropsConfig.getLingerMs());
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producerPropsConfig.getKeySerializer());
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producerPropsConfig.getValueSerializer());
        //继续其他配置。。。

        return prop;
    }


    @Bean
    public ProducerFactory producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerProp());
    }

    @Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public Map consumerProp() {
        HashMap prop = new HashMap<>();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, consumerPropsConfig.getGroupId());
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerPropsConfig.getEnableAutoCommit());
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerPropsConfig.getAutoCommitInterval());
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerPropsConfig.getAutoOffsetReset());
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumerPropsConfig.getKeyDeserializer());
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumerPropsConfig.getValueDeserializer());
        prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,consumerPropsConfig.getSessionTimeoutMs());
        //继续其他配置。。。

        return prop;
    }
    
    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProp());
    }

    
    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = 
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(1);
        factory.setConsumerFactory(consumerFactory());
        
        return factory;
    }
}

3.使用 与上一篇文章相同

注意ConcurrentKafkaListenerContainerFactory可以通过
factory.getContainerProperties().setXXX设置很多属性

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745487.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号