栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

OpenGauss数据库中事务管理源码解析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

OpenGauss数据库中事务管理源码解析

一、事务
  1. 事务的定义

事务是数据库操作的执行单位,需要满足最基本的ACID(原子性、一致性、隔离性、持久性)属性。

(1) 原子性:一个事务提交之后要么全部执行,要么全部不执行。

(2) 一致性:事务的执行不能破坏数据库的完整性和一致性。

(3) 隔离性:事务的隔离性是指在并发中,一个事务的执行不能被其他事务干扰。

(4) 持久性:一旦事务完成提交,那么它对数据库的状态变更就会永久保存在数据库中。

本章主要介绍openGauss事务模块是如何实现数据库事务的基本属性,使用户数据不丢不错、修改不乱、查询无错误。

   2.事务管理器

事务管理器:事务系统的中枢,它的实现是一个有限循环状态机,通过接受外部系统的命令并根据当前事务所处的状态决定事务的下一步执行过程。

在openGauss中,事务的实现与存储引擎的实现有很强关联,代码主要集中在src/gausskernel/storage/access/transam及src/gausskernel/storage/lmgr下。

(1) 事务管理器:事务系统的中枢,它的实现是一个有限循环状态机,通过接受外部系统的命令并根据当前事务所处的状态决定事务的下一步执行过程。

(2) 日志管理器:用来记录事务执行的状态以及数据变化的过程,包括事务提交日志(CLOG)、事务提交序列日志(CSNLOG)以及事务日志(XLOG)。其中CLOG日志只用来记录事务执行的结果状态,CSNLOG记录日志提交的顺序,用于可见性判断;XLOG是数据的redo日志,用于恢复及持久化。

(3) 线程管理机制:通过一片内存区域记录所有线程的事务信息,任何一个线程可以通过访问该区域获取其他事务的状态信息。

(4) MVCC机制:openGauss系统中,事务执行读流程结合各事务提交的CSN序列号,采用了多版本并发控制机制,实现了元组的读和写互不阻塞。详细可见性判断方法见“5.2 事务并发控制”。

(5) 锁管理器:实现系统的写并发控制,通过锁机制来保证事务写流程的隔离性。

二、Opengauss中的事务

   1.层次

openGauss将事务系统分为上层(事务块TBlockState)以及底层(TransState)两个层次。

通过分层的设计,在处理上层业务时可以屏蔽具体细节,实现灵活支持客户端各类事务执行语句(BEGIN/START TRANSACTION/COMMIT/ROLLBACK/END)。

(1) 事务块TBlockState:客户端query的状态,用于提高用户操作数据的灵活性,用事务块的形式支持在一个事务中执行多条query语句。

(2) 底层事务TransState:内核端视角,记录了整个事务当前处于的具体状态。

   2. 具体代码

事务块上层状态机结构体代码如下:

底层事务块代码:

    3.事务状态转换相关函数

        

改变事务块上层的状态

处理函数,根据相应的状态机调用子函数,在每个query处理前后,或检测到错误后被postgres.c 调用

根据当前事务上层状态机,对事务的资源进行相应的申请、回收及清理

事务处理子函数

子函数

说明

StartTransaction

开启事务,对内存及变量进行初始化操作,完成后将底层事务状

态置为TRANS_INPROGRESS

CommitTransaction

当前的底层状态机为TRANS_INPROGRESS,然后置为TRANS_COMMIT,

最后将底层状态机置为TRANS_DEFAULT

AbortTransaction

释放LWLock、UnlockBuffers、LockErrorCleanup,当前底层状态为

TRANS_INPROGRESS,设置为TRANS_ABORT,记录相应的CLOG日志,

清空事务槽位信息,释放各类资源

CleanupTransaction

当前底层状态机应为TRANS_ABORT,继续清理一些资源,一般紧接

着AbortTransaction调用

StartSubTransaction

开启子事务

CommitSubTransaction

提交子事务

AbortSubTransaction

回滚子事务

CleanupSubTransaction

清理子事务的一些资源信息,类似于CleanupTransaction

事务执行函数

函数

说明

StartTransactionCommand

事务开始时根据上层状态机调用相应的事务执行函数

CommitTransactionCommand

事务结束时根据上层状态机调用相应的事务执行函数

AbortCurrentTransaction

事务内部出错,长跳转longjump调用,提前清理掉相应的资源,

并将事务上层状态机置为TBLOCK_ABORT

上层事务状态机控制函数

函数

说明

BeginTransactionBlock

开启一个显式事务时,将上层事务状态机变为TBLOCK_BEGIN

EndTransactionBlock

显式提交一个事务时,将上层事务状态机变为TBLOCK_END

