部署
使用版本:apache-livy-0.7.1
环境:spark(配置中已添加hive-site.xml) , hive ,hadoop(hdfs+yarn) 基于centos
livy只需要配置两个文件(livy-env.sh,livy.conf):
livy-env.sh 中添加环境变量配置
export JAVA_HOME=/usr/local/jdk/jdk1.8.0_141 export HADOOP_HOME=/usr/local/jdk/azkabanWebServer/hadoop-2.7.1 export SPARK_CONF_DIR=/usr/local/jdk/azkabanWebServer/spark-2.4.8/conf export SPARK_HOME=/usr/local/jdk/azkabanWebServer/spark-2.4.8 export HADOOP_CONF_DIR=/usr/local/jdk/azkabanWebServer/hadoop-2.7.1/etc/hadoop export LIVY_SERVER_JAVA_OPTS="-Xmx2g"
其中HADOOP_CONF_DIR一定要添加,否则创建会话的时候,会话里面没有 appId 及appInfo信息,这是我之前踩过的坑,如下图:
livy.conf配置:
livy.spark.master = yarn
livy.spark.deploy-mode = cluster
livy.environment = production
livy.impersonation.enabled = true
livy.server.port = 8998
livy.server.session.timeout = 3600000
livy.server.recovery.mode = recovery
livy.server.recovery.state-store=filesystem
livy.server.recovery.state-store.url=/opt/apache/livy/tmp
livy.task.max.concurrent.count = 20
livy.repl.enableHiveContext = true
如果用spark-sql能操作hive里面的数据 livy.repl.enableHiveContext 一定要配置为true,否则 你用livy提交创建表 sql语句的时候,此创建的表,只对当前livy session有效,在新建的livy session中并不存在其他session中创建的表,因为此表没有存放在hive元数据中
启动sh livy-server start
使用livy
使用livy有两种操作模式:
1、交互式模式:这种模式就类似于在linux中spark-sql进入的交互模式一样,会给当前客户端创建一个会话,只要不进行quit,此会话一直会保留,且一次只能操作一个sql语句,操作的sql都能返回结果
如:创建会话rest接口(可以配置yarn运行的参数)
作业提交方式rest接口:
查询结果rest接口:
注意:这里查询结果返回默认最多只有1000条
看源码里面LivyConf有个配置:
2、非交互式模式(及batch模式):这种模式如常见的使用方式是:需要我们编写scala程序,打成jar文件,放到hdfs上,在进行rest batch提交的时候要指定执行哪个jar及main类,不会给调用方返回结果,因为创建的会话,在执行程序后,会自动释放掉,并不会保留
此种模式并不需要创建会话,如:
以下是交互式模式的Java操作rest接口的工具类:
package com.kyexpress.bdp.common.utils;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Stopwatch;
import com.kyexpress.bdp.common.enums.LivySessionState;
import com.kyexpress.bdp.common.model.BdpLivyInfo;
import com.kyexpress.bdp.common.model.livy.LivySession;
import com.kyexpress.bdp.common.model.livy.LivySessionListResponse;
import com.kyexpress.bdp.common.model.livy.LivyStatement;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@Slf4j
public class LivyClient {
protected String url;
public static final int CREATE_SESSION_TIMEOUT = 10 * 60;
public static String NAME_PREFIX = "bdp@";
private OkHttpClient okHttpClient;
public LivyClient(String livyUrl) {
this.url = livyUrl;
this.okHttpClient = new OkHttpClient.Builder()
.connectTimeout(50000, TimeUnit.MILLISECONDS)
.readTimeout(50000, TimeUnit.MILLISECONDS)
.build();
}
//create session
public LivySession createSession(BdpLivyInfo bdpLivyInfo) throws IOException {
JSonObject body = new JSonObject();
body.put("proxyUser", bdpLivyInfo.getProxyUser());
body.put("kind", "sql");
body.put("driverMemory", bdpLivyInfo.getDriverMemory());
body.put("driverCores", bdpLivyInfo.getDriverCores());
body.put("executorMemory", bdpLivyInfo.getExecutorMemory());
body.put("executorCores", bdpLivyInfo.getExecutorCores());
body.put("numExecutors", bdpLivyInfo.getNumExecutors());
body.put("queue", bdpLivyInfo.getQueue());
body.put("heartbeatTimeoutInSecond",bdpLivyInfo.getHeartbeatTimeoutInSecond());
body.put("name", NAME_PREFIX + GuidUtils.newGuild());
JSonObject sparkConf = new JSonObject();
sparkConf.put("spark.dynamicAllocation.enabled", true);
sparkConf.put("spark.shuffle.service.enabled", true);
body.put("conf", sparkConf);
RequestBody requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), body.toString());
Request request = new Request.Builder()
.url(url + "/sessions")
.post(requestBody)
.header("X-Requested-By", "DataQuery")
.build();
Response response = okHttpClient.newCall(request).execute();
String responseContent = response.body().string();
int code = response.code();
if (!(code >= 200 && code < 300)) {
throw new IOException("create session exception: " + responseContent);
}
LivySession session = JSONObject.parseObject(responseContent, LivySession.class);
session.setLivyClient(this);
session.setBdpLivyInfo(bdpLivyInfo);
//轮询等待
Stopwatch sw = Stopwatch.createStarted();
for (; ; ) {
try {
Thread.sleep(2000);
log.info("waiting some time for creating session:{}", session.getId());
//超时处理
long elapsed = sw.elapsed(TimeUnit.SECONDS);
if (elapsed > CREATE_SESSION_TIMEOUT) {
log.error("create session timeout(s) : " + CREATE_SESSION_TIMEOUT);
deleteSession(session.getId());
throw new IOException("create session timeout");
}
LivySession session_current = getSession(session.getId());
session_current.setLivyClient(this);
session_current.setBdpLivyInfo(bdpLivyInfo);
if (StringUtils.equals(session_current.getState(), LivySessionState.idle.name())) {
return session_current;
} else if (StringUtils.equals(session_current.getState(), LivySessionState.starting.name())) {
continue;
} else {
deleteSession(session.getId());
log.error("create session error,state:{}", session_current.getState());
}
} catch (Exception e) {
log.error("create session error:{}", e.getMessage());
throw new IOException("create session error : " + e.getMessage());
}
}
}
//delete session
public void deleteSession(String sessionId) throws IOException {
Request request = new Request.Builder()
.url(url + "/sessions/" + sessionId)
.delete()
.header("X-Requested-By", "DataQuery")
.build();
Response response = okHttpClient.newCall(request).execute();
String responseContent = response.body().string();
int code = response.code();
if (!(code >= 200 && code < 300)) {
throw new IOException("delete session exception: " + responseContent);
}
}
//get session
public LivySession getSession(String sessionId) throws IOException {
Request request = new Request.Builder()
.url(url + "/sessions/" + sessionId)
.get()
.header("X-Requested-By", "DataQuery")
.build();
Response response = okHttpClient.newCall(request).execute();
String responseContent = response.body().string();
int code = response.code();
if (code == 404) {
return null;
}
if (!(code >= 200 && code < 300)) {
throw new IOException("get session exception: " + responseContent);
}
LivySession session = JSONObject.parseObject(responseContent, LivySession.class);
return session;
}
//list session
public LivySessionListResponse listSession() throws IOException {
Request request = new Request.Builder()
.url(url + "/sessions/")
.get()
.header("X-Requested-By", "DataQuery")
.build();
Response response = okHttpClient.newCall(request).execute();
String responseContent = response.body().string();
int code = response.code();
if (!(code >= 200 && code < 300)) {
throw new IOException("list session exception: " + responseContent);
}
LivySessionListResponse listSessionResponse = JSONObject.parseObject(responseContent, LivySessionListResponse.class);
return listSessionResponse;
}
//execute statement
public LivyStatement executeStatement(String sessionId, String code_, String kind) throws IOException {
JSonObject body = new JSonObject();
body.put("code", code_);
body.put("kind", kind);
RequestBody requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), body.toString());
Request request = new Request.Builder()
.url(url + "/sessions/" + sessionId + "/statements")
.post(requestBody)
.header("X-Requested-By", "DataQuery")
.build();
Response response = okHttpClient.newCall(request).execute();
String responseContent = response.body().string();
int code = response.code();
if (!(code >= 200 && code < 300)) {
throw new IOException("execute statement exception: " + responseContent);
}
LivyStatement statement = JSONObject.parseObject(responseContent, LivyStatement.class);
return statement;
}
//get statement
public LivyStatement getStatement(String sessionId, String statementId) throws IOException {
Request request = new Request.Builder()
.url(url + "/sessions/" + sessionId + "/statements/" + statementId)
.get()
.header("X-Requested-By", "DataQuery")
.build();
Response response = okHttpClient.newCall(request).execute();
String responseContent = response.body().string();
int code = response.code();
if (!(code >= 200 && code < 300)) {
log.info("------response------------"+response.toString());
throw new IOException("get statement exception: " + responseContent);
}
LivyStatement statement = JSONObject.parseObject(responseContent, LivyStatement.class);
return statement;
}
//cancel statement
public void cancelStatement(String sessionId, String statementId) throws IOException {
Request request = new Request.Builder()
.url(url + "/sessions/" + sessionId + "/statements/" + statementId + "/cancel")
.post(RequestBody.create(MediaType.parse("application/json; charset=utf-8"), ""))
.header("X-Requested-By", "DataQuery")
.build();
Response response = okHttpClient.newCall(request).execute();
String responseContent = response.body().string();
int code = response.code();
if (!(code >= 200 && code < 300)) {
throw new IOException("cancel statement exception: " + responseContent);
}
}
}



