栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark流处理中定时更新广播变量值

Spark流处理中定时更新广播变量值

Spark流处理中定时更新广播变量值

在实际项目应用上,某些需求会有更新静态规则表的情况,如消息过滤规则、风控规则等。通常这样的表数据量不会大,在spark中使用广播变量的形式使用,而广播变量是不支持更新的,怎样在流处理过程中更新,下面分别论述Spark streaming和Structured streaming的场景。


一、Spark streaming

可以利用单例模式定时的删除已经广播的值,同时获取新的变量值重新广播,假如要广播的是RDS中的表,代码示例如下:

注意事项:

    spark streaming会为每一个流创建job,为了不同job间互不影响,需在foreachRDD、transform算子内进行变量的广播操作此方法仅适用于spark streaming,structured streaming需使用其他方法做广播变量的更新
@Slf4j
public class JDBCBroadcastPeriodicUpdater {
    private static final int PERIOD = 30 * 1000; //更新周期,秒
    private static volatile JDBCBroadcastPeriodicUpdater instance;

    private Broadcast>> broadcast;
    private long lastUpdate = 0L;

    private JDBCBroadcastPeriodicUpdater() {}

    public static JDBCBroadcastPeriodicUpdater getInstance() {
        if (instance == null) {
            synchronized (JDBCBroadcastPeriodicUpdater.class) {
                if (instance == null) {
                    instance = new JDBCBroadcastPeriodicUpdater();
                }
            }
        }
        return instance;
    }

    
    public Broadcast>> updateAndGet(SparkSession spark, DruidDataSource dataSource, String sql) {
        SparkContext sc = spark.sparkContext();
        long now = System.currentTimeMillis();
        long offset = now - lastUpdate;
        if (offset > PERIOD || null == broadcast) {
            if (broadcast != null) {
                // 删除已获取的广播变量值
                broadcast.unpersist();
            }
            lastUpdate = now;
            // 重新广播新的变量值
            List> value = fetchBroadcastData(dataSource, sql);
            broadcast = JavaSparkContext.fromSparkContext(sc).broadcast(value);
        }
        return broadcast;
    }

    
    @SneakyThrows
    private List> fetchBroadcastData(DruidDataSource dataSource, String sql) {
        List> result = new ArrayList<>();

        DruidPooledConnection conn = dataSource.getConnection();
        PreparedStatement ps = conn.prepareStatement(sql);
        ResultSet data = ps.executeQuery();
        ResultSetmetaData metaData = data.getmetaData();
        int colCount = metaData.getColumnCount();

        while (data.next()) {
            HashMap row = new HashMap<>();
            for (int i=0; i 

二、Structured streaming

Structured streaming使用trigger触发每个批次的数据处理,但由于使用了Spark sql engine,代码是优化后执行的,只有在首次触发trigger时才会获取广播变量的值,故前述在Spark streaming中使用的方法并不能达到更新变量的目的。

2.1 借用Listener的特性

Spark使用微批的形式处理流数据,而每个流的运行都会伴随着Listener监控任务的执行状态。在Structured streaming中,Listener有三个方法:onQueryStarted、onQueryProgress、onQueryTerminated,分别在程序开始运行、每批次数据处理完毕、程序结束时调用,需要注意的是onQueryProgress是异步调用的。

变量的广播操作需要在driver上执行的,而Listener的调用也是在driver端,我们正好可以利用这一点,在onQueryProgress方法中进行广播变量的更新操作。具体就是使用.unpersist()删除广播变量再重新广播。

以上理论上可以实现所需功能,但spark是支持static与stream做join的,而且在执行时每次触发trigger都会去重新获取static的df,故比起上面的方法,以下方法更为推荐。

2.2 使用SQL Hints

如前所述,实际操作中可以分别在代码中获取static df和stream df,创建临时视图做join操作,并用Hints语法标识需要做广播的表。

此法利用了static df在每个批次都会重新读取的特点更新规则数据,又利用Hints语法使实际的数据处理完全用SQL完成,对比自定义Listener的方式更佳简单易用,利于维护。

代码示例:

SELECt  * FROM records r JOIN src s ON r.key = s.key

关于Spark SQL Hints,参考官方文档:Hints - Spark 3.1.2 documentation (apache.org)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/728881.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号