kafka其实没有所谓单节点这一说,即使是一个节点也是一个节点的集群,所以kafka默认就是一个集群的形式。
kafka客户端API类型- AdminClient API:允许管理和检测Topic、broker以及其他kafka对象。类似服务器上命令行对topic等操作。
- Producer API:发布消息到1个或多个Topic
- Consumer API:订阅一个或多个Topic,并处理产生的消息,相对比较复杂的API
- Streams API:高效地将输入流转换到输出流
- Connector API:从一些源系统或应用程序中拉取数据到kafka
我们先新建一个SpringBoot项目,并引入web及kafka的依赖,kafka的依赖可以在maven repository或者kafka官网中获取
org.apache.kafka kafka-clients 2.4.0
常见Admin API
| API | 作用 |
|---|---|
| AdminClient | AdminClient客户端对象 |
| NewTopic | 创建Topic |
| CreateTopicsResult | 创建Topic的返回结果 |
| ListTopicsResult | 查询Topic列表 |
| ListTopicsOptions | 查询Topoic列表及选项 |
| DescribeTopicsResult | 查询Topics |
| DescribeConfigsResult | 查询Topics配置项 |
- AdminClient:
public class AdminSample {
public static void main(String[] args) {
AdminClient adminClient = getAdminClient();
System.out.println("adminClient: " + adminClient);
}
private static AdminClient getAdminClient() {
// 启动AdminClient时需要的配置信息
Properties properties = new Properties();
// 具体需要配置什么我们可以点进AdminClientConfig中查看,目前只为了启动可以只配置bootstrap_server
// 可以新建一个类专门用于存放kafka的配置信息,也可直接写入
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfig.BOOTSTRAP_SERVER);
// properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, xxx.xxx.xxx.xxx:9092);
// 最后返回.create(Properties properties),将写好的配置放进去即可
return AdminClient.create(properties);
}
}
- NewTopic:
private static void createNewTopic() throws Exception {
// 先获取到我们的adminClient
AdminClient adminClient = getAdminClient();
// 实例化我们所需要创建的topic,点进源码我们可看到该构造函数需要传入三个值:
// name:待创建的topic的名称
// numPartitions:新topic的分区数
// replicationFactor:副本因子
NewTopic newTopic = new NewTopic(KafkaConfig.TOPIC_NAME, KafkaConfig.NUM_PARTITIONS, KafkaConfig.REPLICATION_FACTOR);
// 调用adminClient的createTopics方法,传入待创建的topic的集合即可
// 为此操作设置请求超时(以毫秒为单位),如果应使用AdminClient的默认请求超时,则设置为null
CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic), new CreateTopicsOptions().timeoutMs(10000));
// all():如果所有topic都创建成功,则返回一个成功的future。 get():如有必要,等待此future完成,然后返回其结果
topics.all().get();
System.out.println("新的topic已创建成功");
}
点进源码我们能看到,NewTopic的构造函数中,有这么几个参数:
public NewTopic(String name, int numPartitions, short replicationFactor) {
this(name, Optional.of(numPartitions), Optional.of(replicationFactor));
}
在官网上也有对于这几个参数的文档说明,是这么描述的:
- ListTopicsResult
private static void showTopicsList() throws Exception {
AdminClient adminClient = getAdminClient();
// listInternal:是否为内部topic,默认为false
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
// ListTopicsResult listTopicsResult = adminClient.listTopics(listTopicsOptions);
// 无参的listTopics查看的是listInternal = false的topic
ListTopicsResult listTopicsResult = adminClient.listTopics();
// 获取listTopic的name及listInternal信息
Collection topicListings = listTopicsResult.listings().get();
// 打印该信息列表
topicListings.stream().forEach((topicList -> {
System.out.println(topicList);
}));
// 获取topic的名字列表
// Set topicNames = listTopicsResult.names().get();
// topicNames.stream().forEach(System.out::println);
}
- deleteTopics
public static void delTopic() throws Exception {
AdminClient adminClient = getAdminClient();
// 该结果也是返回一个Future
DeleteTopicsResult delRes = adminClient.deleteTopics(Arrays.asList(KafkaConfig.TOPIC_NAME));
delRes.all().get();
}
- DescribeTopicsResult
public static void desTopic() throws Exception {
AdminClient adminClient = getAdminClient();
// 调用describeTopics方法,将我们要查看的topic名称列表放入即可
DescribeTopicsResult desRes = adminClient.describeTopics(Arrays.asList(KafkaConfig.TOPIC_NAME));
// 获取回的返回值为 topic名-topic描述 的键值对
Map descriptionMap = desRes.all().get();
// TopicDescription topicDescription = descriptionMap.get(KafkaConfig.TOPIC_NAME);
Set> entries = descriptionMap.entrySet();
entries.stream().forEach(entry -> {
System.out.println("topic名:" + entry.getKey() + ",topic描述:" + entry.getValue());
});
}
在返回的TopicDescription中,有四个参数:
name String : topic名称 internal boolean : 是否为内部topic partitions List: 一个分区列表,其中索引表示分区id,元素包含该分区的leader和副本信息。 authorizedOperations Set : 此topic的授权操作,如果未知,则为null
- DescribConfigsResult
public static void descConfig() throws Exception {
AdminClient adminClient = getAdminClient();
// 表示具有配置的资源的类 (资源类的类型, 资源名称)
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, KafkaConfig.TOPIC_NAME);
DescribeConfigsResult describeConfigs = adminClient.describeConfigs(Arrays.asList(configResource));
Map configMap = describeConfigs.all().get();
configMap.entrySet().stream().forEach((entry) -> {
System.out.println(entry.getKey() + ":" + entry.getValue());
});
}
- alterConfigs
public static void altConfig() throws Exception {
AdminClient adminClient = getAdminClient();
// 方式一:比较老的方式,通过传Map修改,键:ConfigResource,值:Config
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, KafkaConfig.TOPIC_NAME);
// // ConfigEntry中即为要修改的配置名及值
// Config config = new Config(Arrays.asList(new ConfigEntry("preallocate", "true")));
// Map mapForAlt = new HashMap<>();
// mapForAlt.put(configResource, config);
// // 将该map放入即可
// AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(mapForAlt);
// alterConfigsResult.all().get();
// 方式二:用kafka推荐的方式
Map> mapForAlt = new HashMap<>();
// 其实都差不多,只是需要再添加一个操作的类型,要修改就选SET即可
AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("preallocate", "false"), AlterConfigOp.OpType.SET);
mapForAlt.put(configResource, Arrays.asList(alterConfigOp));
AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(mapForAlt);
alterConfigsResult.all().get();
}



