目录
一、环境准备
二、配置更改
三、启动kafka
四、使用springboot项目测试
一、环境准备
kafka-xx-yy
xx 是scala版本,yy是kafka版本(scala是基于jdk开发,需要安装jdk环境)
下载地址:http://kafka.apache.org/downloads
伪集群搭建,3个节点同个机器端口区分 9092/9093/9094
二、配置更改
kafka1配置:
解压缩: tar -zxvf kafka_2.13-2.8.0.tgz 重命名: mv kafka_2.13-2.8.0 kafka1 进入config目录下 server.properties #内网中使用,内网部署kafka集群只需要用到listeners,内外网需要作区分时才需要用到advertised.listeners port=9092 listeners=PLAINTEXT://172.18.123.xxxxx:9092 advertised.listeners=PLAINTEXT://112.74.55.xxxxx:9092 #每个节点编号1、2、3 broker.id=1 #端口 port=9092 #配置3个 log.dirs=/tmp/kafka-logs-1 #zk地址 zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
kafka1的server.properties完成后,将节点复制出来2个。
kafka2配置更改
broker.id=2 listeners=PLAINTEXT://172.30.xxxx.xxxx:9093(阿里云内网地址) advertised.listeners=PLAINTEXT://112.74.xxxx.xxxx:9093(阿里云外网地址) port=9093 log.dirs=/tmp/kafka-logs-2
kafka3配置更改
broker.id=3 listeners=PLAINTEXT://172.30.xxxx.xxxx:9094(阿里云内网地址) advertised.listeners=PLAINTEXT://112.74.xxxx.xxxx:9094(阿里云外网地址) port=9094 log.dirs=/tmp/kafka-logs-3
三、启动kafka
进入bin后
守护进行启动:
./kafka-server-start.sh -daemon ../config/server.properties &
直接启动:
./kafka-server-start.sh ../config/server.properties &
创建topic:
./kafka-topics.sh --create --zookeeper 112.74.97.xxxx:2181,112.74.97.xxx:2182,112.74.97.xxx:2183 --replication-factor 3 --partitions 6 --topic wnn-cluster-topic
四、使用springboot项目测试
SpringBoot项目测试 连接zookeeper集群
private static final String TOPIC_NAME = "wnn-topic-test-12.27-cluster";
public static AdminClient initAdminClient(){
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"112.74.xxx.x40:2181,112.74.xxxx.x40:2182,112.74.xxx.x40:2183");
AdminClient adminClient = AdminClient.create(properties);
return adminClient;
}
创建topic
@Test
public void createTopicTest(){
AdminClient adminClient = initAdminClient();
//指定分区数量,副本数量
NewTopic newTopic = new NewTopic(TOPIC_NAME,6,(short) 3);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
try {
//future等待创建,成功则不会有任何报错
createTopicsResult.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
列举topic
@Test
public void listTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
//是否查看内部的topic,可以不用
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
ListTopicsResult listTopicsResult = adminClient.listTopics(options);
Set topics = listTopicsResult.names().get();
for(String name : topics){
System.err.println(name);
}
}
查看topic详情
@Test
public void detailTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
Map stringTopicDescriptionMap = describeTopicsResult.all().get();
Set> entries = stringTopicDescriptionMap.entrySet();
entries.stream().forEach((entry)-> System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue()));
}
控制台打印的topic详细信息:
name :wnn-topic-test-12.27-cluster-1 , desc: (name=wnn-topic-test-12.27-cluster-1, internal=false, partitions= (partition=0, leader=xxx.xx.xx.240:9092 (id: 1 rack: null), replicas=xxx.xx.xx.240:9092 (id: 1 rack: null), xxx.xx.xx.240:9093 (id: 2 rack: null), xxx.xx.xx.240:9094 (id: 3 rack: null), isr=xxx.xx.xx.240:9092 (id: 1 rack: null), xxx.xx.xx.240:9093 (id: 2 rack: null), xxx.xx.xx.240:9094 (id: 3 rack: null)), (partition=1, leader=xxx.xx.xx.240:9093 (id: 2 rack: null), replicas=xxx.xx.xx.240:9093 (id: 2 rack: null), xxx.xx.xx.240:9094 (id: 3 rack: null), xxx.xx.xx.240:9092 (id: 1 rack: null) , isr=xxx.xx.xx.240:9093 (id: 2 rack: null), xxx.xx.xx.240:9094 (id: 3 rack: null), xxx.xx.xx.240:9092 (id: 1 rack: null)),4 (partition=2, leader=xxx.xx.xx.240:9094 (id: 3 rack: null), replicas=xxx.xx.xx.240:9094 (id: 3 rack: null), xxx.xx.xx.240:9092 (id: 1 rack: null), xxx.xx.xx.240:9093 (id: 2 rack: null), isr=xxx.xx.xx.240:9094 (id: 3 rack: null), xxx.xx.xx.240:9092 (id: 1 rack: null), xxx.xx.xx.240:9093 (id: 2 rack: null)), (partition=3, leader=xxx.xx.xx.240:9092 (id: 1 rack: null), replicas=xxx.xx.xx.240:9092 (id: 1 rack: null), xxx.xx.xx.240:9094 (id: 3 rack: null), xxx.xx.xx.240:9093 (id: 2 rack: null),4 isr=xxx.xx.xx.240:9092 (id: 1 rack: null), xxx.xx.xx.240:9094 (id: 3 rack: null), xxx.xx.xx.240:9093 (id: 2 rack: null)), (partition=4, leader=xxx.xx.xx.240:9093 (id: 2 rack: null), replicas=xxx.xx.xx.240:9093 (id: 2 rack: null), xxx.xx.xx.240:9092 (id: 1 rack: null), xxx.xx.xx.240:9094 (id: 3 rack: null) , isr=xxx.xx.xx.240:9093 (id: 2 rack: null), xxx.xx.xx.240:9092 (id: 1 rack: null), xxx.xx.xx.240:9094 (id: 3 rack: null)), (partition=5, leader=xxx.xx.xx.240:9094 (id: 3 rack: null), replicas=xxx.xx.xx.240:9094 (id: 3 rack: null), xxx.xx.xx.240:9093 (id: 2 rack: null), xxx.xx.xx.240:9092 (id: 1 rack: null), isr=xxx.xx.xx.240:9094 (id: 3 rack: null), xxx.xx.xx.240:9093 (id: 2 rack: null), xxx.xx.xx.240:9092 (id: 1 rack: null)), authorizedOperations=null)
发送消息
@Test
public void testSendWithCallback(){
Properties properties = getProperties();
Producer producer = new KafkaProducer<>(properties);
for(int i=0;i<9 ;i++) {
producer.send(new ProducerRecord<>(TOPIC_NAME, "wnn-key" + i, "wnn-content-value" + i), new Callback() {
@Override
public void onCompletion(Recordmetadata metadata, Exception exception) {
if(exception == null){
System.err.println("发送状态:"+metadata.toString());
} else {
exception.printStackTrace();
}
}
});
}
producer.close();
}
@Override
public String toString() {
return topicPartition.toString() + "@" + offset;
}
分区号@offset。



