栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

您如何使用Google DataProc Java Client通过关联的GS存储桶中的jar文件和类来提交Spark作业?

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

您如何使用Google DataProc Java Client通过关联的GS存储桶中的jar文件和类来提交Spark作业?

我们希望很快能获得有关正式文档的更全面的指南,但要开始使用,请访问以下API概述:https : //developers.google.com/api-client-
library/java/apis/dataproc/v1

它包括到Dataproc javadocs的链接;如果您的服务器代表自己的项目而不是最终用户的Google项目进行调用,则您可能希望此处说明的基于密钥文件的服务帐户身份验证创建

Credential
用于初始化
Dataproc
客户端存根的对象。

对于特定于dataproc的部分,这仅意味着如果使用Maven,则将以下依赖项添加到Maven pomfile中:

<project>  <dependencies>    <dependency>      <groupId>com.google.apis</groupId>      <artifactId>google-api-services-dataproc</artifactId>      <version>v1-rev4-1.21.0</version>    </dependency>  </dependencies></project>

然后,您将获得如下代码:

Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential)    .setApplicationName("my-webabb/1.0")    .build();dataproc.projects().regions().jobs().submit(    projectId, "global", new SubmitJobRequest()        .setJob(new Job() .setPlacement(new JobPlacement()     .setClusterName("my-spark-cluster")) .setSparkJob(new SparkJob()     .setMainClass("FooSparkJobMain")     .setJarFileUris(ImmutableList.of("gs://bucket/path/to/your/spark-job.jar"))     .setArgs(ImmutableList.of(         "arg1", "arg2", "arg3")))))    .execute();

由于不同的中间服务器可能会进行低级重试,或者您的请求可能会引发IOException,而您不知道提交作业是否成功,因此您可能要执行的另一步骤是生成自己的作业

jobId
;那么您知道要轮询哪个jobId,以弄清它是否已提交,即使您的请求超时或引发一些未知的异常:

import java.util.UUID;...Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential)    .setApplicationName("my-webabb/1.0")    .build();String curJobId = "json-agg-job-" + UUID.randomUUID().toString();Job jobSnapshot = null;try {  jobSnapshot = dataproc.projects().regions().jobs().submit(      projectId, "global", new SubmitJobRequest()          .setJob(new Job()   .setReference(new JobReference()        .setJobId(curJobId))   .setPlacement(new JobPlacement()       .setClusterName("my-spark-cluster"))   .setSparkJob(new SparkJob()       .setMainClass("FooSparkJobMain")       .setJarFileUris(ImmutableList.of("gs://bucket/path/to/your/spark-job.jar"))       .setArgs(ImmutableList.of("arg1", "arg2", "arg3")))))      .execute();} catch (IOException ioe) {  try {    jobSnapshot = dataproc.projects().regions().jobs().get(        projectId, "global", curJobId).execute();    logger.info(ioe, "Despite exception, job was verified submitted");  } catch (IOException ioe2) {    // Handle differently; if it's a GoogleJsonResponseException you can inspect the error    // pre, and if it's a 404, then it means the job didn't get submitted; you can add retry    // logic in that case.  }}// We can poll on dataproc.projects().regions().jobs().get(...) until the job reports being// completed or failed now.


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/509379.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号