栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark进阶(五):DataFrame和DataSet使用

spark进阶(五):DataFrame和DataSet使用

spark进阶(五):Dataframe和DataSet使用

Dataframe是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,Dataframe的数据都被组织到有名字的列中,就像关系型数据库中的表一样。此外,多种数据都可以转化为Dataframe,例如Spark计算过程中生成的RDD、结构化数据文件、Hive中的表、外部数据库等。

在Spark中,一个Dataframe所代表的是一个元素类型为Row的Dataset,即Dataframe只是Dataset[Row]的一个类型别名。相对于RDD,Dataset提供了强类型支持,在RDD的每行数据加了类型约束。而且使用DatasetAPI同样会经过Spark SQL优化器的优化,从而提高程序执行效率。

Dataframe和R的数据结构以及python pandas Dataframe的数据结构和操作基本一致。

一、创建Dataframe、DataSet
  • 创建RDD
  • RDD转化为ROW
  • 通过ROW和元数据信息生成Dataframe
  • 然后通过Dataframe和对应的类转化为DataSet
  • 也就是说Dataframe是DataSet[Row],这里可以通过指定的类将其转化,DataSet[User]
  • 需要注意的事转化使用的类需要时内部类,然后就是类里的变量名要和元数据信息的列名保持对齐
object MovieLenDataSet {
  case class User(UserID:String, Gender:String, Age:String, Occupation:String, Zip_Code:String)
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession.builder()
      .appName("MovieLenDataSet")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    val dataPath = "/home/ffzs/data/ml-1m"
    val schema4users = StructType(
      "UserID::Gender::Age::Occupation::Zip_code"
        .split("::")
        .map(it => StructField(it, StringType, nullable = true))
    )

    val usersRdd = spark.sparkContext.textFile(f"$dataPath/users.dat")
    val usersRows = usersRdd.map(_.split("::"))
      .map(it => {
        it.map(_.trim)
      })
      .map(it => Row(it(0), it(1), it(2), it(3), it(4)))
    val usersDF: Dataframe = spark.createDataframe(usersRows, schema4users)
    val usersDataSet = usersDF.as[User]
    usersDataSet.show(5)
  }
}
二、DataSet使用练习 1.最常见电影类型
  • 对电影类型进行split,然后再聚合计数
  • 然后再通过计数进行排序
println("最常见电影类型:")
moviesDataSet.select("Genres")
  .flatMap(_(0).toString.split("\|"))
  .map(genre => (genre, 1))
  .groupBy("_1")
  .sum()
  .withColumnRenamed("_1", "genre")
  .withColumnRenamed("sum(_2)", "sum")
  .orderBy($"sum".desc)
  .show(5)

最多的居然是戏剧有点意外。

2.最常见的电影类型中最受欢迎的电影
  • 首先筛选出 有戏剧(Drama)类型的电影
  • 然后通过MovieID获取这些电影的观看的用户
  • 然后通过通过电影聚合获取电影的平均评分和评论数
  • 筛选出评论数大于10的电影
  • 最后排序输出
println("最常见的电影类型中最受欢迎的电影(观看人数大于10):")
val mostMovieGenre = "Drama"
moviesDataSet.filter(it => it.Genres.split("\|").toSet.contains(mostMovieGenre))
  .join(ratingsDataSet.select("MovieID", "Rating"), usingColumn = "MovieID")
  .groupBy("MovieID", "Title")
  .agg("Rating"->"avg", "Rating"->"count")
  .filter($"count(Rating)">10)
  .orderBy($"avg(Rating)".desc, $"count(Rating)".desc)
  .show(3)

口碑最好的是:七武士

3.获取评论人数最多的电影
  • 直接通过MovieID聚合求出每个电影的评论数
  • 然后再对数量进行排序
println("获取评论数量最多电影:")
ratingsDataSet.groupBy("MovieID")
  .count()
  .orderBy($"count".desc)
  .show(2)

4.最多评论电影不同年龄段男女观看情况
    println("最多评论电影不同年龄段男女观看情况")
    ratingsDataSet.filter(_.MovieID.equals(mostReviewMovieID)).select("UserID")
      .join(usersDataSet.select("UserID", "Age", "Gender"), "UserID")
      .groupBy("Age")
      .pivot("Gender")
      .count()
      .orderBy($"Age")
      .show()