UserAbortTransactionBlock

显式回滚一个事务时,将上层事务状态机变为TBLOCK_ABORT_PENDING/

TBLOCK_ABORT_END

PrepareTransactionBlock

显式执行PREPARE语句,将上层事务状态机变为TBLOCK_PREPARE

DefineSavepoint

执行savepoint语句,通过调用PushTransaction将子事务上层事务状态机变为

TBLOCK_SUBBEGIN

ReleaseSavepoint

执行release savepoint语句,将子事务上层状态机转变为TBLOCK_SUBRELEASE

RollbackToSavepoint

执行“rollback to”语句,将所有子事务上层状态机转变为

TBLOCK_SUBABORT_PENDING/ TBLOCK_SUBABORT_END,顶层事务的上层状态机转变

为TBLOCK_SUBABORT_RESTART

   对其中一个代码进行详细分析,其他代码类似,篇幅原因不再赘述

StartTransaction:

static void StartTransaction(bool begin_on_gtm)

{

    TransactionState s;

    VirtualTransactionId vxid;

    GTM_Timestamp gtm_timestamp;

    gstrace_entry(GS_TRC_ID_StartTransaction);

   

    ForgetRegisterStreamSnapshots();

   

    s = &TopTransactionStateData;

    CurrentTransactionState = s;

    t_thrd.xact_cxt.bInAbortTransaction = false;

    t_thrd.xact_cxt.handlesDestroyedInCancelQuery = false;

    StmtRetrySetTransactionCommitFlag(false);

   

    if (s->state != TRANS_DEFAULT) {

        ereport(WARNING, (errmsg("StartTransaction while in %s state", TransStateAsString(s->state))));

    }

   

    DestroyCstoreAlterReg();

   

    t_thrd.storage_cxt.EnlargeDeadlockTimeout = false;

   

    gs_memprot_reset_beyondchunk();

   

    s->state = TRANS_START;

#ifdef PGXC

    s->isLocalParameterUsed = false;

#endif

    s->transactionId = InvalidTransactionId;

    ResetUndoActionsInfo();

   

    if (RecoveryInProgress()) {

        s->startedInRecovery = true;

        u_sess->attr.attr_common.XactReadOnly = true;

    } else {

        s->startedInRecovery = false;

        u_sess->attr.attr_common.XactReadOnly = u_sess->attr.attr_storage.DefaultXactReadOnly;

#ifdef PGXC

        if (!u_sess->attr.attr_common.xc_maintenance_mode)

            u_sess->attr.attr_common.XactReadOnly = u_sess->attr.attr_common.XactReadOnly || IsPGXCNodeXactReadOnly();

#endif

       

        if (u_sess->libpq_cxt.IsConnFromCmAgent) {

            u_sess->attr.attr_common.XactReadOnly = false;

        }

    }

    u_sess->attr.attr_storage.XactDeferrable = u_sess->attr.attr_storage.DefaultXactDeferrable;

#ifdef PGXC

    if (u_sess->attr.attr_common.DefaultXactIsoLevel == XACT_SERIALIZABLE)

        u_sess->attr.attr_common.DefaultXactIsoLevel = XACT_REPEATABLE_READ;

#endif

    u_sess->utils_cxt.XactIsoLevel = u_sess->attr.attr_common.DefaultXactIsoLevel;

    t_thrd.xact_cxt.forceSyncCommit = false;

    t_thrd.xact_cxt.MyXactAccessedTempRel = false;

    t_thrd.xact_cxt.MyXactAccessedRepRel = false;

    t_thrd.xact_cxt.XactLocalNodePrepared = false;

    t_thrd.xact_cxt.XactLocalNodeCanAbort = true;

    t_thrd.xact_cxt.XactPrepareSent = false;

    t_thrd.xact_cxt.AlterCoordinatorStmt = false;

    t_thrd.utils_cxt.pRelatedRel = NULL;

   

    s->subTransactionId = TopSubTransactionId;

    t_thrd.xact_cxt.currentSubTransactionId = TopSubTransactionId;

    t_thrd.xact_cxt.currentCommandId = FirstCommandId;

    t_thrd.xact_cxt.currentCommandIdUsed = false;

#ifdef PGXC

   

    t_thrd.xact_cxt.isCommandIdReceived = false;

    if (IsConnFromCoord()) {

        SetReceivedCommandId(FirstCommandId);

        SetSendCommandId(false);

    }

#endif

   

    t_thrd.xact_cxt.nUnreportedXids = 0;

    s->didLogXid = false;

   

    AtStart_Memory();

    AtStart_ResourceOwner();

    vxid.backendId = t_thrd.proc_cxt.MyBackendId;

    vxid.localTransactionId = GetNextLocalTransactionId();

    VirtualXactLockTableInsert(vxid);

    Assert(t_thrd.proc->backendId == vxid.backendId);

    t_thrd.proc->lxid = vxid.localTransactionId;

    TRACE_POSTGRESQL_TRANSACTION_START(vxid.localTransactionId);

   

    s->subXactLock = false;

    s->nestingLevel = 1;

    s->gucNestLevel = 1;

    s->childXids = NULL;

    s->nChildXids = 0;

    s->maxChildXids = 0;

    GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);

   

    Assert(s->prevSecContext == 0);

    t_thrd.xact_cxt.xactStartTimestamp = t_thrd.xact_cxt.stmtStartTimestamp;

    t_thrd.xact_cxt.xactStopTimestamp = 0;

    s->txnKey.txnHandle = InvalidTransactionHandle;

    s->txnKey.txnTimeline = InvalidTransactionTimeline;

    bool normal_working = begin_on_gtm && IsNormalProcessingMode() &&

                          !(IsAutoVacuumWorkerProcess() && (t_thrd.pgxact->vacuumFlags & PROC_IN_VACUUM));

    bool update_xact_time = !GTM_FREE_MODE && !u_sess->attr.attr_common.xc_maintenance_mode && normal_working &&

                            IS_PGXC_COORDINATOR && !IsConnFromCoord();

    if (update_xact_time) {

        if (GTM_MODE) {

            s->txnKey = BeginTranGTM(>m_timestamp);

            t_thrd.xact_cxt.GTMxactStartTimestamp = (TimestampTz)gtm_timestamp;

        } else {

            t_thrd.xact_cxt.GTMxactStartTimestamp = t_thrd.xact_cxt.xactStartTimestamp;

        }

        SetCurrentGTMDeltaTimestamp();

        SetCurrentStmtTimestamp();

        SetStmtSysGTMDeltaTimestamp();

    }

   

    update_xact_time = (GTM_FREE_MODE && IS_PGXC_COORDINATOR && !IsConnFromCoord()) ||

                       (!t_thrd.xact_cxt.timestamp_from_cn && IS_PGXC_DATANODE && normal_working);

    if (update_xact_time) {

        t_thrd.xact_cxt.GTMxactStartTimestamp = t_thrd.xact_cxt.xactStartTimestamp;

        t_thrd.xact_cxt.GTMdeltaTimestamp = 0;

        SetCurrentStmtTimestamp();

    }

   

    t_thrd.xact_cxt.timestamp_from_cn = false;

