前言:项目分为批处理、流处理两大模块,批处理每天数据量TB级别,流处理每天10亿左右,会陆续替换成Flink,由于项目逻辑比较复杂,时间流程略长,感兴趣的可以关注一下,之后会分享重构过程中的各种问题和经验,包括但不限于:部分业务逻辑分享、源码分享、Flink源码重构等。
重构的第一个流程,数据源由三部分组成,在HDFS的三个不同目录下。由于该任务每天执行一次,所以每次运行按照日期匹配HDFS上的目录和文件,Spark匹配文件时可以模糊匹配和多路径读取,十分灵活,例如,读取2021年12月23日的文件,则可以写成hdfs://…/20211223*/*,或者多个目录由逗号隔开(其中每个目录仍然可以模糊匹配),Flink则不支持模糊匹配和多路径读取,不过可以自己写方法,达到同样的效果,部分代码如下:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //filePath为上级目录 inputFormat通过自己写方法获取getTextInputFormat env.readFile(FileInputFormatinputFormat, String filePath) ** * @param fileName 模糊路径 * @param childFlag 子目录中都包含的字符 * @param isPreMatch 是否为前缀匹配,如果是,则会加/ * @param name 路径名称,打印log使用 * @return */ public static TextInputFormat getTextInputFormat(String fileName, String childFlag, Boolean isPreMatch, String name) { // 外层父级目录 String baseDir = getbaseDir(fileName); // 获取匹配字段 List patterns = getDirPattern(fileName, isPreMatch); Path path = new Path(baseDir); Configuration configuration = new Configuration(); // 设置递归获取文件 configuration.setBoolean("recursive.file.enumeration", true); TextInputFormat textInputFormat = new TextInputFormat(path); textInputFormat.supportsMultiPaths(); textInputFormat.configure(configuration); textInputFormat.setFilesFilter(new FilePathFilter() { @Override public boolean filterPath(Path filePath) { // 过滤想要的路径 for (String pattern : patterns) { // 一级目录匹配eg:/20210101/partition1.txt /20210102/partition1.txt if (!pattern.contains(",") && filePath.toString().contains(pattern)) { logger.info(name + " FILTER_DIR: " + filePath); return false; } // 多级目录匹配 父目录 if (pattern.contains(",") && !filePath.toString().contains(childFlag) && filePath.toString().contains(pattern.split(",")[0])) { logger.info(name + " Father FILTER_DIR: " + filePath); return false; } // 多级目录匹配 子目录 if (pattern.contains(",") && filePath.toString().contains(childFlag) && filePath.toString().contains(pattern.split(",")[0]) && filePath.toString().contains(childFlag + pattern.split(",")[1])) { logger.info(name + " Child FILTER_DIR: " + filePath); return false; } } return true; } }); return textInputFormat; }
重构后,Flink处理流程和Spark保持一致(多次聚合运算,较为复杂),由于运行参数和底层原理的不同,占用集群资源没有达到1:1,但基本相当,并行度为800、CPU Core占用400个、内存占用1TB,具体情况如下:
运行过程对比,以下是Spark每个Executor处理数据的情况,分别为input列升序、降序后的截图:
可以看出数据倾斜严重,Executor最多处理3.5G,最少处理0MB、129MB,而且从整体来看,数据在Executors上的分布很不均匀,对于这种现象,需要优化处理,解决数据倾斜问题。
以下是某一reduce算子执行时,每个task中处理数据的情况,同样存在数据倾斜:
以下是Flink同一reduce算子执行时,每个TaskManager处理数据的情况,分别为Bytes received列升序、降序后的截图:
可以看出数据分布很均匀,根据上面的对比来看,Flink处理要优于Spark 。
等程序运行结束后,从Yarn Web UI查看耗时情况,Flink耗时12分钟左右,Spark耗时25分钟左右,胜负已分。
以下是Flink遇到的一些问题及解决方案。
问题1:写入HDFS并行度过大
Caused by: java.io.IOException: Insufficient number of network buffers: required 2, but only 0 available. The total number of network buffers is currently set to 9418 of 32768 bytes each. You can increase this number by setting the configuration keys ‘taskmanager.network.memory.fraction’, ‘taskmanager.network.memory.min’, and ‘taskmanager.network.memory.max’.
调整-ytm 4096——>8192
或
修改flink-conf.yaml
调整
taskmanager.network.memory.fraction:0.1
——>
taskmanager.network.memory.min:64mb
taskmanager.network.memory.max:1g
问题2:Flink接入CDH集群时,一般不会有HDFS根目录权限,但官方下载的Flink默认需要在根目录下创建文件夹。网上很多解决方案为执行chmod 777,其实这样是不正确的,因为根目录权限不该随意赋予其他用户。
flink-conf.yaml可以增加如下配置,指定目录:
yarn.staging-directory: hdfs://user/wd/dc5/flink_application/
(Flink1.9)#yarn.storage-file.location: hdfs://user/wd/dc5/flink_application
但是,官方下载的flink配置这两个参数不会生效,需要下载flink源码,编译成CDH可用的,然后替换flink lib目录下的flink-dist.jar,具体步骤可以自行搜索,或者联系作者获取已经编译好的文件,这里不在阐述。
点击下方链接查看完整内容,关注公众号,订阅后续推送:
Spark VS Flink:TB级别大数据项目分享



