栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SparkSQL 经典案例(SQL&DSL)风格详解

SparkSQL 经典案例(SQL&DSL)风格详解

案例一:连续活跃用户案例

#SQL风格

import org.apache.spark.sql.catalyst.plans.logical.Window
import org.apache.spark.sql.{Dataframe, SparkSession}
//连续活跃用户案例
//求连续登录天数大于或等于两天的用户记录
object ContenueActiveUser_SQL {
    def main(args: Array[String]): Unit = {
        //获取session
        val session: SparkSession = SparkSession
          .builder()
          .master("local[*]")
          .appName("")
          .getOrCreate()

        //纯SQL进行查询
        val df: Dataframe = session
          .read
          .option("header", "true")
          .csv("Files/data1.txt")

//        df.show()
        df.createTempView("view_log")

        
        val df2: Dataframe = session.sql(
            """
              |
              |select
              |uid,
              |min(dt) as min_dt,
              |max(dt) as max_dt,
              |count(date_diff) as times
              |from
              |(select
              |uid,
              |dt,
              |date_sub(dt,dt_num) as date_diff
              | from
              |  (
              |         select
              |         uid,
              |         dt,
              |         row_number() over(partition by uid order by dt asc) as dt_num
              |         from
              |             (
              |             select
              |         distinct(uid,dt),uid,dt
              |         from view_log
              |         )t1
              | )t2)
              | group by uid,date_diff having times>=3
              |""".stripMargin)
//        df2.show()
//        session.stop()
    }
}

 #DSL风格

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{Dataframe, SparkSession}

//连续活跃用户案例
//求连续登录天数大于或等于两天的用户记录
object ContenueActiveUser_DSL {
    def main(args: Array[String]): Unit = {

        //获取session
        val session: SparkSession = SparkSession
          .builder()
          .master("local[*]")
          .appName("")
          .getOrCreate()

        //纯sql进行查询数据
        val df: Dataframe = session
          .read
          .option("header", "true")
          .csv("data1.txt")

        //注意:是命名为session 不是spark
        import session.implicits._
        import org.apache.spark.sql.functions._

        
        df.distinct().
          select('uid,'dt,
              (row_number() over (Window.partitionBy("uid").orderBy("dt"))) as 'rn
          )
          .select(
              'uid,
              'dt,
              date_sub('uid,'dt) as'date_diff
          ).groupBy('uid, 'date_diff)
          //假如要多个聚合时,使用agg
          .agg(
              min("dt"),
              max("dt"),
              count("*") as "times"
          ).where('times >= 2)
          .drop("date_diff")
          .show()
    }
}

案例二:店铺每月累计案例

#SQL风格

import org.apache.spark.sql.{Dataframe, SparkSession}
//店铺每月累计案例

object ShopMonthAdd_SQL {
    def main(args: Array[String]): Unit = {
        //1.获取session
        val session: SparkSession = SparkSession
          .builder()
          .master("local[*]")
          .appName("")
          .getOrCreate()

        

        //2.纯SQL进行查询
        val df: Dataframe = session
          .read
          .option("header", "true")
          .csv("Files/data2.txt")

        //3.创建临时视图
        df.createTempView("view_shop")

        //4.SQL风格进行查询 naem time money
        session.sql(
            """
              |select
              |sid,
              |mth,
              |sum(mth_money) over(partition by sid order by mth) as total_money
              |from
              |(
              |select
              |sid,
              |mth,
              |sum(money) as mth_money
              |from
              |(
              |select
              |sid,
              |date_format(dt,"yyyy-MM") as mth,
              |cast(money as double) as money
              |from view_shop
              |) t1 group by sid,mth) t2
              |
              |""".stripMargin).show()
        session.stop()
    }
}

#DSL风格

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.{Dataframe, SparkSession}

//店铺每月累计案例

object ShopMonthAdd_DSL {
    def main(args: Array[String]): Unit = {
        //1.获取session
        val session: SparkSession = SparkSession
          .builder()
          .master("local[*]")
          .appName("")
          .getOrCreate()

        

        //2.纯SQL进行查询
        val df: Dataframe = session
          .read
          .option("header", "true")
          .csv("Files/data2.txt")

        
        import session.implicits._
        import org.apache.spark.sql.functions._

        df.select($"sid",
            'money.cast(DataTypes.DoubleType) as "money",
            expr("date_format(dt, 'yyyy-MM') as mth")
        ).groupBy("sid", "mth").
          sum("money")
          .withColumnRenamed("sum(money)", "mth_money")
          .select(
              $"sid",
              $"mth",
              sum("mth_money").over(Window.partitionBy("sid")
                .orderBy("mth")) as "total_money"
          ).show()
    }
}

案例三:流量统计

#SQL风格

import org.apache.spark.sql.{Dataframe, SparkSession}
//流量统计
object FlowTotal_SQL {
    def main(args: Array[String]): Unit = {
        //1.获取session
        val session: SparkSession = SparkSession
          .builder()
          .master("local[*]")
          .appName("")
          .getOrCreate()

        //2.纯SQL进行查询
        val df: Dataframe = session
          .read
          .option("header", "true")
          .csv("Files/data3.txt")

        //3.创建临时视图
        df.createTempView("view_flow")

        

        //4.SQL风格进行查询
        session.sql(
            """
              |select
              |uid,
              |min(start_dt) as start_dt,
              |max(end_dt) as end_dt,
              |sum(flow) as flow
              |from
              |(
              |select
              |uid,
              |start_dt,
              |end_dt,
              |sum(lag_num) over(partition by uid order by start_dt) as flag,
              |flow
              |from
              |(
              |select
              |uid,
              |start_dt,
              |end_dt,
              |if((to_unix_timestamp(start_dt)-to_unix_timestamp(lag_time))/60>10,1,0) as lag_num,
              |flow
              |from
              |(
              |select
              |uid,
              |start_dt,
              |end_dt,
              |flow,
              |lag(end_dt,1,start_dt) over(partition by uid order by start_dt) as lag_time
              |from view_flow
              |)t1 )t2 )t3 group by uid,flag
              |""".stripMargin).show()

        session.stop()
    }
}

#DSL风格

 

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.{Dataframe, SparkSession}

//流量统计
object FlowTotal_DSL {
    def main(args: Array[String]): Unit = {
        //1.获取session
        val session: SparkSession = SparkSession
          .builder()
          .master("local[*]")
          .appName("")
          .getOrCreate()

        //2.纯SQL进行查询
        val df: Dataframe = session
          .read
          .option("header", "true")
          .csv("Files/data3.txt")

        
        import session.implicits._
        import org.apache.spark.sql.functions._
        df.select($"sid",
            'money.cast(DataTypes.DoubleType) as "money",
            expr("date_format(dt, 'yyyy-MM') as mth")
        ).groupBy("sid", "mth").
          sum("money")
          .withColumnRenamed("sum(money)", "mth_money")
          .select(
              $"sid",
              $"mth",
              sum("mth_money").over(Window.partitionBy("sid")
                .orderBy("mth")) as "total_money"
          ).show()
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350020.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号