栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

kafka topic 管理api

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

kafka topic 管理api

1.使用AdminClient进行处理 

 2.topic 创建、删除等操作代码

pom文件


      junit
      junit
      4.11
      test
    

    
      org.apache.kafka
      kafka-clients
      2.0.0
    

操作代码

package com.vince.xq.kafka;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertTrue;


public class AppTest {

    private String BROKER_LIST = "localhost:9092";

    
    @Test
    public void shouldAnswerWithTrue() {
        assertTrue(true);
    }

    @Test
    public void createTopic() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");  //kafka服务地址
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());

        AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
        //创建名称为test1的topic,有5个分区
        NewTopic topic = new NewTopic("test01", 5, (short) 1);
        CreateTopicsResult createTopicsResult = client.createTopics(Arrays.asList(topic));
        client.close();//关闭
    }

    @Test
    public void deleteTopic() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");  //kafka服务地址
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());


        AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
        client.deleteTopics(Arrays.asList("test2"));
        client.close();//关闭
    }

    @Test
    public void getTopic() throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");  //kafka服务地址
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());


        AdminClient client = KafkaAdminClient.create(props);//创建操作客户端

        DescribeTopicsResult describeTopicsResult = client.describeTopics(Arrays.asList("test1"));
        Map descriptionMap = describeTopicsResult.all().get();
        descriptionMap.forEach((key, value) -> {
            System.out.println("name: " + key + " desc: " + value);
        });
    }


    @Test
    public void listTopic() throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");  //kafka服务地址
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());

        AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
        ListTopicsResult listTopicsResult = client.listTopics();
        Set names = listTopicsResult.names().get();
        //打印names
        names.stream().forEach(System.out::println);
        client.close();//关闭
    }


    @Test
    public void producer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer producer = new KafkaProducer<>(props);

        //异步发送20条消息
        for (int i = 1; i <= 20; i++) {
            ProducerRecord record = new ProducerRecord<>("test1", "key" + i, "message" + i);
            //aysn 发送
            //producer.send(record);

            //同步发送
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(Recordmetadata recordmetadata, Exception e) {
                    if (e == null) {
                        System.out.println("success:" + recordmetadata.offset());
                    } else {
                        e.printStackTrace();
                    }
                }
            });

        }

        producer.close();
    }

    @Test
    public void consumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");//groupid相同的属于同一个消费者组
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//自动提交offset
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer consumer = new KafkaConsumer<>(props);
        //消费test1主题
        consumer.subscribe(Arrays.asList("test1"));
        while (true) {
            System.out.println("consumer is polling");
            //5秒等待
            ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord record : records) {
                System.out.println(String.format("offset=%d,key=%s,value=%s",
                        record.offset(), record.key(), record.value()));
            }
            //同步提交,失败会重试
            consumer.commitSync();
            //异步提交,失败不会重试
            //consumer.commitAsync();
        }
    }


}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/270810.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号