当插入、删除等变更动作发生时,PG会生成对应动作的WAL记录,然后写入到内存中的WAL缓冲区。满足以下任意条件时,WAL记录会从缓冲区写入到段文件,以确保数据安全:
- 一个运行中的事务提交或中止
- WAL缓冲区被写入的元组填满(WAL缓冲区大小可由参数wal_buffre控制)
- WalWriter周期性的执行写操作
- checkpoint
当数据库进行恢复时,会从重做点开始进行恢复。重做点是指最新的checkpoint开始时WAL记录写入的位置。当数据库启动时,PG会判断是否需要进行恢复,比如数据库是在immediate模式下关闭的,此时需要进入恢复模式,从WAL中的redo点开始进行重放WAL记录,其中WAL记录的位置是由LSN记录,lsn是一个64位的整数,记录了wal记录在WAL段中的位置。
WAL缓冲区在数据库启动时,会从系统中申请一份WAL的共享内存,入口函数是XLOGShmemInit(void)(ControlFile也在这里初始化共享内存)。使用XLogCtlData结构体来管理WAL共享内存。
| 类型 | 属性名 | 描述 |
|---|---|---|
| XLogwrtRqst | LogwrtRqst | 表示当前请求写入系统缓冲区或同步写入磁盘的日志位置 |
| XLogRecPtr | asyncXactLSN | 最近需要异步提交的日志位置 |
| XLogwrtResult | LogwrtResult | 当前已经写入系统缓冲区或者同步写入磁盘的日志位置 |
| XLogRecPtr* | xlblocks | LSN数组 |
| int | XLogCacheBlck | WAL缓冲区的大小,单位为页 |
| char* | pages | 指向WAL缓存Buffer的首地址 |
typedef struct XLogCtlData {
XLogCtlInsert Insert;
XLogwrtRqst LogwrtRqst;
XLogRecPtr RedoRecPtr;
FullTransactionId ckptFullXid;
XLogRecPtr asyncXactLSN;
XLogRecPtr replicationSlotMinLSN;
XLogSegNo lastRemovedSegNo;
XLogRecPtr unloggedLSN;
slock_t ulsn_lck;
pg_time_t lastSegSwitchTime;
XLogRecPtr lastSegSwitchLSN;
XLogwrtResult LogwrtResult;
XLogRecPtr InitializedUpTo;
char *pages;
XLogRecPtr *xlblocks;
int XLogCacheBlck;
TimeLineID ThisTimeLineID;
TimeLineID PrevTimeLineID;
RecoveryState SharedRecoveryState;
bool SharedHotStandbyActive;
bool WalWriterSleeping;
Latch recoveryWakeupLatch;
XLogRecPtr lastCheckPointRecPtr;
XLogRecPtr lastCheckPointEndPtr;
CheckPoint lastCheckPoint;
XLogRecPtr lastReplayedEndRecPtr;
TimeLineID lastReplayedTLI;
XLogRecPtr replayEndRecPtr;
TimeLineID replayEndTLI;
TimestampTz recoveryLastXTime;
TimestampTz currentChunkStartTime;
bool recoveryPause;
XLogRecPtr lastFpwDisableRecPtr;
slock_t info_lck;
} XLogCtlData;
XLOGShmemInit
XLOGShmemInit函数用于在共享内存中为XLogCtlData和ControlFileData申请内存。
static XLogCtlData *XLogCtl = NULL;
static ControlFileData *ControlFile = NULL;
void XLOGShmemInit(void) {
bool foundCFile, foundXLog;
char *allocptr; int i;
ControlFileData *localControlFile;
XLogCtl = (XLogCtlData *)ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
localControlFile = ControlFile;
ControlFile = (ControlFileData *)ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
if (foundCFile || foundXLog) {
Assert(foundCFile && foundXLog);
WALInsertLocks = XLogCtl->Insert.WALInsertLocks;
LWLockRegisterTranche(LWTRANCHE_WAL_INSERT,"wal_insert");
if (localControlFile) pfree(localControlFile);
return;
}
memset(XLogCtl, 0, sizeof(XLogCtlData));
if (localControlFile)
{
memcpy(ControlFile, localControlFile, sizeof(ControlFileData));
pfree(localControlFile);
}
allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
allocptr += sizeof(WALInsertLockPadded) -
((uintptr_t) allocptr) % sizeof(WALInsertLockPadded);
WALInsertLocks = XLogCtl->Insert.WALInsertLocks =
(WALInsertLockPadded *) allocptr;
allocptr += sizeof(WALInsertLockPadded) * NUM_XLOGINSERT_LOCKS;
LWLockRegisterTranche(LWTRANCHE_WAL_INSERT, "wal_insert");
for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
{
LWLockInitialize(&WALInsertLocks[i].l.lock, LWTRANCHE_WAL_INSERT);
WALInsertLocks[i].l.insertingAt = InvalidXLogRecPtr;
WALInsertLocks[i].l.lastimportantAt = InvalidXLogRecPtr;
}
allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr);
XLogCtl->pages = allocptr;
memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
XLogCtl->SharedRecoveryState = RECOVERY_STATE_CRASH;
XLogCtl->SharedHotStandbyActive = false;
XLogCtl->WalWriterSleeping = false;
SpinLockInit(&XLogCtl->Insert.insertpos_lck);
SpinLockInit(&XLogCtl->info_lck);
SpinLockInit(&XLogCtl->ulsn_lck);
InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
}



