栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

spring启动设置开关

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

spring启动设置开关

【前言】

        最近接触了一个项目,对数据库的数据进行监听,当监听到数据库数据进行了增删改,就会更新es或者做一些业务处理,这时候就有一个疑问,如果我项目运行中,我想关掉这个服务的监听,而不是杀死服务,这时候怎么处理呢?

【过程】

     

@Component
public class SpringFinishedListener implements InitializingBean, ApplicationListener {//实现对spring的启动器进行监听

    private static final Logger logger = LoggerFactory.getLogger(SpringFinishedListener.class);

    
    private final ReentrantLock lock = new ReentrantLock();

    private volatile boolean started = false;//用变量控制开关

    private final Thread receiveThread = new Thread(new ReceiveTask(), "TunnelReceiveThread-logical-slot");


    @Autowired
    private EventParserService eventParserService;

    @Autowired
    private PublisherService publisherService;

    @Autowired
    private WhiteListPublisherService whiteListPublisherService;

   
    //做业务处理
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        try {
            createRplConn();
            createRplSlot();
            createRplStream();
            started = true;
            receiveThread.start();
            Thread.sleep(10*1000);
            //队列消费
            publisherService.consume();
            //白名单队列消费
            whiteListPublisherService.consume();
            logger.info("Startup RplStream Success");
        } catch (Exception e) {
            logger.warn("Startup RplStream Failure", e);
            shutdown();
        }
    }
    //当手动掉下面关闭的方法后 这里就不执行了
    private class ReceiveTask implements Runnable {

        @Override
        public void run() {
            while (started) {
                try {
                    receiveStream();
                } catch (Exception e) {
                    logger.warn("receiveStream msg failure,try to recover.", e);
                    recover();
                }
            }
        }
    }

    public void shutdown() {
        started = false;
        closeClosable(this.stream);
        closeClosable(this.connection);
    }

    private void createRplConn() throws SQLException {
        //String url = "jdbc:postgresql://192.168.1.31:5432/boss_crm";
        Properties props = new Properties();
        PGProperty.USER.set(props, this.username);
        PGProperty.PASSWORD.set(props, SecurityUtils.rc4Decrypt(this.password));
        PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, this.version);
        PGProperty.REPLICATION.set(props, "database");
        PGProperty.PREFER_QUERY_MODE.set(props, "simple");

        this.connection = DriverManager.getConnection(url, props);
        this.rplConnection = this.connection.unwrap(PGConnection.class);
        logger.info("GetRplConnection success,slot:{}", this.slotName);
    }

    private void createRplSlot() throws SQLException {
        try {
            this.rplConnection.getReplicationAPI()
                    .createReplicationSlot()
                    .logical()
                    .withSlotName(slotName)
                    .withOutputPlugin("test_decoding")
                    .make();
            logger.info("GetRplSlot success,slot:{}", this.slotName);
        } catch (SQLException e) {
            logger.error("ERROR: replication slot " + slotName + " already exists", e);
        }
    }

    private void createRplStream() throws SQLException {
        this.stream = this.rplConnection.getReplicationAPI()
                .replicationStream()
                .logical()
                .withSlotName(slotName)
                .withSlotOption("include-xids", true)
                .withSlotOption("skip-empty-xacts", true)
                .withSlotOption("include-timestamp", true)
                .withStatusInterval(5, TimeUnit.SECONDS)
                .start();
        logger.info("GetRplStream success,slot:{}", this.slotName);
    }

    private void recover() {
        this.lock.lock();
        try {
            long s = System.currentTimeMillis();
            closeClosable(stream);
            closeClosable(connection);

            while (true) {
                try {
                    createRplConn();
                    createRplSlot();
                    createRplStream();
                    break;
                } catch (Exception e) {
                    logger.warn("Recover Streaming Occurred Error", e);
                    closeClosable(stream);
                    closeClosable(connection);
                    TimeUtils.sleepInMills(500);
                }
            }
            long e = System.currentTimeMillis();
            logger.info("recover logical replication success,slot:{},cost:{}ms", slotName, e - s);
        } finally {
            this.lock.unlock();
        }
    }

    private static void closeClosable(AutoCloseable closeable) {
        if (closeable != null) {
            try {
                closeable.close();
            } catch (Exception e) {
                logger.info("closeClosable err", e);
            }
        }
    }

    private void receiveStream() throws SQLException {

        assert !stream.isClosed();
        assert !connection.isClosed();

        //non blocking receive message
        ByteBuffer msg = stream.readPending();

        if (msg == null) {
            return;
        }
        int offset = msg.arrayOffset();
        byte[] source = msg.array();
        int length = source.length - offset;
        if (length > 65536) {
            String longStr = new String(source, offset, 4000);
            logger.info("String length is too long longStr={}", longStr);
            logger.info("String length is too long length={}", length);
            LogSequenceNumber lsn = stream.getLastReceiveLSN();
            stream.setAppliedLSN(lsn);
            stream.setFlushedLSN(lsn);
            return;
        }
        String message = new String(source, offset, length);

        //System.out.println("message=====================" +message);
        //logger.info("message=" + message);

        LogSequenceNumber lsn = stream.getLastReceiveLSN();

        InvokeContext ctx = new InvokeContext();
        ctx.setMessage(message);
        eventParserService.parse(ctx);

        //feedback
        stream.setAppliedLSN(lsn);
        stream.setFlushedLSN(lsn);
    }

    //手动掉这个方法可关闭
    public void closeReplication() {
        try {
            started = false;
        } catch (Exception e) {
            logger.info("closeReplication err", e);
        }
    }
}

【总结】

        思路大概就是对项目启动后,设置变量为开,当手动关掉后,线程池收到关的信号后,停止运行

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/305734.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号