栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka API AdminClient基本使用

Kafka API AdminClient基本使用

目前 Kafka 提供了一个新的 API 工具 AdminClient。通过SpringBoot集成AdminClient可以进行kafka的管理。实现了查询、新建、删除topic、调整partition、获取lag、配置信息等常用功能。通过这些功能可以为业务人员构建一个简单的kafka管理界面。

pom


    org.springframework.kafka
    spring-kafka
    2.6.10

application.yml

spring:
  kafka:
    bootstrap-servers: 192.188.234.12:9092
    consumer:
      auto-offset-reset: earliest
      group-id: test

Util类:

@Component
public class KafkaTools {

    private static KafkaTools _this;

    @Autowired
    private ConsumerFactory consumerFactory;

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Autowired
    private KafkaProperties kafkaProperties;

    @PostConstruct
    public void init() {
        _this = this;
    }

    
    private static long[] getDescribe(String topic) {
        long[] describe = new long[3];
        Consumer consumer = createConsumer();

        List partitionInfos = _this.kafkaTemplate.partitionsFor(topic);
        List tp = new ArrayList<>();
        partitionInfos.forEach(str -> {
            TopicPartition topicPartition = new TopicPartition(topic, str.partition());
            tp.add(topicPartition);
            long logEndOffset = consumer.endOffsets(tp).get(topicPartition);

            consumer.assign(tp);
            //consumer.position(topicPartition);
            long currentOffset = consumer.position(topicPartition);

            //System.out.println("logEndOffset : " + logEndOffset + ", currentOffset : "+ currentOffset);
            describe[0] += currentOffset;
            describe[1] += logEndOffset;
            describe[2] = describe[1] - describe[0];

            tp.clear();
        });

        //System.out.println(Arrays.toString(describe));
        return describe;
    }

    
    private static Consumer createConsumer() {
        return _this.consumerFactory.createConsumer();
    }

    
    public static Long getLag(String topic) {
        return getDescribe(topic)[2];
    }

    public static KafkaTopic getTopicDetail(String topic) {
        KafkaTopic detail = new KafkaTopic();

        detail.setName(topic);

        List list = new ArrayList<>();
        Consumer consumer = createConsumer();
        long[] lag = new long[1];

        List partitionInfos = _this.kafkaTemplate.partitionsFor(topic);
        List tp = new ArrayList<>();
        partitionInfos.forEach(str -> {
            PartInfo info = new PartInfo();

            info.setPartName(str.toString());
            TopicPartition topicPartition = new TopicPartition(topic, str.partition());
            tp.add(topicPartition);
            long logEndOffset = consumer.endOffsets(tp).get(topicPartition);

            consumer.assign(tp);
            //consumer.position(topicPartition);
            long currentOffset = consumer.position(topicPartition);

            info.setCurrentOffset(currentOffset);
            info.setEndOffset(logEndOffset);

            info.setLag(logEndOffset - currentOffset);
            lag[0] += (logEndOffset - currentOffset);
            list.add(info);

            tp.clear();
        });

        detail.setPartInfos(list);
        detail.setPartition(partitionInfos.size());
        detail.setLag((int)lag[0]);

        return detail;
    }

    
    public static boolean createToipc(String topicName, int numPar) {
        AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());

        if(client !=null) {
            try {
                Collection newTopics = new ArrayList<>(1);
                newTopics.add(new NewTopic(topicName, numPar, (short) 1));
                client.createTopics(newTopics);
            }
            catch (Throwable e) {
                e.printStackTrace();
                return false;
            }
            finally {
                client.close();
            }
        }

