本人一名大四软件工程专业帅哥,现在进一家科技公司实习,在利用flink处理文件时发现了一些好玩的东西,看没有谁发过,就发来玩玩
想让Flink程序一直在running的话,只能让他不断循环读取数据,自定义Source即可,代码如下
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.time.Duration; import java.time.LocalDateTime; import java.time.LocalTime; public class ReadeHdfsSource extends RichParallelSourceFunction{ //循环标志符 private volatile Boolean running; //配置hadoop hdfs fileSystem private transient Configuration configuration; //要输入的文件路径 private String path; public ReadeHdfsSource(String path) { this.path = path; } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { super.open(parameters); configuration = new Configuration(); } @Override public void run(SourceContext sourceContext) throws Exception { Boolean subRunning = true; String inputPath; init();//初始化 String nowDataTime; try { while (running) { //获取当天日期并且转为字符串 nowDataTime = LocalDateTime.now().toLocalDate().toString(); //目录拼接上日期 inputPath = path + nowDataTime; FileSystem fileSystem = FileSystem.get(configuration); subRunning = true; LocalDateTime startTime = LocalDateTime.now(); //判断是否存在hdfs要读入的内容,没有就跳过 if (PathUtil.isCreate(configuration, inputPath)) { collectRecords(sourceContext, fileSystem, path + nowDataTime); } while (subRunning) { //将输入时间和现在时间作比较 相差一天且当前小时为凌晨3到9点就跳出循环进行外部循环读文件写文件 //与每天凌晨3点操作上面的读写文件相同 LocalDateTime endTime = LocalDateTime.now(); if (Duration.between(startTime, endTime).toDays() == 1 && ((endTime.getHour() < 9) || (endTime.getHour()) >= 3)) { subRunning = false; } Thread.sleep(2000L); } } } catch (Exception e) { throw new RuntimeException(e); } } //输出文件流 public void collectRecords(SourceContext ctx, FileSystem fs, String path) throws IOException { String fileName; //循环根目录 得到文件名拼接 //获取文件列表 FileStatus[] fileStatuses = fs.listStatus(new Path(path)); //循环文件 for (int i = 0; i < fileStatuses.length; i++) { if (fileStatuses[i].isFile()) { fileName = fileStatuses[i].getPath().getName();//获取文件名字 //打开输入流 FSDataInputStream dataInputStream = fs.open(new Path(path + "/" + fileName)); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(dataInputStream)); String line; while ((line = bufferedReader.readLine()) != null) { ctx.collect(line); } //输出完成整个文件了 } } } @Override public void cancel() { //点击页面cancel取消按钮的时候 running = false; } //初始化 private void init() { running = true; } }
cancel()为点击Apache Dashboard点击程序取消的时候触发open()为初始化run()为source程序运行的主逻辑程序



