栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

PostgreSQL数据库并发事务——XactLogCommitRecord向XLOG添加记录

PostgreSQL数据库并发事务——XactLogCommitRecord向XLOG添加记录

FinishPreparedTransaction --> RecordTransactionCommitPrepared --> XactLogCommitRecord
CommitTransaction --> RecordTransactionCommit --> XactLogCommitRecord

事务首先会在自己的私有空间中构建出需要被插入的元组,然后将元组插入缓存页面,并将这个缓存页面标记位脏页。xl_xact_commit.xact_time为形参commit time。xl_xact_xinfo.xinfo可以为如下组合(XACT_COMPLETION_UPDATE_RELCACHE_FILE、XACT_COMPLETION_FORCE_SYNC_COMMIT、XACT_XINFO_HAS_AE_LOCKS、XACT_COMPLETION_APPLY_FEEDBACK、XACT_XINFO_HAS_DBINFO、XACT_XINFO_HAS_SUBXACTS、XACT_XINFO_HAS_RELFILENODES、XACT_XINFO_HAS_INVALS、XACT_XINFO_HAS_TWOPHASE、XACT_XINFO_HAS_GID、XACT_XINFO_HAS_ORIGIN)。xl_xact_dbinfo.dbId设置为MyDatabaseId,xl_xact_dbinfo.tsId设置为MyDatabaseTableSpace。

XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, bool forceSync, int xactflags, TransactionId twophase_xid, const char *twophase_gid) {
	xl_xact_commit xlrec;
	xl_xact_xinfo xl_xinfo;
	xl_xact_dbinfo xl_dbinfo;
	xl_xact_subxacts xl_subxacts;
	xl_xact_relfilenodes xl_relfilenodes;
	xl_xact_invals xl_invals;
	xl_xact_twophase xl_twophase;
	xl_xact_origin xl_origin;
	uint8		info;
	Assert(CritSectionCount > 0);
	xl_xinfo.xinfo = 0;

	
	if (!TransactionIdIsValid(twophase_xid)) info = XLOG_XACT_COMMIT;
	else info = XLOG_XACT_COMMIT_PREPARED;

	
	xlrec.xact_time = commit_time;
	if (relcacheInval) xl_xinfo.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
	if (forceSyncCommit) xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
	if ((xactflags & XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK)) xl_xinfo.xinfo |= XACT_XINFO_HAS_AE_LOCKS;
	
	if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY) xl_xinfo.xinfo |= XACT_COMPLETION_APPLY_FEEDBACK;
	
	if (nmsgs > 0 || XLogLogicalInfoActive()) {
		xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
		xl_dbinfo.dbId = MyDatabaseId;
		xl_dbinfo.tsId = MyDatabaseTableSpace;
	}
	if (nsubxacts > 0) {
		xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS;
		xl_subxacts.nsubxacts = nsubxacts;
	}
	if (nrels > 0) {
		xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES;
		xl_relfilenodes.nrels = nrels;
	}
	if (nmsgs > 0) {
		xl_xinfo.xinfo |= XACT_XINFO_HAS_INVALS;
		xl_invals.nmsgs = nmsgs;
	}
	if (TransactionIdIsValid(twophase_xid)) {
		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
		xl_twophase.xid = twophase_xid;
		Assert(twophase_gid != NULL);
		if (XLogLogicalInfoActive()) xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
	}
	
	if (replorigin_session_origin != InvalidRepOriginId) {
		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
		xl_origin.origin_lsn = replorigin_session_origin_lsn;
		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
	}
	if (xl_xinfo.xinfo != 0) info |= XLOG_XACT_HAS_INFO;

	
	XLogBeginInsert();
	XLogRegisterData((char *) (&xlrec), sizeof(xl_xact_commit));
	if (xl_xinfo.xinfo != 0) XLogRegisterData((char *) (&xl_xinfo.xinfo), sizeof(xl_xinfo.xinfo));
	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO) XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS) {
		XLogRegisterData((char *) (&xl_subxacts), MinSizeOfXactSubxacts);
		XLogRegisterData((char *) subxacts, nsubxacts * sizeof(TransactionId));
	}
	if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES) {
		XLogRegisterData((char *) (&xl_relfilenodes), MinSizeOfXactRelfilenodes);
		XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
	}
	if (xl_xinfo.xinfo & XACT_XINFO_HAS_INVALS) {
		XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals);
		XLogRegisterData((char *) msgs, nmsgs * sizeof(SharedInvalidationMessage));
	}
	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) {
		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
		if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID) XLogRegisterData(unconstify(char *, twophase_gid), strlen(twophase_gid) + 1);
	}
	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
	
	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
	return XLogInsert(RM_XACT_ID, info);
}

