from pyspark import SparkContext, SQLContext, SparkConf
from pyspark.sql import SparkSession
import warnings
from pyspark.sql import functions as fn
from pyspark.sql.types import StructField, LongType
# sc = SparkContext(appName="AAA")
sqlContext = SQLContext(sc)
ss = SparkSession(sc).builder
.config('spark.sql.shuffle.partitions', 2000)
.config('spark.executor.memoryOverhead', 8192)
.config('spark.driver.memoryOverhead', 8192)
.config('spark.dynamicAllocation.enabled', 'true')
.getOrCreate()
warnings.filterwarnings("ignore", category=DeprecationWarning)
sc.setLogLevel('ERROR')
data = ss.createDataframe([
{'A': 'X', 'B': 'X', 'C': 3}, {'A': 'Y', 'B': 'Y', 'C': 1}, {'A': 'L', 'B': 'L', 'C': 1}
])
data.show()
def row_dealwith(row):
a, b, c = row[0], row[1], row[2]
if c == 1:
return (a, b, c)
resA, resB, resC = [], [], []
for i in range(c):
resA.append(a)
resB.append(b)
resC.append(1)
return (','.join(str(i) for i in resA), ','.join(str(i) for i in resB), ','.join(str(i) for i in resC),)
newdata = data.rdd.map(row_dealwith).toDF(schema=['A', 'B', 'C'])
newdata.show()
dfA = newdata.withColumn('A', fn.explode(fn.split(newdata.A, ','))).select('A')
dfB = newdata.withColumn('B', fn.explode(fn.split(newdata.B, ','))).select('B')
dfC = newdata.withColumn('C', fn.explode(fn.split(newdata.C, ','))).select('C')
dfA = mkdf_tojoin(dfA, ss)
dfB = mkdf_tojoin(dfB, ss)
dfC = mkdf_tojoin(dfC, ss)
dfres = dfA.join(dfB, on=['tmpid'], how='left').join(dfC, on=['tmpid'], how='left').drop('tmpid')
dfres.show()
def flat(l):
for k in l:
if not isinstance(k, (list, tuple)):
yield k
else:
yield from flat(k)
def mkdf_tojoin(df, ss):
schema = df.schema.add(StructField("tmpid", LongType()))
rdd = df.rdd.zipWithIndex()
rdd = rdd.map(lambda x: list(flat(x)))
df = ss.createDataframe(rdd, schema)
return df