基于数据 按城市统计每个区县的游客人数top3
—主要运用sql语句中开窗函数的使用
object Demo04Dian {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local")
.appName("Demo04Dian")
.config("spark.sql.shuffle.partitions", "3")
.getOrCreate()
//导入所有函数依赖
import org.apache.spark.sql.functions._
//导入隐式函数
import spark.implicits._
//读取数据
val dianxinDF = spark.read
.format("csv")
.option("seq", ",")
.schema("mdn String,grid_id String,city_id String,county_id String," +
"t String,start_time String,end_time String,date String")
.load("sparkproject/data/dian.txt")
//注册视图
dianDF.createOrReplaceTempView("dian")
//sql语句实现需求
spark.sql(
"""
|select tt1.city_id
| ,tt1.county_id
| ,tt1.sumcount
| ,tt1.rk
|from(select t1.city_id
| ,t1.county_id
| ,t1.sumcount
| ,row_number() over(partition by t1.city_id order by t1.sumcount desc) as rk
|from(select city_id
| ,county_id
| ,count(distinct mdn) as sumcount
| from dian
| group by city_id,county_id
|)as t1
|) as tt1 where tt1.rk<=3
""".stripMargin)
.show()
println("-" * 50)
dianDF.groupBy($"city_id", $"county_id")
.agg(countDistinct($"mdn") as "num")
.select($"city_id", $"county_id", $"num", row_number().over(Window.partitionBy($"city_id").orderBy($"num".desc)) as "rk")
.where($"rk" <= 3)
.select($"city_id", $"county_id", $"num", $"rk")
.show()
}
}



