栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

dolphinscheduler 集成flink

dolphinscheduler 集成flink

版本
  • dolphinscheduler 1.3.6
  • hadoop 3.2.1
  • flink 1.13.1
主要问题

classloader.check-leaked-classloader

使用默认配置(将flink copy到worker节点的/opt/soft/flink下),会遇到这个错误

Exception in thread "Thread-5" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183)
        at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2780)
        at org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3036)
        at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2995)
        at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2968)
        at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2848)
        at org.apache.hadoop.conf.Configuration.get(Configuration.java:1200)
        at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1812)
        at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1789)

只需要按提示在flink-conf.yaml里面增加“classloader.check-leaked-classloader: false”就可以

[root@37e3e6d56452 flink]# tail  conf/flink*.yaml  
# The port under which the web-based HistoryServer listens.
#historyserver.web.port: 8082

# Comma separated list of directories to monitor for completed jobs.
#historyserver.archive.fs.dir: hdfs:///completed-jobs/

# Interval in milliseconds for refreshing the monitored directories.
#historyserver.archive.fs.refresh-interval: 10000
classloader.check-leaked-classloader: false

HADOOP_CLASSPATH environment

在1.13.1 flink 版本,需要设置HADOOP_CLASSPATH 才可以提交到yarn,需要做2步

  1. dolphinscheduler worker 节点同步安装hadoop并正确配置
  2. 提交前需要设置HADOOP_CLASSPATH,建议修改bin/flink 脚本,在第一行增加export HADOOP_CLASSPATH=hadoop classpath

以下为修改后的 bin/flink文件示意

[root@37e3e6d56452 flink]# head -n 30 bin/flink
#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR ConDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

export HADOOP_CLASSPATH=`hadoop classpath`
target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.
iteration=0
while [ -L "$target" ]; do
    if [ "$iteration" -gt 100 ]; then
        echo "Cannot resolve path: You have a cyclic symlink in $target."
        break

non-leaf queue

还可能遇到如下错误

Caused by: org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit application_1626093777623_0001 to YARN : Application application_1626093777623_0001 submitted by user : root to non-leaf queue : root
		at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:327)
		at org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1178)
		at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:593)
		at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:474)

分析日志,可以看到如下flink执行代码

flink run -m yarn-cluster -ys 1 -ynm WordCount -yjm 1G -ytm 1G -yqu root -p 1 -sae -c org.apache.flink.examples.java.wordcount.WordCount WordCount.jar --input  hdfs:///griffin/env.json    --output hdfs:///opt/dol-wcoutput/

注意-yqu,执行bin/flink run -h可以看到如下解释

 -yqu,--yarnqueue                Specify YARN queue.

查看yarn queue配置

可以看到root下面有子queue,因此报 non-leaf 错误,可以通过自定义参数解决

配置后查看日志,可以看到提交yarn的命令修改为

flink run -m yarn-cluster -ys 1 -ynm WordCount -yjm 1G -ytm 1G -p 1 -sae -yqu default -c org.apache.flink.examples.java.wordcount.WordCount WordCount.jar --input  hdfs:///griffin/env.json    --output hdfs:///opt/dol-wcoutput/
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/304796.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号