栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kyuubi+Spark SQL替代hive及hive on spark

Kyuubi+Spark SQL替代hive及hive on spark

一个cdh5.14.4的hadoop数仓集群,使用hive以及hive on spark做批处理。随着job数量以及数据量增加,每天晚上满负荷跑,HDFS io, 网络等达到40gb/s, 而且job执行时间也在慢慢拉长,队列等待平均在5000,高的有15000左右。另外目前存在的问题也有,主要是hive on spark 核与内存比例高达1:8,也就是一个容器使用了1核8g内存甚至10g,设置小了,hive on spark跑不过去。为了提高集群的并发度,减少内存浪费,另外也的确需要一个更快的etl工具。因此测试了spark3.1.2的sql性能,比较下来有5-10倍的提升,以前1个半小时的,现在20分钟,以前30分钟的,现在大概5-7分钟,另外spark sql和hive sql的兼容性高达99%,也就是hive sql大部分可以直接迁移到spark sql上,也就是简单的改一个jdbc地址即可。 

调研及测试了一下,Kyuubi + Spark 3.1.2是一个很好的方式,接下来仅仅做一个备忘。

1. 由于cdh5.14.4的hadoop没有hadoop-client-api包,但是spark 3.2的版本需要这个包,因此3.2编译无法通过,最后选择Spark 3.1.2

2. Kyuubi选择的是 1.2.0 without Spark版本,Kyuubi自带的Spark是2.7的。 为什么使用Kyuubi,而不使用Thirftserver,是因为集群需要考虑多租户,thriftserver我没具体去测试,不清楚是否可以实现

3.集群开启了Kerberos + sentry, 这块和Kyuubi + Spark紧密度不大,开启与否不是重点,不管是否开启都不影响.

1)编译Spark 3.1.2, 官网下载好Spark源代码。

设置好编译环境(我是mac, JAVA_HOME看起来有点奇怪,):

1. 增加环境变量
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_251.jdk/Contents/Home
export MAVEN_HOME=/Users/jia-long.wang/Downloads/apache-maven-3.8.1                
export MAVEN_OPTS="-Xmx4g -XX:ReservedCodeCacheSize=1024m"
export SCALA_HOME=/Users/jia-long.wang/Downloads/scala-2.12.8
export PATH=$MAVEN_HOME/bin:$PATH:$SCALA_HOME/bin

2. 修改pom.xml, 

增加Cloudera的maven源

  cloudera
  cloudera Repository
  https://repository.cloudera.com/artifactory/cloudera-repos/


