栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark VS Flink:TB级别大数据项目分享

Spark VS Flink:TB级别大数据项目分享

前言:项目分为批处理、流处理两大模块,批处理每天数据量TB级别,流处理每天10亿左右,会陆续替换成Flink,由于项目逻辑比较复杂,时间流程略长,感兴趣的可以关注一下,之后会分享重构过程中的各种问题和经验,包括但不限于:部分业务逻辑分享、源码分享、Flink源码重构等。

重构的第一个流程,数据源由三部分组成,在HDFS的三个不同目录下。由于该任务每天执行一次,所以每次运行按照日期匹配HDFS上的目录和文件,Spark匹配文件时可以模糊匹配和多路径读取,十分灵活,例如,读取2021年12月23日的文件,则可以写成hdfs://…/20211223*/*,或者多个目录由逗号隔开(其中每个目录仍然可以模糊匹配),Flink则不支持模糊匹配和多路径读取,不过可以自己写方法,达到同样的效果,部分代码如下:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//filePath为上级目录 inputFormat通过自己写方法获取getTextInputFormat
env.readFile(FileInputFormat inputFormat, String filePath)

**
* @param fileName   模糊路径
* @param childFlag  子目录中都包含的字符
* @param isPreMatch 是否为前缀匹配,如果是,则会加/
* @param name       路径名称,打印log使用
* @return
*/
public static TextInputFormat getTextInputFormat(String fileName, String childFlag, Boolean isPreMatch, String name) {
    // 外层父级目录
    String baseDir = getbaseDir(fileName);
  // 获取匹配字段
    List patterns = getDirPattern(fileName, isPreMatch);

    Path path = new Path(baseDir);
    Configuration configuration = new Configuration();
    // 设置递归获取文件
    configuration.setBoolean("recursive.file.enumeration", true);

    TextInputFormat textInputFormat = new TextInputFormat(path);
    textInputFormat.supportsMultiPaths();
    textInputFormat.configure(configuration);
    textInputFormat.setFilesFilter(new FilePathFilter() {
        @Override
        public boolean filterPath(Path filePath) {
            // 过滤想要的路径
            for (String pattern : patterns) {
                // 一级目录匹配eg:/20210101/partition1.txt  /20210102/partition1.txt
                if (!pattern.contains(",") && filePath.toString().contains(pattern)) {
                    logger.info(name + " FILTER_DIR: " + filePath);
                    return false;
                }
                // 多级目录匹配 父目录
                if (pattern.contains(",") && !filePath.toString().contains(childFlag) && filePath.toString().contains(pattern.split(",")[0])) {
                    logger.info(name + " Father FILTER_DIR: " + filePath);
                    return false;
                }
                // 多级目录匹配 子目录
                if (pattern.contains(",") && filePath.toString().contains(childFlag) && filePath.toString().contains(pattern.split(",")[0]) && filePath.toString().contains(childFlag + pattern.split(",")[1])) {
                    logger.info(name + " Child FILTER_DIR: " + filePath);
                    return false;
                }
            }
            return true;
        }
    });
    return textInputFormat;
}

重构后,Flink处理流程和Spark保持一致(多次聚合运算,较为复杂),由于运行参数和底层原理的不同,占用集群资源没有达到1:1,但基本相当,并行度为800、CPU Core占用400个、内存占用1TB,具体情况如下:


运行过程对比,以下是Spark每个Executor处理数据的情况,分别为input列升序、降序后的截图:


可以看出数据倾斜严重,Executor最多处理3.5G,最少处理0MB、129MB,而且从整体来看,数据在Executors上的分布很不均匀,对于这种现象,需要优化处理,解决数据倾斜问题。

以下是某一reduce算子执行时,每个task中处理数据的情况,同样存在数据倾斜:


以下是Flink同一reduce算子执行时,每个TaskManager处理数据的情况,分别为Bytes received列升序、降序后的截图:



可以看出数据分布很均匀,根据上面的对比来看,Flink处理要优于Spark 。
等程序运行结束后,从Yarn Web UI查看耗时情况,Flink耗时12分钟左右,Spark耗时25分钟左右,胜负已分。

以下是Flink遇到的一些问题及解决方案。

问题1:写入HDFS并行度过大
Caused by: java.io.IOException: Insufficient number of network buffers: required 2, but only 0 available. The total number of network buffers is currently set to 9418 of 32768 bytes each. You can increase this number by setting the configuration keys ‘taskmanager.network.memory.fraction’, ‘taskmanager.network.memory.min’, and ‘taskmanager.network.memory.max’.
调整-ytm 4096——>8192

修改flink-conf.yaml
调整
taskmanager.network.memory.fraction:0.1
——>
taskmanager.network.memory.min:64mb
taskmanager.network.memory.max:1g

问题2:Flink接入CDH集群时,一般不会有HDFS根目录权限,但官方下载的Flink默认需要在根目录下创建文件夹。网上很多解决方案为执行chmod 777,其实这样是不正确的,因为根目录权限不该随意赋予其他用户。
flink-conf.yaml可以增加如下配置,指定目录:
yarn.staging-directory: hdfs://user/wd/dc5/flink_application/
(Flink1.9)#yarn.storage-file.location: hdfs://user/wd/dc5/flink_application
但是,官方下载的flink配置这两个参数不会生效,需要下载flink源码,编译成CDH可用的,然后替换flink lib目录下的flink-dist.jar,具体步骤可以自行搜索,或者联系作者获取已经编译好的文件,这里不在阐述。

点击下方链接查看完整内容,关注公众号,订阅后续推送:

Spark VS Flink:TB级别大数据项目分享

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/682768.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号