栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

seata源码解析:seata server如何存储事务执行状态的?

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

seata源码解析:seata server如何存储事务执行状态的?

介绍

启动命令

seata-server.sh -p 8091

执行Server.java中的main方法

public class Server {

    public static void main(String[] args) throws IOException {

        // 省略部分代码...

        // SessionHolder负责事务日志的持久化存储
        // 设置存储模式,有三种可选类型,file,db,redis
        SessionHolder.init(parameterParser.getStoreMode());

        // 创建事务协调器
        DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
        coordinator.init();

        // 省略部分代码...
        System.exit(0);
    }
}

SessionHolder其实就是4个SessionManager的集合

// 保存了所有的GlobalSession
private static SessionManager ROOT_SESSION_MANAGER;
// 需要异步commit的GlobalSession
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 需要重试commit的GlobalSession
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 需要重试roollback的GlobalSession
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;

在init方法中通过SPI的方式实例化对应的类

public static void init(String mode) {
    if (StringUtils.isBlank(mode)) {
        mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
    }
    StoreMode storeMode = StoreMode.get(mode);
    if (StoreMode.DB.equals(storeMode)) {
        // 通过spi加载SessionManager
        ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
        ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
            new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
            new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
            new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
    } else if (StoreMode.FILE.equals(storeMode)) {
        // 省略其他存储方式的加载逻辑
    }
    // 删除已经完成的GlobalSession
    reload(storeMode);
}

4种类型的SessionManager都是同一个实例,只是调用的构造方法不同而已。以DatabaseSessionManager为例,ROOT_SESSION_MANAGER调用了无参数构造函数,而其他SessionManager传入了taskName属性,SessionManager会根据其taskName返回不同的GlobalSession

我们来看一下SessionManager的继承关系


SessionLifecycleListener看接口名字就是基于观察者模式设计的,当对应的GlobalSession状态发生改变的时候,就会回调相应的方法

SessionManager则定义了GlobalSession状态发生变化时应该执行的动作方法

AbstractSessionManager则是一个抽象类,当SessionLifecycleListener接口方法被回调时,调用SessionManager定义的动作方法

public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {

    protected TransactionStoreManager transactionStoreManager;

	// 省略部分代码
    // 重写了SessionManager接口方法
    @Override
    public void addGlobalSession(GlobalSession session) throws TransactionException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
        }
        writeSession(LogOperation.GLOBAL_ADD, session);
    }

    // 重写了SessionManager接口方法
    @Override
    public void updateGlobalSessionStatus(GlobalSession session, GlobalStatus status) throws TransactionException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_UPDATE);
        }
        writeSession(LogOperation.GLOBAL_UPDATE, session);
    }


    // 重写了SessionLifecycleListener接口方法
    @Override
    public void onBegin(GlobalSession globalSession) throws TransactionException {
        addGlobalSession(globalSession);
    }

    // 重写了SessionLifecycleListener接口方法
    @Override
    public void onStatusChange(GlobalSession globalSession, GlobalStatus status) throws TransactionException {
        updateGlobalSessionStatus(globalSession, status);
    }

}

可以看到AbstractSessionManager并没有实现SessionManager接口的方法,而是直接抛出异常,说明具体的存储逻辑交给子类来实现了

AbstractSessionManager有3个实现类,说明seata支持3种存储模式。具体追到这3个实现类中,可以看到最终存储的逻辑是交给TransactionStoreManager来实现的

CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

status对应的值为枚举类GlobalStatus,即一个全局事务的所有状态为

枚举值status值含义备注
UnKnown0Un known global status.
Begin1The Begin.PHASE 1: can accept new branch registering.
Committing2Committing.PHASE 2: Running Status: may be changed any time.
CommitRetrying3The Commit retrying.
Rollbacking4Rollbacking global status.
RollbackRetrying5The Rollback retrying.
TimeoutRollbacking6The Timeout rollbacking.
TimeoutRollbackRetrying7The Timeout rollback retrying.
AsyncCommitting8All branches can be async committed. The committing is NOT done yet, but it can be seen as committed for TM/RM client
Committed9Finally: global transaction is successfully committed.PHASE 2: Final Status: will NOT change any more.
CommitFailed10The Commit failed.
Rollbacked11The Rollbacked.
RollbackFailed12The Rollback failed.
TimeoutRollbacked13The Timeout rollbacked.
TimeoutRollbackFailed14The Timeout rollback failed.
Finished15The Finished.

而DefaultCoordinator#init方法,初始化5个定时任务

public void init() {
    // 重试rollback定时任务
    retryRollbacking.scheduleAtFixedRate(() -> {
        boolean lock = SessionHolder.retryRollbackingLock();
        if (lock) {
            try {
                handleRetryRollbacking();
            } catch (Exception e) {
                LOGGER.info("Exception retry rollbacking ... ", e);
            } finally {
                SessionHolder.unRetryRollbackingLock();
            }
        }
    }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    // 重试commit定时任务
    retryCommitting.scheduleAtFixedRate(() -> {
        boolean lock = SessionHolder.retryCommittingLock();
        if (lock) {
            try {
                handleRetryCommitting();
            } catch (Exception e) {
                LOGGER.info("Exception retry committing ... ", e);
            } finally {
                SessionHolder.unRetryCommittingLock();
            }
        }
    }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    // 省略部分代码
}

参考博客
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/342946.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号