栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark

spark

This is a sample Python script. Press Shift+F10 to execute it or replace it with your code. Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.

import pyspark.sql.types
from pyspark.sql import SparkSession
from pyecharts import Bar, Page
from pyecharts import Map
import requests
import json
from pyecharts import Funnel,EffectScatter,Line, Overlap,Grid,Parallel,Pie,configure
from pyspark.sql.types import DoubleType

import pyspark.sql.functions as F
import pandas as pd
from pyspark.sql.functions import split
import os
def hubeirent(filename): #文件夹 分析湖北不同地区租房的价格
spark=SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
files=os.listdir(filename)
# wuhan 0 yichang 1 huangshi 2
max_list = [0 for i in range(3)]
mean_list = [1.2 for i in range(3)]
min_list = [0 for i in range(3)]
mid_list = [0 for i in range(3)]
mapp={“武汉.csv”:0,“宜昌.csv”:1,“黄石.csv”:2}

all_list = []
for file in files:
    a=filename+'/'+file
    df = spark.read.csv(a, header=True)
    i=mapp[file]
    df=df.withColumn('价格',df["价格"].cast('double'))
    min_list[i]=df.agg({"价格": "min"}).first()['min(价格)']
    max_list[i] = df.agg({"价格": "max"}).first()['max(价格)']
    mean_list[i] = df.agg({"价格": "mean"}).first()['avg(价格)']
    mid_list[i] = df.approxQuantile("价格",[0.5],0.01)[0]
all_list.append(min_list)
all_list.append(max_list)
all_list.append(mean_list)
all_list.append(mid_list)

return all_list

def shenghuirent(filename):
spark = SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
files = os.listdir(filename)
# wuhan 0 yichang 1 huangshi 2
max_list = [0 for i in range(5)]
mean_list = [1.2 for i in range(5)]
min_list = [0 for i in range(5)]
mid_list = [0 for i in range(5)]
mapp = {“武汉.csv”: 0, “上海.csv”: 1, “重庆.csv”: 2,“北京.csv”:3,“天津.csv”:4}

all_list = []
for file in files:
    a = filename + '/' + file
    df = spark.read.csv(a, header=True)
    i = mapp[file]
    df = df.withColumn('价格', df["价格"].cast('double'))
    min_list[i] = df.agg({"价格": "min"}).first()['min(价格)']
    max_list[i] = df.agg({"价格": "max"}).first()['max(价格)']
    mean_list[i] = df.agg({"价格": "mean"}).first()['avg(价格)']
    mid_list[i] = df.approxQuantile("价格", [0.5], 0.01)[0]
all_list.append(min_list)
all_list.append(max_list)
all_list.append(mean_list)
all_list.append(mid_list)

return all_list

def wuhandiqurent(filename): #比较不同地区的租金 武汉.csv
spark = SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
df = spark.read.csv(filename, header=True)
df = df.withColumn(‘价格’, df[“价格”].cast(‘double’))
a=df.groupby(‘地址’).agg({‘价格’: ‘mean’}).collect()

qu=[]
price=[]
for i in a:
    qu.append(i[0])
    price.append(i[1])


return qu,price

def wuhanhudiqufangyuan(filename): #统计武汉各地区房源top10
spark = SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
df = spark.read.csv(filename, header=True)
df=df.groupby(‘地址’).count()
a=df.orderBy(-df[‘count’]).collect()
diqu=[]
shu=[]

for j in a[:10]:
    diqu.append(j[0])
    shu.append(j[1])


return diqu,shu

def wuhandiquhuxing(filename): #分析武汉地区户型性价比 jiage/mianji
spark = SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
df = spark.read.csv(filename, header=True)
df=df.filter(df[‘面积’].isNotNull())
df = df.withColumn(‘价格’, df[“价格”].cast(‘double’))
df=df.withColumn(‘面积’, split(df[‘面积’],“㎡”)[0])
df.withColumn(‘面积’, df[“面积”].cast(‘double’))
df=df.withColumn(‘性价比’,df[‘价格’]/df[“面积”])
df=df.groupby(‘房型’).agg({‘性价比’: ‘mean’,‘面积’: ‘mean’,‘价格’:‘mean’})
a=df.orderBy(-df[‘avg(性价比)’]).collect()
huxing=[]
danjia=[]
jiage=[]
mianji=[]
for j in a[:10]:
huxing.append(j[0])
danjia.append(j[1])
jiage.append(j[2])
mianji.append(j[3])

