栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 其他 > Spark

通过扩展 Spark SQL ,打造自己的大数据分析引擎

Spark 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力



Spark SQL 的 Catalyst ,这部分真的很有意思,值得去仔细研究一番,今天先来说说Spark的一些扩展机制吧,上一次写Spark,对其SQL的解析进行了一定的魔改,今天我们按套路来,使用砖厂为我们提供的机制,来扩展Spark...

首先我们先来了解一下 Spark SQL 的整体执行流程,输入的查询先被解析成未关联元数据的逻辑计划,然后根据元数据和解析规则,生成逻辑计划,再经过优化规则,形成优化过的逻辑计划(RBO),将逻辑计划转换成物理计划在经过代价模型(CBO),输出真正的物理执行计划。



我们今天举三个扩展的例子,来进行说明。

扩展解析器

这个例子,我们扩展解析引擎,我们对输入的SQL,禁止泛查询即不许使用select *来做查询,以下是解析的代。

  1. package wang.datahub.parser  
  2. import org.apache.spark.sql.catalyst.analysis.UnresolvedStar import org.apache.spark.sql.catalyst.expressions.expression 
  3. import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} 
  4. import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.types.{DataType, StructType} 
  5.  class MyParser(parser: ParserInterface) extends ParserInterface { 
  6.    override def parsePlan(sqlText: String): LogicalPlan = { 
  7.    val logicalPlan = parser.parsePlan(sqlText)    logicalPlan transform { 
  8.      case project @ Project(projectList, _) =>        projectList.foreach { 
  9.          name =>            if (name.isInstanceOf[UnresolvedStar]) { 
  10.              throw new RuntimeException("You must specify your project column set," +                " * is not allowed.") 
  11.           }       } 
  12.        project   } 
  13.    logicalPlan } 
  14.    
  15.  override def parseexpression(sqlText: String): expression = parser.parseexpression(sqlText)  
  16.    override def parseTableIdentifier(sqlText: String): TableIdentifier = 
  17.    parser.parseTableIdentifier(sqlText)  
  18.    override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = 
  19.    parser.parseFunctionIdentifier(sqlText)  
  20.   
  21.  override def parseTableSchema(sqlText: String): StructType =    parser.parseTableSchema(sqlText) 
  22.    
  23.  override def parseDataType(sqlText: String): DataType = parser.parseDataType(sqlText) } 

接下来,我们测试一下

  1. package wang.datahub.parser  
  2. import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} import org.apache.spark.sql.catalyst.parser.ParserInterface 
  3.  object MyParserApp { 
  4.  def main(args: Array[String]): Unit = {    System.setProperty("hadoop.home.dir","E:\devlop\envs\hadoop-common-2.2.0-bin-master"); 
  5.    type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface    type ExtensionsBuilder = SparkSessionExtensions => Unit 
  6.    val parserBuilder: ParserBuilder = (_, parser) => new MyParser(parser)    val extBuilder: ExtensionsBuilder = { e => e.injectParser(parserBuilder)} 
  7.    val spark =  SparkSession     .builder() 
  8.     .appName("Spark SQL basic example")     .config("spark.master", "local[*]") 
  9.     .withExtensions(extBuilder)     .getOrCreate() 
  10.     spark.sparkContext.setLogLevel("ERROR") 
  11.     import spark.implicits._ 
  12.     val df = Seq( 
  13.     ( "First Value",1, java.sql.Date.valueOf("2010-01-01")),     ( "First Value",4, java.sql.Date.valueOf("2010-01-01")), 
  14.     ("Second Value",2,  java.sql.Date.valueOf("2010-02-01")),     ("Second Value",9,  java.sql.Date.valueOf("2010-02-01")) 
  15.   ).toDF("name", "score", "date_column")    df.createTempView("p") 
  16.     //   val df = spark.read.json("examples/src/main/resources/people.json") 
  17.    //   df.toDF().write.saveAsTable("person")    //,javg(score) 
  18.     // custom parser 
  19.    //   spark.sql("select * from p ").show  
  20.    spark.sql("select * from p").show() } 

下面是执行结果,符合我们的预期。



扩展优化器

接下来,我们来扩展优化器,砖厂提供了很多默认的RBO,这里可以方便的构建我们自己的优化规则,本例中我们构建一套比较奇怪的规则,而且是完全不等价的,这里只是为了说明。

针对字段+0的操作,规则如下:

    如果0出现在+左边,则直接将字段变成右表达式,即 0+nr 等效为 nr 如果0出现在+右边,则将0变成3,即 nr+0 变成 nr+3 如果没出现0,则表达式不变

下面是代码:

  1. package wang.datahub.optimizer  
  2. import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Add, expression, Literal} 
  3. import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule 
  4.  object MyOptimizer extends Rule[LogicalPlan] { 
  5.   def apply(logicalPlan: LogicalPlan): LogicalPlan = { 
  6.    logicalPlan.transformAllexpressions {      case Add(left, right) => { 
  7.        println("this this my add optimizer")        if (isStaticAdd(left)) { 
  8.          right       } else if (isStaticAdd(right)) { 
  9.          Add(left, Literal(3L))       } else { 
  10.          Add(left, right)       } 
  11.     }   } 
  12. }  
  13.  private def isStaticAdd(expression: expression): Boolean = {    expression.isInstanceOf[Literal] && expression.asInstanceOf[Literal].toString == "0" 
  14. }  
  15.  def main(args: Array[String]): Unit = {    System.setProperty("hadoop.home.dir","E:\devlop\envs\hadoop-common-2.2.0-bin-master"); 
  16.    val testSparkSession: SparkSession = SparkSession.builder().appName("Extra optimization rules")     .master("local[*]") 
  17.     .withExtensions(extensions => {        extensions.injectOptimizerRule(session => MyOptimizer) 
  18.     })     .getOrCreate() 
  19.     testSparkSession.sparkContext.setLogLevel("ERROR") 
  20.     import testSparkSession.implicits._ 
  21.    testSparkSession.experimental.extraOptimizations = Seq()    Seq(-1, -2, -3).toDF("nr").write.mode("overwrite").json("./test_nrs") 
  22. //   val optimizedResult = testSparkSession.read.json("./test_nrs").selectExpr("nr + 0")    testSparkSession.read.json("./test_nrs").createTempView("p") 
  23.     var sql = "select nr+0 from p"; 
  24.    var t = testSparkSession.sql(sql)    println(t.queryExecution.optimizedPlan) 
  25.    println(sql)    t.show() 
  26.     sql = "select 0+nr from p"; 
  27.    var  u = testSparkSession.sql(sql)    println(u.queryExecution.optimizedPlan) 
  28.    println(sql)    u.show() 
  29.     sql = "select nr+8 from p"; 
  30.    var  v = testSparkSession.sql(sql)    println(v.queryExecution.optimizedPlan) 
  31.    println(sql)    v.show() 
  32. //   println(optimizedResult.queryExecution.optimizedPlan.toString() ) //   optimizedResult.collect().map(row => row.getAs[Long]("(nr + 0)")) 
  33.    Thread.sleep(1000000) } 
  34.  } 

