(1)统计每个公司每年按月累计收入
—(2)统计每个公司当月比上年同期增长率
—coalesce()函数,可以传入多个 参数,如果第一个参数为null,值为第二数,第二个为null,值为第三个…
object Demo05liti2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("Demo05liti2")
.master("local")
.config("spark.sql.shuffle.partitions", 3)
.getOrCreate()
//导入所有函数和隐式函数依赖
import spark.implicits._
import org.apache.spark.sql.functions._
val burDf = spark.read
.format("csv")
.schema("burk String,year String,tsl01 String,tsl02 String,tsl03 String," +
"tsl04 String,tsl05 String,tsl06 String,tsl07 String,tsl08 String,tsl09 String," +
"tsl10 String,tsl11 String,tsl12 String")
.option("seq", ",")
.load("sparkproject/data/bulks.txt")
//注册表数据
burDf.createOrReplaceTempView("burlks")
//使用sql的方式实现
spark.sql(
"""
|select burk
| ,year
| ,month
| ,price
| ,sum(price) over(partition by burk,year order by month) as rk
|from burlks
|lateral view explode(Map(1,tsl01,2,tsl02,3,tsl03,4,tsl04,5,tsl05,6,tsl06,7,tsl07,8,tsl08,9,tsl09,10,tsl10,11,tsl11,12,tsl12))v1 as month,price
|
""".stripMargin)
.show()
println("-" * 50)
//使用DSL的方式实现
//不能直接传入map,需要使用expr包一下
val mapl = map(expr("1"), $"tsl01"
, expr("2"), $"tsl02"
, expr("3"), $"tsl03"
, expr("4"), $"tsl04"
, expr("5"), $"tsl05"
, expr("6"), $"tsl06"
, expr("7"), $"tsl07"
, expr("8"), $"tsl08"
, expr("9"), $"tsl09"
, expr("10"), $"tsl10"
, expr("11"), $"tsl11"
, expr("12"), $"tsl12"
)
burDf
.select($"burk", $"year", explode(mapl) as Array("month", "price"))
.select($"burk", $"year", $"month", $"price", sum($"price") over Window.partitionBy($"burk", $"year").orderBy($"month", $"month") as "sum_price")
println("-" * 60)
//使用DSL方式实现
burDf
.select($"burk", $"year", explode(mapl) as Array("month", "price"))
//使用lag函数往前看1个数据,以月份进行分区,相同月份在一起往前看1
.select($"burk", $"year", $"month", $"price", lag($"price", 1) over Window.partitionBy($"burk", $"month").orderBy($"year") as "last_price")
.select($"burk", $"year", $"month", round(coalesce($"price" / $"last_price" - 1,expr("1")),5) as "incrprer")
.show(100)
}
}



