栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

同一服务Kafka多数据源配置

同一服务Kafka多数据源配置

同一服务Kafka多数据源配置 一个微服务项目,业务需要,引入两套kafka集群的配置

1.xml文件加入新老Kafka集群配置信息
2.添加KafkaTemplateConfig bean文件
3.注解@Qualifier(“kafkaTemplateNew”)使用

@EnableKafka
@Configuration
public class KafkaTemplateConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka-new.bootstrap-servers}")
    private String newBootstrapServers;

    
    @Bean
    @Primary
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    
    @Bean(name="kafkaTemplateNew")
    public KafkaTemplate kafkaTemplateNew() {
        return new KafkaTemplate<>(newProducerFactory());
    }

    
    @Bean
    public ProducerFactory producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs(true));
    }

    
    @Bean
    public ProducerFactory newProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs(false));
    }

    
    public Map producerConfigs(boolean isPrimary) {
        Map props = new HashMap<>();
        // 指定多个kafka集群多个地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, isPrimary?bootstrapServers:newBootstrapServers);

        // 重试次数,0为不启用重试机制
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        //同步到副本, 默认为1
        // acks=0 把消息发送到kafka就认为发送成功
        // acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
        // acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        // 生产者空间不足时,send()被阻塞的时间,默认60s
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
        // 控制批处理大小,单位为字节
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
        // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        // 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
        // 键的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 值的序列化方式
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
        // 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
        return props;
    }
}
    @Autowired
    @Qualifier("kafkaTemplateNew")
    private KafkaTemplate kafkaTemplateNew;
    @Autowired
    private KafkaTemplate kafkaTemplate;
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/742695.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号