- SparkSession
#coding:utf8
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
sc = spark.sparkContext
df = spark.read.csv('hdfs://101.133.232.96:8020/input/stu_score.txt',sep=',',header=False)
df2 = df.toDF("id","name","score")
df2.printSchema()
df2.show()
df2.createTempView("score")
spark.sql("""
select * from score where name = '语文' limit 5
"""
).show()
2. Dataframe代码的构建-基于RDD方式1
#coding:utf8
from pyspark.sql import SparkSession
# /opt/module/spark/bin/spark-submit /opt/Code/SparkSQL.py
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile('hdfs://101.133.232.96:8020/input/people.txt').
map(lambda x:x.split(',')).map(lambda x:(x[0],int(x[1])))
# 构建Dataframe对象
# 参数1 被转换的RDD
# 参数2 指令列名,通过list的形式指定
df = spark.createDataframe(rdd,schema = ['name','age'])
df.printSchema()
df.show()
基于RDD方式2
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile('hdfs://101.133.232.96:8020/input/people.txt').
map(lambda x:x.split(',')).map(lambda x:(x[0],int(x[1])))
schema = StructType().add('name',StringType(),nullable=True)
.add('age',IntegerType(),nullable=False)
df = spark.createDataframe(rdd,schema = schema)
df.printSchema()
df.show()
基于RDD方式3
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile('hdfs://101.133.232.96:8020/input/people.txt').
map(lambda x:x.split(',')).map(lambda x:(x[0],int(x[1])))
# toDF方式构建Dataframe
df1 = rdd.toDF(["name","age"])
df1.printSchema()
df1.show()
# toDF 的方式2构建Dataframe
scheme = StructType().add('name',StringType(),nullable=True)
.add('age',IntegerType(),nullable=True)
df2 = rdd.toDF(scheme)
df2.printSchema()
df2.show()
Dataframe的代码构建 - 基于Pandas的Dataframe
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
sc = spark.sparkContext
# 基于Pandas的Dataframe构建SparkSQL的dataframe
pdf = pd.Dataframe({
"id":[1,2,3],
'name':['张大仙','王小小','吕不韦'],
'age':[11,21,32]
})
df = spark.createDataframe(pdf)
df.printSchema()
df.show()
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
sc = spark.sparkContext
schema = StructType().add('data',StringType(),nullable=True)
df = spark.read.format('text').schema(schema=schema).
load('hdfs://101.133.232.96/input/people.txt')
df.printSchema()
df.show()
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
sc = spark.sparkContext
df = spark.read.format('json').load('hdfs://101.133.232.96/input/people.json')
df.printSchema()
df.show()
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
sc = spark.sparkContext
df = spark.read.format('csv')
.option('sep',';')
.option('header',True)
.option('encoding',"utf-8")
.schema('name STRING,age INT,job STRING')
.load('hdfs://101.133.232.96/input/people.csv')
df.printSchema()
df.show()
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
sc = spark.sparkContext
df = spark.read.format('parquet')
.load('hdfs://101.133.232.96/input/users.parquet')
df.printSchema()
df.show()
DSL -select,filter和where,group by
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
sc = spark.sparkContext
df = spark.read.format('csv')
.schema('id INT,subject STRING,score INT')
.load('hdfs://101.133.232.96/input/stu_score.txt')
# DLS 风格演示
df.select(["id","subject"]).show()
df.select("id","subject").show()
# Column对象的获取
id_column = df['id']
subject_column = df['subject']
df.select(id_column,subject_column).show()
#filter
df.filter("score < 99").show()
df.filter(df['score']<99).show()
#group by
# 调用聚合方法后,返回值依旧是dataframe
df.groupBy("subject").count().show()
df.groupBy(df['subject']).count().show()
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
sc = spark.sparkContext
df = spark.read.format('csv')
.schema('id INT,subject STRING,score INT')
.load('hdfs://101.133.232.96/input/stu_score.txt')
# 注册成临时表
df.createTempView("score")
df.createOrReplaceTempView("score_2")
df.createGlobalTempView("score_3")
# 可以通过SparkSession对象的sql api
spark.sql("""
select subject,count(*) as cnt from score group by subject
""").show()
spark.sql("""
select subject,count(*) as cnt from score_2 group by subject
""").show()
spark.sql("""
select subject,count(*) as cnt from global_tmp.score_3 group by subject
""").show()
词频统计案例练习
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile('file:///opt/Data/sql/words.txt')
.flatMap(lambda x:x.split(" ")).map(lambda x:[x])
df = rdd.toDF(["word"])
df.createTempView("words")
spark.sql("select word,count(*) as cnt from words group by word order by cnt desc").show()
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
df = spark.read.format('text').load('file:///opt/Data/sql/words.txt').show()
df.select(F.split(df['value']," ")).show()
df.select(F.explode(F.split(df['value']," "))).show()
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
df = spark.read.format('text').load('file:///opt/Data/sql/words.txt')
# withColumn 如果列名一样就替换,不一样就拓展一个列
df2 = df.withColumn('value',F.explode(F.split(df['value']," ")))
# 返回dataframe 列
df2.groupBy('value').count()
.withColumnRenamed("count","cnt")
.orderBy('cnt',ascending=False)
.show()
1. 用户平均分
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
schema = StructType().add('user_id',StringType(),nullable=True).
add('movie_id',IntegerType(),nullable=True).
add('rank',IntegerType(),nullable=True).
add('ts',StringType(),nullable=True)
df = spark.read.format('csv').
option('sep','t').
option('header',False).
option('encoding','utf-8').
schema(schema=schema).
load('file:///opt/Data/ml-100k/u.data')
# 用户平均分
df.groupBy('user_id').avg('rank').withColumnRenamed('avg(rank)','avg_rank').withColumn('avg_rank',F.round('avg_rank',2)).
orderBy('avg_rank',ascending=False).show()
2. 电影平均分
df.createTempView('movie')
# 电影平均分
spark.sql("""
select movie_id,round(avg(rank),2) as avg_rank
from movie
group by movie_id
order by avg_rank desc
""").show()
- 大于电影平均分的电影的数量
print('大于平均分电影的数量')
print(df.select(F.avg(df['rank'])).first()['avg(rank)'])
print(df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count())
4. 查询高分电影中>3打分次数最多的用户,并求出此人打的平均分
# 查询高分电影中>3打分次数最多的用户,并求出此人打的平均分
user_id = df.where(df['rank'] > 3).groupBy('user_id').count().withColumnRenamed("count","cnt").orderBy("cnt",ascending=False).
first()['user_id']
df.filter(df['user_id'] == user_id).select(F.round(F.avg('rank'),2)).show()
5. 查询每个用户的平均打分,最低打分,最高打分
# 查询每个用户的平均打分,最低打分,最高打分
spark.sql("""
select user_id,min(rank) as min_rank,max(rank) as max_rank,avg(rank) as avg_rank
from movie group by user_id
""").show()
df.groupby('user_id').agg(
F.round(F.avg('rank'),2).alias('avg_rank'),
F.round(F.min('rank'),2).alias('min_rank'),
F.round(F.max('rank'),2).alias('max_rank')
).show()
- 查询被评分超过100次的电影的平均分排名TOP10
spark.sql("""
select movie_id,count(movie_id) as cnt,avg(rank) as avg_rank
from movie
group by movie_id
having cnt > 100
order by avg_rank desc
limit 10
""").show()
df.groupby('movie_id').agg(
F.count('movie_id').alias('cnt'),
F.round(F.avg('rank'),2).alias('avg_rank'),
).where('cnt>100').orderBy('avg_rank',ascending=False).limit(10).show()
电影评分数据分析总体代码
#coding:utf8
from audioop import add
from cmd import IDENTCHARS
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType
import pandas as pd
from pyspark.sql import functions as F
# /opt/module/spark/bin/spark-submit /opt/Code/SparkSQL.py
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
schema = StructType().add('user_id',StringType(),nullable=True).
add('movie_id',IntegerType(),nullable=True).
add('rank',IntegerType(),nullable=True).
add('ts',StringType(),nullable=True)
df = spark.read.format('csv').
option('sep','t').
option('header',False).
option('encoding','utf-8').
schema(schema=schema).
load('file:///opt/Data/ml-100k/u.data')
# # 用户平均分
# df.groupBy('user_id').avg('rank').withColumnRenamed('avg(rank)','avg_rank').withColumn('avg_rank',F.round('avg_rank',2)).
# orderBy('avg_rank',ascending=False).show()
df.createTempView('movie')
# # 电影平均分
# spark.sql("""
# select movie_id,round(avg(rank),2) as avg_rank
# from movie
# group by movie_id
# order by avg_rank desc
# """).show()
# # 大于平均分的电影数量
# # F.avg(df['rank'])) ==》 dataframe对象
# # F.avg(df['rank'])).first() ==》 row对象
# # F.avg(df['rank'])).first()['avg(rank)'] ==》数值
# print('大于平均分电影的数量')
# print(df.select(F.avg(df['rank'])).first()['avg(rank)'])
# print(df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count())
# # 查询高分电影中>3打分次数最多的用户,并求出此人打的平均分
# user_id = df.where(df['rank'] > 3).groupBy('user_id').count().withColumnRenamed("count","cnt").orderBy("cnt",ascending=False).
# first()['user_id']
# df.filter(df['user_id'] == user_id).select(F.round(F.avg('rank'),2)).show()
# 查询每个用户的平均打分,最低打分,最高打分
# spark.sql("""
# select user_id,min(rank) as min_rank,max(rank) as max_rank,avg(rank) as avg_rank
# from movie group by user_id
# """).show()
# df.groupby('user_id').agg(
# F.round(F.avg('rank'),2).alias('avg_rank'),
# F.round(F.min('rank'),2).alias('min_rank'),
# F.round(F.max('rank'),2).alias('max_rank')
# ).show()
spark.sql("""
select movie_id,count(movie_id) as cnt,avg(rank) as avg_rank
from movie
group by movie_id
having cnt > 100
order by avg_rank desc
limit 10
""").show()
df.groupby('movie_id').agg(
F.count('movie_id').alias('cnt'),
F.round(F.avg('rank'),2).alias('avg_rank'),
).where('cnt>100').orderBy('avg_rank',ascending=False).limit(10).show()
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').
config('spark.sql.shuffle.partition','2').
getOrCreate()
df = spark.read.format('csv').option('sep',';').option('header',True).
load('file:///opt/Data/sql/people.csv')
df.dropDuplicates().show()
df.dropDuplicates(['age','job']).show()
df.fillna('loss').show()
df.fillna('loss',subset = ['job']).show()
df.fillna({"name":"未知姓名","age":1,"job":"worker"}).show()
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').
config('spark.sql.shuffle.partition','2').
getOrCreate()
df = spark.read.format('csv').option('sep',';').option('header',True).
load('file:///opt/Data/sql/people.csv')
df.select(F.concat_ws('_','name','age','job')).
write.mode('overwrite').format('text').save('file:///opt/Data/sql/output/text')
df.write.mode('overwrite').format('csv').option('sep',";").option("header",True).
save('file:///opt/Data/sql/output/csv')
df.write.mode('overwrite').format('parquet').save('file:///opt/Data/sql/output/parquet')



