pyspark
一、数据
二、分析
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
def spark_analyse(filename):
print("开始spark分析")
spark = SparkSession.builder.master("local").appName("wubash").getOrCreate()
df = spark.read.csv(filename, header=True)
# max_list存储各个区的最大值,0是黄埔,1是徐汇,2是长宁,3是静安,4是普陀,5是闸北,6是虹口,7是杨浦;同理的mean_list, 以及min_list,approxQuantile中位数
max_list = [0 for i in range(8)]
mean_list = [1.2 for i in range(8)]
min_list = [0 for i in range(8)]
df = df.filter(df.price != '面议').withColumn("price", df.price.cast(IntegerType()))
mean_list[0] = df.filter(df.area == "浦东").agg({"price": "mean"}).first()['avg(price)']
min_list[0] = df.filter(df.area == "浦东").agg({"price": "min"}).first()['min(price)']
max_list[0] = df.filter(df.area == "浦东").agg({"price": "max"}).first()['max(price)']
all_list = []
all_list.append(min_list)
print("结束spark分析")
return all_list
def spark_analyse2(filename,shuru):
print("开始spark分析2")
spark = SparkSession.builder.master("local").appName("wubash").getOrCreate()
df = spark.read.csv(filename, header=True)
df.select("title", "price").filter(df.area == shuru).show()
def spark_analyse3(filename):
print("开始spark分析3")
spark = SparkSession.builder.master("local").appName("wubash").getOrCreate()
df = spark.read.csv(filename, header=True)
df = df.filter(df.price != '面议').withColumn("price", df.price.cast(IntegerType()))
mean_list = [1.2 for i in range(8)]
mean_list[0] = df.filter(df.time == "2016年建造").agg({"price": "mean"}).first()['avg(price)']
mj_list = []
mj_list.append(mean_list)
return mj_list
三、画图
from pyecharts import Bar
def draw_bar(all_list):
print("开始绘图")
attr = ["浦东", "徐汇", "长宁", "静安", "普陀", "松江", "虹口", "杨浦"]
v0 = all_list[0]
bar = Bar("上海市二手房房价概况")
bar.add("最小值", attr, v0, is_stack=True)
bar.render()
print("结束绘图")
def draw_bar2(mj_list):
print("开始绘图")
attr = ["2015", "2014", "2013", "2012", "2011", "2010", "2009", "2008"]
v0 = mj_list[0]
bar = Bar("time & price")
bar.add("平均值", attr, v0, is_stack=True)
bar.render()
print("结束绘图")
四、运行
#12
import wubash
import draw
if __name__ == '__main__':
print("开始总程序")
Filename = ".csv"
shuru=str(input("输入地区: "))
wubash.spark_analyse2(Filename,shuru)
all_list = wubash.spark_analyse(Filename)
draw.draw_bar(all_list)
print("结束总程序")
#3
import wubash
import draw
if __name__ == '__main__':
print("开始总程序")
Filename = ".csv"
mj_list=wubash.spark_analyse3(Filename)
draw.draw_bar2(mj_list)
print("结束总程序")



