前言:
这是一个使用Flink实时读取kafka数据,并将单词个数统计实时输出到控制台的demo.
依赖:
org.apache.flink flink-java 1.14.0 org.apache.flink flink-connector-kafka_2.12 1.14.0 org.apache.flink flink-streaming-java_2.12 1.14.0 provided org.apache.flink flink-clients_2.12 1.14.0 org.apache.kafka kafka-clients 2.8.0
导包:
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.FlatMapIterator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Arrays; import java.util.Iterator; import java.util.Properties;
java代码:
public class WordCounInTime {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
//设置监控数据流时间间隔(官方叫状态与检查点)
see.enableCheckpointing(5000);
Properties properties = new Properties();
//kafka默认端口号9092
properties.setProperty("bootstrap.servers","localhost:9092");
//zookeeper默认端口2181
properties.setProperty("zookeeper.connect", "localhost:2181");
//给消费者分组,同一组的消费者不会消费相同的消息
properties.setProperty("group.id","test");
//反序列化
// properties.setProperty("key.deserializer","org.apache.kafka .common.serialization.StringDeserializer");
// properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("kafka-test",new SimpleStringSchema(), properties);
//指定偏移量,这里指定从最后一个之后再读取。
consumer.setStartFromLatest();
DataStreamSource data = see.addSource(consumer);
SingleOutputStreamOperator> map = data.flatMap(new FlatMapIterator() {
@Override
public Iterator flatMap(String s) throws Exception {
String replace = s.replace(",", "").replace(".", "").replace("?", "");
return Arrays.asList(replace.split(" ")).iterator();
}
}).map(new MapFunction>() {
@Override
public Tuple2 map(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
map.keyBy(0).sum(1).print();
see.execute();
}
}