typedef struct xl_xact_commit {
	TimestampTz xact_time;		
	
	
	
	
	
	
	
	
} xl_xact_commit;
typedef struct xl_xact_xinfo {
	
	uint32		xinfo;
} xl_xact_xinfo;
typedef struct xl_xact_dbinfo {
	Oid			dbId;			
	Oid			tsId;			
} xl_xact_dbinfo;
InitXLogInsert

在数据库启动之后,PostgreSQL在InitXLogInsert函数(InitPostgres–>RecoveryInProgress–>InitXLOGACCESS–>InitXLogInsert)中初始化了几个进程变量,用来临时保存日志的中间状态(rdatas、mainrdata_head、mainrdata_last、mainrdata_len、hdr_scratch、registered_buffers)。
rdatas是长度为XLR_NORMAL_RDATAS数组,用来保存日志数据信息。

static XLogRecData *rdatas;     
static int	num_rdatas;			
static int	max_rdatas;			
typedef struct XLogRecData {
	struct XLogRecData *next;	
	char	   *data;			
	uint32		len;			
} XLogRecData;

registered_buffers为长度为XLR_NORMAL_MAX_BLOCK_ID+1的数组,用于注册被修改的页面信息,由XLogRegisterBuffer注册页面时,会在其中占用一个槽位。

typedef struct {
	bool		in_use;			
	uint8		flags;			
	RelFileNode rnode;			
	ForkNumber	forkno;
	BlockNumber block;
	Page		page;			
	uint32		rdata_len;		
	XLogRecData *rdata_head;	
	XLogRecData *rdata_tail;	
	XLogRecData bkp_rdatas[2];	
	
	char		compressed_page[PGLZ_MAX_BLCKSZ];
} registered_buffer;

hdr_scratch在组装日志时使用的临时内存,长度为HEADER_SCRATCH_SIZE,保存日志记录的Header部分static char *hdr_scratch = NULL;。
InitXLogInsert函数首先为xloginsert_cxt创建内存上下文,在xloginsert_cxt内存上下文中创建registered_buffer结构体数据,在xloginsert_cxt内存上下文中创建rdatas数组,在xloginsert_cxt内存上下文中创建hdr_scratch结构体。

static MemoryContext xloginsert_cxt;