执行如下

  1. this this my add optimizer this this my add optimizer 
  2. this this my add optimizer Project [(nr#12L + 3) AS (nr + CAST(0 AS BIGINT))#14L] 
  3. +- Relation[nr#12L] json  
  4. select nr+0 from p this this my add optimizer 
  5. this this my add optimizer this this my add optimizer 
  6. +------------------------+ |(nr + CAST(0 AS BIGINT))| 
  7. +------------------------+ |                       2| 
  8. |                       1| |                       0| 
  9. +------------------------+  
  10. this this my add optimizer Project [nr#12L AS (CAST(0 AS BIGINT) + nr)#21L] 
  11. +- Relation[nr#12L] json  
  12. select 0+nr from p this this my add optimizer 
  13. +------------------------+ |(CAST(0 AS BIGINT) + nr)| 
  14. +------------------------+ |                     -1| 
  15. |                     -2| |                     -3| 
  16. +------------------------+  
  17. this this my add optimizer this this my add optimizer 
  18. this this my add optimizer Project [(nr#12L + 8) AS (nr + CAST(8 AS BIGINT))#28L] 
  19. +- Relation[nr#12L] json  
  20. select nr+8 from p this this my add optimizer 
  21. this this my add optimizer this this my add optimizer 
  22. +------------------------+ |(nr + CAST(8 AS BIGINT))| 
  23. +------------------------+ |                       7| 
  24. |                       6| |                       5| 
  25. +------------------------+ 
扩展策略

SparkStrategies包含了一系列特定的Strategies,这些Strategies是继承自QueryPlanner中定义的Strategy,它定义接受一个Logical Plan,生成一系列的Physical Plan

通过Strategies把逻辑计划转换成可以具体执行的物理计划,代码如下

  1. package wang.datahub.strategy  
  2. import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan 
  3. import org.apache.spark.sql.execution.SparkPlan  
  4. object MyStrategy extends Strategy {  def apply(plan: LogicalPlan): Seq[SparkPlan] = { 
  5.    println("Hello world!")    Nil 
  6. }  
  7.  def main(args: Array[String]): Unit = {    System.setProperty("hadoop.home.dir","E:\devlop\envs\hadoop-common-2.2.0-bin-master"); 
  8.    val spark = SparkSession.builder().master("local").getOrCreate()  
  9.    spark.experimental.extraStrategies = Seq(MyStrategy)    val q = spark.catalog.listTables.filter(t => t.name == "six") 
  10.    q.explain(true)    spark.stop() 
  11. } } 

执行效果



好了,扩展部分就先介绍到这,接下来我计划可能会简单说说RBO和CBO,结合之前做过的一个小功能,一条SQL的查询时间预估。

本文转载自微信公众号「麒思妙想」,可以通过以下二维码关注。转载本文请联系麒思妙想公众号。



 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/796551.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号