栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

livy部署和使用

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

livy部署和使用

部署

 使用版本:apache-livy-0.7.1

 环境:spark(配置中已添加hive-site.xml) , hive ,hadoop(hdfs+yarn) 基于centos

 livy只需要配置两个文件(livy-env.sh,livy.conf):

livy-env.sh 中添加环境变量配置

export JAVA_HOME=/usr/local/jdk/jdk1.8.0_141
export HADOOP_HOME=/usr/local/jdk/azkabanWebServer/hadoop-2.7.1
export SPARK_CONF_DIR=/usr/local/jdk/azkabanWebServer/spark-2.4.8/conf
export SPARK_HOME=/usr/local/jdk/azkabanWebServer/spark-2.4.8
export HADOOP_CONF_DIR=/usr/local/jdk/azkabanWebServer/hadoop-2.7.1/etc/hadoop
export LIVY_SERVER_JAVA_OPTS="-Xmx2g"

其中HADOOP_CONF_DIR一定要添加,否则创建会话的时候,会话里面没有 appId 及appInfo信息,这是我之前踩过的坑,如下图:

 livy.conf配置:

livy.spark.master = yarn
livy.spark.deploy-mode = cluster
livy.environment = production
livy.impersonation.enabled = true
livy.server.port = 8998
livy.server.session.timeout = 3600000
livy.server.recovery.mode = recovery
livy.server.recovery.state-store=filesystem
livy.server.recovery.state-store.url=/opt/apache/livy/tmp
livy.task.max.concurrent.count = 20
livy.repl.enableHiveContext = true

如果用spark-sql能操作hive里面的数据  livy.repl.enableHiveContext 一定要配置为true,否则 你用livy提交创建表 sql语句的时候,此创建的表,只对当前livy session有效,在新建的livy session中并不存在其他session中创建的表,因为此表没有存放在hive元数据中

启动sh  livy-server start

使用livy

使用livy有两种操作模式:

1、交互式模式:这种模式就类似于在linux中spark-sql进入的交互模式一样,会给当前客户端创建一个会话,只要不进行quit,此会话一直会保留,且一次只能操作一个sql语句,操作的sql都能返回结果

如:创建会话rest接口(可以配置yarn运行的参数)

作业提交方式rest接口:

 查询结果rest接口:

注意:这里查询结果返回默认最多只有1000条

看源码里面LivyConf有个配置:

 

2、非交互式模式(及batch模式):这种模式如常见的使用方式是:需要我们编写scala程序,打成jar文件,放到hdfs上,在进行rest batch提交的时候要指定执行哪个jar及main类,不会给调用方返回结果,因为创建的会话,在执行程序后,会自动释放掉,并不会保留

此种模式并不需要创建会话,如:

 

 以下是交互式模式的Java操作rest接口的工具类:

package com.kyexpress.bdp.common.utils;


import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Stopwatch;
import com.kyexpress.bdp.common.enums.LivySessionState;
import com.kyexpress.bdp.common.model.BdpLivyInfo;
import com.kyexpress.bdp.common.model.livy.LivySession;
import com.kyexpress.bdp.common.model.livy.LivySessionListResponse;
import com.kyexpress.bdp.common.model.livy.LivyStatement;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


@Slf4j
public class LivyClient {
    protected String url;
    public static final int CREATE_SESSION_TIMEOUT = 10 * 60;
    public static String NAME_PREFIX = "bdp@";
    private OkHttpClient okHttpClient;

    public LivyClient(String livyUrl) {
        this.url = livyUrl;
        this.okHttpClient = new OkHttpClient.Builder()
                .connectTimeout(50000, TimeUnit.MILLISECONDS)
                .readTimeout(50000, TimeUnit.MILLISECONDS)
                .build();
    }

