src/backend/storage/ipc/pmsignal.c
Postmaster拥有BackendList后端列表,用于存放标识postgres子进程的Backend。AssignPostmasterChildSlot函数为新的postmaster子进程选择未使用的slot,并将其状态设置为ASSIGNED,返回slot number。如果PMSignalState->next_child_flag为0,进入for循环从PMSignalState->num_child_flags数组尾部遍历,–slot为-1小于0,slot更新为PMSignalState->num_child_flags数组元素数量-1。如果该slot未使用则使用PM_CHILD_ASSIGNED,将PMSignalState->next_child_flag赋值为slot,并返回slot+1。
如果PMSignalState->next_child_flag不为0,进入for循环从PMSignalState->num_child_flags数组尾部遍历,–slot不小于0。如果该slot未使用则使用PM_CHILD_ASSIGNED,将PMSignalState->next_child_flag赋值为slot,并返回slot+1。
SendPostmasterSignal函数从子进程向postmaster发送信号,通过kill函数向postmaster发送SIGUSR1信号。支持如下PMSignalReason原由通知postmaster。
void SendPostmasterSignal(PMSignalReason reason) {
if (!IsUnderPostmaster) return;
PMSignalState->PMSignalFlags[reason] = true;
kill(PostmasterPid, SIGUSR1);
}
typedef enum {
PMSIGNAL_RECOVERY_STARTED,
PMSIGNAL_BEGIN_HOT_STANDBY,
PMSIGNAL_WAKEN_ARCHIVER,
PMSIGNAL_ROTATE_LOGFILE,
PMSIGNAL_START_AUTOVAC_LAUNCHER,
PMSIGNAL_START_AUTOVAC_WORKER,
PMSIGNAL_BACKGROUND_WORKER_CHANGE,
PMSIGNAL_START_WALRECEIVER,
PMSIGNAL_ADVANCE_STATE_MACHINE,
NUM_PMSIGNALS
} PMSignalReason;
在实现中,后台进程是这样通知Postmaster的:
- 首先在共享内存中开辟一个数组PMSignalFlags,数组中的每一位对应于一个信号。
- 然后如果后台进程希望向Postmaster发送一个信号,那么后台首先将信号在数组PMSignalFlags中相应的元素置1(逻辑真),然后调用kill函数向postmaster发送SIGUSR1信号。
- 当Postmaster收到SIGUSR1信号后首先检测共享存储中PMSingnalFlags,确认具体的信号是什么。同时将信号在数组PMSignalFlags中相应的元素置0(逻辑假)然后作出相应反应。
src/backend/storage/ipc/procsignal.c
通过传入MyBackendId获取ProcSignalSlots数组中的对应的元素槽,将信号槽中的pss_pid设置为MyProcPid,最后将该槽的指针赋值给MyProcSignalSlot。也就是一个后端一个slot,和门铃一样,其他进程可以敲击这个门铃slot来通知信号。
typedef struct {
pid_t pss_pid;
sig_atomic_t pss_signalFlags[NUM_PROCSIGNALS];
} ProcSignalSlot;
static ProcSignalSlot *ProcSignalSlots = NULL;
void ProcSignalInit(int pss_idx) {
volatile ProcSignalSlot *slot;
Assert(pss_idx >= 1 && pss_idx <= NumProcSignalSlots);
slot = &ProcSignalSlots[pss_idx - 1];
if (slot->pss_pid != 0)
elog(LOG, "process %d taking over ProcSignal slot %d, but it's not empty",MyProcPid, pss_idx);
MemSet(slot->pss_signalFlags, 0, NUM_PROCSIGNALS * sizeof(sig_atomic_t));
slot->pss_pid = MyProcPid;
MyProcSignalSlot = slot;
on_shmem_exit(CleanupProcSignalState, Int32GetDatum(pss_idx));
}
在共享内存中创建ProcSignalSlot数组
Size ProcSignalShmemSize(void) {
return NumProcSignalSlots * sizeof(ProcSignalSlot);
}
void ProcSignalShmemInit(void) {
Size size = ProcSignalShmemSize();
bool found;
ProcSignalSlots = (ProcSignalSlot *)ShmemInitStruct("ProcSignalSlots", size, &found);
if (!found) MemSet(ProcSignalSlots, 0, size);
}
SendProcSignal向一个Postgres进程发送信号,如果信号发送出去了,即成功,返回0;错误返回-1并设置errno.
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId){
volatile ProcSignalSlot *slot;
if (backendId != InvalidBackendId){
slot = &ProcSignalSlots[backendId - 1];
if (slot->pss_pid == pid){
slot->pss_signalFlags[reason] = true;
return kill(pid, SIGUSR1);
}
}else{
int i;
for (i = NumProcSignalSlots - 1; i >= 0; i--){
slot = &ProcSignalSlots[i];
if (slot->pss_pid == pid){
slot->pss_signalFlags[reason] = true;
return kill(pid, SIGUSR1);
}
}
}
errno = ESRCH;
return -1;
}
信号处理函数
void
procsignal_sigusr1_handler(SIGNAL_ARGS)
{
int save_errno = errno;
if (CheckProcSignal(PROCSIG_CATCHUP_INTERRUPT))
HandleCatchupInterrupt();
if (CheckProcSignal(PROCSIG_NOTIFY_INTERRUPT))
HandleNotifyInterrupt();
if (CheckProcSignal(PROCSIG_PARALLEL_MESSAGE))
HandleParallelMessageInterrupt();
if (CheckProcSignal(PROCSIG_WALSND_INIT_STOPPING))
HandleWalSndInitStopping();
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATAbase))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATAbase);
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_TABLESPACE))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_TABLESPACE);
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOCK))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOCK);
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
SetLatch(MyLatch);
latch_sigusr1_handler();
errno = save_errno;
}



