栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark读取es数据的array类型

Spark读取es数据的array类型

最近西瓜用Spark读取ES上的数据时,发现报了一个错误:

21/12/07 10:44:35 WARN ScalaRowValueReader: Field 'vod_cat_tags' is backed by an array but the associated Spark Schema does not reflect this;
              (use es.read.field.as.array.include/exclude) 
21/12/07 10:44:35 WARN ScalaRowValueReader: Field 'vod_cat_tags' is backed by an array but the associated Spark Schema does not reflect this;
              (use es.read.field.as.array.include/exclude) 
21/12/07 10:44:35 WARN ScalaRowValueReader: Field 'vod_cat_tags' is backed by an array but the associated Spark Schema does not reflect this;
              (use es.read.field.as.array.include/exclude) 
21/12/07 10:44:35 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.elasticsearch.hadoop.rest.EsHadoopParsingException: org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field 'vod_cat_tags.level5_name' not found; typically this occurs with arrays which are not mapped as single value
	at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:514)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:292)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:262)
	at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:313)
	at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:93)
	at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:61)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

数据如图:

由此可以得知,报错主要是因为vod_cat_tags没有被读取。西瓜最先参考的文章是https://blog.csdn.net/data_curd/article/details/109497126,但该文章上没有详细的注释,西瓜对各项参数都不了解,导致知道错在哪却不知道怎么改。后来上网查看了相关资料,发现问题出现在es.read.field.as.array.include这项参数上,把参数调整好就OK了,在conf.set或spark.sqlContext.read.format("").option上调整都可以,最后代码如下: 

    val conf = new SparkConf().setAppName("test").setMaster("local[*]")
    conf.set("es.index.auto.create","true")   //是否自动创建index
    conf.set("es.nodes","192.168.128.130")    //es节点
    conf.set("es.port","9200")                //es端口
    conf.set("es.nodes.wan.only","true")      //所有操作通过声明的es.nodes连接,公网时必传
    conf.set("es.read.field.as.array.include","tags,vod_cat_tags,vao_cat_tags.level5_name")   //读es的时候,指定将哪些字段作为数组类型
    val spark = new SparkSession.Builder().enableHiveSupport().config(conf).getOrCreate()
    val ess = spark.sqlContext.read.format("org.elasticsearch.spark.sql")
      .option("inferSchema", "true")
//      .option("es.read.field.as.array.include","tags,vod_cat_tags,vao_cat_tags.level5_name")    //效果跟前面的一样
      .load("media_index")      //索引名
    ess.show(false)

 可能有小伙伴对上面的注释还不能完全看懂,在此附上西瓜找到的资料:es配置https://www.cnblogs.com/upupfeng/p/12205657.html

option参数https://blog.csdn.net/OldDirverHelpMe/article/details/106120312

有其他问题可以提出来,一起进步

最后,不喜勿喷

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/651478.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号