基于文件:
readTextFile(path)TextInputFormat-逐行读取文本文件,即遵守规范的文件,并将它们作为字符串返回。
readFile(fileInputFormat, path)- 根据指定的文件输入格式读取(一次)文件。
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)- 这是前两个内部调用的方法。它path根据给定的fileInputFormat.
根据提供的watchType,此源可以定期监视(每interval毫秒)新数据的路径(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理当前路径中的数据并退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter,用户可以进一步排除正在处理的文件。
基于套接字:
socketTextStream- 从套接字读取。元素可以用分隔符分隔。 基于集合:
fromCollection(Collection)- 从 Java Java.util.Collection
创建数据流。集合中的所有元素必须属于同一类型。fromCollection(Iterator, Class)- 从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
fromElements(T …)- 从给定的对象序列创建数据流。所有对象必须属于同一类型。
fromParallelCollection(SplittableIterator, Class)-
从迭代器并行创建数据流。该类指定迭代器返回的元素的数据类型。generateSequence(from, to)- 并行生成给定区间内的数字序列。
自定义source:
addSource- 附加一个新的源函数。例如,要从 Apache Kafka 读取,您可以使用 addSource(new
FlinkKafkaConsumer08<>(…)).
案例
package com.zxl.blink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
public class SourceDemo {
public static void main(String[] args) throws Exception {
//配置WEBUI
Configuration configuration=new Configuration();
configuration.setInteger(RestOptions.PORT,8848);
//创建flink执行环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
environment.setParallelism(4);
// TODO: 2022/2/7 从内存直接读取数据 一般用于数据测试
//传入字符串
DataStreamSource data = environment.fromElements("hello", "hello");
data.print("读取一个或者多个数据");
//读取集合的数据
List lists = new ArrayList<>();
lists.add("asdzxc,www.baidu.com,1607136604");
lists.add("asdzxv,www.taobao.com,1607136610");
lists.add("asdzxb,www.leetcode.com,1607136645");
lists.add("asdzxm,www.aliyun.com,1607136699");
//传入集合
DataStreamSource list = environment.fromCollection(lists);
list.print("从集合中读取数据");
// TODO: 2022/2/7 从文件中读取数据
//传入路径
DataStreamSource textFile = environment.readTextFile("E:\JAVAWorker\demoday0130\src\main\resources\hello.txt");
textFile.print("读取文件数据");
DataStreamSource file = environment.readTextFile("E:\JAVAWorker\demoday0130\src\main\resources\hello.txt", "utf8");
file.print("读取文件并设置编码格式");
// TODO: 2022/2/7 从端口读取数据
//socketTextStream的参数 hostname(主机名) port(端口号) delimiter(分隔符) maxRetry(最大重试次数)
DataStreamSource socketTextStream = environment.socketTextStream("192.168.200.179", 999,",",4);
socketTextStream.print("从端口读取数据");
// TODO: 2022/2/7 自定义source
//使用addSource addSource()自定义数据读取函数,与外部存储交互,读取数据,如从Kafka、JDBC、HDFS等读取。
DataStreamSource source = environment.addSource(new DataDB());
source.print();
environment.execute();
}
}



