栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka客户端操作之Admin API

kafka客户端操作之Admin API

背景:我使用docker-compose 搭建的kafka服务
kafka的简单介绍以及docker-compose部署单主机Kafka集群

Kafka API简单介绍

kafka除了用于管理和管理任务的命令行工具,Kafka还有5个用于Java和Scala的核心API

他们分别是

The Admin API : 用于管理和inspect topics, brokers和其他 Kafka 对象The Producer API: 将事件流发布(写入)到一个或多个 Kafka topicsThe Consumer API: 订阅(读取)一个或多个topics并处理它们生成的事件流The Kafka Streams API: 用于实现流处理应用程序和微服务,它提供了更高级别的方法来处理事件流,包括转换、聚合和连接等有状态操作、窗口化、基于事件时间的处理等等。从一个或多个topics读取输入以生成一个或多个topics的输出,有效地将输入流转换为输出流。The Kafka Connect API:用于构建和运行可重用 的数据导入/导出connectors,这些connectors从外部系统和应用程序消费(读取)或产生(写入)事件流,以便它们可以与 Kafka 集成。例如,与 PostgreSQL 等关系数据库的连接器可能会捕获表的每次更改。但是,在实践中,您通常不需要实现自己的connectors,因为 Kafka 社区已经提供了数百个即用型connectors。

我使用的wurstmeister/kafka镜像的kafka是2.8.1版本的,通过docker inspect命令可以查看

新建项目kafkademo用以测试

我使用IDEA 的Spring Initializr创建项目,引入web依赖。
我使用的spring boot 版本是2.6.3,kafka-clients 是2.8.1,和kafka server是对应的

pom.xml文件



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.6.3
         
    
    com.xt
    kafkademo
    0.0.1-SNAPSHOT
    kafkademo
    kafkademo
    
        1.8
    
    
        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
            org.apache.kafka
            kafka-clients
            2.8.1
        
    
    
    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    


创建连接客户端

首先设置一个连接kafka的客户端类AdminClient,他是一个抽象类,只有一个KafkaAdminClient实现,如下代码所示,他提供了两种创建方式

//Admin的默认实现。 通过调用AdminClient中的create()方法之一来创建此类的实例。 用户不应直接引用此类。
//这个类是线程安全的。
public abstract class AdminClient implements Admin {

    
    public static AdminClient create(Properties props) {
        return (AdminClient) Admin.create(props);
    }

    
    public static AdminClient create(Map conf) {
        return (AdminClient) Admin.create(conf);
    }
}

通过使用Properties来创建连接kafka server的客户端

//设置AdminClient
public static AdminClient adminClient(){
    //Properties类表示一组持久的属性。 Properties可以保存到流中或从流中加载。属性列表中的每个键及其对应的值都是一个字符串。继承自Hashtable
    Properties properties = new Properties();
    //AdminClientConfig是AdminClient配置类,它还包含用于配置条目名称的常量。
    properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka服务器IP:9092");
    properties.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 10000);
    properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
    //内置管理客户端的基类 ,是一个抽象类,下面有一个实现 为KafkaAdminClient,此类是保证了线程安全
    AdminClient adminClient = AdminClient.create(properties);
    return adminClient;
}
创建topic
 //创建Topic实例
