栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

pyspark

pyspark

pyspark

一、数据
二、分析

# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType


def spark_analyse(filename):
    print("开始spark分析")
    spark = SparkSession.builder.master("local").appName("wubash").getOrCreate()
    df = spark.read.csv(filename, header=True)

    # max_list存储各个区的最大值,0是黄埔,1是徐汇,2是长宁,3是静安,4是普陀,5是闸北,6是虹口,7是杨浦;同理的mean_list, 以及min_list,approxQuantile中位数
    max_list = [0 for i in range(8)]
    mean_list = [1.2 for i in range(8)]
    min_list = [0 for i in range(8)]
    
    df = df.filter(df.price != '面议').withColumn("price", df.price.cast(IntegerType()))

    mean_list[0] = df.filter(df.area == "浦东").agg({"price": "mean"}).first()['avg(price)']

    min_list[0] = df.filter(df.area == "浦东").agg({"price": "min"}).first()['min(price)']
    
    max_list[0] = df.filter(df.area == "浦东").agg({"price": "max"}).first()['max(price)']
    


    all_list = []
    all_list.append(min_list)


    print("结束spark分析")

    return all_list



def spark_analyse2(filename,shuru):
    print("开始spark分析2")
    spark = SparkSession.builder.master("local").appName("wubash").getOrCreate()
    df = spark.read.csv(filename, header=True)
    df.select("title", "price").filter(df.area == shuru).show()


def spark_analyse3(filename):
    print("开始spark分析3")
    spark = SparkSession.builder.master("local").appName("wubash").getOrCreate()
    df = spark.read.csv(filename, header=True)
    df = df.filter(df.price != '面议').withColumn("price", df.price.cast(IntegerType()))
    mean_list = [1.2 for i in range(8)]
    mean_list[0] = df.filter(df.time == "2016年建造").agg({"price": "mean"}).first()['avg(price)']
    

    mj_list = []
    mj_list.append(mean_list)

    return mj_list

三、画图

from pyecharts import Bar


def draw_bar(all_list):
    print("开始绘图")
    attr = ["浦东", "徐汇", "长宁", "静安", "普陀", "松江", "虹口", "杨浦"]
    v0 = all_list[0]

    bar = Bar("上海市二手房房价概况")
    bar.add("最小值", attr, v0, is_stack=True)
    
    bar.render()
    print("结束绘图")

def draw_bar2(mj_list):
    print("开始绘图")
    attr = ["2015", "2014", "2013", "2012", "2011", "2010", "2009", "2008"]
    v0 = mj_list[0]


    bar = Bar("time & price")
    bar.add("平均值", attr, v0, is_stack=True)
    bar.render()
    print("结束绘图")

四、运行

#12
import wubash
import draw
if __name__ == '__main__':
    print("开始总程序")
    Filename = ".csv"
    shuru=str(input("输入地区: "))
    wubash.spark_analyse2(Filename,shuru)
    all_list = wubash.spark_analyse(Filename)
    draw.draw_bar(all_list)
    print("结束总程序")

#3
import wubash
import draw
if __name__ == '__main__':
    print("开始总程序")
    Filename = ".csv"
    mj_list=wubash.spark_analyse3(Filename)
    draw.draw_bar2(mj_list)
    print("结束总程序")
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/700578.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号