您需要在阅读器中返回一个检查点值,该值 checkpointInfo()
将 open()
在重新启动时传递到阅读器的方法中。阅读器和批处理容器通过这种方式协调以在重新启动时提供检查点。
因此,您可能会遇到类似(查找 CHECKPOINT 注释)的情况:
public class DBItemReader implements ItemReader { // ... // CHECKPOINT field defined private String checkpoint = null; @Override public void open(Serializable checkpoint) throws NamingException, SQLException { // CHECKPOINT-based positioning through query value. // Initial position = whereclauseFrom, on restart set to checkpoint String queryVal = (String)(checkpoint == null ? whereclauseFrom : checkpoint); if(Integer.parseInt(whereclauseFrom) == 5){ sql = "SELECt * FROM " + tableName + " WHERe CAST(REC AS INTEGER) <= "+ queryVal; }else if(Integer.parseInt(whereclauseFrom) == 6){ sql = "SELECt * FROM " + tableName + " WHERe CAST(REC AS INTEGER) >= "+ queryVal; } // .. } @Override public Object readItem() throws SQLException { if (listRecObj.size() == 0) { return null; } else { RecObj rec =null; Iterator<RecObj> iter =listRecObj.iterator(); while (iter.hasNext()) { rec = iter.next(); // CHECKPOINT updated checkpoint = rec.getRec(); if (Integer.parseInt(rec.getRec()) == 7) { throw new IllegalStateException("Thrown Error"); } } } // ... } @Override public Serializable checkpointInfo() { // CHECKPOINT returned at end of chunk return checkpoint; }}