5.通过SQL进行操作
  • 通过将DataSet生成视图
  • 然后通过视图名进行SQL操作
ratingsDataSet.createTempView("rating")
spark.sql("select MovieID, count(1) cnt from rating group by MovieID limit 1").show()

+-------+--------+
|MovieID|count(1)|
+-------+--------+
|   2294|     645|
+-------+--------+
6.存储
  • 将DataSet进行存储
  • mode有四种:
    • overwrite: 替换
    • Append:在之后进行追加
    • ErrorIfExists:如果存在则报错,默认方式
    • Ignore:存在的话不进行操作
    println("将DataSet数据存储打到HDFS中")
    ratingsDataSet.write
      .mode(SaveMode.Overwrite)
      .parquet("hdfs://localhost:9000/movieLen/rating")

写入成功:

完整代码:
object MovieLenDataSet {
  case class User(UserID:String, Gender:String, Age:String, Occupation:String, Zip_Code:String)
  case class Rating(UserID:String, MovieID:String, Rating:Double, Timestamp: String)
  case class Movie(MovieID:String, Title:String, Genres:String)
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession.builder()
      .appName("MovieLenDataSet")
      .master("local[*]")
//      .master("spark://localhost:7077")
      .getOrCreate()
    import spark.implicits._

    val dataPath = "/home/ffzs/data/ml-1m"

    // 构造user的DataSet  通过Row转化为Dataframe 然后再转化为DataSet
    val schema4users = StructType(
      "UserID::Gender::Age::Occupation::Zip_code"
        .split("::")
        .map(it => StructField(it, StringType, nullable = true))
    )
    val usersRdd = spark.sparkContext.textFile(f"$dataPath/users.dat")
    val usersRows = usersRdd.map(_.split("::"))
      .map(it => {it.map(_.trim)})
      .map(it => Row(it(0), it(1), it(2), it(3), it(4)))
    val usersDF: Dataframe = spark.createDataframe(usersRows, schema4users)
    val usersDataSet = usersDF.as[User].cache()

    // 构造movie的DataSet 直接读取文件生成Dataframe 然后转化为DataSet
    val moviesRows = spark.read.textFile(f"$dataPath/movies.dat")
    val moviesDataSet = moviesRows.map(row => {
      val values = row.split("::").map(_.trim)
      Movie(values(0), values(1), values(2))
    }).cache()

    // 构造rating的DataSet
    val ratingsRows = spark.read.textFile(f"$dataPath/ratings.dat")
    val ratingsDataSet = ratingsRows.map(row => {
      val values = row.split("::").map(_.trim)
      Rating(values(0), values(1), values(2).toDouble, values(3))
    }).cache()


    println("最常见的电影类型中最受欢迎的电影(观看人数大于10):")
    val mostMovieGenre = "Drama"
    moviesDataSet.filter(it => it.Genres.split("\|").toSet.contains(mostMovieGenre))
      .join(ratingsDataSet.select("MovieID", "Rating"), usingColumn = "MovieID")
      .groupBy("MovieID", "Title")
      .agg("Rating"->"avg", "Rating"->"count")
      .filter($"count(Rating)">10)
      .orderBy($"avg(Rating)".desc, $"count(Rating)".desc)
      .show(3)

    println("获取评论数量最多电影ID:")
    val mostReviewMovieID = ratingsDataSet.groupBy("MovieID")
      .count()
      .orderBy($"count".desc)
      .first()(0)

    println("最多评论电影不同年龄段男女观看情况")
    ratingsDataSet.filter(_.MovieID.equals(mostReviewMovieID)).select("UserID")
      .join(usersDataSet.select("UserID", "Age", "Gender"), "UserID")
      .groupBy("Age")
      .pivot("Gender")
      .count()
      .orderBy($"Age")
      .show()

    println("通过SQL语句对dataset数据进行操作")
    ratingsDataSet.createTempView("rating")
    spark.sql("select MovieID, count(1) cnt from rating group by MovieID limit 1").show()

    println("将DataSet数据存储打到HDFS中")
    ratingsDataSet.write
      .mode(SaveMode.Overwrite)
      .parquet("hdfs://localhost:9000/movieLen/rating")
  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/302862.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号