您可以将from_json()与
schema_of_json()一起使用
以推断JSON模式。例如:
from pyspark.sql import functions as F# a sample json string: edges_json_sample = data[0].edges# or edges_json_sample = df.select('edges').first()[0]>>> edges_json_sample#'[{"distance":4.382441320292239,"duration":1.5,"speed":2.9,"nodeIDs":{"nodeA":954752475,"nodeB":1665827480}},{"distance":14.48582171131768,"duration":2.6,"speed":5.6,"nodeIDs":{"nodeA":1665827480,"nodeB":3559056131}}]'# infer schema from the sample stringschema = df.select(F.schema_of_json(edges_json_sample)).first()[0]>>> schema#u'array<struct<distance:double,duration:double,nodeIDs:struct<nodeA:bigint,nodeB:bigint>,speed:double>>'# convert json string to data structure and then retrieve desired itemsnew_df = df.withColumn('data', F.explode(F.from_json('edges', schema))) .select('*', 'data.*', 'data.nodeIDs.*') .drop('data', 'nodeIDs', 'edges')>>> new_df.show()+-----+-----+--------------------+----------+-----------------+--------+-----+----------+----------+|count|level|timestamp|trace_uuid| distance|duration|speed| nodeA| nodeB|+-----+-----+--------------------+----------+-----------------+--------+-----+----------+----------+| 156| 36|2019-05-20T10:36:...| aaaa|4.382441320292239| 1.5| 2.9| 954752475|1665827480|| 156| 36|2019-05-20T10:36:...| aaaa|14.48582171131768| 2.6| 5.6|1665827480|3559056131|| 179| 258|2019-05-20T11:36:...| bbbb| 0.0| 0.0| 0.0| 520686131| 520686216|| 179| 258|2019-05-20T11:36:...| bbbb|8.654358326561642| 3.1| 2.8| 520686216| 506361795|+-----+-----+--------------------+----------+-----------------+--------+-----+----------+----------+# expected resultdata_reshaped = new_df.rdd.collect()


