栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafak消费者从头开始消费(消费者组)

kafak消费者从头开始消费(消费者组)

【README】

本文主要用于描述 kafka 消费者如何从头开始消费;


【1】从头开始消费

1)从头开始消费,需要满足两个条件, 如下:

  • 条件1, 使用一个全新的消费者组id;
  • 条件2,指定 auto.offset.reset 为 earliest ;

2)代码如下:

public static void main(String[] args) {
		
		Properties props = new Properties();
		
		
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
		
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
		
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan2"); // group.id 
		 
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest 
		
		
		KafkaConsumer consumer = new KafkaConsumer<>(props); 
		
		consumer.subscribe(Arrays.asList("third", "second"));
		 
		int i =0;
		while(true) {
			if (i++ > 10) break; // 只消费10条数据 
			
			ConsumerRecords consumerRds  = consumer.poll(100);
			
			
			for(ConsumerRecord rd : consumerRds) {
				System.out.println("[消费者] " + rd.key() + "--" + rd.value()); 
			}
		} 
		 
		consumer.close(); 
	}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/460621.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号