常用hive字符串拼接函数,转json等操作
import pyspark
from pyspark.sql import SparkSession
# 创建SparkSession对象,调用.builder类
# .appName("testapp")方法给应用程序一个名字;.getOrCreate()方法创建或着获取一个已经创建的SparkSession
spark = SparkSession.builder.appName("pysaprk").getOrCreate()
import pyspark.sql.functions
df = spark.createDataframe([(1, "male", "18", "2019-01-01 11:45:50"),
(2, "female", "37", "2019-01-02 11:55:50"),
(3, "male", "18", "2019-01-21 11:45:50"),
(4, "female", "44", "2019-02-01 12:45:50"),
(5, "male", "39", "2019-01-15 10:40:50")],
["id", "sex", "age", "createtime_str"])
df.show(20,truncate=False)
+---+------+---+-------------------+ |id |sex |age|createtime_str | +---+------+---+-------------------+ |1 |male |18 |2019-01-01 11:45:50| |2 |female|37 |2019-01-02 11:55:50| |3 |male |18 |2019-01-21 11:45:50| |4 |female|44 |2019-02-01 12:45:50| |5 |male |39 |2019-01-15 10:40:50| +---+------+---+-------------------+
df.createOrReplaceTempView("temp")
struct
结构体,转json的中间过程
df_struct = spark.sql(
"""
select named_struct("id",id) as col_struct
from temp
"""
)
df_struct.show()
+----------+ |col_struct| +----------+ | [1]| | [2]| | [3]| | [4]| | [5]| +----------+
df_struct.printSchema()
root |-- col_struct: struct (nullable = false) | |-- id: long (nullable = true)转json
利用to_json把上面的结构体直接转成json字符串
df_json = spark.sql(
"""
select to_json(named_struct("id",id)) as json
from temp
"""
)
df_json.show()
+--------+
| json|
+--------+
|{"id":1}|
|{"id":2}|
|{"id":3}|
|{"id":4}|
|{"id":5}|
+--------+
df_json.printSchema()
root |-- json: string (nullable = true)concat
concat常规的字符串拼接函数,没啥,看个例子
spark.sql(
"""
select concat("id",":",id) as col_concat
from temp
"""
).show()
+----------+ |col_concat| +----------+ | id:1| | id:2| | id:3| | id:4| | id:5| +----------+concat_ws
concat_ws(seperator, string s1, string s2…)
采取分隔符,把各个字段连接起来
spark.sql(
"""
select concat_ws(",",id,sex) as col_concat_ws,
concat(id,",",sex) as next
from temp
"""
).show()
+-------------+--------+ |col_concat_ws| next| +-------------+--------+ | 1,male| 1,male| | 2,female|2,female| | 3,male| 3,male| | 4,female|4,female| | 5,male| 5,male| +-------------+--------+collect_list和collect_set
和group by连用,把分组中某一列转成一个数组。collect_list不去重,collect_set去重
df_collect = spark.sql( """ select sex ,collect_set(age) as set_col ,collect_list(age) as list_age from temp group by sex """ ) df_collect.show()
+------+--------+------------+ | sex| set_col| list_age| +------+--------+------------+ |female|[44, 37]| [37, 44]| | male|[39, 18]|[18, 18, 39]| +------+--------+------------+
df_collect.printSchema()
root |-- sex: string (nullable = true) |-- set_col: array (nullable = true) | |-- element: string (containsNull = true) |-- list_age: array (nullable = true) | |-- element: string (containsNull = true)
可以看到collect之后以array格式储存
collect_list和concat_ws数组转字符串spark.sql(
"""
select sex
,collect_list(age) as list_age
,concat_ws(",",collect_list(age)) as age_str
from temp
group by sex
""").show(truncate=False)
+------+------------+--------+ |sex |list_age |age_str | +------+------------+--------+ |female|[37, 44] |37,44 | |male |[18, 18, 39]|18,18,39| +------+------------+--------+利用collect_list拼接嵌套json
df_json2 = spark.sql(
"""
select sex
,to_json(named_struct("age_list",list_age)) as json_str1
,to_json(named_struct("sex",sex,"data",to_json(named_struct("age_list",list_age)) )) as json_str2
from
(select sex
,collect_list(age) as list_age
,concat_ws(",",collect_list(age)) as age_str
from temp
group by sex ) as t
""")
df_json2.show(truncate=False)
+------+-----------------------------+-------------------------------------------------------------+
|sex |json_str1 |json_str2 |
+------+-----------------------------+-------------------------------------------------------------+
|female|{"age_list":["37","44"]} |{"sex":"female","data":"{"age_list":["37","44"]}"} |
|male |{"age_list":["18","18","39"]}|{"sex":"male","data":"{"age_list":["18","18","39"]}"}|
+------+-----------------------------+-------------------------------------------------------------+
df_json2.printSchema()
root |-- sex: string (nullable = true) |-- json_str1: string (nullable = true) |-- json_str2: string (nullable = true)
2022-02-16 于南京市江宁区九龙湖



