栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

PostgreSQL数据库SI Message——Cache同步

PostgreSQL数据库SI Message——Cache同步

在PostgreSQL中,每个进程都有属于自己的Cache。换句话说,同一个系统表在不同的进程中都有对应的Cache来缓存它的元组(对于RelCache来说缓存的是一个RelationData结构)。同一个系统表的元组可能同时被多个进程的Cache所缓存,当其中某个Cache中的一个元组被删除或更新时,需要通知其他进程对其Cache进行同步。在PostgreSQL的实现中,会记录下已被删除的无效元组,并通过SI Message方式(即共享消息队列方式)在进程之间传递这一消息。收到无效消息的进程将同步地把无效元组(或RelationData结构)从自己的Cache中删除。
为了实现SI Message这一功能,PostgreSQL在共享内存中开辟了shmInvalBuffer记录系统中所发出的所有无效消息以及所有进程处理无消息的进度。shmInvalBuffer是一个全局变量,其数据类型为SISeg。

typedef struct SISeg {
	
	int			minMsgNum;		
	int			maxMsgNum;		
	int			nextThreshold;	
	int			lastBackend;	
	int			maxBackends;	
	slock_t		msgnumLock;		
	
	SharedInvalidationMessage buffer[MAXNUMMESSAGES];
	
	ProcState	procState[FLEXIBLE_ARRAY_MEMBER];
} SISeg;
static SISeg *shmInvalBuffer;	

在shmInvalBuffer中,无效消息存储在由Buffer字段指定的定长数组中(其长度MAXNUMMESSAGES预定义为4096)。该数组中每个元素存储一个无效消息,也可以称该数组为无效消息队列。无效消息队列实际是一个环状结构,最初数组为空时,新来的无效消息从前向后依次存放在数组的元素中,当数组被放满之后,新的无效消息将回到Buffer数组的头部开始插入。minMsgNum字段记录Buffer中还未被所有进程处理的无效消息编号中的最小值,maxMsgNum字段记录下一个可以用于存放新无效消息的数组元素下标。实际上,minMsgNum指出了Buffer中还没有被所有进程处理的无效消息的下界,而maxMsgNum则指出了上界,即编号比minMsgNum小的无效消息是已经被所有进程处理完的,而编号大于等于maxMsgNum的无效消息是还没有产生的,而两者之间的无效消息则是至少还有一个进程没有对其进行处理。因此在无效消息队列构成的环中,除了minMsgNum和maxMsgNum之间的位置之外,其他位置都可以用来存放新增加的无效消息。

typedef union
{
	int8		id;				
	SharedInvalCatcacheMsg cc;
	SharedInvalCatalogMsg cat;
	SharedInvalRelcacheMsg rc;
	SharedInvalSmgrMsg sm;
	SharedInvalRelmapMsg rm;
	SharedInvalSnapshotMsg sn;
} SharedInvalidationMessage;

PostgreSQL在shmInvalBuffer中用一个ProcState数组(procState字段)来存储正在读取无效消息的进程的读取进度,该数组的大小与系统允许的最大进程数MaxBackends有关,在默认情况下这个数组的大小为100(系统的默认最大进程数为100,可在postgresql.conf中修改)。ProcState记录了PID为procPid的进程读取无效消息的状态,其中nextMsgNum的值介于shmInvalBuffer的minMsgNum值和maxMsgNum值之间。

typedef struct ProcState {
	
	pid_t		procPid;		
	PGPROC	   *proc;			
	
	int			nextMsgNum;		
	bool		resetState;		
	bool		signaled;		
	bool		hasMessages;	
	
	bool		sendOnly;		
	
	LocalTransactionId nextLXID;
} ProcState;
初始化

SInvalShmemSize函数计算SInalShmem的大小为SISeg的大小加上其成员ProcState*MaxBackends的大小。

Size SInvalShmemSize(void) {
	Size		size;
	size = offsetof(SISeg, procState);
	size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
	return size;
}

void CreateSharedInvalidationState(void) {
	int			i;
	bool		found;
	
	shmInvalBuffer = (SISeg *) ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
	if (found) return;
	
	shmInvalBuffer->minMsgNum = 0;
	shmInvalBuffer->maxMsgNum = 0;
	shmInvalBuffer->nextThreshold = CLEANUP_MIN;
	shmInvalBuffer->lastBackend = 0;
	shmInvalBuffer->maxBackends = MaxBackends;
	SpinLockInit(&shmInvalBuffer->msgnumLock);
	
	
	for (i = 0; i < shmInvalBuffer->maxBackends; i++){
		shmInvalBuffer->procState[i].procPid = 0;	
		shmInvalBuffer->procState[i].proc = NULL;
		shmInvalBuffer->procState[i].nextMsgNum = 0;	
		shmInvalBuffer->procState[i].resetState = false;
		shmInvalBuffer->procState[i].signaled = false;
		shmInvalBuffer->procState[i].hasMessages = false;
		shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
	}
}
void SharedInvalBackendInit(bool sendOnly) {
	int			index;
	ProcState  *stateP = NULL;
	SISeg	   *segP = shmInvalBuffer;
	
	LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
	
	for (index = 0; index < segP->lastBackend; index++){
		if (segP->procState[index].procPid == 0)	{
			stateP = &segP->procState[index];
			break;
		}
	}
	if (stateP == NULL){
		if (segP->lastBackend < segP->maxBackends){
			stateP = &segP->procState[segP->lastBackend];
			Assert(stateP->procPid == 0);
			segP->lastBackend++;
		}else{
			
			MyBackendId = InvalidBackendId;
			LWLockRelease(SInvalWriteLock);
			ereport(FATAL,(errcode(ERRCODE_TOO_MANY_CONNECTIONS),errmsg("sorry, too many clients already")));
		}
	}
	MyBackendId = (stateP - &segP->procState[0]) + 1;
	
	MyProc->backendId = MyBackendId;
	
	nextLocalTransactionId = stateP->nextLXID;
	
	stateP->procPid = MyProcPid;
	stateP->proc = MyProc;
	stateP->nextMsgNum = segP->maxMsgNum;
	stateP->resetState = false;
	stateP->signaled = false;
	stateP->hasMessages = false;
	stateP->sendonly = sendOnly;
	LWLockRelease(SInvalWriteLock);
	
	on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
	elog(DEBUG4, "my backend ID is %d", MyBackendId);
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/283074.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号