return huxing,danjia,jiage,mianji

def yichangdiqufangyuan(filename):
spark = SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
df = spark.read.csv(filename, header=True)
df = df.groupby(‘地址’).count()
a=df.orderBy(-df[‘count’]).collect()
diqu = []
fangyuan = []

for j in a:
    diqu.append(j[0])
    fangyuan.append(j[1])

return diqu, fangyuan

def yichangdiqujiage(filename):
spark = SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
df = spark.read.csv(filename, header=True)
df = df.withColumn(‘价格’, df[“价格”].cast(‘double’))
a=df.groupby(‘地址’).agg({‘价格’: ‘min’}).collect()
b=df.groupby(‘地址’).agg({‘价格’: ‘max’}).collect()
c=df.groupby(‘地址’).agg({‘价格’: ‘avg’}).collect()
diqu=[]
min_list=[]
max_list=[]
mean_list=[]
for i in range(len(a)):
diqu.append(a[i][0])
min_list.append(a[i][1])
max_list.append(b[i][1])
mean_list.append(c[i][1])

return diqu,min_list,max_list,mean_list

def huangshimianji(filename):
spark = SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
df = spark.read.csv(filename, header=True)

df = df.withColumn('面积', split(df['面积'], "㎡")[0])
df.withColumn('面积', df["面积"].cast('double'))


a=df.filter(df.面积< 50).groupby('面积').agg({'价格': 'avg'}).collect()
b=df.filter(df.面积>= 50).filter(df.面积<100).groupby('面积').agg({'价格': 'avg'}).collect()
c = df.filter(df.面积 >= 100).filter(df.面积 < 150).groupby('面积').agg({'价格': 'avg'}).collect()
d = df.filter(df.面积 >= 150).filter(df.面积 < 200).groupby('面积').agg({'价格': 'avg'}).collect()
e = df.filter(df.面积 >= 200).groupby('面积').agg({'价格': 'avg'}).collect()
resulta=[]
resultb=[]
resultc=[]
resultd=[]
resulte=[]
for i in a:
    resulta.append([i[0],i[1]])
for i in b:
    resultb.append([i[0],i[1]])
for i in c:
    resultc.append([i[0], i[1]])
for i in d:
    resultd.append([i[0], i[1]])
for i in e:
    resulte.append([i[0], i[1]])

return resulta,resultb,resultc,resultd,resulte

def draw_hubeirent(list):
attr = [“武汉”, “宜昌”, “黄石”]
v0 = list[0]
v1 = list[1]
v2 = list[2]
v3 = list[3]

bar = Bar("湖北省租房租金概况")
bar.add("最小值", attr, v0, is_stack=True)
bar.add("最大值", attr, v1, is_stack=True)
bar.add("平均值", attr, v2, is_stack=True)
bar.add("中位数", attr, v3, is_stack=True)
bar.render("湖北.html")

def draw_shenghuirent(list):
attr = [“武汉”, “上海”, “重庆”, “北京”, “天津”]
v0 = list[0]
v1 = list[1]
v2 = list[2]
v3 = list[3]

bar = Bar("武汉市和直辖市租房租金概况")
bar.add("最小值", attr, v0, is_stack=True)
bar.add("最大值", attr, v1, is_stack=True)
bar.add("平均值", attr, v2, is_stack=True)
bar.add("中位数", attr, v3, is_stack=True)
bar.render("省会.html")

def draw_wuhandiqurent(list1,list2): #地区 价格
line = Line(‘地区价格图’, background_color=‘white’, title_text_size=25)
line.add(‘地区’, list1,list2, is_label_show = False,is_smooth=True,is_fill=True)
line.render(“武汉小区租金对比.html”)

