启动命令
seata-server.sh -p 8091
执行Server.java中的main方法
public class Server {
public static void main(String[] args) throws IOException {
// 省略部分代码...
// SessionHolder负责事务日志的持久化存储
// 设置存储模式,有三种可选类型,file,db,redis
SessionHolder.init(parameterParser.getStoreMode());
// 创建事务协调器
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
coordinator.init();
// 省略部分代码...
System.exit(0);
}
}
SessionHolder其实就是4个SessionManager的集合
// 保存了所有的GlobalSession private static SessionManager ROOT_SESSION_MANAGER; // 需要异步commit的GlobalSession private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER; // 需要重试commit的GlobalSession private static SessionManager RETRY_COMMITTING_SESSION_MANAGER; // 需要重试roollback的GlobalSession private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
在init方法中通过SPI的方式实例化对应的类
public static void init(String mode) {
if (StringUtils.isBlank(mode)) {
mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
}
StoreMode storeMode = StoreMode.get(mode);
if (StoreMode.DB.equals(storeMode)) {
// 通过spi加载SessionManager
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
} else if (StoreMode.FILE.equals(storeMode)) {
// 省略其他存储方式的加载逻辑
}
// 删除已经完成的GlobalSession
reload(storeMode);
}
4种类型的SessionManager都是同一个实例,只是调用的构造方法不同而已。以DatabaseSessionManager为例,ROOT_SESSION_MANAGER调用了无参数构造函数,而其他SessionManager传入了taskName属性,SessionManager会根据其taskName返回不同的GlobalSession
我们来看一下SessionManager的继承关系
SessionLifecycleListener看接口名字就是基于观察者模式设计的,当对应的GlobalSession状态发生改变的时候,就会回调相应的方法
SessionManager则定义了GlobalSession状态发生变化时应该执行的动作方法
AbstractSessionManager则是一个抽象类,当SessionLifecycleListener接口方法被回调时,调用SessionManager定义的动作方法
public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {
protected TransactionStoreManager transactionStoreManager;
// 省略部分代码
// 重写了SessionManager接口方法
@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
}
writeSession(LogOperation.GLOBAL_ADD, session);
}
// 重写了SessionManager接口方法
@Override
public void updateGlobalSessionStatus(GlobalSession session, GlobalStatus status) throws TransactionException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_UPDATE);
}
writeSession(LogOperation.GLOBAL_UPDATE, session);
}
// 重写了SessionLifecycleListener接口方法
@Override
public void onBegin(GlobalSession globalSession) throws TransactionException {
addGlobalSession(globalSession);
}
// 重写了SessionLifecycleListener接口方法
@Override
public void onStatusChange(GlobalSession globalSession, GlobalStatus status) throws TransactionException {
updateGlobalSessionStatus(globalSession, status);
}
}
可以看到AbstractSessionManager并没有实现SessionManager接口的方法,而是直接抛出异常,说明具体的存储逻辑交给子类来实现了
AbstractSessionManager有3个实现类,说明seata支持3种存储模式。具体追到这3个实现类中,可以看到最终存储的逻辑是交给TransactionStoreManager来实现的
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
status对应的值为枚举类GlobalStatus,即一个全局事务的所有状态为
| 枚举值 | status值 | 含义 | 备注 |
|---|---|---|---|
| UnKnown | 0 | Un known global status. | |
| Begin | 1 | The Begin. | PHASE 1: can accept new branch registering. |
| Committing | 2 | Committing. | PHASE 2: Running Status: may be changed any time. |
| CommitRetrying | 3 | The Commit retrying. | |
| Rollbacking | 4 | Rollbacking global status. | |
| RollbackRetrying | 5 | The Rollback retrying. | |
| TimeoutRollbacking | 6 | The Timeout rollbacking. | |
| TimeoutRollbackRetrying | 7 | The Timeout rollback retrying. | |
| AsyncCommitting | 8 | All branches can be async committed. The committing is NOT done yet, but it can be seen as committed for TM/RM client | |
| Committed | 9 | Finally: global transaction is successfully committed. | PHASE 2: Final Status: will NOT change any more. |
| CommitFailed | 10 | The Commit failed. | |
| Rollbacked | 11 | The Rollbacked. | |
| RollbackFailed | 12 | The Rollback failed. | |
| TimeoutRollbacked | 13 | The Timeout rollbacked. | |
| TimeoutRollbackFailed | 14 | The Timeout rollback failed. | |
| Finished | 15 | The Finished. |
而DefaultCoordinator#init方法,初始化5个定时任务
public void init() {
// 重试rollback定时任务
retryRollbacking.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.retryRollbackingLock();
if (lock) {
try {
handleRetryRollbacking();
} catch (Exception e) {
LOGGER.info("Exception retry rollbacking ... ", e);
} finally {
SessionHolder.unRetryRollbackingLock();
}
}
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
// 重试commit定时任务
retryCommitting.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.retryCommittingLock();
if (lock) {
try {
handleRetryCommitting();
} catch (Exception e) {
LOGGER.info("Exception retry committing ... ", e);
} finally {
SessionHolder.unRetryCommittingLock();
}
}
}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
// 省略部分代码
}
参考博客


