栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark SQL将计算指标的特定的列转换为Json,其他列不变

Spark SQL将计算指标的特定的列转换为Json,其他列不变

1.实现效果前后对比

parquet文件链接:https://pan.baidu.com/s/1dmugj-ty47Hgi6WLAPaiGQ?pwd=yyds
提取码:yyds
--来自百度网盘超级会员V2的分享

          原表格(spark自带的parquet文件)user.parquet
namefavorite_colorfavorite_numbers
Alyssanull[3, 9, 15, 20]
 Ben red  [  ]

======>转换后

          实现name,favorite_color转json的表格
  Newcol    favorite_numbers
{“name”: “Alyssa”,”favorite_color”: “null”}[3, 9, 15, 20]
{“name”: “ Ben”,”favorite_color”: “red”}  [  ]
2.代码附上
package com.sz.table_ddl.test

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.{struct, to_json}

object col_json {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new
        SparkConf().setMaster("local[*]").setAppName("SparkSQL")
    //1.todo 建立和spark框架的链接
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    //禁用广播
    spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1")
    //2. todo 读取文件
    val string_quet=spark.read.format("parquet").load("D:\a\users.parquet")
    //创建虚拟表
    string_quet.createOrReplaceTempView("users")
    //显示parquet文件的数据
    string_quet.show(5)

    //3. todo 计算指标
    val rs=spark.sql(
      """
        |select
        |s.name as name,
        |s.favorite_color as color,
        |s.favorite_numbers as numbers
        |from users s
        |""".stripMargin)

    rs.show()
    //导包--> import org.apache.spark.sql.functions.{struct, to_json}
    val finalDF = rs.withColumn("Newcol", to_json(struct("color","numbers")))
    //一定要用sql得到列的别名
    //selectExpr查询指定列
    finalDF.selectExpr("Newcol","numbers").show(false)



    
    //当然得到的结果也可以导入到mysql

    //1. todo mysql数据库建表
    // 建表中`numbers` enum,没有测过,思路是这样
    
   //2. todo 导入数据到mysql
    finalDF.selectExpr("Newcol","numbers").write.format("jdbc")
      // todo option:jdbc里面的四大金刚 url,table,user,password
      .option("url", "jdbc:mysql://localhost:3306/names?&useUnicode=true&characterEncoding=utf8")
      .option("dbtable", "tab_json02")
      .option("user", "root")
      .option("password", "root")
      // todo 更新
      // .mode(SaveMode.Overwrite)
      // todo 追加
      .mode(SaveMode.Append)
      .save()

    
    // todo 释放资源
    spark.stop()
  }
}

3.代码结果

参考:

Spark SQL指定特定的列转换为Json_呼呼的小窝-CSDN博客_spark sql 结果转为json

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/720957.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号