@SpringBootApplication
public class CcApplication {
public static void main(String[] args) {
SpringApplication.run(CcApplication.class, args);
String topic = "ty_analysis";
String groupId = "analysis";
Properties properties = new Properties();
properties.put("bootstrap.servers","172.16.9.10:9092");
//必须指定有业务意义的名字
properties.put("group.id",groupId);
properties.put("enable.auto.commit","true");
properties.put("auto.commit.interval.ms","1000");
//从最早的消息开始读取
properties.put("auto.offset.reset","earliest");
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer(properties);
//订阅主题,可以订阅多个主题,还可以使用正则表达式订阅主题
//注意:多次订阅,会覆盖前面的
consumer.subscribe(Arrays.asList(topic));
try{
while(true){
//1000是超时设定,如果有定时要求,可设置,否则建议设置个比较大的值
//通常consumer拿到足够多的数据,会立即返回,否则会阻塞
//poll返回则认为是成功消费了消息,如果发现消费慢需要分析是poll慢还是本身业务逻辑处理慢
ConsumerRecords records = consumer.poll(1000);
for(ConsumerRecord record : records){
System.out.printf("offset=%d, key=%s,value= %s%n",record.offset(),record.key(),record.value());
}
}
}finally {
consumer.close();
}
}
}
# 主要的pom依赖
org.apache.kafka
kafka-clients
2.6.0
com.fasterxml.jackson.core
jackson-databind
2.9.5