在PostgreSQL中,每个进程都有属于自己的Cache。换句话说,同一个系统表在不同的进程中都有对应的Cache来缓存它的元组(对于RelCache来说缓存的是一个RelationData结构)。同一个系统表的元组可能同时被多个进程的Cache所缓存,当其中某个Cache中的一个元组被删除或更新时,需要通知其他进程对其Cache进行同步。在PostgreSQL的实现中,会记录下已被删除的无效元组,并通过SI Message方式(即共享消息队列方式)在进程之间传递这一消息。收到无效消息的进程将同步地把无效元组(或RelationData结构)从自己的Cache中删除。
为了实现SI Message这一功能,PostgreSQL在共享内存中开辟了shmInvalBuffer记录系统中所发出的所有无效消息以及所有进程处理无消息的进度。shmInvalBuffer是一个全局变量,其数据类型为SISeg。
typedef struct SISeg {
int minMsgNum;
int maxMsgNum;
int nextThreshold;
int lastBackend;
int maxBackends;
slock_t msgnumLock;
SharedInvalidationMessage buffer[MAXNUMMESSAGES];
ProcState procState[FLEXIBLE_ARRAY_MEMBER];
} SISeg;
static SISeg *shmInvalBuffer;
在shmInvalBuffer中,无效消息存储在由Buffer字段指定的定长数组中(其长度MAXNUMMESSAGES预定义为4096)。该数组中每个元素存储一个无效消息,也可以称该数组为无效消息队列。无效消息队列实际是一个环状结构,最初数组为空时,新来的无效消息从前向后依次存放在数组的元素中,当数组被放满之后,新的无效消息将回到Buffer数组的头部开始插入。minMsgNum字段记录Buffer中还未被所有进程处理的无效消息编号中的最小值,maxMsgNum字段记录下一个可以用于存放新无效消息的数组元素下标。实际上,minMsgNum指出了Buffer中还没有被所有进程处理的无效消息的下界,而maxMsgNum则指出了上界,即编号比minMsgNum小的无效消息是已经被所有进程处理完的,而编号大于等于maxMsgNum的无效消息是还没有产生的,而两者之间的无效消息则是至少还有一个进程没有对其进行处理。因此在无效消息队列构成的环中,除了minMsgNum和maxMsgNum之间的位置之外,其他位置都可以用来存放新增加的无效消息。
typedef union
{
int8 id;
SharedInvalCatcacheMsg cc;
SharedInvalCatalogMsg cat;
SharedInvalRelcacheMsg rc;
SharedInvalSmgrMsg sm;
SharedInvalRelmapMsg rm;
SharedInvalSnapshotMsg sn;
} SharedInvalidationMessage;
PostgreSQL在shmInvalBuffer中用一个ProcState数组(procState字段)来存储正在读取无效消息的进程的读取进度,该数组的大小与系统允许的最大进程数MaxBackends有关,在默认情况下这个数组的大小为100(系统的默认最大进程数为100,可在postgresql.conf中修改)。ProcState记录了PID为procPid的进程读取无效消息的状态,其中nextMsgNum的值介于shmInvalBuffer的minMsgNum值和maxMsgNum值之间。
typedef struct ProcState {
pid_t procPid;
PGPROC *proc;
int nextMsgNum;
bool resetState;
bool signaled;
bool hasMessages;
bool sendOnly;
LocalTransactionId nextLXID;
} ProcState;
初始化
SInvalShmemSize函数计算SInalShmem的大小为SISeg的大小加上其成员ProcState*MaxBackends的大小。
Size SInvalShmemSize(void) {
Size size;
size = offsetof(SISeg, procState);
size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
return size;
}
void CreateSharedInvalidationState(void) {
int i;
bool found;
shmInvalBuffer = (SISeg *) ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
if (found) return;
shmInvalBuffer->minMsgNum = 0;
shmInvalBuffer->maxMsgNum = 0;
shmInvalBuffer->nextThreshold = CLEANUP_MIN;
shmInvalBuffer->lastBackend = 0;
shmInvalBuffer->maxBackends = MaxBackends;
SpinLockInit(&shmInvalBuffer->msgnumLock);
for (i = 0; i < shmInvalBuffer->maxBackends; i++){
shmInvalBuffer->procState[i].procPid = 0;
shmInvalBuffer->procState[i].proc = NULL;
shmInvalBuffer->procState[i].nextMsgNum = 0;
shmInvalBuffer->procState[i].resetState = false;
shmInvalBuffer->procState[i].signaled = false;
shmInvalBuffer->procState[i].hasMessages = false;
shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
}
}
void SharedInvalBackendInit(bool sendOnly) {
int index;
ProcState *stateP = NULL;
SISeg *segP = shmInvalBuffer;
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
for (index = 0; index < segP->lastBackend; index++){
if (segP->procState[index].procPid == 0) {
stateP = &segP->procState[index];
break;
}
}
if (stateP == NULL){
if (segP->lastBackend < segP->maxBackends){
stateP = &segP->procState[segP->lastBackend];
Assert(stateP->procPid == 0);
segP->lastBackend++;
}else{
MyBackendId = InvalidBackendId;
LWLockRelease(SInvalWriteLock);
ereport(FATAL,(errcode(ERRCODE_TOO_MANY_CONNECTIONS),errmsg("sorry, too many clients already")));
}
}
MyBackendId = (stateP - &segP->procState[0]) + 1;
MyProc->backendId = MyBackendId;
nextLocalTransactionId = stateP->nextLXID;
stateP->procPid = MyProcPid;
stateP->proc = MyProc;
stateP->nextMsgNum = segP->maxMsgNum;
stateP->resetState = false;
stateP->signaled = false;
stateP->hasMessages = false;
stateP->sendonly = sendOnly;
LWLockRelease(SInvalWriteLock);
on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
elog(DEBUG4, "my backend ID is %d", MyBackendId);
}