#ifdef PGXC

    pgstat_report_xact_timestamp(t_thrd.xact_cxt.GTMxactStartTimestamp);

#else

    pgstat_report_xact_timestamp(t_thrd.xact_cxt.xactStartTimestamp);

#endif

   

    AtStart_GUC();

    AtStart_Inval();

    AtStart_Cache();

    AfterTriggerBeginXact();

#ifdef ENABLE_MULTIPLE_NODES

    reset_searchlet_id();

#endif

    ResetBCMArray();

   

    InitNodeGroupStatus();

   

    s->state = TRANS_INPROGRESS;

#ifdef ENABLE_MOT

    CallXactCallbacks(XACT_EVENT_START);

#endif

    if (module_logging_is_on(MOD_TRANS_XACT)) {

        ereport(LOG, (errmodule(MOD_TRANS_XACT),

                      errmsg("start transaction succ. In Node %s, trans state: %s -> %s.",

                             g_instance.attr.attr_common.PGXCNodeName, TransStateAsString(TRANS_START),

                             TransStateAsString(TRANS_INPROGRESS))));

    }

    ShowTransactionState("StartTransaction");

    gstrace_exit(GS_TRC_ID_StartTransaction);

}

    4. 事务状态机

在无异常情形下,一个事务块的状态机上图所示按照

默认(TBLOCK_DEFAULT) ->  已开始(TBLOCK_STARTED) ->  事务块开启(TBLOCK_BEGIN)  ->  事务块运行中(TBLOCK_INPROGRESS)  ->  事务块结束(TBLOCK_END)  ->  默认(TBLOCK_DEFAULT)循环。

剩余的状态机是在上述正常场景下的各个状态点的异常处理分支。

 在进入事务块运行中(TBLOCK_INPROGRESS)之前出错,因为事务还没有开启,直接报错并回滚,清理资源回到默认(TBLOCK_DEFAULT)状态。

          在事务块运行中(TBLOCK_INPROGRESS)出错分为2种情形。

         事务执行失败:

