FinishPreparedTransaction --> RecordTransactionCommitPrepared --> XactLogCommitRecord CommitTransaction --> RecordTransactionCommit --> XactLogCommitRecord
事务首先会在自己的私有空间中构建出需要被插入的元组,然后将元组插入缓存页面,并将这个缓存页面标记位脏页。xl_xact_commit.xact_time为形参commit time。xl_xact_xinfo.xinfo可以为如下组合(XACT_COMPLETION_UPDATE_RELCACHE_FILE、XACT_COMPLETION_FORCE_SYNC_COMMIT、XACT_XINFO_HAS_AE_LOCKS、XACT_COMPLETION_APPLY_FEEDBACK、XACT_XINFO_HAS_DBINFO、XACT_XINFO_HAS_SUBXACTS、XACT_XINFO_HAS_RELFILENODES、XACT_XINFO_HAS_INVALS、XACT_XINFO_HAS_TWOPHASE、XACT_XINFO_HAS_GID、XACT_XINFO_HAS_ORIGIN)。xl_xact_dbinfo.dbId设置为MyDatabaseId,xl_xact_dbinfo.tsId设置为MyDatabaseTableSpace。
XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, bool forceSync, int xactflags, TransactionId twophase_xid, const char *twophase_gid) {
xl_xact_commit xlrec;
xl_xact_xinfo xl_xinfo;
xl_xact_dbinfo xl_dbinfo;
xl_xact_subxacts xl_subxacts;
xl_xact_relfilenodes xl_relfilenodes;
xl_xact_invals xl_invals;
xl_xact_twophase xl_twophase;
xl_xact_origin xl_origin;
uint8 info;
Assert(CritSectionCount > 0);
xl_xinfo.xinfo = 0;
if (!TransactionIdIsValid(twophase_xid)) info = XLOG_XACT_COMMIT;
else info = XLOG_XACT_COMMIT_PREPARED;
xlrec.xact_time = commit_time;
if (relcacheInval) xl_xinfo.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
if (forceSyncCommit) xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
if ((xactflags & XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK)) xl_xinfo.xinfo |= XACT_XINFO_HAS_AE_LOCKS;
if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY) xl_xinfo.xinfo |= XACT_COMPLETION_APPLY_FEEDBACK;
if (nmsgs > 0 || XLogLogicalInfoActive()) {
xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
xl_dbinfo.dbId = MyDatabaseId;
xl_dbinfo.tsId = MyDatabaseTableSpace;
}
if (nsubxacts > 0) {
xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS;
xl_subxacts.nsubxacts = nsubxacts;
}
if (nrels > 0) {
xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES;
xl_relfilenodes.nrels = nrels;
}
if (nmsgs > 0) {
xl_xinfo.xinfo |= XACT_XINFO_HAS_INVALS;
xl_invals.nmsgs = nmsgs;
}
if (TransactionIdIsValid(twophase_xid)) {
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
xl_twophase.xid = twophase_xid;
Assert(twophase_gid != NULL);
if (XLogLogicalInfoActive()) xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
}
if (replorigin_session_origin != InvalidRepOriginId) {
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
xl_origin.origin_lsn = replorigin_session_origin_lsn;
xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
}
if (xl_xinfo.xinfo != 0) info |= XLOG_XACT_HAS_INFO;
XLogBeginInsert();
XLogRegisterData((char *) (&xlrec), sizeof(xl_xact_commit));
if (xl_xinfo.xinfo != 0) XLogRegisterData((char *) (&xl_xinfo.xinfo), sizeof(xl_xinfo.xinfo));
if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO) XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS) {
XLogRegisterData((char *) (&xl_subxacts), MinSizeOfXactSubxacts);
XLogRegisterData((char *) subxacts, nsubxacts * sizeof(TransactionId));
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES) {
XLogRegisterData((char *) (&xl_relfilenodes), MinSizeOfXactRelfilenodes);
XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_INVALS) {
XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals);
XLogRegisterData((char *) msgs, nmsgs * sizeof(SharedInvalidationMessage));
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) {
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID) XLogRegisterData(unconstify(char *, twophase_gid), strlen(twophase_gid) + 1);
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
return XLogInsert(RM_XACT_ID, info);
}
typedef struct xl_xact_commit {
TimestampTz xact_time;
} xl_xact_commit;
typedef struct xl_xact_xinfo {
uint32 xinfo;
} xl_xact_xinfo;
typedef struct xl_xact_dbinfo {
Oid dbId;
Oid tsId;
} xl_xact_dbinfo;
InitXLogInsert
在数据库启动之后,PostgreSQL在InitXLogInsert函数(InitPostgres–>RecoveryInProgress–>InitXLOGACCESS–>InitXLogInsert)中初始化了几个进程变量,用来临时保存日志的中间状态(rdatas、mainrdata_head、mainrdata_last、mainrdata_len、hdr_scratch、registered_buffers)。
rdatas是长度为XLR_NORMAL_RDATAS数组,用来保存日志数据信息。
static XLogRecData *rdatas;
static int num_rdatas;
static int max_rdatas;
typedef struct XLogRecData {
struct XLogRecData *next;
char *data;
uint32 len;
} XLogRecData;
registered_buffers为长度为XLR_NORMAL_MAX_BLOCK_ID+1的数组,用于注册被修改的页面信息,由XLogRegisterBuffer注册页面时,会在其中占用一个槽位。
typedef struct {
bool in_use;
uint8 flags;
RelFileNode rnode;
ForkNumber forkno;
BlockNumber block;
Page page;
uint32 rdata_len;
XLogRecData *rdata_head;
XLogRecData *rdata_tail;
XLogRecData bkp_rdatas[2];
char compressed_page[PGLZ_MAX_BLCKSZ];
} registered_buffer;
hdr_scratch在组装日志时使用的临时内存,长度为HEADER_SCRATCH_SIZE,保存日志记录的Header部分static char *hdr_scratch = NULL;。
InitXLogInsert函数首先为xloginsert_cxt创建内存上下文,在xloginsert_cxt内存上下文中创建registered_buffer结构体数据,在xloginsert_cxt内存上下文中创建rdatas数组,在xloginsert_cxt内存上下文中创建hdr_scratch结构体。
static MemoryContext xloginsert_cxt;
void InitXLogInsert(void) {
if (xloginsert_cxt == NULL)
xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,"WAL record construction",ALLOCSET_DEFAULT_SIZES);
if (registered_buffers == NULL) {
registered_buffers = (registered_buffer *)MemoryContextAllocZero(xloginsert_cxt,sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
}
if (rdatas == NULL){
rdatas = MemoryContextAlloc(xloginsert_cxt,sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
max_rdatas = XLR_NORMAL_RDATAS;
}
if (hdr_scratch == NULL)
hdr_scratch = MemoryContextAllocZero(xloginsert_cxt, HEADER_SCRATCH_SIZE);
}
mainrdata_head、mainrdata_last、mainrdata_len为指针变量,日志数据信息,XLogRegisterData函数注册数据时,从rdatas数组中获取槽位,并通过mainrdata系列变量串联起来。
事务日志不直接写入WAL Buffer,而是先组成XLogRecData链接,然后将这个链表转化为一条事务日志。
XLogBeginInsert一方面通过设置begininsert_called标志放置递归调用日志生成函数;另一方面,它通过XLogInsertAllowed函数和一些Assert做代码安全检查工作。
XLogRegisterDataXLogRegisterData函数主要负责注册生成日志记录的数据,每调用一次就会在rdatas数组中占用一个槽位。
void XLogRegisterData(char *data, int len) {
XLogRecData *rdata;
Assert(begininsert_called);
if (num_rdatas >= max_rdatas) elog(ERROR, "too much WAL data");
rdata = &rdatas[num_rdatas++];
rdata->data = data;
rdata->len = len;
mainrdata_last->next = rdata;
mainrdata_last = rdata;
mainrdata_len += len;
}
XLogSetRecordFlags
XLogSetRecordFlags函数设置正在进行的插入的标志curinsert_flags,有如下选项:
- XLOG_INCLUDE_ORIGIN replication origin需要包含在该record中
- XLOG_MARK_UNimportANT 该记录对于持久化不重要,允许避免触发WAL archiving和其他后台工作
static uint8 curinsert_flags = 0;
void XLogSetRecordFlags(uint8 flags) {
Assert(begininsert_called);
curinsert_flags = flags;
}
XLogInsert
XLogInsert函数通过指定的RMID和info插入一个XLOG记录,数据通过XLogRegister*函数注册的缓存。该函数返回记录的XLOG指针,下一个记录的开始。
typedef uint64 XLogRecPtr;
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info) {
if (!begininsert_called) elog(ERROR, "XLogBeginInsert was not called");
if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE | XLR_CHECK_CONSISTENCY)) != 0)
elog(PANIC, "invalid xlog info mask %02X", info);
TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
XLogRecPtr EndPos;
if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) {
XLogResetInsertion();
EndPos = SizeOfXLogLongPHD;
return EndPos;
}
do {
XLogRecPtr RedoRecPtr;
bool doPageWrites;
XLogRecPtr fpw_lsn;
XLogRecData *rdt;
GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, &fpw_lsn);
EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
} while (EndPos == InvalidXLogRecPtr);
XLogResetInsertion();
return EndPos;
}
在bootstrap模式下且rmid不等于RM_XLOG_ID,调用XLogResetInsertion函数,并返回sizeof(XLogLongPageHeaderData)大小。在bootstrap模式下,处理XLOG资源,其他不会log任何信息。XLogResetInsertion函数清理registered_buffers,重置用来临时保存日志的中间状态(rdatas、mainrdata_head、mainrdata_last、mainrdata_len、hdr_scratch、registered_buffers)。
void XLogResetInsertion(void) {
int i;
for (i = 0; i < max_registered_block_id; i++)
registered_buffers[i].in_use = false;
num_rdatas = 0;
max_registered_block_id = 0;
mainrdata_len = 0;
mainrdata_last = (XLogRecData *) &mainrdata_head;
curinsert_flags = 0;
begininsert_called = false;
}
调用GetFullPageWriteInfo函数获取full-page write是否开启,日志的注册主要是将WAL日志所需的信息保存在内存中(mainrdata_last),由XLogRecordAssemble函数处理完成(主要处理日志记录中与页面Block相关的部分,即对在registered_buffers数组中的数据进行二次加工,例如判断是否需要做Full Page Write,是否需要压缩页面等)。
do {
XLogRecPtr RedoRecPtr;
bool doPageWrites;
XLogRecPtr fpw_lsn;
XLogRecData *rdt;
GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, &fpw_lsn);
EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
} while (EndPos == InvalidXLogRecPtr);



