栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

如何通过Java在Kafka中创建主题

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

如何通过Java在Kafka中创建主题

编辑 -较新版本的Kafka不需要Zookeeper。请参阅@Neeleshkumar Srinivasan Mannur的API版本1.01.0以上的答案

API 0.11.0+中的过程似乎已大大简化。使用它,可以完成以下操作

import org.apache.kafka.clients.admin.AdminClient;import org.apache.kafka.clients.admin.CreateTopicsResult;import org.apache.kafka.clients.admin.NewTopic;Properties properties = new Properties();properties.load(new FileReader(new File("kafka.properties")));AdminClient adminClient = AdminClient.create(properties);NewTopic newTopic = new NewTopic("topicName", 1, (short)1); //new NewTopic(topicName, numPartitions, replicationFactor)List<NewTopic> newTopics = new ArrayList<NewTopic>();newTopics.add(newTopic);adminClient.createTopics(newTopics);adminClient.close();

kafka.properties文件的内容如下

bootstrap.servers=localhost:9092group.id=testenable.auto.commit=trueauto.commit.interval.ms=1000key.deserializer=org.apache.kafka.common.serialization.StringDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializer

请注意,必须关闭AdminClient的实例才能反映新创建的主题。



原始答案

我修好了..经过长时间的研究..

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

通过上面的代码,ZkClient将创建一个主题,但是该主题信息将不了解kafka。所以我们要做的是,我们需要通过以下方式为ZkClient创建对象,

首先导入以下语句,

import kafka.utils.ZKStringSerializer$;

并通过以下方式为ZkClient创建对象,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

编辑1:(对于@ajkret评论)

由于api已更改,因此上述代码不适用于kafka> 0.9,请使用以下代码适用于kafka> 0.9


import java.util.Properties;import kafka.admin.AdminUtils;import kafka.utils.ZKStringSerializer$;import kafka.utils.ZkUtils;import org.I0Itec.zkclient.ZkClient;import org.I0Itec.zkclient.ZkConnection;public class KafkaTopicCreationInJava{    public static void main(String[] args) throws Exception {        ZkClient zkClient = null;        ZkUtils zkUtils = null;        try { String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181"; int sessionTimeOutInMs = 15 * 1000; // 15 secs int connectionTimeOutInMs = 10 * 1000; // 10 secs zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); String topicName = "testTopic"; int noOfPartitions = 2; int noOfReplication = 3; Properties topicConfiguration = new Properties(); AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);        } catch (Exception ex) { ex.printStackTrace();        } finally { if (zkClient != null) {     zkClient.close(); }        }    }}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/413872.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号