1.使用AdminClient进行处理
2.topic 创建、删除等操作代码
pom文件
junit junit4.11 test org.apache.kafka kafka-clients2.0.0
操作代码
package com.vince.xq.kafka;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertTrue;
public class AppTest {
private String BROKER_LIST = "localhost:9092";
@Test
public void shouldAnswerWithTrue() {
assertTrue(true);
}
@Test
public void createTopic() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); //kafka服务地址
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
//创建名称为test1的topic,有5个分区
NewTopic topic = new NewTopic("test01", 5, (short) 1);
CreateTopicsResult createTopicsResult = client.createTopics(Arrays.asList(topic));
client.close();//关闭
}
@Test
public void deleteTopic() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); //kafka服务地址
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
client.deleteTopics(Arrays.asList("test2"));
client.close();//关闭
}
@Test
public void getTopic() throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); //kafka服务地址
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
DescribeTopicsResult describeTopicsResult = client.describeTopics(Arrays.asList("test1"));
Map descriptionMap = describeTopicsResult.all().get();
descriptionMap.forEach((key, value) -> {
System.out.println("name: " + key + " desc: " + value);
});
}
@Test
public void listTopic() throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); //kafka服务地址
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
ListTopicsResult listTopicsResult = client.listTopics();
Set names = listTopicsResult.names().get();
//打印names
names.stream().forEach(System.out::println);
client.close();//关闭
}
@Test
public void producer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer producer = new KafkaProducer<>(props);
//异步发送20条消息
for (int i = 1; i <= 20; i++) {
ProducerRecord record = new ProducerRecord<>("test1", "key" + i, "message" + i);
//aysn 发送
//producer.send(record);
//同步发送
producer.send(record, new Callback() {
@Override
public void onCompletion(Recordmetadata recordmetadata, Exception e) {
if (e == null) {
System.out.println("success:" + recordmetadata.offset());
} else {
e.printStackTrace();
}
}
});
}
producer.close();
}
@Test
public void consumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");//groupid相同的属于同一个消费者组
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//自动提交offset
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer consumer = new KafkaConsumer<>(props);
//消费test1主题
consumer.subscribe(Arrays.asList("test1"));
while (true) {
System.out.println("consumer is polling");
//5秒等待
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord record : records) {
System.out.println(String.format("offset=%d,key=%s,value=%s",
record.offset(), record.key(), record.value()));
}
//同步提交,失败会重试
consumer.commitSync();
//异步提交,失败不会重试
//consumer.commitAsync();
}
}
}



