栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

pyspark 基础设置

pyspark 基础设置

1. 常用package和参数

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.mixture import GaussianMixture as GMM
from sklearn.mixture import GaussianMixture
from sklearn.cluster import KMeans
from sklearn import metrics
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import math
from sklearn.preprocessing import OrdinalEncoder

import os
import datetime
import findspark
import time
import numpy as np
import pandas as pd

# spark环境配置
os.environ['SPARK_HOME'] = '/usr/hdp/3.0.1.0-187/spark2'
os.environ["HADOOP_USER_NAME"] = "hive"
os.environ["LOCAL_DIRS"] = "/data/tmp"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
findspark.init(edit_rc=True)

from pyspark import SparkConf
from pyspark.sql import SparkSession, HiveContext
from sqlalchemy.engine import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
import random
from pyspark.sql.functions import col,lower,sort_array,desc,row_number,lit
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType,StringType,FloatType,ArrayType
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.window import Window
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer

from pyspark.ml.feature import PCA,oneHotEncoder
# from pyspark.ml.clustering import KMeans
# from pyspark.ml.clustering import KMeansModel
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer
# SparkSession
class DmpSession:
    def __init__(self, num_executors="10", executor_cores="2", executor_memory="4g", driver_cores="6",
                 driver_memory="10g", over_load="2g", mode="client"):
        self.mode = mode
        self.executor_cores = executor_cores
        self.executor_memory = executor_memory
        self.over_load = over_load

        # yarn cluster模式
        self.num_executors = num_executors
        self.driver_cores = driver_cores
        self.driver_memory = driver_memory

        # yarn client模式
        self.executor_instance = num_executors
        self.am_cores = driver_cores
        self.am_memory = driver_memory

    @property
    def spark(self):
        conf = SparkConf() 
            .setAppName("zhiyi_shakerate_analysis") 
            .setMaster("yarn") 
            .set("hive.metastore.uris", "thrift://10.0.101.54:9083") 
            .set("spark.sql.parquet.writeLegacyFormat", "true") 
            .set("spark.hadoop.hive.support.quoted.identifiers", "none") 
            .set('spark.debug.maxToStringFields', "100") 
            .set("spark.local.dir", "file:///data/tmp") 
            .set("spark.sql.shuffle.partitions", "1200") 
            .set("spark.executor.memoryOverhead", self.over_load) 
            .set("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") 
            .set("spark.executor.cores", self.executor_cores) 
            .set("spark.executor.memory", self.executor_memory) 
            .set("spark.sql.hive.filesourcePartitionFileCacheSize", "2621440000") 
            .set("spark.driver.maxResultSize","8g") 
            .set("spark.network.timeout", "300") 
            .set("spark.shuffle.io.maxRetries", "10") 
            .set("spark.sql.autoBroadcastJoinThreshold", "104857600") 
            .setSparkHome("/usr/hdp/3.0.1.0-187/spark2")
        # .set("yarn.nodemanager.local-dirs", "file:///data/tmp") 
        # .set("spark.sql.shuffle.partitions", "100") 
        # .set("spark.network.timeout", "300") 
        #            .set("spark.shuffle.io.maxRetries", "10") 
        #
        # cluster
        conf.set("spark.num.executors", self.num_executors) 
            .set("spark.driver.cores", self.driver_cores) 
            .set("spark.driver.memory", self.driver_memory)
        # client
        conf.set("spark.executor.instances", self.executor_instance) 
            .set("spark.yarn.am.cores", self.am_cores) 
            .set("spark.yarn.am.memory", self.am_memory)
        return SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

    @property
    def sc(self):
        return self.spark.sparkContext

    @property
    def hiveContext(self):
        return HiveContext(self.sc)

2. Job,大小设置

dmpSession = DmpSession(num_executors="10"
                            , executor_cores="2"
                            , executor_memory="6g"
                            , driver_cores="4"
                            , driver_memory="8g"
                            , over_load="8g")
spark = dmpSession.spark

3. sparksql

sqls = '''select distinct material_id,material_class_id_small,material_class_desc_small ,
                material_class_id_mid,material_class_desc_mid, 
                material_class_id_large,material_class_desc_large 
            from dmp_fm.dim_material_base_info_cur '''
data = spark.sql(sqls)
#pandas
datas = data.toPandas()
datas.to_csv('./商品分类明细.csv',index = False)
#stop
spark.stop()

3. apply udf

# hour
def get_hours(time):
    return str(time//10000)

def get_time_d(hour):
    return time_d[hour]

get_hours_udf = udf(get_hours, returnType=StringType())
get_time_d_udf = udf(get_time_d, returnType=StringType())

#apply
data_time = data.withColumn('hour',get_hours_udf(data.trd_tm))

4. pyecharts

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/467404.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号