事务块运行中(TBLOCK_INPROGRESS)   ->   回滚(TBLOCK_ABORT)->  回滚结束(TBLOCK_ABORT_END)  ->   默认(TBLOCK_DEFAULT);

         用户手动回滚执行成功的事务:

事务块运行中(TBLOCK_INPROGRESS)  ->  回滚等待(TBLOCK_ABORT_PENDING)  ->  默认(TBLOCK_DEFAULT)。

在用户执行COMMIT语句时出错:

事务块结束(TBLOCK_END)  ->   默认(TBLOCK_DEFAULT)。

由图5-2可以看出,事务开始后离开默认(TBLOCK_DEFAULT)状态,事务完全结束后回到默认(TBLOCK_DEFAULT)状态。

openGauss同时还支持隐式事务块,当客户端执行单条SQL语句时可以自动提交,其状态机相对比较简单:

默认(TBLOCK_DEFAULT) -> 已开始(TBLOCK_STARTED) -> 默认(TBLOCK_DEFAULT)循环。

    5. 内核内部底层状态

         底层状态机的描述见结构体TransState。

(1) 在事务开启前事务状态为TRANS_DEFAULT。

(2) 事务开启过程中事务状态为TRANS_START。

(3) 事务成功开启后一直处于TRANS_INPROGRESS。

(4) 事务结束/回滚的过程中为TARNS_COMMIT/ TRANS_ABORT。

(5) 事务结束后事务状态回到TRANS_DEFAULT。

    三、实例

在客户端执行SQL语句:

BEGIN;

SELECt * FROM TABLE1;

END;

整体过程:

任何语句的执行总是先进入事务处理接口事务块中,然后调用事务底层函数处理具体命令,最后返回到事务块中。具体过程见下图

BEGIN执行流程,如图所示。

(1) 入口函数exec_simple_query处理begin命令。

(2) start_xact_command函数开始一个query命令,调用StartTransactionCommand函数,此时事务块上层状态未TBLOCK_DEFAULT,继续调用StartTransaction函数,设置事务底层状态TRANS_START,完成内存、缓存区、锁资源的初始化后将事务底层状态设为TRANS_INPROGRESS,最后在StartTransactionCommand函数中设置事务块上层状态为TBLOCK_STARTED。

(3) PortalRun函数处理begin语句,依次向下调用函数,最后调用BeginTransactionBlock函数转换事务块上层状态为TBLOCK_BEGIN。

(4) finish_xact_command函数结束一个query命令,调用CommitTransactionCommand函数设置事务块上层状态从TBLOCK_BEGIN变为TBLOCK_INPROGRESS,并等待读取下一条命令。

  

SELECt执行流程,如图5-6所示。

(1) 入口函数exec_simple_query处理“SELECT * FROM table1;”命令。

(2) start_xact_command函数开始一个query命令,调用StartTransactionCommand函数,由于当前上层事务块状态为TBLOCK_INPROGRESS,说明已经在事务块内部,则直接返回,不改变事务上层以及底层的状态。

(3) PortalRun执行SELECt语句,依次向下调用函数ExecutorRun根据执行计划执行最优路径查询。

(4) finish_xact_command函数结束一条query命令,调用CommitTransactionCommand函数,当前事务块上层状态仍为TBLOCK_INPROGESS,不改变当前事务上层以及底层的状态。

END执行流程,如图5-7所示。

(1) 入口函数exec_simple_query处理end命令。

(2) start_xact_command函数开始一个query命令,调用StartTransactionCommand函数,当前上层事务块状态为TBLOCK_INPROGESS,表明事务仍然在进行,此时也不改变任何上层及底层事务状态。

(3) PortalRun函数处理end语句,依次调用processUtility函数,最后调用EndTransactionBlock函数对当前上层事务块状态机进行转换,设置事务块上层状态为TBLOCK_END。

(4) Finish_xact_command函数结束query命令,调用CommitTransactionCommand函数,当前事务块状态TBLOCK_END;继续调用CommitTransaction函数提交事务,设置事务底层状态为TRANS_COMMIT,进行事务提交流程并且清理事务资源;清理后设置底层事务状态为TRANS_DEFAULT,返回CommitTansactionCommand函数;设置事务块上层状态为TBLOCK_DEFAULT,整个事务块结束。

  四、其他问题
  1. 多条指令例如:

BEGIN

         SELECT * FROM foo

         INSERT INTO foo VALUES (...)

COMMIT

调用CommitTransactionCommand后再调用CommandCounterIncrement

  1. 子事务块(subtransaction)问题代码位于src/gausskernel/storage/access/transam/xact.c 

子事务块的状态机转换同父事务类似。父子事务的关系类似于一个栈的实现,父事务的子事务相较于父事务后开始先结束。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/644029.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号