- 一、先用window函数尝试
- 二、获得对应的collection_list
- 三、用withColumn和split进行分隔
- Reference
pyspark.sql和mysql一眼有window窗口函数:
# 数据的分组聚合,找到每个用户最近的3次收藏beat(用window开窗函数)
from pyspark.sql.window import Window
import pyspark.sql.functions as F
window_out = Window.partitionBy("user_id")
.orderBy(F.desc("collect_time"))
# user_feed.withColumn("rank", F.rank().over(window_out)).show(truncate = False)
# user_feed.withColumn("rank", F.rank().over(window_out)).show(40)
user_feed_test = user_feed.withColumn("rank", F.row_number().over(window_out))
.where(F.col('rank') <= 3)
user_feed_test.show(7)
# .where(F.col('row_number') <= 3).select("user_id","beat_id","collect_type","rank")
这样就能得到每个用户最近三次使用的item了(根据时间进行排序,在最后一列),然后把一些beat_id异常值去掉:
# 对异常的beat_id的处理,去掉所有值为0的表项
user_feed_test = user_feed_test.filter(F.col('beat_id') > 0)
user_feed_test.show(30)
"""
+--------+-------+------------+--------------------+----+
| user_id|beat_id|collect_type| collect_time|rank|
+--------+-------+------------+--------------------+----+
|10065188| 827272| 4|2021-08-22 04:54:...| 1|
|10065188| 885812| 5|2020-10-23 18:53:...| 2|
|10068979|1069390| 5|2021-06-20 07:44:...| 1|
|10074915|1122682| 4|2021-09-07 14:26:...| 2|
|10075397| 947751| 4|2022-01-30 07:30:...| 1|
|10075397| 336641| 5|2022-01-30 07:23:...| 2|
|10075397| 886179| 4|2022-01-05 10:35:...| 3|
+--------+-------+------------+--------------------+----+
"""
二、获得对应的collection_list
# 这里要注意创建视图,否则上面的user_feed_test只是一个变量
user_feed_test.createOrReplaceTempView("user_feed_test")
# 把每个用户前3收藏的装入set中
df = spark.sql("select user_id,
concat_ws(',', collect_set(beat_id)) as collection_list
FROM user_feed_test
GROUP BY user_id")
df.show(5)
结果为:
+--------+--------------------+ | user_id| collection_list| +--------+--------------------+ |10065188| 827272,885812| |10068979| 1069390| |10074915| -2,1122682| |10075397|336641,886179,947751| |10454402|888649,884711,108...| +--------+--------------------+三、用withColumn和split进行分隔
withColumn增加对应列:
# 用withColumn和split进行分隔
from pyspark.sql.functions import instr, split, when
# instr表示找到在字符串中对应字符第一次出现的下标位置
#df = df.withColumn('separator_if_exists',
# (instr(col('collection_list'),',') > 0) & instr(col('collection_list'),',').isNotNull())
df = df.withColumn('separator_if_exists',
(split(col('collection_list'),',')[0] > 0).isNotNull())
df = df.withColumn('collect_beat1',
when(col('separator_if_exists') == True,
split(col('collection_list'),',')[0]).otherwise(None))
# .drop('separator_if_exists')
df = df.withColumn('collect_beat2',
when(col('separator_if_exists') == True,
split(col('collection_list'),',')[1]).otherwise(None))
df = df.withColumn('collect_beat3',
when(col('separator_if_exists') == True,
split(col('collection_list'),',')[2]).otherwise(None))
df.show(truncate=False)
结果为:
+--------+----------------------+-------------------+-------------+-------------+-------------+ |user_id |collection_list |separator_if_exists|collect_beat1|collect_beat2|collect_beat3| +--------+----------------------+-------------------+-------------+-------------+-------------+ |10065188|827272,885812 |true |827272 |885812 |null | |10068979|1069390 |true |1069390 |null |null | |10074915|-2,1122682 |true |-2 |1122682 |null | |10364341|1070218 |true |1070218 |null |null | +--------+----------------------+-------------------+-------------+-------------+-------------+Reference
[1] https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.window.html?highlight=window#pyspark.sql.functions.window



