栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

对于kafka的一些问题

对于kafka的一些问题

  1.kafka再次消费消费过的消息
   出现问题的原因:
     1.强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等
     2.设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费
     3.(常见)消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费
     4.当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题
     5.当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题
  2.手动设置数据重复消费
     1.如果想再次看到重复数据,再次设置一个消费者SimpleConsumer中设置topic01 重新读之前的数据
  3.消费完的数据会删掉吗
    不会删掉,offset会增加,数据在partition中存在
  4.kafka中数据存在哪里
    config/server.config中,log.dir设置的
  3.kafka怎么删除掉过去的信息
   1.修改server.properties#delete.topic.enable=true
   根据topic删除命令./bin/kafka-topics --delete --zookeeper 【zookeeper server:port】 --    topic 【topic name】
   2.logdir删除下文件手动删除(不推荐)
 
private static final String KAFKA_TOPIC = "kafka-topic";

    private static final String KAFKA_HOST = "host";

    private static final String KAFKA_GROUP_ID = "group-id";

    //获取topic列表所有信息 手动设置偏移量
    public void textResetOffset(Long offset){
        //form a properties to new consumer
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_GROUP_ID);
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer consumer = new KafkaConsumer<>(properties);
        //subscribe incoming topic
        consumer.subscribe(Collections.singletonList(KAFKA_TOPIC));
        //get consumer consume partitions
        List partitionInfos = consumer.partitionsFor(KAFKA_TOPIC);
        List topicPartitions = new ArrayList<>();
        for(PartitionInfo partitionInfo : partitionInfos){
            TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            topicPartitions.add(topicPartition);
        }
        // poll data from kafka server to prevent lazy operation
        consumer.poll(Duration.ofSeconds(200L));
        //reset offset from beginning
        consumer.seekToBeginning(topicPartitions);
        //reset designated partition offset by designated spot
        consumer.seek(topicPartitions.get(0), offset);
        //reset offset to end
        consumer.seekToEnd(topicPartitions);
        //consume message as usual
        ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));
        Iterator> iterator = records.iterator();
        while (iterator.hasNext()){
            ConsumerRecord record = iterator.next();
            System.out.println("topic2:"+record.topic());
            System.out.println("value2:"+record.value());
        }
    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745315.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号