1. 常用package和参数
import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn.mixture import GaussianMixture as GMM from sklearn.mixture import GaussianMixture from sklearn.cluster import KMeans from sklearn import metrics from sklearn.preprocessing import StandardScaler, MinMaxScaler import math from sklearn.preprocessing import OrdinalEncoder import os import datetime import findspark import time import numpy as np import pandas as pd # spark环境配置 os.environ['SPARK_HOME'] = '/usr/hdp/3.0.1.0-187/spark2' os.environ["HADOOP_USER_NAME"] = "hive" os.environ["LOCAL_DIRS"] = "/data/tmp" os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" findspark.init(edit_rc=True) from pyspark import SparkConf from pyspark.sql import SparkSession, HiveContext from sqlalchemy.engine import create_engine from sqlalchemy.orm import sessionmaker, scoped_session import random from pyspark.sql.functions import col,lower,sort_array,desc,row_number,lit from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType,StringType,FloatType,ArrayType from pyspark.sql import functions as F from pyspark.ml.linalg import Vectors, VectorUDT from pyspark.sql.window import Window from pyspark.ml.linalg import Vectors from pyspark.ml.feature import VectorAssembler from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoder, StringIndexer from pyspark.ml.feature import PCA,oneHotEncoder # from pyspark.ml.clustering import KMeans # from pyspark.ml.clustering import KMeansModel from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoder, StringIndexer
# SparkSession
class DmpSession:
def __init__(self, num_executors="10", executor_cores="2", executor_memory="4g", driver_cores="6",
driver_memory="10g", over_load="2g", mode="client"):
self.mode = mode
self.executor_cores = executor_cores
self.executor_memory = executor_memory
self.over_load = over_load
# yarn cluster模式
self.num_executors = num_executors
self.driver_cores = driver_cores
self.driver_memory = driver_memory
# yarn client模式
self.executor_instance = num_executors
self.am_cores = driver_cores
self.am_memory = driver_memory
@property
def spark(self):
conf = SparkConf()
.setAppName("zhiyi_shakerate_analysis")
.setMaster("yarn")
.set("hive.metastore.uris", "thrift://10.0.101.54:9083")
.set("spark.sql.parquet.writeLegacyFormat", "true")
.set("spark.hadoop.hive.support.quoted.identifiers", "none")
.set('spark.debug.maxToStringFields', "100")
.set("spark.local.dir", "file:///data/tmp")
.set("spark.sql.shuffle.partitions", "1200")
.set("spark.executor.memoryOverhead", self.over_load)
.set("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")
.set("spark.executor.cores", self.executor_cores)
.set("spark.executor.memory", self.executor_memory)
.set("spark.sql.hive.filesourcePartitionFileCacheSize", "2621440000")
.set("spark.driver.maxResultSize","8g")
.set("spark.network.timeout", "300")
.set("spark.shuffle.io.maxRetries", "10")
.set("spark.sql.autoBroadcastJoinThreshold", "104857600")
.setSparkHome("/usr/hdp/3.0.1.0-187/spark2")
# .set("yarn.nodemanager.local-dirs", "file:///data/tmp")
# .set("spark.sql.shuffle.partitions", "100")
# .set("spark.network.timeout", "300")
# .set("spark.shuffle.io.maxRetries", "10")
#
# cluster
conf.set("spark.num.executors", self.num_executors)
.set("spark.driver.cores", self.driver_cores)
.set("spark.driver.memory", self.driver_memory)
# client
conf.set("spark.executor.instances", self.executor_instance)
.set("spark.yarn.am.cores", self.am_cores)
.set("spark.yarn.am.memory", self.am_memory)
return SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
@property
def sc(self):
return self.spark.sparkContext
@property
def hiveContext(self):
return HiveContext(self.sc)
2. Job,大小设置
dmpSession = DmpSession(num_executors="10"
, executor_cores="2"
, executor_memory="6g"
, driver_cores="4"
, driver_memory="8g"
, over_load="8g")
spark = dmpSession.spark
3. sparksql
sqls = '''select distinct material_id,material_class_id_small,material_class_desc_small ,
material_class_id_mid,material_class_desc_mid,
material_class_id_large,material_class_desc_large
from dmp_fm.dim_material_base_info_cur '''
data = spark.sql(sqls)
#pandas
datas = data.toPandas()
datas.to_csv('./商品分类明细.csv',index = False)
#stop
spark.stop()
3. apply udf
# hour
def get_hours(time):
return str(time//10000)
def get_time_d(hour):
return time_d[hour]
get_hours_udf = udf(get_hours, returnType=StringType())
get_time_d_udf = udf(get_time_d, returnType=StringType())
#apply
data_time = data.withColumn('hour',get_hours_udf(data.trd_tm))
4. pyecharts



