栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark处理hudi增量数据并进行聚合操作———附带详细思路和代码

spark处理hudi增量数据并进行聚合操作———附带详细思路和代码

文章目录

1 背景与结果2 方法3 代码

3.1 法13.2 法23.3 法3 4 附送启动pyspark shell的脚本

1 背景与结果

因为如果每次都对全量数据进行聚合操作,将是非常耗费时间的一件事,但是如果我们只处理与增量数据相关的数据,那处理的数据量将大大减少,程序运行的处理时间将会大大缩短。

我们对三种方法,在原数据10865条,增量数据2800条(总共包含900种聚类后种类)的条件下,进行测试,得到

法1的处理时间:437.8s;法2的处理时间:471.3s;法3的处理时间:6.2 s 2 方法

这里使用三种方法处理增量:

使用增量数据dataframe得到每条增量数据(list格式)

法1:使用总表(包含增量数据)的dataframe创建临时镜像表,对每条增量数据在临时镜像表进行聚合操作;法2:对每条增量数据对总表dataframe进行聚合操作; 法3:对增量数据和全量数据做join处理,得到全量数据中与增量数据关联的全部变化的数据,然后对这部分数据进行聚合操作。 3 代码

得到全量数据:

read_hudi_file_path = '/test_hudi/PROD/hadoop/ods/fyk_test_02_fyk_test_02/fyk_test_02/mysql_10.20.3.88_3306_mf_test4/jk8_mor*/*'

original_tablet_DF = spark. 
                read. 
                format("hudi"). 
                load(read_hudi_file_path)
                
columns_to_drop =['_hoodie_commit_time', '_hoodie_commit_seqno', '_hoodie_record_key', '_hoodie_partition_path', '_hoodie_file_name', 'hudi_delta_streamer_ingest_date', 'hudi_delta_streamer_update_time', '_hoodie_is_deleted']

original_tablet_DF = original_tablet_DF.drop(*columns_to_drop)                   

得到增量数据:

write_incremental_begin_time = '20220119171247'

read_hudi_file_base_path = '/test_hudi/PROD/hadoop/ods/fyk_test_02_fyk_test_02/fyk_test_02/mysql_10.20.3.88_3306_mf_test4/jk8_mor/'


incremental_read_options = {
    'hoodie.datasource.query.type': 'incremental',
    'hoodie.datasource.read.begin.instanttime': write_incremental_begin_time,
}


incremental_table_df = spark.read.format("hudi").options(**incremental_read_options).load(read_hudi_file_base_path)



incremental_table_df = incremental_table_df.drop(*columns_to_drop)
3.1 法1

得到每条增量数据(法1和法2共用):

# 去重
aggregate_field = ['sub_trans_code']

incremental_df = incremental_table_df.dropDuplicates((aggregate_field)).select(*aggregate_field)

# 存储每条的数据
detail_data_aggregate_list = list()

for d in incremental_df.collect():
    d = list(d)
    detail_data_aggregate_list.append(d)

    
print('使用list存储每条数据:')
print(detail_data_aggregate_list[0])

agg_params = dict()

agg_params = {'sub_total_trans_cost': 'sum', 'sub_total_trans_price': 'sum'}

处理dataframe:

write_data_list = list()
write_title_list = list()

for i in range(len(detail_data_aggregate_list)):
    for j in range(len(aggregate_field)):
        if j != 0:
            if isinstance(detail_data_aggregate_list[i][j], str):
                filter_statement += f" AND {aggregate_field[j]}='{detail_data_aggregate_list[i][j]}'"
            else:
                filter_statement += f" AND {aggregate_field[j]}={detail_data_aggregate_list[i][j]}"
        else:
            if isinstance(detail_data_aggregate_list[i][j], str):
                filter_statement = f"{aggregate_field[j]}='{detail_data_aggregate_list[i][j]}'"
            else:
                filter_statement = f"{aggregate_field[j]}={detail_data_aggregate_list[i][j]}"
    print(filter_statement)
    temp_write_df = original_tablet_DF.filter(filter_statement).groupBy(*aggregate_field).agg(agg_params)
    write_data_list.append(tuple(list(temp_write_df.collect()[0]))) # 法2(先使用list存储,然后把再转换为dataframe)
    
   
    # if i != 0:# 法1(创建写入的dataframe)
    #    write_df = temp_write_df.union(write_df)
    # else:
    #    write_df = temp_write_df
    # unionAll/union是返回两个数据集的并集,包括重复行
    # Intersect是返回两个数据集的交集,不包括重复行
    # Minus是返回两个数据集的差集,不包括重复行    
    
# 法2
write_title_list.extend(aggregate_field)
accumulate_field = ['sub_total_trans_cost', 'sub_total_trans_price']
write_title_list.extend(accumulate_field)
write_df = self._spark.createDataframe(write_data_list, write_title_list)
3.2 法2
original_tablet_DF.createOrReplaceTempView('original_table_name')

aggregate_field_list_str = ','.join([aggregate_field[i] for i in range(0, len(aggregate_field))])

write_data_list = list()

for i in range(len(detail_data_aggregate_list)):
    sql_statement = f"SELECT {aggregate_field_list_str}"
    sql_statement += ",SUM(sub_total_trans_cost), SUM(sub_total_trans_price) "
    sql_statement += f" FROM original_table_name WHERe"
    for j in range(len(aggregate_field)):
        if j != 0:
            if isinstance(detail_data_aggregate_list[i][j], str):
                sql_statement += " AND {0}='{1}'".format(aggregate_field[j], detail_data_aggregate_list[i][j])
            else:
                sql_statement += " AND {0}={1}".format(aggregate_field[j], detail_data_aggregate_list[i][j])
        else:
            if isinstance(detail_data_aggregate_list[i][j], str):
                sql_statement += " {0}='{1}'".format(aggregate_field[j], detail_data_aggregate_list[i][j])
            else:
                sql_statement += " {0}={1}".format(aggregate_field[j], detail_data_aggregate_list[i][j])
    sql_statement += f" GROUP BY {aggregate_field_list_str}"
    print(sql_statement)
     write_data_list.append(tuple(list(spark.sql(sql_statement).collect()[0])))

    
    
print('write_data_list:')
print(write_data_list)
write_title_list = list()
write_title_list.extend(self._aggregate_field)
accumulate_field = ['sub_total_trans_cost', 'sub_total_trans_price']
write_title_list.extend(accumulate_field)

# print(write_title_list)
write_df = spark.createDataframe(write_data_list, write_title_list)
write_df.show(5)
print(write_df.count())
3.3 法3
# join

#  
write_df = original_tablet_DF.join(incremental_df, aggregate_field, 'left_semi')

write_df = write_df.groupBy(*aggregate_field).agg(agg_params)
4 附送启动pyspark shell的脚本
#!/usr/bin/env bash
/software/spark-3.1.2-bin-hadoop2.7/bin/pyspark --jars /root/yl/hudi-0.10.0/docker/hoodie/hadoop/hive_base/target/hoodie-spark-bundle.jar --driver-class-path /software/hadoop-2.10.1/etc/hadoop:/software/apache-hive-2.3.8-bin/conf/:/software/member/config/mysql-connector-java-5.1.49-bin.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --deploy-mode client --driver-memory 1G --executor-memory 1G --num-executors 3 --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/710556.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号