本篇博客主要是写了如何从不同的数据源里读取数据,包括Flink从Java集合中读取数据、从本地文件当中读取数据、从Socket读取数据、从Kafka读取数据、从自定义Source获取数据。
目录
- 1、从Java集合中读取数据
- 2、从本地文件当中读取数据
- 3、从Socket读取数据
- 写法1
- 写法2
- 4、从Kafka读取数据
- 新版写法
- 老版写法
- 5、从自定义Source获取数据
1、从Java集合中读取数据
一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的。
package com.lqs.five.part1_source;
import com.lqs.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;
public class Test01_SourceList {
public static void main(String[] args) throws Exception {
//TODO 1、获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//TODO 2、从集合中获取数据
List waterSensors = Arrays.asList(
new WaterSensor("1", 134L, 34),
new WaterSensor("2", 1434L, 55),
new WaterSensor("4", 1367L, 354)
);
env.fromCollection(waterSensors).print();
//从元素中获取数据
//The parallelism of non parallel operator must be 1
DataStreamSource streamSource = env.fromElements(1, 2, 3, 4, 5).setParallelism(1);
streamSource.print();
env.execute("SourceList");
}
}
2、从本地文件当中读取数据
注意:
- 参数可以是目录也可以是文件
- 路径可以是相对路径也可以是绝对路径
- 相对路径是从系统属性user.dir获取路径: idea下是project的根目录, standalone模式下是集群节点根目录
- 也可以从hdfs目录下读取, 使用路径:hdfs://nwh120:8020/…, 由于Flink没有提供hadoop相关依赖, 需要pom中添加
关依赖:
org.apache.hadoop
hadoop-client
3.1.3
package com.lqs.five.part1_source;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Test02_SourceFile {
public static void main(String[] args) throws Exception {
//TODO 1、获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//TODO 2、读取文件里面的数据
env.readTextFile("input/hcc").print();
env.execute("SourceFile");
}
}
3、从Socket读取数据
写法1
package com.lqs.five.part1_source;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class Test03_SourceUnboundStream {
public static void main(String[] args) throws Exception {
//TODO 1、获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//TODO 2、读取无界数据流
//使用的nacat进行数据模拟
DataStreamSource streamSource = env.socketTextStream("nwh120", 8888);
//TODO 3、将数据按照空格切分,切出每个单词
SingleOutputStreamOperator wordDataStream = streamSource.flatMap(
new FlatMapFunction() {
@Override
public void flatMap(String s, Collector out) throws Exception {
for (String s1 : s.split(" ")) {
out.collect(s1);
}
}
}
)
//设置共享度
// .slotSharingGroup("gourp1")
//与前后都断开
// .disableChaining()
//与前面都断开任务链
// .startNewChain()
//为某个算子单独设置并行度
// .setParallelism(5)
;
//TODO 4、将每个单词组成Tuple2元组
SingleOutputStreamOperator> wordToOneDataStream = wordDataStream.map(
new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
return Tuple2.of(value, 1);
}
}
);
//TODO 5、将相同的单词聚合到一块
KeyedStream, String> keyedDataStream = wordToOneDataStream.keyBy(
new KeySelector, String>() {
@Override
public String getKey(Tuple2 value) throws Exception {
return value.f0;
}
}
);
//TODO 1、做累加计数
SingleOutputStreamOperator> result = keyedDataStream.sum(1);
result.print();
env.execute("SourceUnboundStream");
}
}
写法2
package com.lqs.five.part1_source;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class Test03_SourceSocket {
public static void main(String[] args) throws Exception {
//TODO 1、获取流数据执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//TODO 2、读取无界流数据
DataStreamSource streamSource = env.socketTextStream("nwh120", 8888);
//TODO 3、转换数据结构
//TODO 写法1
SingleOutputStreamOperator> wordAndOne = streamSource.flatMap(
new RichFlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
//按空格切分
String[] split = value.split(" ");
//循环将切分好的数据发送到下游(下一级)
for (String word : split) {
out.collect(Tuple2.of(word, 1L));
}
}
}
);
//TODO 写法2
//TODO 写法3
//TODO 4、分组
KeyedStream, String> keyedStream = wordAndOne.keyBy(
new KeySelector, String>() {
@Override
public String getKey(Tuple2 value) throws Exception {
return value.f0;
}
}
);
//简写
// KeyedStream, String> keyedStream = wordAndOne.keyBy(value -> value.f0);
//TODO 5、求和
SingleOutputStreamOperator> result = keyedStream.sum(1);
//TODO 6、打印
result.print();
//TODO 7、执行
env.execute("WordCount");
}
}
4、从Kafka读取数据
添加依赖
新版写法org.apache.flink flink-connector-kafka_2.12 1.13.0
package com.lqs.five.part1_source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Test04_SourceKafka {
public static void main(String[] args) throws Exception {
//TODO 1、获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//TODO 2、从Kafka读取数据流
KafkaSource kafkaSource = KafkaSource.builder()
.setBootstrapServers("nwh120:9092")
.setTopics("test_kafka_source")
.setGroupId("lqs_test")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");
streamSource.print();
env.execute("SourceKafka");
}
}
老版写法
package com.lqs.five.part1_source;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class Test04_SourceKafkaOld {
public static void main(String[] args) throws Exception {
//TODO 1、获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//TODO 2、从Kafka读取数据流
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "nwh120:9092");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "lqs_test");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
env.addSource(new FlinkKafkaConsumer("test_kafka_source", new SimpleStringSchema(), properties)).print();
env.execute("SourceKafkaOld");
}
}
5、从自定义Source获取数据
package com.lqs.five.part1_source;
import com.lqs.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;
public class Test05_CustomizeSource {
public static void main(String[] args) throws Exception {
//TODO 1、获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//TODO 2、从自定义的数据源获取数据
env.addSource(new MySource()).print();
env.execute("CustomizeSource");
}
public static class MySource implements ParallelSourceFunction{
private Boolean isRuning=true;
@Override
public void run(SourceContext ctx) throws Exception {
while (isRuning){
ctx.collect(
new WaterSensor(
"test_kafka_source"+new Random().nextInt(100),
System.currentTimeMillis(),
new Random().nextInt(1000)
)
);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRuning=false;
}
}
}



