在实际项目应用上,某些需求会有更新静态规则表的情况,如消息过滤规则、风控规则等。通常这样的表数据量不会大,在spark中使用广播变量的形式使用,而广播变量是不支持更新的,怎样在流处理过程中更新,下面分别论述Spark streaming和Structured streaming的场景。
一、Spark streaming
可以利用单例模式定时的删除已经广播的值,同时获取新的变量值重新广播,假如要广播的是RDS中的表,代码示例如下:
注意事项:
- spark streaming会为每一个流创建job,为了不同job间互不影响,需在foreachRDD、transform算子内进行变量的广播操作此方法仅适用于spark streaming,structured streaming需使用其他方法做广播变量的更新
@Slf4j
public class JDBCBroadcastPeriodicUpdater {
private static final int PERIOD = 30 * 1000; //更新周期,秒
private static volatile JDBCBroadcastPeriodicUpdater instance;
private Broadcast>> broadcast;
private long lastUpdate = 0L;
private JDBCBroadcastPeriodicUpdater() {}
public static JDBCBroadcastPeriodicUpdater getInstance() {
if (instance == null) {
synchronized (JDBCBroadcastPeriodicUpdater.class) {
if (instance == null) {
instance = new JDBCBroadcastPeriodicUpdater();
}
}
}
return instance;
}
public Broadcast>> updateAndGet(SparkSession spark, DruidDataSource dataSource, String sql) {
SparkContext sc = spark.sparkContext();
long now = System.currentTimeMillis();
long offset = now - lastUpdate;
if (offset > PERIOD || null == broadcast) {
if (broadcast != null) {
// 删除已获取的广播变量值
broadcast.unpersist();
}
lastUpdate = now;
// 重新广播新的变量值
List