public static void createTopic() {

    AdminClient adminClient = adminClient();
    // 副本因子
    Short rs = 1;
    //创建具有指定副本因子和分区数的新topic。
    NewTopic newTopic = new NewTopic(TOPIC_NAME, 1 , rs);
    //创建一批新主题。
    //此操作不是事务性的,因此它可能对某些主题成功,而对另一些主题则失败。
    //CreateTopicsResult返回成功后,所有代理可能需要几秒钟才能意识到主题已创建。在此期间, listTopics()和describeTopics(Collection)可能不会返回有关新主题的信息。
    CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
    System.out.println("创建topic成功 : "+ topics.toString());
    System.out.println("---------------------------------------------------------------");

}
获取Topic列表
//获取Topic列表
public static void topicLists() throws Exception {
    AdminClient adminClient = adminClient();
    // 是否查看internal选项
    ListTopicsOptions options = new ListTopicsOptions();
    //设置我们是否应该列出内部topic。
    options.listInternal(true);

    //列出集群中可用的topic。
    ListTopicsResult listTopicsResult = adminClient.listTopics(options);
    //返回一个topic名称集合的future(这里是KafkaFuture)
    Set names = listTopicsResult.names().get();
    //返回一个KafkaFuture,它产生一个 TopicListing 对象的集合
    Collection topicListings = listTopicsResult.listings().get();
    //返回一个KafkaFuture,它产生一个topic名称到 TopicListing 对象的映射。
    KafkaFuture> mapKafkaFuture = listTopicsResult.namesToListings();
    // 打印names
    names.stream().forEach(System.out::println);
    System.out.println("---------------------------topic列表-------------------------");
    // 打印topicListings
    topicListings.stream().forEach((topicList)->{
        System.out.println(topicList);
    });
    System.out.println("---------------------------topic列表-------------------------");
}
获取topic的描述信息
// 获取描述topic的信息
public static void describeTopics() throws Exception {
    AdminClient adminClient = adminClient();
    //描述集群中的一些topic。
    DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));

    Map stringTopicDescriptionMap = describeTopicsResult.all().get();

    Set> entries = stringTopicDescriptionMap.entrySet();
    System.out.println("----------------------------topic信息-----------------------------");
    entries.stream().forEach((entry)->{
        System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue());
    });
    System.out.println("----------------------------topic信息-----------------------------");
}
修改配置信息
//修改Config信息
public static void alterConfig() throws Exception{
    AdminClient adminClient = adminClient();

    Map> configMaps = new HashMap<>();
    //具有配置的资源的类,需要提供type和名称  Type是他内部维护的枚举类,共有四种类型:BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0)
    ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
    //包含名称、值和操作类型的更改配置条目的类  ,需要注入ConfigEntry,和操作类型,同样OpType是个枚举类
    AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("preallocate","false"),AlterConfigOp.OpType.SET);
    configMaps.put(configResource,Arrays.asList(alterConfigOp));
    //逐步更新指定资源的配置
    AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps);
    alterConfigsResult.all().get();
}
获取配置的描述信息
//获取描述配置的信息
public static void describeConfig() throws Exception{
    AdminClient adminClient = adminClient();

    ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
    //获取指定资源的配置
    DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
    Map configResourceConfigMap = describeConfigsResult.all().get();
    System.out.println("----------------------------配置信息-----------------------------");
    configResourceConfigMap.entrySet().stream().forEach((entry)->{
        System.out.println("configResource : "+entry.getKey()+" , Config : "+entry.getValue());
    });
    System.out.println("----------------------------配置信息-----------------------------");
}
增加topic的partition数量

topic和partition的概念如果不是很清楚的话,可以去这篇博客的指定章节查看
kafka的简单介绍以及docker-compose部署单主机Kafka集群

//增加partition数量
public static void incrPartitions(int partitions) throws Exception{
   AdminClient adminClient = adminClient();
   Map partitionsMap = new HashMap<>();

   NewPartitions newPartitions = NewPartitions.increaseTo(partitions);
   partitionsMap.put(TOPIC_NAME, newPartitions);
   CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
   createPartitionsResult.all().get();
}
删除Topic
//删除Topic
public static void delTopics() throws Exception {
   AdminClient adminClient = adminClient();
   //删除一批topic。
   //此操作不是事务性的,因此它可能对某些主题成功,而对另一些主题则失败。
   //DeleteTopicsResult返回成功后,所有代理可能需要几秒钟才能意识到主题已消失。 在此期间, listTopics()和describeTopics(Collection)可能会继续返回有关已删除主题的信息。
   DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
   deleteTopicsResult.all().get();
}
完整代码
public class AdminSample {

    public final static String TOPIC_NAME="xt";