修改对应的hadoop版本,其他不用动,比如hive版本默认是2.3.7的,不要去修改,修改之后会有很多依赖错误。Spark其实早就可以适配所有版本的hive metastore,只需要使用spark.sql.hive.metastore.jars=/data1/cloudera/parcels/CDH/lib/hive/lib/*
spark.sql.hive.metastore.version=1.1.0 
指定自己的hive版本即可

2.6.0-cdh5.14.4

修改源代码-1,spark3对hadoop2+版本有一个地方支持有问题。参考官方:[SPARK-19545][YARN]Fix compile issue for Spark on Yarn when building against Hadoop 2.6.0~2.6.3 by jerryshao · Pull Request #16884 · apache/spark · GitHub

修改源代码-2,spark-3.1.2的权限有一个地方有bug, 就是普通用户创建表的时候会显示no privileges. 其原理是 spark会去校验 hdfs location,但是hdfs location在create table那一刻还不存在,表都没有建,哪来的location ? 所以会验证失败,超级管理员hive没有这个问题。参考网友的地址:sparksql集成sentry遇到的问题_u012477420的博客-CSDN博客

简单理解就是:

override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
    verifyColumnDataType(table.dataSchema)
    client.createTable(toHiveTable(table, Some(userName)), ignoreIfExists)
  }


替换为:

override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
    verifyColumnDataType(table.dataSchema)
    val hiveTable = toHiveTable(table, Some(userName))
	if (sparkConf.getBoolean("spark.sql.enable.sentry", defaultValue = false)) {
		hiveTable.getTTable.getSd.setLocation(null)
	}
    client.createTable(hiveTable, ignoreIfExists)
  }

修改dev/make-distribution.sh,Spark推荐的方式,采用maven直接编译也是ok的,但是那样无法单独打包二进制包,里面很多源代码也在,简洁起见,使用推荐方式编译。

VERSION=3.1.2
SCALA_VERSION=2.12
SPARK_HADOOP_VERSION=2.6.0-CDH5.14.4
SPARK_HIVE=1

#VERSION=$("$MVN" help:evaluate -Dexpression=project.version $@ 
#    | grep -v "INFO"
#    | grep -v "WARNING"
#    | tail -n 1)
#SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version $@ 
#    | grep -v "INFO"
#    | grep -v "WARNING"
#    | tail -n 1)
#SPARK_HADOOP_VERSION=$("$MVN" help:evaluate -Dexpression=hadoop.version $@ 
#    | grep -v "INFO"
#    | grep -v "WARNING"
#    | tail -n 1)
#SPARK_HIVE=$("$MVN" help:evaluate -Dexpression=project.activeProfiles -pl sql/hive $@ 
#    | grep -v "INFO"
#    | grep -v "WARNING"
#    | fgrep --count "hive";
#    # Reset exit status to 0, otherwise the script stops here if the last grep finds nothing
#    # because we use "set -o pipefail"
#    echo -n)

开始编译:

dev/make-distribution.sh --tgz --name 2.6.0-cdh5.14.4  -Pyarn -Phadoop2.6.0-cdh5.14.4 -Dhadoop.version=2.6.0-cdh5.14.4 -Phive  -Phive-thriftserver

喝杯茶应该差不多就好了。在spark3.1.2的目录下就看到一个包:spark-3.1.2-bin-2.6.0-cdh5.14.4.tgz

修改spark-env.sh

JAVA_HOME=/usr/java/jdk1.8.0_121
SPARK_CONF_DIR=/data1/software/spark-3.1.2/conf
HADOOP_CONF_DIR=/etc/hadoop/conf
YARN_CONF_DIR=/etc/hadoop/conf

修改spark-defaults.conf

spark.master                     yarn
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://nameservice/tmp/spark-events
spark.serializer                 org.apache.spark.serializer.KryoSerializer

拷贝 hive-site.xml, core-site.xml, hdfs-site.xml yarn-site.xml到 spark conf目录,spark就不用动了。

2) 安装Kyuubi ,我选择的是官方编译好的版本,但是without Spark。解压缩就算安装好了,其他的就是修改配置文件。

上面spark-defaults.conf其实就增加了日志目录等几个简单的配置,因为所有配置放在kyuubi里,启动的时候,kyuubi会把spark的配置加载,那么问题是spark-defaults.conf需要增加其他配置吗?无所谓了,加也可以,不加也可以。

修改kyuubi-defaults.conf

## Kyuubi Configurations

kyuubi.engine.share.level=ConNECTION
#kyuubi.engine.share.level=USER
#kyuubi.session.engine.idle.timeout=PT30M
kyuubi.sesson.engine.initialize.timeout=PT5M
kyuubi.authentication=KERBEROS
kyuubi.kinit.principal=hive/host224.slave.cluster@xxx.CN
kyuubi.kinit.keytab=/data1/software/kyuubi/kyuubi224.keytab
kyuubi.frontend.bind.host       10.37.54.224
kyuubi.frontend.bind.port       10009
#kyuubi.ha.enabled=true
#kyuubi.ha.zookeeper.quorum=10.37.54.187
#kyuubi.ha.zookeeper.client.port=2181
#kyuubi.ha.zookeeper.namespace=kyuubi
#kyuubi.ha.zookeeper.session.timeout=60000
#kyuubi.ha.zookeepe.acl.enabled=false
kyuubi.zookeeper.embedded.max.session.timeout=120000
hive.metastore.uris             thrift://host12.master.cluster.xxx.cn:9083,thrift://host13.master.cluster.xxx.cn:9083
javax.jdo.option.ConnectionURL  jdbc:mysql://manager.cluster.xxx.cn:3306/hive?useUnicode=true&characterEncoding=UTF-8
javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver
javax.jdo.option.ConnectionUserName root
javax.jdo.option.ConnectionPassword 123456
spark.sql.hive.metastore.jars=/data1/cloudera/parcels/CDH/lib/hive/lib/*
spark.sql.hive.metastore.version=1.1.0
spark.master=yarn
#spark.yarn.queue=root.default
spark.submit.deployMode=cluster
spark.executor.cores=2
spark.yarn.am.memory=4G
spark.driver.memory=4G
spark.executor.memory=12G 
#spark.default.parallelism=10
spark.shuffle.service.port=7337
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.authenticate=false
spark.authenticate.enableSaslEncryption=false
spark.dynamicAllocation.executorIdleTimeout=60
spark.dynamicAllocation.minExecutors=1
spark.dynamicAllocation.schedulerBacklogTimeout=1
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.maxExecutors=1000
spark.shuffle.service.enabled=true
spark.shuffle.useOldFetchProtocol=true
spark.executor.heartbeatInterval=20s
spark.hadoop.fs.hdfs.impl.disable.cache=true
#spark.kerberos.keytab=/data1/software/kyuubi/kyuubi224.keytab
#spark.kerberos.principal=hive/host224.slave.cluster@xxx.CN
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
#spark.yarn.appMasterEnv.HADOOP_JAAS_DEBUG=true
#spark.yarn.am.extraJavaOptions="-Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true"
spark.security.credentials.hdfs.enabled=true
spark.kerberos.access.hadoopFileSystems=hdfs://nameservice:8020,hdfs://host12.master.cluster.xxx.cn:8020,hdfs://host13.master.cluster.xxx.cn:8020
#spark.kerberos.renewal.credentials=ccache
spark.sql.hive.filesourcePartitionFileCacheSize=786432000
spark.sql.hive.verifyPartitionPath=true
spark.sql.hive.convertmetastoreParquet.mergeSchema=true
spark.hadoopRDD.ignoreEmptySplits=true
spark.sql.hive.convertmetastoreParquet=false
#spark.sql.hive.convertmetastoreParquet.mergeSchema=true
#spark.yarn.jars=hdfs://nameservice:8020/sparkjars
#spark.jars=/data1/software/kyuubi_2/sparkjars/hive-hcatalog-core-1.1.0-cdh5.14.4.jar
spark.sql.enable.sentry=true
spark.hive.warehouse.subdir.inherit.perms=false

配置是不是挺多? 都是经过一段时间的推磨得到的,我觉得你们可以直接用,基本没问题。

修改kyuubi-env.sh

export JAVA_HOME=/usr/java/jdk1.8.0_121
export HADOOP_CONF_DIR=/etc/hadoop/conf
export SPARK_HOME=/data1/software/spark-3.1.2
export SPARK_CONF_DIR=/data1/software/spark-3.1.2/conf
export KYUUBI_JAVA_OPTS="-Xmx10g -XX:+UnlockDiagnosticVMOptions -XX:ParGCCardsPerStrideChunk=4096 -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSConcurrentMTEnabled -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyonly -XX:+CMSClassUnloadingEnabled -XX:+CMSParallelRemarkEnabled -XX:+UseCondCardMark -XX:MaxDirectMemorySize=1024m  -XX:+HeapDumponOutOfMemoryError -XX:HeapDumpPath=./logs -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:./logs/kyuubi-server-gc-%t.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=5M -XX:NewRatio=3 -XX:metaspaceSize=512m"

新建启动kyuubi的kerberos账号,机器名是什么就建一个 xxx/hostname@dns 格式的名字即可,熟悉kerberos的不用多做解释,自然理解。hive/host224.slave.cluster@xxx.CN,为什么我选择hive呢? 因为kyuubi是多租户,有代理功能,如果选择kyuubi之类的用户,还需要在core-site.xml文件添加代理功能,然后还需要重启Hadoop集群,我的天,重启那是不可能的。hive本身在安装hadoop的时候就已经具备代理能力。

3) 启动kyuubi 

bin/kyuubi start, 查看日志及10009端口,没有异常即可。

4) beeline登入10009端口

beeline -u "jdbc:hive2://host224.slave.cluster.xxx.cn:10009/;principal=hive/host224.slave.cluster.xxx.cn@xxx.CN"

beeline登入日志如下:

21/11/29 21:53:52 INFO service.FrontendService: Client protocol version: HIVE_CLI_SERVICE_PROTOCOL_V7
21/11/29 21:53:52 INFO imps.CuratorframeworkImpl: Starting
21/11/29 21:53:52 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=10.37.54.224:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@79fed433
21/11/29 21:53:52 INFO zookeeper.ClientCnxn: Opening socket connection to server host224.slave.cluster.enn.cn/10.37.54.224:2181. Will not attempt to authenticate using SASL (unknown error)
21/11/29 21:53:52 INFO zookeeper.ClientCnxn: Socket connection established to host224.slave.cluster.enn.cn/10.37.54.224:2181, initiating session
21/11/29 21:53:52 INFO server.NIOServerCnxnFactory: Accepted socket connection from /10.37.54.224:43544
21/11/29 21:53:52 INFO server.ZooKeeperServer: Client attempting to establish new session at /10.37.54.224:43544
21/11/29 21:53:52 INFO server.ZooKeeperServer: Established session 0x100cec978840010 with negotiated timeout 60000 for client /10.37.54.224:43544
21/11/29 21:53:52 INFO zookeeper.ClientCnxn: Session establishment complete on server host224.slave.cluster.enn.cn/10.37.54.224:2181, sessionid = 0x100cec978840010, negotiated timeout = 60000
21/11/29 21:53:52 INFO state.ConnectionStateManager: State change: ConNECTED
21/11/29 21:53:52 INFO engine.EngineRef: Launching engine:
/data1/software/spark-3.1.2/bin/spark-submit 
        --class org.apache.kyuubi.engine.spark.SparkSQLEngine 
        --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
        --conf spark.sql.hive.convertmetastoreParquet=false 
        --conf spark.sql.hive.verifyPartitionPath=true 
        --conf spark.kerberos.access.hadoopFileSystems=hdfs://nameservice:8020,hdfs://host12.master.cluster.xxx.cn:8020,hdfs://host13.master.cluster.xxx.cn:8020 
        --conf spark.executor.heartbeatInterval=20s 
        --conf spark.sql.adaptive.enabled=true 
        --conf spark.shuffle.service.enabled=true 
        --conf spark.hadoop.fs.hdfs.impl.disable.cache=true 
        --conf spark.kyuubi.engine.share.level=ConNECTION 
        --conf spark.app.name=kyuubi_CONNECTION_hive_8124e89e-ce69-42e8-9ab2-390b9c6321e3 
        --conf spark.sql.hive.metastore.jars=/data1/cloudera/parcels/CDH/lib/hive/lib/* 
        --conf spark.security.credentials.hdfs.enabled=true 
        --conf spark.sql.hive.convertmetastoreParquet.mergeSchema=true 
        --conf spark.dynamicAllocation.schedulerBacklogTimeout=1 
        --conf spark.sql.hive.filesourcePartitionFileCacheSize=786432000 
        --conf spark.sql.enable.sentry=true 
        --conf spark.driver.memory=4G 
        --conf spark.hive.warehouse.subdir.inherit.perms=false 
        --conf spark.dynamicAllocation.maxExecutors=1000 
        --conf spark.sql.adaptive.coalescePartitions.enabled=true 
        --conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_CONNECTION/hive/8124e89e-ce69-42e8-9ab2-390b9c6321e3 
        --conf spark.yarn.am.memory=4G 
        --conf spark.kyuubi.ha.zookeeper.quorum=10.37.54.224:2181 
        --conf spark.submit.deployMode=cluster 
        --conf spark.shuffle.service.port=7337 
        --conf spark.master=yarn 
        --conf spark.authenticate.enableSaslEncryption=false 
        --conf spark.shuffle.useOldFetchProtocol=true 
        --conf spark.yarn.tags=KYUUBI 
        --conf spark.authenticate=false 
        --conf spark.kyuubi.sesson.engine.initiaize.timeout=300000 
        --conf spark.executor.memory=12G 
        --conf spark.dynamicAllocation.enabled=true 
        --conf spark.executor.cores=2 
        --conf spark.dynamicAllocation.minExecutors=1 
        --conf spark.hadoopRDD.ignoreEmptySplits=true 
        --conf spark.kyuubi.ha.zookeeper.acl.enabled=false 
        --conf spark.dynamicAllocation.executorIdleTimeout=60 
        --conf spark.sql.hive.metastore.version=1.1.0 
        --proxy-user hive /data1/software/kyuubi/externals/engines/spark/kyuubi-spark-sql-engine-1.2.0.jar
21/11/29 21:53:52 INFO engine.ProcBuilder: Logging to /data1/software/kyuubi/work/hive/kyuubi-spark-sql-engine.log.19
21/11/29 21:54:27 INFO server.NIOServerCnxnFactory: Accepted socket connection from /10.37.54.150:34834
21/11/29 21:54:27 INFO server.ZooKeeperServer: Client attempting to establish new session at /10.37.54.150:34834
21/11/29 21:54:27 INFO server.ZooKeeperServer: Established session 0x100cec978840011 with negotiated timeout 60000 for client /10.37.54.150:34834
21/11/29 21:54:28 INFO client.ServiceDiscovery: Get service instance:host150.slave.cluster.enn.cn:32823 and version:Some(1.2.0) under /kyuubi_CONNECTION/hive/8124e89e-ce69-42e8-9ab2-390b9c6321e3
21/11/29 21:54:28 INFO session.KyuubiSessionImpl: [hive:10.37.54.224] SessionHandle [8124e89e-ce69-42e8-9ab2-390b9c6321e3] - Connecting to engine [host150.slave.cluster.xxx.cn:32823]
21/11/29 21:54:28 INFO session.KyuubiSessionImpl: [hive:10.37.54.224] SessionHandle [8124e89e-ce69-42e8-9ab2-390b9c6321e3] - Connected to engine [host150.slave.cluster.xxx.cn:32823]
21/11/29 21:54:28 INFO session.KyuubiSessionImpl: [hive:10.37.54.224] SessionHandle [8124e89e-ce69-42e8-9ab2-390b9c6321e3] - Sending TOpenSessionReq to engine [host150.slave.cluster.xxx.cn:32823]
21/11/29 21:54:28 INFO session.KyuubiSessionImpl: [hive:10.37.54.224] SessionHandle [8124e89e-ce69-42e8-9ab2-390b9c6321e3] - Received TOpenSessionResp from engine [host150.slave.cluster.xxx.cn:32823]
21/11/29 21:54:28 INFO imps.CuratorframeworkImpl: backgroundOperationsLoop exiting
21/11/29 21:54:28 INFO server.PrepRequestProcessor: Processed session termination for sessionid: 0x100cec978840010
21/11/29 21:54:28 INFO zookeeper.ZooKeeper: Session: 0x100cec978840010 closed
21/11/29 21:54:28 INFO server.NIOServerCnxn: Closed socket connection for client /10.37.54.224:43544 which had sessionid 0x100cec978840010
21/11/29 21:54:28 INFO session.KyuubiSessionManager: hive's session with SessionHandle [8124e89e-ce69-42e8-9ab2-390b9c6321e3] is opened, current opening sessions 1
21/11/29 21:54:28 INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x100cec978840010

接下来属于QA阶段,自问自答:

1.  编译spark为什么不用修改hive版本

因为spark支持适配不同版本的 hive metastore, 只需要指定2个参数:

spark.sql.hive.metastore.jars=/data1/cloudera/parcels/CDH/lib/hive/lib/*
spark.sql.hive.metastore.version=1.1.0 (不要写 1.1.0-cdh5.14.4,只能写1.1.0,你要是1.2.0,就写1.2.0,不要带有cdh字样)

2.  为什么下面的配置有3个,不是常用的2个namenode ? spark.kerberos.access.hadoopFileSystems=hdfs://nameservice:8020,hdfs://host12.master.cluster.enn.cn:8020,hdfs://host13.master.cluster.enn.cn:8020

 因为经过测试,也许是我不细心,不管是单独配置nameservice,还是配置2个实际的namenode都会出现连接hdfs kerberos的错误,不是一直出错,是偶尔,这就很烦躁了,因此我3个都配上去,没有出过错,因此,使用3个。

3.  如下2个参数为什么注释了?

 #spark.kerberos.keytab=/data1/software/kyuubi/kyuubi224.keytab
#spark.kerberos.principal=hive/host224.slave.cluster.xxx.cn@xxx.CN

--proxy-user和keytab只能用一个,因为kyuubi是多租户,有代理功能,在启动的时候会添加--proxy-user来代理用户,因此要注释掉。

4.  为什么要添加:spark.hive.warehouse.subdir.inherit.perms=false

因为在多租户的时候,你即可有all权限,但是文件的owner不是你,会有一个Warning出现,大概意思是   xxxx user is not the owner of thie node=xxxxxxxx, 看着很烦。

5. spark sql 在truncate会出错,权限不足

使用drop table , create table替代,这个地方报错仍然为:xxxx user is not the owner ,我就奇怪了,spark有什么毛病,我不是owner,我有rwx权限即可,你管我是不是owner干什么? 这个地方没有办法,除非你修改代码吧。

6. kyuubi高可用为什么注释了

因为kyuubi的开发写了太多的bug,尤其是这个ha,不管zk是否开启了kerberos,都无法使用。区别在于,开了了,启动kyuubi没问题,beeline连接,创建connection有问题,没开启是2个都有问题。

7. 高可用怎么做?

可以等1.3.0版本出来,或者自己去改代码。 我才用的方式是使用前端load balance,就是和hive高可用一样的道理。

8. 自定义函数怎么用?

我的做法:hdfs://nameservice:8020/sparkjars 专门放自定义jar包,使用的时候add jar即可。

9. shuflle参数:

spark.shuffle.service.enabled=true
spark.shuffle.useOldFetchProtocol=true

因为netty版本问题,因此需要添加 use Old Fetch Protocol, 否者会报错,错误为 mesage type 9啥玩意的。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/612867.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号