def draw_wuhanhudiqufangyuan(list1,list2):
funnel = Funnel(“武汉小区房源top10”)
funnel.add(“地址”, list1,list2, is_label_show=True, label_pos=“inside”, label_text_color="#fff")
funnel.render(“武汉房源top10.html”)
def draw_yichangdiqufangyuan(list1,list2):
map=Map(“宜昌地区房源对比图”,“宜昌市”)
map.add(“宜昌”,list1,list2,visual_range=[0,max(list2)],maptype=‘宜昌’,is_visualmap=True)
map.render(“宜昌地区房源对比图.html”)
def draw_yichangdiqujiage(list1,list2,list3,list4):
parallel = Parallel(“宜昌主要地区租金对比图”, width=600, height=400,)
parallel.config(list1)
parallel.add(“宜昌”, [list2,list3,list4], visual_range=[0, max(list2,list3,list4)], is_roam=True) # type有scatter, effectScatter, heatmap三种模式可选,可根据自己的需求选择对应的图表模式
parallel.render(path=“宜昌主要地区租金对比图.html”)

def draw_wuhandiquhuxing(list1,list2,list3,list4): #户型 单价 租金 面积
bar = Bar(title=“户型对比”)
line = Line()
bar.add(“租金”, list1, list2, mark_line=[“average”], mark_point=[“max”, “min”],
is_label_show=True) # 将数据加入图中
bar.add(“面积”, list1, list3, mark_line=[“average”], mark_point=[“max”, “min”],
is_label_show=True) # 将数据加入图中
line.add(“单价”, list1, list4, is_smooth=True) # 将数据加入图中

overlap = Overlap()
overlap.add(bar)
overlap.add(line)
overlap.render("户型top10对比图.html")

def draw_huangshimianji(list1,list2,list3,list4,list5):
key=[]
value=[]
for i in list1:
key.append(i[0])
value.append(i[1])

es1=EffectScatter('面积<50')
es1.add('面积<50',key,value,symbol="rect")
es1.render('面积<50.html')
key = []
value = []
for i in list2:
    key.append(i[0])
    value.append(i[1])
es2 = EffectScatter('50<=面积<100')
es2.add('50<=面积<100', key, value, symbol="roundRect")
es2.render('50<=面积<100.html')
key = []
value = []
for i in list3:
    key.append(i[0])
    value.append(i[1])
es3= EffectScatter('100<=面积<150')
es3.add('100<=面积<150', key, value, symbol="triangle")
es3.render('100<=面积<150.html')
key = []
value = []
for i in list4:
    key.append(i[0])
    value.append(i[1])
es4 = EffectScatter('150<=面积<200')
es4.add('150<=面积<200', key, value, symbol="diamond")
es4.render('150<=面积<200.html')
key = []
value = []
for i in list5:
    key.append(i[0])
    value.append(i[1])
es5= EffectScatter('200<=面积')
es5.add('200<=面积', key, value, symbol="pin")
es5.render('200<=面积.html')
key=['面积<50','50<=面积<100','100<=面积<150','150<=面积<200','200<=面积']
value=[len(list1),len(list2),len(list3),len(list4),len(list5)]

pie = Pie('黄石房源面积对比图')
pie.add("房源面积数量", key, value)
pie.render("黄石房源面积对比图.html")

if name == ‘main’:
files=“链家/湖北”
all_list=hubeirent(files)
draw_hubeirent(all_list)
files2=“链家/省会”
all_list2 = shenghuirent(files2)
draw_shenghuirent(all_list2)
files3=“链家/湖北/武汉.csv”
qu,price=wuhandiqurent(files3)
draw_wuhandiqurent(qu,price)

diqu,shu=wuhanhudiqufangyuan(files3)
draw_wuhanhudiqufangyuan(diqu,shu)
huxing,danjia,jiage,mianji=wuhandiquhuxing(files3)
draw_wuhandiquhuxing(huxing,danjia,jiage,mianji)
file4="链家/湖北/宜昌.csv"
diqu, fangyuan=yichangdiqufangyuan(file4)
draw_yichangdiqufangyuan(diqu, fangyuan)
diqu,min_list,max_list,mean_list=yichangdiqujiage(file4)
draw_yichangdiqujiage(diqu,min_list,max_list,mean_list)
files5="链家/湖北/黄石.csv"
resulta, resultb, resultc, resultd, resulte =huangshimianji(files5)
draw_huangshimianji(resulta, resultb, resultc, resultd, resulte)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/700406.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号