    //设置AdminClient
    public static AdminClient adminClient(){
        //Properties类表示一组持久的属性。 Properties可以保存到流中或从流中加载。属性列表中的每个键及其对应的值都是一个字符串。继承自Hashtable
        Properties properties = new Properties();
        //AdminClientConfig是AdminClient配置类,它还包含用于配置条目名称的常量。
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"81.68.82.48:9092");
        properties.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 10000);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
        //内置管理客户端的基类 ,是一个抽象类,下面有一个实现 为KafkaAdminClient,此类是保证了线程安全
        AdminClient adminClient = AdminClient.create(properties);
        return adminClient;
    }


    public static void main(String[] args) throws Exception {
        AdminClient adminClient = AdminSample.adminClient();

        System.out.println("adminClient : "+ adminClient);
        // 创建Topic实例
        createTopic();
        // 获取Topic列表
        topicLists();
        // 描述Topic
        describeTopics();
        // 修改Config
        alterConfig();
        // 查询Config
        describeConfig();
        // 增加partition数量
        incrPartitions(2);
        // 删除Topic实例
        delTopics();
    }


     //创建Topic实例
    public static void createTopic() {

        AdminClient adminClient = adminClient();
        // 副本因子
        Short rs = 1;
        //创建具有指定副本因子和分区数的新topic。
        NewTopic newTopic = new NewTopic(TOPIC_NAME, 1 , rs);
        //创建一批新主题。
        //此操作不是事务性的,因此它可能对某些主题成功,而对另一些主题则失败。
        //CreateTopicsResult返回成功后,所有代理可能需要几秒钟才能意识到主题已创建。在此期间, listTopics()和describeTopics(Collection)可能不会返回有关新主题的信息。
        CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
        System.out.println("创建topic成功 : "+ topics.toString());
        System.out.println("---------------------------------------------------------------");

    }


    //获取Topic列表
    public static void topicLists() throws Exception {
        AdminClient adminClient = adminClient();
        // 是否查看internal选项
        ListTopicsOptions options = new ListTopicsOptions();
        //设置我们是否应该列出内部topic。
        options.listInternal(true);

        //列出集群中可用的topic。
        ListTopicsResult listTopicsResult = adminClient.listTopics(options);
        //返回一个topic名称集合的future(这里是KafkaFuture)
        Set names = listTopicsResult.names().get();
        //返回一个KafkaFuture,它产生一个 TopicListing 对象的集合
        Collection topicListings = listTopicsResult.listings().get();
        //返回一个KafkaFuture,它产生一个topic名称到 TopicListing 对象的映射。
        KafkaFuture> mapKafkaFuture = listTopicsResult.namesToListings();
        // 打印names
        names.stream().forEach(System.out::println);
        System.out.println("---------------------------topic列表-------------------------");
        // 打印topicListings
        topicListings.stream().forEach((topicList)->{
            System.out.println(topicList);
        });
        System.out.println("---------------------------topic列表-------------------------");
    }

    // 获取描述topic的信息
    public static void describeTopics() throws Exception {
        AdminClient adminClient = adminClient();
        //描述集群中的一些topic。
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));

        Map stringTopicDescriptionMap = describeTopicsResult.all().get();

        Set> entries = stringTopicDescriptionMap.entrySet();
        System.out.println("----------------------------topic信息-----------------------------");
        entries.stream().forEach((entry)->{
            System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue());
        });
        System.out.println("----------------------------topic信息-----------------------------");
    }

    //修改Config信息
    public static void alterConfig() throws Exception{
        AdminClient adminClient = adminClient();

        Map> configMaps = new HashMap<>();
        //具有配置的资源的类,需要提供type和名称  Type是他内部维护的枚举类,共有四种类型:BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0)
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        //包含名称、值和操作类型的更改配置条目的类  ,需要注入ConfigEntry,和操作类型,同样OpType是个枚举类
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("preallocate","false"),AlterConfigOp.OpType.SET);
        configMaps.put(configResource,Arrays.asList(alterConfigOp));
        //逐步更新指定资源的配置
        AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps);
        alterConfigsResult.all().get();
    }

    //获取描述配置的信息
    public static void describeConfig() throws Exception{
        AdminClient adminClient = adminClient();

        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        //获取指定资源的配置
        DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
        Map configResourceConfigMap = describeConfigsResult.all().get();
        System.out.println("----------------------------配置信息-----------------------------");
        configResourceConfigMap.entrySet().stream().forEach((entry)->{
            System.out.println("configResource : "+entry.getKey()+" , Config : "+entry.getValue());
        });
        System.out.println("----------------------------配置信息-----------------------------");
    }


    //增加partition数量
    public static void incrPartitions(int partitions) throws Exception{
        AdminClient adminClient = adminClient();
        Map partitionsMap = new HashMap<>();

        NewPartitions newPartitions = NewPartitions.increaseTo(partitions);
        partitionsMap.put(TOPIC_NAME, newPartitions);
        CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
        createPartitionsResult.all().get();
        
    }


    //删除Topic
    public static void delTopics() throws Exception {
        AdminClient adminClient = adminClient();
        //删除一批topic。
        //此操作不是事务性的,因此它可能对某些主题成功,而对另一些主题则失败。
        //DeleteTopicsResult返回成功后,所有代理可能需要几秒钟才能意识到主题已消失。 在此期间, listTopics()和describeTopics(Collection)可能会继续返回有关已删除主题的信息。
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
        deleteTopicsResult.all().get();
    }


}

References:

https://kafka.apache.org/introhttps://coding.imooc.com/class/434.html

(写博客主要是对自己学习的归纳整理,资料大部分来源于书籍、网络资料和自己的实践,整理不易,但是难免有不足之处,如有错误,请大家评论区批评指正。同时感谢广大博主和广大作者辛苦整理出来的资源和分享的知识。)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/722619.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号