栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

JAVA spring Boot集成kafka的使用 第一章 配置

JAVA spring Boot集成kafka的使用 第一章 配置

JAVA spring Boot集成kafka的使用 - 配置

因为是java开发,相关zookeeper环境搭建 与kafka搭建就不详细做说明了,网上有很多的搭建方法,这里只叙述JAVA中的使用方法跟相关问题,纯手工啊兄弟萌
kafkaApi

1,配置和依赖 1.1依赖
    
        org.apache.kafka
        kafka_2.12
        1.0.0
        provided
    

    
        org.apache.kafka
        kafka-clients
        1.0.0
    

    
        org.apache.kafka
        kafka-streams
        1.0.0
    
    
    
        org.springframework.kafka
        spring-kafka
    
1.2 yml配置
  kafka:
    bootstrap-servers: 172.16.7.56:1996
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  listener:
    #listner负责ack,每调用一次,就立即commit,如果是手动消费此项必须配置不然会报错
    ack-mode: manual_immediate
    missing-topics-fatal: false
      

注意配置好了消费业务 手动消费时 ack-mode: manual_immediate必须配置 不然会报错spring boot集成配合kafka的话只需要在yml中做好参数配置就可以了,简单配置一下,详细的可以在官方文档中查看-官方文档

1.3 确保数据安全配置
    给 topic 设置
    replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2
    个副本。这个是可以在yml中配置的在 Kafka 服务端设置
    min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader
    至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。在 producer 端设置
    acks=all:这个是要求每条数据,必须是写入所有 replica
    之后,才能认为是写成功了。这个是可以在yml中配置的在 producer 端设置
    retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。这个是可以在yml中配置的
2.JAVA手动创建主题

JAVA是可以通过配置文件直接增加主题的,当然生产者send数据的时候如果没有指定的主题kafka是会默认创建这个主题的,我们也可以通过配置类进行主题的增加。
具体是通过kafka.clients.admin包实现的以下是代码:

@Configuration
@Slf4j
@EnableAsync
public class kafkaConfiguration {
    
    @Bean
    NewTopic initialTopic(){
        log.info("【kafka 已创建主题partitions 分区数量为5 副本数量为2】");
        //创建主题名为partitionsOne,分区数量 5 副本数量 2
        return new NewTopic("partitionsOne",5, (short) 2);
    }
}
2.1 JAVA手动增加主题分区

因为不知道怎么把yml中的kafka相关配置直接取出来,我这里只能手动创建properties类加入map中使用AdminClient去增加主题的分区以下有几点注意
1.分区只能增大不能减小
2.确定好客户端跟服务器的kafka版本是否支持并确定好版本是否一致

public void createPartition(){
        Properties properties = new Properties();
        //不知道怎么直接把yml中kafka相关配置放过来只能手动加入
        //加入kafka地址的key跟地址值 bootstrap.servers 是写死的 
        properties.put("bootstrap.servers", "192.168.0.0:9092");
        try (AdminClient client = AdminClient.create(properties)) {
            Map newPartitions = new HashMap<>();
            newPartitions.put("topic1", NewPartitions.increaseTo(2));
            CreatePartitionsResult rs = client.createPartitions(newPartitions);
            try {
                rs.all().get();
            } catch (InterruptedException | ExecutionException e) {
                throw new IllegalStateException(e);
            }
        }
        
    }

未完待续…后续会慢慢补充

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/702034.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号