报错如下:
Caused by: java.lang.IllegalArgumentException: Cannot grow BufferHolder by size 9384 because the size after growing exceeds size limitation 2147483632
不能按大小9384增加BufferHolder,因为增长后的大小超过了大小限制2147483632
参考链接:
https://docs.microsoft.com/zh-cn/azure/databricks/kb/sql/cannot-grow-bufferholder-exceeds-size
定位到出错位置:
我的需求是根据唯一键分组统计,有点击的保留所有点击,没点击的保留所有曝光,所以用到了collect_list,在数据起量之后,整个groupby之后转的df会非常大,collect_list传到udf的的行向量也会非常的长,所以超出缓存大小。
问题关键代码如下:
schema_getdataCols = ['newid'] + data_origin_columns1
df_HDFS_gp = df_HDFS_A.groupBy('newid').agg(
fn.collect_list('suuid').alias('suuid'),
fn.collect_list('aid').alias('aid'),
fn.collect_list('slotid').alias('slotid'),
fn.collect_list('adfrom').alias('adfrom'),
fn.collect_list('appkey').alias('appkey'),
fn.collect_list('appname').alias('appname'),
fn.collect_list('battery').alias('battery'),
fn.collect_list('brand').alias('brand'),
fn.collect_list('channel').alias('channel'),
fn.collect_list('hardware').alias('hardware'),
fn.collect_list('product').alias('product'),
fn.collect_list('screensize').alias('screensize'),
fn.collect_list('manufacturer').alias('manufacturer'),
fn.collect_list('model').alias('model'),
fn.collect_list('nettype').alias('nettype'),
fn.collect_list('operator').alias('operator'),
fn.collect_list('os').alias('os'),
fn.collect_list('city').alias('city'),
fn.collect_list('actname').alias('actname'),
).rdd.map(row_dataID_druid_ad_behavior).toDF(schema=schema_getdataCols)
尝试过的方法:
提高BufferHolder缓存大小
.config('spark.kryoserializer.buffer.max', 5120) 将df分割成多个数据帧进行后续的处理
最终解决的方法:
思路固化了,谈到分组处理就死盯着groupby不放了。根据需求有点击保留所有点击,没点击保留所有曝光,所有关键代码如下:
'''三元组拼接,划分label'''
# 先筛除有点击的数
df_have_click = df.filter(df['actname'] == 'ckads')
# 保留有点击的三元组id
click_ids = df_have_click.select('newid').collect()
click_ids = [i[0] for i in click_ids]
click_ids = list(set(click_ids))
# 筛除有曝光的数,排除有点击的三元组id
df_have_display = df.filter(df['actname'] == 'exads').filter(~df['newid'].isin(click_ids))
# 拼接
df_HDFS_res = df_have_click.unionAll(df_have_display)
问题解决.



