设法找到答案,所以我会分享。SparkDF(来自pyspark.sql)目前不支持该
newAPIHadoopFile()方法;但是,这
df.rdd.saveAsNewAPIHadoopFile()也给了我错误。诀窍是通过以下功能将df转换为字符串
def transform(doc): import json import hashlib _json = json.dumps(doc) keys = doc.keys() for key in keys: if doc[key] == 'null' or doc[key] == 'None': del doc[key] if not doc.has_key('id'): id = hashlib.sha224(_json).hexdigest() doc['id'] = id else: id = doc['id'] _json = json.dumps(doc) return (id, _json)所以我的JSON工作流程是:
1:
df = spark.read.json('XXX.json')2:
rdd_mapped = df.rdd.map(lambda y: y.asDict())
3:
final_rdd = rdd_mapped.map(transform)
4:
final_rdd.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.linkedMapWritable", conf={ "es.resource" : "<INDEX> / <INDEX>", "es.mapping.id":"id", "es.input.json": "true", "es.net.http.auth.user":"elastic", "es.write.operation":"index", "es.nodes.wan.only":"false", "es.net.http.auth.pass":"changeme", "es.nodes":"<NODE1>, <NODE2>, <NODE3>...", "es.port":"9200" })有关ES参数的更多信息,请参见此处(滚动到“配置”)