    //create session
    public LivySession createSession(BdpLivyInfo bdpLivyInfo) throws IOException {
        JSonObject body = new JSonObject();
        body.put("proxyUser", bdpLivyInfo.getProxyUser());
        body.put("kind", "sql");
        body.put("driverMemory", bdpLivyInfo.getDriverMemory());
        body.put("driverCores", bdpLivyInfo.getDriverCores());
        body.put("executorMemory", bdpLivyInfo.getExecutorMemory());
        body.put("executorCores", bdpLivyInfo.getExecutorCores());
        body.put("numExecutors", bdpLivyInfo.getNumExecutors());
        body.put("queue", bdpLivyInfo.getQueue());
        body.put("heartbeatTimeoutInSecond",bdpLivyInfo.getHeartbeatTimeoutInSecond());
        body.put("name", NAME_PREFIX + GuidUtils.newGuild());
        JSonObject sparkConf = new JSonObject();
        sparkConf.put("spark.dynamicAllocation.enabled", true);
        sparkConf.put("spark.shuffle.service.enabled", true);
        body.put("conf", sparkConf);
        RequestBody requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), body.toString());
        Request request = new Request.Builder()
                .url(url + "/sessions")
                .post(requestBody)
                .header("X-Requested-By", "DataQuery")
                .build();
        Response response = okHttpClient.newCall(request).execute();
        String responseContent = response.body().string();
        int code = response.code();
        if (!(code >= 200 && code < 300)) {
            throw new IOException("create session exception: " + responseContent);
        }
        LivySession session = JSONObject.parseObject(responseContent, LivySession.class);
        session.setLivyClient(this);
        session.setBdpLivyInfo(bdpLivyInfo);
        //轮询等待
        Stopwatch sw = Stopwatch.createStarted();
        for (; ; ) {
            try {
                Thread.sleep(2000);
                log.info("waiting some time for creating session:{}", session.getId());
                //超时处理
                long elapsed = sw.elapsed(TimeUnit.SECONDS);
                if (elapsed > CREATE_SESSION_TIMEOUT) {
                    log.error("create session timeout(s) : " + CREATE_SESSION_TIMEOUT);
                    deleteSession(session.getId());
                    throw new IOException("create session timeout");
                }
                LivySession session_current = getSession(session.getId());
                session_current.setLivyClient(this);
                session_current.setBdpLivyInfo(bdpLivyInfo);
                if (StringUtils.equals(session_current.getState(), LivySessionState.idle.name())) {
                    return session_current;
                } else if (StringUtils.equals(session_current.getState(), LivySessionState.starting.name())) {
                    continue;
                } else {
                    deleteSession(session.getId());
                    log.error("create session error,state:{}", session_current.getState());
                }
            } catch (Exception e) {
                log.error("create session error:{}", e.getMessage());
                throw new IOException("create session error : " + e.getMessage());
            }
        }
    }

    //delete session
    public void deleteSession(String sessionId) throws IOException {
        Request request = new Request.Builder()
                .url(url + "/sessions/" + sessionId)
                .delete()
                .header("X-Requested-By", "DataQuery")
                .build();
        Response response = okHttpClient.newCall(request).execute();
        String responseContent = response.body().string();
        int code = response.code();
        if (!(code >= 200 && code < 300)) {
            throw new IOException("delete session exception: " + responseContent);
        }
    }

    //get session
    public LivySession getSession(String sessionId) throws IOException {
        Request request = new Request.Builder()
                .url(url + "/sessions/" + sessionId)
                .get()
                .header("X-Requested-By", "DataQuery")
                .build();
        Response response = okHttpClient.newCall(request).execute();
        String responseContent = response.body().string();
        int code = response.code();
        if (code == 404) {
            return null;
        }
        if (!(code >= 200 && code < 300)) {
            throw new IOException("get session exception: " + responseContent);
        }
        LivySession session = JSONObject.parseObject(responseContent, LivySession.class);
        return session;
    }

    //list session
    public LivySessionListResponse listSession() throws IOException {
        Request request = new Request.Builder()
                .url(url + "/sessions/")
                .get()
                .header("X-Requested-By", "DataQuery")
                .build();
        Response response = okHttpClient.newCall(request).execute();
        String responseContent = response.body().string();
        int code = response.code();
        if (!(code >= 200 && code < 300)) {
            throw new IOException("list session exception: " + responseContent);
        }
        LivySessionListResponse listSessionResponse = JSONObject.parseObject(responseContent, LivySessionListResponse.class);
        return listSessionResponse;
    }

    //execute statement
    public LivyStatement executeStatement(String sessionId, String code_, String kind) throws IOException {
        JSonObject body = new JSonObject();
        body.put("code", code_);
        body.put("kind", kind);
        RequestBody requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), body.toString());
        Request request = new Request.Builder()
                .url(url + "/sessions/" + sessionId + "/statements")
                .post(requestBody)
                .header("X-Requested-By", "DataQuery")
                .build();
        Response response = okHttpClient.newCall(request).execute();
        String responseContent = response.body().string();
        int code = response.code();
        if (!(code >= 200 && code < 300)) {
            throw new IOException("execute statement exception: " + responseContent);
        }
        LivyStatement statement = JSONObject.parseObject(responseContent, LivyStatement.class);
        return statement;
    }

    //get statement
    public LivyStatement getStatement(String sessionId, String statementId) throws IOException {
        Request request = new Request.Builder()
                .url(url + "/sessions/" + sessionId + "/statements/" + statementId)
                .get()
                .header("X-Requested-By", "DataQuery")
                .build();
        Response response = okHttpClient.newCall(request).execute();
        String responseContent = response.body().string();
        int code = response.code();
        if (!(code >= 200 && code < 300)) {
            log.info("------response------------"+response.toString());
            throw new IOException("get statement exception: " + responseContent);
        }
        LivyStatement statement = JSONObject.parseObject(responseContent, LivyStatement.class);
        return statement;
    }

    //cancel statement
    public void cancelStatement(String sessionId, String statementId) throws IOException {
        Request request = new Request.Builder()
                .url(url + "/sessions/" + sessionId + "/statements/" + statementId + "/cancel")
                .post(RequestBody.create(MediaType.parse("application/json; charset=utf-8"), ""))
                .header("X-Requested-By", "DataQuery")
                .build();
        Response response = okHttpClient.newCall(request).execute();
        String responseContent = response.body().string();
        int code = response.code();
        if (!(code >= 200 && code < 300)) {
            throw new IOException("cancel statement exception: " + responseContent);
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/435060.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号