【前言】
最近接触了一个项目,对数据库的数据进行监听,当监听到数据库数据进行了增删改,就会更新es或者做一些业务处理,这时候就有一个疑问,如果我项目运行中,我想关掉这个服务的监听,而不是杀死服务,这时候怎么处理呢?
【过程】
@Component public class SpringFinishedListener implements InitializingBean, ApplicationListener{//实现对spring的启动器进行监听 private static final Logger logger = LoggerFactory.getLogger(SpringFinishedListener.class); private final ReentrantLock lock = new ReentrantLock(); private volatile boolean started = false;//用变量控制开关 private final Thread receiveThread = new Thread(new ReceiveTask(), "TunnelReceiveThread-logical-slot"); @Autowired private EventParserService eventParserService; @Autowired private PublisherService publisherService; @Autowired private WhiteListPublisherService whiteListPublisherService; //做业务处理 @Override public void onApplicationEvent(ContextRefreshedEvent event) { try { createRplConn(); createRplSlot(); createRplStream(); started = true; receiveThread.start(); Thread.sleep(10*1000); //队列消费 publisherService.consume(); //白名单队列消费 whiteListPublisherService.consume(); logger.info("Startup RplStream Success"); } catch (Exception e) { logger.warn("Startup RplStream Failure", e); shutdown(); } } //当手动掉下面关闭的方法后 这里就不执行了 private class ReceiveTask implements Runnable { @Override public void run() { while (started) { try { receiveStream(); } catch (Exception e) { logger.warn("receiveStream msg failure,try to recover.", e); recover(); } } } } public void shutdown() { started = false; closeClosable(this.stream); closeClosable(this.connection); } private void createRplConn() throws SQLException { //String url = "jdbc:postgresql://192.168.1.31:5432/boss_crm"; Properties props = new Properties(); PGProperty.USER.set(props, this.username); PGProperty.PASSWORD.set(props, SecurityUtils.rc4Decrypt(this.password)); PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, this.version); PGProperty.REPLICATION.set(props, "database"); PGProperty.PREFER_QUERY_MODE.set(props, "simple"); this.connection = DriverManager.getConnection(url, props); this.rplConnection = this.connection.unwrap(PGConnection.class); logger.info("GetRplConnection success,slot:{}", this.slotName); } private void createRplSlot() throws SQLException { try { this.rplConnection.getReplicationAPI() .createReplicationSlot() .logical() .withSlotName(slotName) .withOutputPlugin("test_decoding") .make(); logger.info("GetRplSlot success,slot:{}", this.slotName); } catch (SQLException e) { logger.error("ERROR: replication slot " + slotName + " already exists", e); } } private void createRplStream() throws SQLException { this.stream = this.rplConnection.getReplicationAPI() .replicationStream() .logical() .withSlotName(slotName) .withSlotOption("include-xids", true) .withSlotOption("skip-empty-xacts", true) .withSlotOption("include-timestamp", true) .withStatusInterval(5, TimeUnit.SECONDS) .start(); logger.info("GetRplStream success,slot:{}", this.slotName); } private void recover() { this.lock.lock(); try { long s = System.currentTimeMillis(); closeClosable(stream); closeClosable(connection); while (true) { try { createRplConn(); createRplSlot(); createRplStream(); break; } catch (Exception e) { logger.warn("Recover Streaming Occurred Error", e); closeClosable(stream); closeClosable(connection); TimeUtils.sleepInMills(500); } } long e = System.currentTimeMillis(); logger.info("recover logical replication success,slot:{},cost:{}ms", slotName, e - s); } finally { this.lock.unlock(); } } private static void closeClosable(AutoCloseable closeable) { if (closeable != null) { try { closeable.close(); } catch (Exception e) { logger.info("closeClosable err", e); } } } private void receiveStream() throws SQLException { assert !stream.isClosed(); assert !connection.isClosed(); //non blocking receive message ByteBuffer msg = stream.readPending(); if (msg == null) { return; } int offset = msg.arrayOffset(); byte[] source = msg.array(); int length = source.length - offset; if (length > 65536) { String longStr = new String(source, offset, 4000); logger.info("String length is too long longStr={}", longStr); logger.info("String length is too long length={}", length); LogSequenceNumber lsn = stream.getLastReceiveLSN(); stream.setAppliedLSN(lsn); stream.setFlushedLSN(lsn); return; } String message = new String(source, offset, length); //System.out.println("message=====================" +message); //logger.info("message=" + message); LogSequenceNumber lsn = stream.getLastReceiveLSN(); InvokeContext ctx = new InvokeContext(); ctx.setMessage(message); eventParserService.parse(ctx); //feedback stream.setAppliedLSN(lsn); stream.setFlushedLSN(lsn); } //手动掉这个方法可关闭 public void closeReplication() { try { started = false; } catch (Exception e) { logger.info("closeReplication err", e); } } }
【总结】
思路大概就是对项目启动后,设置变量为开,当手动关掉后,线程池收到关的信号后,停止运行



