目前 Kafka 提供了一个新的 API 工具 AdminClient。通过SpringBoot集成AdminClient可以进行kafka的管理。实现了查询、新建、删除topic、调整partition、获取lag、配置信息等常用功能。通过这些功能可以为业务人员构建一个简单的kafka管理界面。
pom
org.springframework.kafka spring-kafka2.6.10
application.yml
spring:
kafka:
bootstrap-servers: 192.188.234.12:9092
consumer:
auto-offset-reset: earliest
group-id: test
Util类:
@Component
public class KafkaTools {
private static KafkaTools _this;
@Autowired
private ConsumerFactory consumerFactory;
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private KafkaProperties kafkaProperties;
@PostConstruct
public void init() {
_this = this;
}
private static long[] getDescribe(String topic) {
long[] describe = new long[3];
Consumer consumer = createConsumer();
List partitionInfos = _this.kafkaTemplate.partitionsFor(topic);
List tp = new ArrayList<>();
partitionInfos.forEach(str -> {
TopicPartition topicPartition = new TopicPartition(topic, str.partition());
tp.add(topicPartition);
long logEndOffset = consumer.endOffsets(tp).get(topicPartition);
consumer.assign(tp);
//consumer.position(topicPartition);
long currentOffset = consumer.position(topicPartition);
//System.out.println("logEndOffset : " + logEndOffset + ", currentOffset : "+ currentOffset);
describe[0] += currentOffset;
describe[1] += logEndOffset;
describe[2] = describe[1] - describe[0];
tp.clear();
});
//System.out.println(Arrays.toString(describe));
return describe;
}
private static Consumer createConsumer() {
return _this.consumerFactory.createConsumer();
}
public static Long getLag(String topic) {
return getDescribe(topic)[2];
}
public static KafkaTopic getTopicDetail(String topic) {
KafkaTopic detail = new KafkaTopic();
detail.setName(topic);
List list = new ArrayList<>();
Consumer consumer = createConsumer();
long[] lag = new long[1];
List partitionInfos = _this.kafkaTemplate.partitionsFor(topic);
List tp = new ArrayList<>();
partitionInfos.forEach(str -> {
PartInfo info = new PartInfo();
info.setPartName(str.toString());
TopicPartition topicPartition = new TopicPartition(topic, str.partition());
tp.add(topicPartition);
long logEndOffset = consumer.endOffsets(tp).get(topicPartition);
consumer.assign(tp);
//consumer.position(topicPartition);
long currentOffset = consumer.position(topicPartition);
info.setCurrentOffset(currentOffset);
info.setEndOffset(logEndOffset);
info.setLag(logEndOffset - currentOffset);
lag[0] += (logEndOffset - currentOffset);
list.add(info);
tp.clear();
});
detail.setPartInfos(list);
detail.setPartition(partitionInfos.size());
detail.setLag((int)lag[0]);
return detail;
}
public static boolean createToipc(String topicName, int numPar) {
AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());
if(client !=null) {
try {
Collection newTopics = new ArrayList<>(1);
newTopics.add(new NewTopic(topicName, numPar, (short) 1));
client.createTopics(newTopics);
}
catch (Throwable e) {
e.printStackTrace();
return false;
}
finally {
client.close();
}
}
return true;
}
public static boolean deleteTopic(String topic) {
AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());
// 服务端server.properties需要设置delete.topic.enable=true,才可以使用同步删除,否则只是将主题标记为删除
try {
client.deleteTopics(Arrays.asList(topic));
}
catch (Throwable e) {
e.printStackTrace();
return false;
}
finally {
client.close();
}
return true;
}
public static String listTopics() {
AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());
String r = "";
if (client != null) {
try {
ListTopicsResult result = client.listTopics();
Set topics = result.names().get();
r = topics.toString();
}
catch (Throwable e) {
e.printStackTrace();
}
finally {
client.close();
}
}
return r;
}
public static int getPartition(String topic) {
AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());
int num = 0;
try {
TopicDescription description = client.describeTopics(Arrays.asList(topic)).all().get().get(topic);
//r = description.toString();
num = description.partitions().size();
}
catch (Throwable e) {
e.printStackTrace();
}
finally {
client.close();
}
return num;
}
public static boolean updatePartitions(String topic, Integer numPartitions) {
AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());
NewPartitions newPartitions = NewPartitions.increaseTo(numPartitions);
Map map = new HashMap<>(1, 1);
map.put(topic, newPartitions);
try {
client.createPartitions(map).all().get();
}
catch (Throwable e) {
e.printStackTrace();
return false;
}
finally {
client.close();
}
return true;
}
public static void describeConfig() {
AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test1");
Collection coll = new ArrayList();
coll.add(configResource);
DescribeConfigsResult result = client.describeConfigs(coll);
try {
Map map = result.all().get();
map.forEach((key, value) ->
System.out.println("name: " + key.name() + ", desc: " + value));
}
catch (Throwable e) {
e.printStackTrace();
}
finally {
client.close();
}
}
public static void incrementalAlterConfig() {
// 指定ConfigResource的类型及名称
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "MyTopic");
Collection coll = new ArrayList();
coll.add(configResource);
// 配置项同样以ConfigEntry形式存在,只不过增加了操作类型
// 以及能够支持操作多个配置项,相对来说功能更多、更灵活
Collection configs = new ArrayList();
configs.add(new AlterConfigOp(
new ConfigEntry("preallocate", "false"),
AlterConfigOp.OpType.SET
));
AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());
Map configMaps = new HashMap<>();
configMaps.put(configResource, configs);
AlterConfigsResult result = client.incrementalAlterConfigs(configMaps);
try {
System.out.println(result.all().get());
}
catch (Throwable e) {
e.printStackTrace();
}
finally {
client.close();
}
}
public static void alterConfig() {
// 指定ConfigResource的类型及名称
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test1");
// 配置项以ConfigEntry形式存在
Collection coll = new ArrayList();
coll.add(new ConfigEntry("preallocate", "true"));
Config config = new Config(coll);
AdminClient client = AdminClient.create(_this.kafkaProperties.buildAdminProperties());
Map configMaps = new HashMap<>();
configMaps.put(configResource, config);
AlterConfigsResult result = client.alterConfigs(configMaps);
try {
System.out.println(result.all().get());
}
catch (Throwable e) {
e.printStackTrace();
}
finally {
client.close();
}
}
}
Controller类
@RestController
public class TopicController {
// 获取topic的lag
@GetMapping("/topics/{topic}/lag")
public Result getTopicLag(@PathVariable String topic) {
return Result.succeed(KafkaTools.getLag(topic));
}
// 获取topic 列表
@GetMapping("/topics")
public Result getTopics() {
return Result.succeed(KafkaTools.listTopics());
}
// 获取topic的分区数
@GetMapping("/topics/{topic}/partition")
public Result getTopicPartition(@PathVariable String topic) {
return Result.succeed(KafkaTools.getPartition(topic));
}
// 修改topic分区数
@PutMapping("/topics/{topic}/partition/{num}")
public Result updateTopicPartition(@PathVariable String topic, @PathVariable int num) {
if(KafkaTools.updatePartitions(topic, num))
return Result.succeed("");
else
return Result.failed("");
}
// 新建topic
@PostMapping("/topics")
public Result newTopic(@RequestBody KafkaTopic info) {
if(KafkaTools.createToipc(info.getName(), info.getPartition()))
return Result.succeed("");
else
return Result.failed("");
}
// 删除topic
@DeleteMapping("/topics/{topic}")
public Result deleteTopic(@PathVariable String topic) {
if(KafkaTools.deleteTopic(topic))
return Result.succeed("");
else
return Result.failed("");
}
// 获取topic详情
@GetMapping("/topics/{topic}/details")
public Result getTopicDetail(@PathVariable String topic) {
KafkaTopic rel = KafkaTools.getTopicDetail(topic);
return Result.succeed(rel);
}
}
最后是基本Java Bean:
@Data
public class KafkaTopic {
String name;
int partition;
int lag;
List partInfos;
}
@Data
public class PartInfo {
String partName;
long currentOffset;
long endOffset;
long lag;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Result implements Serializable {
private T datas;
private Integer resp_code;
private String resp_msg;
public boolean isSucceed() {
return resp_code==0 ? true : false;
}
public static Result succeed(String msg) {
return of(null, CodeEnum.SUCCESS.getCode(), msg);
}
public static Result succeed(T model, String msg) {
return of(model, CodeEnum.SUCCESS.getCode(), msg);
}
public static Result succeed(T model) {
return of(model, CodeEnum.SUCCESS.getCode(), "");
}
public static Result of(T datas, Integer code, String msg) {
return new Result<>(datas, code, msg);
}
public static Result failed(String msg) {
return of(null, CodeEnum.ERROR.getCode(), msg);
}
public static Result failed(T model, String msg) {
return of(model, CodeEnum.ERROR.getCode(), msg);
}
}



