编辑 -较新版本的Kafka不需要Zookeeper。请参阅@Neeleshkumar Srinivasan Mannur的API版本1.01.0以上的答案
API 0.11.0+中的过程似乎已大大简化。使用它,可以完成以下操作
import org.apache.kafka.clients.admin.AdminClient;import org.apache.kafka.clients.admin.CreateTopicsResult;import org.apache.kafka.clients.admin.NewTopic;Properties properties = new Properties();properties.load(new FileReader(new File("kafka.properties")));AdminClient adminClient = AdminClient.create(properties);NewTopic newTopic = new NewTopic("topicName", 1, (short)1); //new NewTopic(topicName, numPartitions, replicationFactor)List<NewTopic> newTopics = new ArrayList<NewTopic>();newTopics.add(newTopic);adminClient.createTopics(newTopics);adminClient.close();kafka.properties文件的内容如下
bootstrap.servers=localhost:9092group.id=testenable.auto.commit=trueauto.commit.interval.ms=1000key.deserializer=org.apache.kafka.common.serialization.StringDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializer
请注意,必须关闭AdminClient的实例才能反映新创建的主题。
原始答案
我修好了..经过长时间的研究..
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());通过上面的代码,ZkClient将创建一个主题,但是该主题信息将不了解kafka。所以我们要做的是,我们需要通过以下方式为ZkClient创建对象,
首先导入以下语句,
import kafka.utils.ZKStringSerializer$;
并通过以下方式为ZkClient创建对象,
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());编辑1:(对于@ajkret评论)
由于api已更改,因此上述代码不适用于kafka> 0.9,请使用以下代码适用于kafka> 0.9
import java.util.Properties;import kafka.admin.AdminUtils;import kafka.utils.ZKStringSerializer$;import kafka.utils.ZkUtils;import org.I0Itec.zkclient.ZkClient;import org.I0Itec.zkclient.ZkConnection;public class KafkaTopicCreationInJava{ public static void main(String[] args) throws Exception { ZkClient zkClient = null; ZkUtils zkUtils = null; try { String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181"; int sessionTimeOutInMs = 15 * 1000; // 15 secs int connectionTimeOutInMs = 10 * 1000; // 10 secs zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); String topicName = "testTopic"; int noOfPartitions = 2; int noOfReplication = 3; Properties topicConfiguration = new Properties(); AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration); } catch (Exception ex) { ex.printStackTrace(); } finally { if (zkClient != null) { zkClient.close(); } } }}