void InitXLogInsert(void) {
	
	if (xloginsert_cxt == NULL)
		xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,"WAL record construction",ALLOCSET_DEFAULT_SIZES);
	if (registered_buffers == NULL) {
		registered_buffers = (registered_buffer *)MemoryContextAllocZero(xloginsert_cxt,sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
		max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
	}
	if (rdatas == NULL){
		rdatas = MemoryContextAlloc(xloginsert_cxt,sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
		max_rdatas = XLR_NORMAL_RDATAS;
	}
	
	if (hdr_scratch == NULL)
		hdr_scratch = MemoryContextAllocZero(xloginsert_cxt, HEADER_SCRATCH_SIZE);
}

mainrdata_head、mainrdata_last、mainrdata_len为指针变量,日志数据信息,XLogRegisterData函数注册数据时,从rdatas数组中获取槽位,并通过mainrdata系列变量串联起来。

事务日志不直接写入WAL Buffer,而是先组成XLogRecData链接,然后将这个链表转化为一条事务日志。

XLogBeginInsert

一方面通过设置begininsert_called标志放置递归调用日志生成函数;另一方面,它通过XLogInsertAllowed函数和一些Assert做代码安全检查工作。

XLogRegisterData

XLogRegisterData函数主要负责注册生成日志记录的数据,每调用一次就会在rdatas数组中占用一个槽位。

void XLogRegisterData(char *data, int len) {
	XLogRecData *rdata;
	Assert(begininsert_called);
	if (num_rdatas >= max_rdatas) elog(ERROR, "too much WAL data");
	rdata = &rdatas[num_rdatas++];
	rdata->data = data;
	rdata->len = len;
	
	mainrdata_last->next = rdata;
	mainrdata_last = rdata;
	mainrdata_len += len;
}

XLogSetRecordFlags

XLogSetRecordFlags函数设置正在进行的插入的标志curinsert_flags,有如下选项:

  • XLOG_INCLUDE_ORIGIN replication origin需要包含在该record中
  • XLOG_MARK_UNimportANT 该记录对于持久化不重要,允许避免触发WAL archiving和其他后台工作
static uint8 curinsert_flags = 0;

void XLogSetRecordFlags(uint8 flags) {
	Assert(begininsert_called);
	curinsert_flags = flags;
}
XLogInsert

XLogInsert函数通过指定的RMID和info插入一个XLOG记录,数据通过XLogRegister*函数注册的缓存。该函数返回记录的XLOG指针,下一个记录的开始。

typedef uint64 XLogRecPtr;

XLogRecPtr XLogInsert(RmgrId rmid, uint8 info) {
	
	if (!begininsert_called) elog(ERROR, "XLogBeginInsert was not called");
	
	if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE | XLR_CHECK_CONSISTENCY)) != 0)
		elog(PANIC, "invalid xlog info mask %02X", info);
	TRACE_POSTGRESQL_WAL_INSERT(rmid, info);

    XLogRecPtr	EndPos;
	
	if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) {
		XLogResetInsertion();
		EndPos = SizeOfXLogLongPHD; 
		return EndPos;
	}
	do {
		XLogRecPtr	RedoRecPtr;
		bool		doPageWrites;
		XLogRecPtr	fpw_lsn;
		XLogRecData *rdt;
		
		GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, &fpw_lsn);
		EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
	} while (EndPos == InvalidXLogRecPtr);
	XLogResetInsertion();
	return EndPos;
}

在bootstrap模式下且rmid不等于RM_XLOG_ID,调用XLogResetInsertion函数,并返回sizeof(XLogLongPageHeaderData)大小。在bootstrap模式下,处理XLOG资源,其他不会log任何信息。XLogResetInsertion函数清理registered_buffers,重置用来临时保存日志的中间状态(rdatas、mainrdata_head、mainrdata_last、mainrdata_len、hdr_scratch、registered_buffers)。

void XLogResetInsertion(void) {
	int			i;
	for (i = 0; i < max_registered_block_id; i++)
		registered_buffers[i].in_use = false;
	num_rdatas = 0;
	max_registered_block_id = 0;
	mainrdata_len = 0;
	mainrdata_last = (XLogRecData *) &mainrdata_head;
	curinsert_flags = 0;
	begininsert_called = false;
}

调用GetFullPageWriteInfo函数获取full-page write是否开启,日志的注册主要是将WAL日志所需的信息保存在内存中(mainrdata_last),由XLogRecordAssemble函数处理完成(主要处理日志记录中与页面Block相关的部分,即对在registered_buffers数组中的数据进行二次加工,例如判断是否需要做Full Page Write,是否需要压缩页面等)。

	do {
		XLogRecPtr	RedoRecPtr;
		bool		doPageWrites;
		XLogRecPtr	fpw_lsn;
		XLogRecData *rdt;
		
		GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, &fpw_lsn);
		EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
	} while (EndPos == InvalidXLogRecPtr);
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350691.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号