这个版本创建topic必须使用zookeeper,一般可以使用如下命令手动创建topic:
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 3 --topic test
那如何使用代码自动创建topic呢,可以使用AdminZkClient,示例如下:
KafkaZkClient zkClient = KafkaZkClient.apply("127.0.0.1:2181", false, 60000, 60000, 10, Time.SYSTEM, "metricGroup", "create topic", scala.Option.apply(null), scala.Option.apply(null));
if (zkClient == null) {
throw new UnsupportedOperationException("KafkaZkClient null");
}
try {
// 创建topic
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.createTopic("topic-name", 3, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
} finally {
zkClient.close();
}
由于AdminZkClient是scala代码,所以使用kafka的依赖必须使用kafka_2.12,依赖引入如下:
kafka 0.11及其以上版本创建topicorg.apache.kafka kafka_2.12
从kafka 0.11版本开始,就不再强制使用zookeeper创建topic,可以直接使用kafka的地址操作topic,并且提供了更简单的操作类AdminClient,使用如下:
Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
// 创建管理者
AdminClient adminClient = AdminClient.create(properties);
try {
// 创建topic前,可以先检查topic是否存在,如果已经存在,则不用再创建了
Set topics = adminClient.listTopics().names().get();
if (topics.contains(topicName)) {
return;
}
// 创建topic
NewTopic newTopic = new NewTopic("topic-name", 3, (short) 1);
CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));
result.all().get();
} finally {
adminClient.close();
}
AdminClient是kafka客户端自带的,所以使用kafka只需要引入kafka-clients就行了,例如:
org.apache.kafka kafka-clients 2.5.1 compile



