从kafka消费的数据落到Hbase中,源表的数据量较大,因此采用了mutator缓存1000条,或者累积1秒后,一起put。避免短时间内大量访问hbase regionServer,把hbase 干废了。
由于mutator的flush操作是在invoke()方法中触发的,而invoke方法只在数据达到sink算子时,才会被触发。因此在夜间数据比较稀疏时,数据的时效性取决于两条数据到达的时间间隔。
为解决这个问题,在open函数中初始化了一个Timer定时器,设置每秒中调用一次Invoke方法。这样除了数据达到会触发invoke方法,定时器也会触发invoke。
@Override
public void open(Configuration parameters) thorw Exceptions{
initialConnection();
initialMuator();
timer = new Timer();
timer.schedule(new TimerTask(){
@Override
public void run(){
invoke(new PutVo(), context);
}
},0L,1000L);
}
然而出现了丢数问题。6万笔数据丢了三千多笔。
看日志发现,因为Timer起了并发线程,如下图。理想情况下两个并发的线程顺序执行,互不干扰。然而事实上他们共享了puts对象,出现了在第一个线程中刚把数据加到puts对象中,第二个线程把puts给清空了。
如此以来,出现了丢数。
解决方法:目前采用对自定义的sink类继承CheckpointedFunction接口,实现其中的snapshotState方法。
checkpointedFunction 是实现operator state 的核心方法,其中定义了两个方法:snapshotState 和 initialState
snapshotState 在checkpoint的时候会被调用,用于快照状态,通常用于flush、commit、synchronize外部系统initializeState 在从状态中恢复时会被调用。
计划在每次checkpoint的时候,flush一次。由于每次进行Checkpoint前,都需要暂停处理新流入数据,然后再开始执行快照。所以不会出现刚刚把数据放到puts里,然后puts被清空的情况。
public class HbaseMuatorSink extends RichSinkFunctionimplements Serializable, CheckpoitedFunction{ private List puts = new ArrayList<>(); private BufferedMutator mutator = null; @Override public void open() throws Exception{ initialConnection(); initialMuator(); } @Override public void invoke(PutVo putVo, Context context){ puts.add(getPut(putVo)); count +=1; if(count > 1000){ mutator.mutate(); mutator.flush(); puts.clear(); count = 0; } } @Override public void close() throws Exception{ } @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw Exception{ mutator.mutate(puts); mutator.flush(); puts.clear(); count = 0; } @Override public void initializeState(FunctionInitializationContext context) throws Exception { } }
reference: 聊聊flink的CheckpointedFunction - 简书



