栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

IDEA中PYSPARK的两表关联(字段名相同)

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

IDEA中PYSPARK的两表关联(字段名相同)

前言

       使用GROUPLENS的电影评价的大数据集,Windows中IDEA2020环境中SPARK做两表关联测试学习。

       个人用户学习大数据,一般会搭建的基于Linux虚拟机的HDFS集群。而SPARK主要运行在内存中,若在虚拟机的内存中运行没有在Windows中直接运行有效率吧。所以建议SPARK的学习就在Windows中了。若想在Linux运行,写好的程序也可以修改(主要是SparkSession和读取的文件的路径)后再在Linux虚拟机的HDFS集群上运行。

一、项目环境

    Windows: IDEA2020

    JDK: java version 1.8.0_231

    Python: 3.8.3

    Spark:spark-3.2.1-bin-hadoop2.7.tgz

二、电影评价 大数据集下载

1.下载地址

    http://files.grouplens.org/datasets/movielens/

 

2.测试中用到的CSV数据文件
 2.1 电影名称 movies.csv(示例):
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
 2.2 电影评价 ratings.csv(示例):
userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931
 

三、在IDEA中用Python编写

1.引入库
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType,DoubleType
from pyspark.sql import functions as F

2.创建SparkSession的执行环境入口
if __name__ == '__main__':
    # 构建SparkSession的执行环境入口对象
    spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()

    # 通过SparkSession对象获取SparkContext 对象
    sc = spark.sparkContext

 3.读取电影评价ratings.csv数据集,创建DataFrame
    # todo 1:读取电影评价ratings.csv数据集
    # 定义结构1
    # userId,movieId,rating,timestamp
    schemaRank = StructType().add("userId", StringType(), nullable=True).
        add("movieId",IntegerType(),nullable=True). 
        add("rating",DoubleType(),nullable=True). 
        add("timestamp",StringType(),nullable=True)

    # 用,分割读取CSV文件1
    dfRank = spark.read.format("csv").option("sep", ",").
        option("header",True).
        option("encoding","utf-8").
        schema(schema=schemaRank).
        load("../data/input/ratings.csv")
            # 文件内容样例
            # userId,movieId,rating,timestamp
            # 1,1,4.0,964982703
            # 1,3,4.0,964981247
            # 1,6,4.0,964982224
            # 1,47,5.0,964983815

 4.读取电影名称movies.csv数据集,创建DataFrame
    # 定义结构2
    # movieId,title,genres
    schemaMovie = StructType().add("movieId",IntegerType(),nullable=True). 
        add("title", StringType(), nullable=True). 
        add("genres", StringType(), nullable=True)

    # 用,分割读取CSV文件2
    dfMovie = spark.read.format("csv").option("sep", ","). 
        option("header",True). 
        option("encoding","utf-8"). 
        schema(schema=schemaMovie). 
        load("../data/input/movies.csv")
        # 文件内容样例
        # movieId,title,genres
        # 1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
        # 2,Jumanji (1995),Adventure|Children|Fantasy
        # 3,Grumpier Old Men (1995),Comedy|Romance

 5.用电影名称DataFrame,建表 movies表
    dfMovie.createOrReplaceTempView("movies")
    # 验证movies表内容
    spark.sql("SELECT movieId,title,genres FROM movies").show()
        # 查询结果样例
        # +-------+--------------------+--------------------+
        # |movieId|               title|              genres|
        # +-------+--------------------+--------------------+
        # |      1|    Toy Story (1995)|Adventure|Animati...|
        # |      2|      Jumanji (1995)|Adventure|Childre...|
        # |      3|Grumpier Old Men ...|      Comedy|Romance|

  6.查询 评价次数超100次的电影,平均分排名 Top10的DataFrame
    # todo 3 查询 评价次数超100次的电影,平均分排名 Top10
    print("查询 评价次数超100次的电影,平均分排名 Top10 ")
    dfRank2 = dfRank.groupBy("movieId").agg(
        F.count("movieId").alias("cnt"),
        F.round(F.avg("rating"),2).alias("avgRank")
    ).where("cnt >100").
        orderBy("avgRank",ascending=False).
        limit(10)
    dfRank2.show()
        # 查询结果样例
        # +-------+---+-------+
        # |movieId|cnt|avgRank|
        # +-------+---+-------+
        # |    318|317|   4.43|
        # |    858|192|   4.29|
        # |   2959|218|   4.27|
    # todo 用电影评价,建表 ranks
    dfRank2.createOrReplaceTempView("ranks")

  7.两个dataframe关联,取电影名称
    # todo 4 两个dataframe关联,取电影名称

    dfRank2.join(dfMovie, "movieId", "inner").createOrReplaceTempView("movieRankTable")
    print("DataFrame风格 前10名电影名字 ")
    # todo 字段内容全部显示 show(10,False)
    spark.sql("SELECT movieId,title,avgRank,genres "
              "FROM movieRankTable ").show(10,False)

        # 查询结果样例
        # +-------+--------------------------------+-------+---------------------------------------+
        # |movieId|title                           |avgRank|genres                                 |
        # +-------+--------------------------------+-------+---------------------------------------+
        # |50     |Usual Suspects, The (1995)      |4.24   |Crime|Mystery|Thriller                 |

 8.两张表关联,取电影名称
    print("SQL风格 前10名电影名字  ")
    spark.sql("SELECt r.movieId,m.title,r.avgRank,m.genres "
              " FROM movies m,ranks r"
              " WHERe m.movieId =r.movieId").show(10,False)

        # 查询结果样例
        # +-------+--------------------------------+-------+---------------------------------------+
        # |movieId|title                           |avgRank|genres                                 |
        # +-------+--------------------------------+-------+---------------------------------------+
        # |50     |Usual Suspects, The (1995)      |4.24   |Crime|Mystery|Thriller                 |
        # |318    |Shawshank Redemption, The (1994)|4.43   |Crime|Drama                            |
        # |527    |Schindler's List (1993)         |4.23   |Drama|War                              |

四、运行结果


五.遇到的问题

   两个dataframe关联后, 出现重复列,字段相同时,抽出时报错

pyspark.sql.utils.AnalysisException: Reference 'movieId' is ambiguous, # could be: movieranktable.movieId, movieranktable.movieId.; line 1 pos 7

pyspark.sql.utils.AnalysisException: Reference 'movieId' is ambiguous,could be: movieranktable.movieId, movieranktable.movieId.; line 1 pos 7

 对策 :join(dfMovie,"movieId","inner")的方式

# @错误的写法: dfRank2.join(dfMovie, dfRank2.movieId == dfMovie.movieId).createOrReplaceTempView("movieRankTable")
# @正确的写法: dfRank2.join(dfMovie, "movieId", "inner").createOrReplaceTempView("movieRankTable")

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/886491.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号