栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink通过flink-yarn远程提交任务到yarn集群

Flink通过flink-yarn远程提交任务到yarn集群

 public void crateStreamTaskByFlinkClient() {
    //flink的本地配置目录,为了得到flink的配置
    // 如果出现org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.错误
    // 则在flink-config.yaml加入
    // classloader.resolve-order: parent-first
    String configurationDirectory = "/opt/module/flink-1.11.4/conf";
    // String configurationDirectory = "/home/lxj/workspace/Olt-Test/bigdata/bigdataserver/src/main/resources/flink/conf";

    //存放flink集群相关的jar包目录
    String flinkLibs = "hdfs://hadoop113:8020/jar/flink11/libs";
    //用户jar
    String userJarPath = "hdfs://hadoop113:8020/jar/userTask/Flink1.11-1.0-SNAPSHOT-jar-with-dependencies.jar";
    String flinkDistJar = "hdfs://hadoop113:8020/jar/flink11/libs/flink-dist_2.12-1.11.4.jar";

    YarnClient yarnClient = YarnClient.createYarnClient();
    YarnConfiguration yarnConfiguration = new YarnConfiguration();
    yarnClient.init(yarnConfiguration);
    yarnClient.start();

    // 设置日志的,没有的话看不到日志
    YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever
        .create(yarnClient);

    //获取flink的配置
    Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
        configurationDirectory);

    flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);

    flinkConfiguration.set(
        PipelineOptions.JARS,
        Collections.singletonList(userJarPath));

    Path remoteLib = new Path(flinkLibs);
    flinkConfiguration.set(
        YarnConfigOptions.PROVIDED_LIB_DIRS,
        Collections.singletonList(remoteLib.toString()));

    flinkConfiguration.set(
        YarnConfigOptions.Flink_DIST_JAR,
        flinkDistJar);

    // 设置为application模式
    flinkConfiguration.set(
        DeploymentOptions.TARGET,
        YarnDeploymentTarget.APPLICATION.getName());

    // yarn application name
    flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "flink-application");

    YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);

    ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
        .createClusterSpecification();

    // 设置用户jar的参数和主类
    ApplicationConfiguration appConfig = new ApplicationConfiguration(new String[] {"test"}, "com.starnet.server.bigdata.flink.WordCount");

    YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
        flinkConfiguration,
        yarnConfiguration,
        yarnClient,
        clusterInformationRetriever,
        true);

    try {
        ClusterClientProvider clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
            clusterSpecification,
            appConfig);

        ClusterClient clusterClient = clusterClientProvider.getClusterClient();

        ApplicationId applicationId = clusterClient.getClusterId();
        String webInterfaceURL = clusterClient.getWebInterfaceURL();
        log.error("applicationId is {}", applicationId);
        log.error("webInterfaceURL is {}", webInterfaceURL);
        
        // 退出
        // yarnClusterDescriptor.killCluster(applicationId);
    } catch (Exception e){
        log.error(e.getMessage(), e);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/283042.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号