        return true;
    }

    
    public static boolean deleteTopic(String topic) {
        AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());

        // 服务端server.properties需要设置delete.topic.enable=true,才可以使用同步删除,否则只是将主题标记为删除
        try {
            client.deleteTopics(Arrays.asList(topic));
        }
        catch (Throwable e) {
            e.printStackTrace();
            return false;
        }
        finally {
            client.close();
        }

        return true;
    }

    
    public static String listTopics() {
        AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());
        String r = "";
        if (client != null) {
            try {
                ListTopicsResult result = client.listTopics();
                Set topics = result.names().get();
                r = topics.toString();
            }
            catch (Throwable e) {
                e.printStackTrace();
            }
            finally {
                client.close();
            }
        }

        return r;
    }

    
    public static int getPartition(String topic) {
        AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());
        int num = 0;
        try {
            TopicDescription description = client.describeTopics(Arrays.asList(topic)).all().get().get(topic);
            //r = description.toString();
            num = description.partitions().size();
        }
        catch (Throwable e) {
            e.printStackTrace();
        }
        finally {
            client.close();
        }

        return num;
    }

    
    public static boolean updatePartitions(String topic, Integer numPartitions) {
        AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());

        NewPartitions newPartitions = NewPartitions.increaseTo(numPartitions);
        Map map = new HashMap<>(1, 1);
        map.put(topic, newPartitions);

        try {
            client.createPartitions(map).all().get();
        }
        catch (Throwable e) {
            e.printStackTrace();
            return false;
        }
        finally {
            client.close();
        }

        return true;
    }

    
    public static void describeConfig() {
        AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());

        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test1");
        Collection coll = new ArrayList();
        coll.add(configResource);

        DescribeConfigsResult result = client.describeConfigs(coll);

        try {
            Map map = result.all().get();

            map.forEach((key, value) ->
                    System.out.println("name: " + key.name() + ", desc: " + value));
        }
        catch (Throwable e) {
            e.printStackTrace();
        }
        finally {
            client.close();
        }
    }
    
    
    public static void incrementalAlterConfig() {
        // 指定ConfigResource的类型及名称
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "MyTopic");

        Collection coll = new ArrayList();
        coll.add(configResource);

        // 配置项同样以ConfigEntry形式存在,只不过增加了操作类型
        // 以及能够支持操作多个配置项,相对来说功能更多、更灵活
        Collection configs = new ArrayList();
        configs.add(new AlterConfigOp(
                new ConfigEntry("preallocate", "false"),
                AlterConfigOp.OpType.SET
        ));

        AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());
        Map configMaps = new HashMap<>();
        configMaps.put(configResource, configs);
        AlterConfigsResult result = client.incrementalAlterConfigs(configMaps);

        try {
            System.out.println(result.all().get());
        }
        catch (Throwable e) {
            e.printStackTrace();
        }
        finally {
            client.close();
        }
    }

    
    public static void alterConfig() {
        // 指定ConfigResource的类型及名称
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test1");
        // 配置项以ConfigEntry形式存在
        Collection coll = new ArrayList();
        coll.add(new ConfigEntry("preallocate", "true"));
        Config config = new Config(coll);

        AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());
        Map configMaps = new HashMap<>();
        configMaps.put(configResource, config);
        AlterConfigsResult result = client.alterConfigs(configMaps);

        try {
            System.out.println(result.all().get());
        }
        catch (Throwable e) {
            e.printStackTrace();
        }
        finally {
            client.close();
        }
    }
}

Controller类

@RestController
public class TopicController {

    // 获取topic的lag
    @GetMapping("/topics/{topic}/lag")
    public Result getTopicLag(@PathVariable String topic) {
        return Result.succeed(KafkaTools.getLag(topic));
    }

    // 获取topic 列表
    @GetMapping("/topics")
    public Result getTopics() {
        return Result.succeed(KafkaTools.listTopics());
    }

    // 获取topic的分区数
    @GetMapping("/topics/{topic}/partition")
    public Result getTopicPartition(@PathVariable String topic) {
        return Result.succeed(KafkaTools.getPartition(topic));
    }

    // 修改topic分区数
    @PutMapping("/topics/{topic}/partition/{num}")
    public Result updateTopicPartition(@PathVariable String topic, @PathVariable int num) {
        if(KafkaTools.updatePartitions(topic, num))
            return Result.succeed("");
        else
            return Result.failed("");
    }

    // 新建topic
    @PostMapping("/topics")
    public Result newTopic(@RequestBody KafkaTopic info) {
        if(KafkaTools.createToipc(info.getName(), info.getPartition()))
            return Result.succeed("");
        else
            return Result.failed("");
    }

    // 删除topic
    @DeleteMapping("/topics/{topic}")
    public Result deleteTopic(@PathVariable String topic) {
        if(KafkaTools.deleteTopic(topic))
            return Result.succeed("");
        else
            return Result.failed("");
    }

    // 获取topic详情
    @GetMapping("/topics/{topic}/details")
    public Result getTopicDetail(@PathVariable String topic) {
        KafkaTopic rel = KafkaTools.getTopicDetail(topic);
        return Result.succeed(rel);
    }
}

最后是基本Java Bean:

@Data
public class KafkaTopic {
    String name;
    int partition;
    int lag;
    List partInfos;
}

@Data
public class PartInfo {
    String partName;
    long currentOffset;
    long endOffset;
    long lag;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Result implements Serializable {

    private T datas;
    private Integer resp_code;
    private String resp_msg;

    public boolean isSucceed() {
        return resp_code==0 ? true : false;
    }

    public static  Result succeed(String msg) {
        return of(null, CodeEnum.SUCCESS.getCode(), msg);
    }

    public static  Result succeed(T model, String msg) {
        return of(model, CodeEnum.SUCCESS.getCode(), msg);
    }

    public static  Result succeed(T model) {
        return of(model, CodeEnum.SUCCESS.getCode(), "");
    }

    public static  Result of(T datas, Integer code, String msg) {
        return new Result<>(datas, code, msg);
    }

    public static  Result failed(String msg) {
        return of(null, CodeEnum.ERROR.getCode(), msg);
    }

    public static  Result failed(T model, String msg) {
        return of(model, CodeEnum.ERROR.getCode(), msg);
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/585233.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号