从本博客开始,我们用三篇博客的篇幅集中介绍Linux内核对Unix系统V进程间通信机制的继承和实现。
早期Unix系统的进程间通信(IPC)机制主要有两种,就是管道和信号。后来,针对普通管道只能在近亲进程之间建立的缺点,又有了命名管道。但是,对于一个现代的操作系统以及日益发展的各种应用来说,这些机制虽然很重要,但也确实存在明显的不足。首先,信号所能载送的信息量很小,单独使用时不适合信息量要求比较大的场合。而管道,即使是命名管道,虽然可用于信息量较大的场合,但是对于不同的应用而言还是有许多缺点,主要有:
所载送的信息是无格式的字节流。设想如果一个进程要发送两段文字给另一个进程,每段文字都是一个小小的字符文件,那么对接收方进程而言,这两段文字连成了一片,怎样知道这两段文件的分界在哪儿呢?一个管道(命名的或者无名的)就好像是一条通信线路,在这条通信线路上要有一些起码的、低层的通信规程,例如报文的划分,才能满足较高层规程的要求。从这个角度来说,无格式字节流只是一种最原始的通信手段。由于所载送的是无格式字节流,就缺乏一些控制手段,如报文的优先级别等。管道机制的缓冲区大小是有限的、静态的。当发送者写满了缓冲区而接收者没有来得及从缓冲区读走,发送者就只好停下来睡眠,这就强化了管道机制的同步性要求。虽然这种同步性往往本来就是应用程序所需要的,但在某些应用中、某些场合下却成为一个缺陷。固然,可以通过使用O_NONBLOCK标志让发送者在缓冲区一满就返回(而不至于进入睡眠),但是那样会增加应用程序的复杂性,还会进一步降低效率。再说,在管道机制中也无法让发送者预先知道缓冲区中还有多少可写空间。从运行效率看,管道机制的开销也不小,尤其是当发送的信息量比较小时,平均每个直接所耗费的代价就相当高了。每个管道都要占用一个开大文件号,一般的应用程序中这还不至于成为问题,但在某些特殊的应用中是有可能会造成问题的。
上述的多数缺陷都可以在应用软件中采取一些措施加以克服或减轻,但是那样只会使应用软件更复杂、效率更低。而操作系统的作用和目的本来就在于使应用软件更简单、更安全、更高效。这样,针对各种应用的要求,就提出了改进早起Unix IPC机制的任务。另一方面,当时在操作系统以及一些有关的领域的理论研究有了较大的发展而且日趋成熟,概括出了对IPC的几种抽象,包括报文(message)传递、共享内存以及进程同步。其中进程同步又包括了信号了(semaphore)、互斥量(mutex)以及约会(rendezvous)等形式。在这样的历史条件下,at&t在其Unix系统V版本中增加了报文传递、共享内存以及信号量三种IPC机制。这三种机制合在一起统称为系统V进程间通信机制。与此同时,BSD也在早期Unix IPC机制的基础上做了扩充,形成了基于socket的IPC机制,使得IPC成为系统V与BSD版本之间的主要区别之一。而Linux则兼收并蓄,把两者都继承了下来。我们将在下一个系列再介绍BSD所扩充的IPC机制,而本博客以及后面的两篇则集中与at&t的系统V IPC机制在Linux内核中的具体实现。
Linux内核为系统V IPC提供了一个统一的系统调用ipc,也许应该称为sys_ipc。其应用程序设计界面(API)为:
int syscall(SYS_ipc, unsigned int call, int first,
unsigned long second, unsigned long third, void *ptr,
long fifth);
其中第一个参数call为具体的操作码,定义于include/asm-i386/ipc.h
#define SEMOP 1 #define SEMGET 2 #define SEMCTL 3 #define MSGSND 11 #define MSGRCV 12 #define MSGGET 13 #define MSGCTL 14 #define SHMAT 21 #define SHMDT 22 #define SHMGET 23 #define SHMCTL 24
操作码中凡由SEM开头的都是为信号量而设,由MSG开头的都是为报文传递而设,而由SHM开头的则都是为共享内存区而设。其余参数的使用则因具体操作的不同而异。不过,为便于使用,在C语言库函数中分别提供了如semget、msgget、msgsnd等等库函数,这些库函数把用户程序对它们的调用转换成统一的系统调用ipc,却使用户感觉好像内核提供了这么一些不同的系统调用一样。
内核中的入口为sys_ipc,其代码如下:
asmlinkage int sys_ipc (uint call, int first, int second,
int third, void *ptr, long fifth)
{
int version, ret;
version = call >> 16;
call &= 0xffff;
switch (call) {
case SEMOP:
return sys_semop (first, (struct sembuf *)ptr, second);
case SEMGET:
return sys_semget (first, second, third);
case SEMCTL: {
union semun fourth;
if (!ptr)
return -EINVAL;
if (get_user(fourth.__pad, (void **) ptr))
return -EFAULT;
return sys_semctl (first, second, third, fourth);
}
case MSGSND:
return sys_msgsnd (first, (struct msgbuf *) ptr,
second, third);
case MSGRCV:
switch (version) {
case 0: {
struct ipc_kludge tmp;
if (!ptr)
return -EINVAL;
if (copy_from_user(&tmp,
(struct ipc_kludge *) ptr,
sizeof (tmp)))
return -EFAULT;
return sys_msgrcv (first, tmp.msgp, second,
tmp.msgtyp, third);
}
default:
return sys_msgrcv (first,
(struct msgbuf *) ptr,
second, fifth, third);
}
case MSGGET:
return sys_msgget ((key_t) first, second);
case MSGCTL:
return sys_msgctl (first, second, (struct msqid_ds *) ptr);
case SHMAT:
switch (version) {
default: {
ulong raddr;
ret = sys_shmat (first, (char *) ptr, second, &raddr);
if (ret)
return ret;
return put_user (raddr, (ulong *) third);
}
case 1:
if (!segment_eq(get_fs(), get_ds()))
return -EINVAL;
return sys_shmat (first, (char *) ptr, second, (ulong *) third);
}
case SHMDT:
return sys_shmdt ((char *)ptr);
case SHMGET:
return sys_shmget (first, second, third);
case SHMCTL:
return sys_shmctl (first, second,
(struct shmid_ds *) ptr);
default:
return -EINVAL;
}
}
函数sys_ipc的结构很简单:根据调用参数操作码的不同,分别处理三种进程间通信机制的11项不同的操作。我们分三节介绍相关的代码,本届先介绍报文传递。
每一个进程都可以通过库函数调用msgget(即通过操作码为MSGGET的ipc系统调用,下同)建立报文队列。有的系统中把这样的队列称为信箱(mail box)。报文队列不是通过文件名,而是通过一个键值(key)加以标识。一旦建立以后,其它进程就可以使用相同的键值通过msgget取得对已建立报文队列的访问途径。然后,发送报文的进程就可以通过msgsnd发送报文到指定的队列中,而接收进程则可以通过msgrcv从指定的队列中接收报文。从概念上说,这类似于命名管道,但报文队列所传递的不再是完全无结构的字节流了,每个报文都有一定的低层结构而互相区分。此外,还可以通过msgctl调用对指针报文队列进行一些控制(包括撤销该队列)。由于报文对了并不纳入文件系统的范畴,所以并不占用打开文件号。内核中实现报文传递机制的代码基本上都在文件ipc/msg.c中。
库函数msgget--创建报文队列先来看报文队列的建立和取得:
sys_ipc=>sys_msgget
asmlinkage long sys_msgget (key_t key, int msgflg)
{
int id, ret = -EPERM;
struct msg_queue *msq;
down(&msg_ids.sem);
if (key == IPC_PRIVATE)
ret = newque(key, msgflg);
else if ((id = ipc_findkey(&msg_ids, key)) == -1) {
if (!(msgflg & IPC_CREAT))
ret = -ENOENT;
else
ret = newque(key, msgflg);
} else if (msgflg & IPC_CREAT && msgflg & IPC_EXCL) {
ret = -EEXIST;
} else {
msq = msg_lock(id);
if(msq==NULL)
BUG();
if (ipcperms(&msq->q_perm, msgflg))
ret = -EACCES;
else
ret = msg_buildid(id, msq->q_perm.seq);
msg_unlock(id);
}
up(&msg_ids.sem);
return ret;
}
操作码MSGGET,从而是sys_msgget可用于两个不同的目的:一是用一个给定的键值创建一个报文队列;二是给定一个键值,找到已经建立的报文队列。当调用参数msgflg中的IPC_CREAT标志为1时表示创建,为0时则为寻找,二者均返回保温队列的标识号。内核中有个全局的数据结构msg_ids,专门用来管理报文队列。
static struct ipc_ids msg_ids;
其中类型 ipc_ids在ipc/util.h中定义:
struct ipc_ids {
int size;
int in_use;
int max_id;
unsigned short seq;
unsigned short seq_max;
struct semaphore sem;
spinlock_t ary;
struct ipc_id* entries;
};
结构中的指针entries指向一个结构数组,其类型定义为:
struct ipc_id {
struct kern_ipc_perm* p;
};
数组中的元素都是指向kern_ipc_perm数据结构指针,而kern_ipc_perm数据结构则是在include/linux/ipc.h中定义的:
struct kern_ipc_perm
{
key_t key;
uid_t uid;
gid_t gid;
uid_t cuid;
gid_t cgid;
mode_t mode;
unsigned long seq;
};
数据结构msg_ids是全局的,显然必须置于内核中信号量机制的保护之下。
键的类型为key_t,实际上是个整数。前面讲过,报文队列以键值而不是文件名来标识,所以每个保温队列的键值必须是唯一的。不过,键值0,也就是IPC_PRIVATE,是一种特殊情况。每个进程都可以用键值0建立一个专供其私用(自己发送自己接收)的报文队列。所以这个特殊键值并不是唯一的。但是,正由于这种队列是私用的,所以不存在要通过键值找打一个队列的问题。
正因为这样,当键值为IPC_PRIVATE时(309行),就无条件地调用newque建立一个报文队列(310行),否则就要先通过ipc_findkey找一下相应的保温队列是否已经存在。
函数newque的代码如下:
sys_ipc=>sys_msgget=>newque
static int newque (key_t key, int msgflg)
{
int id;
struct msg_queue *msq;
msq = (struct msg_queue *) kmalloc (sizeof (*msq), GFP_KERNEL);
if (!msq)
return -ENOMEM;
id = ipc_addid(&msg_ids, &msq->q_perm, msg_ctlmni);
if(id == -1) {
kfree(msq);
return -ENOSPC;
}
msq->q_perm.mode = (msgflg & S_IRWXUGO);
msq->q_perm.key = key;
msq->q_stime = msq->q_rtime = 0;
msq->q_ctime = CURRENT_TIME;
msq->q_cbytes = msq->q_qnum = 0;
msq->q_qbytes = msg_ctlmnb;
msq->q_lspid = msq->q_lrpid = 0;
INIT_LIST_HEAD(&msq->q_messages);
INIT_LIST_HEAD(&msq->q_receivers);
INIT_LIST_HEAD(&msq->q_senders);
msg_unlock(id);
return msg_buildid(id,msq->q_perm.seq);
}
每个报文队列都有个队列头,那就是msg_queue数据结构,定义于ipc/msg.c中:
struct msg_queue {
struct kern_ipc_perm q_perm;
time_t q_stime;
time_t q_rtime;
time_t q_ctime;
unsigned long q_cbytes;
unsigned long q_qnum;
unsigned long q_qbytes;
pid_t q_lspid;
pid_t q_lrpid;
struct list_head q_messages;
struct list_head q_receivers;
struct list_head q_senders;
};
反之,每个msg_queue数据结构也唯一地对应着一个报文队列。这些数据结构以及结构之间的关系可以总结如下:
全局变量ipc_ids数据结构msg_ids是系统中所有报文队列的总根。数据结构msg_ids中的指针entries指向一个ipc_id结构数组,数组中的每个元素都是ipc_id数据结构,结构中有个指针p,指向一个kern_ipc_perm数据结构。由于kern_ipc_perm数据结构是报文队列头msg_queue数据结构内部的第一个成分,上述数组元素中的指针p实际上指向一个报文队列,数组的大小决定了已经或可以建立的报文队列数量。
每个已建立的报文队列由一个标识号来代表,与打开文件号相似。但是,打开文件号只局限于每个进程局部,而报文队列的标识号却是全局的,所以必须保证在全局范围中的唯一性。标识号由ipc_addid分配,其代码如下:
sys_ipc=>sys_msgget=>newque=>ipc_addid
int ipc_addid(struct ipc_ids* ids, struct kern_ipc_perm* new, int size)
{
int id;
size = grow_ary(ids,size);
for (id = 0; id < size; id++) {
if(ids->entries[id].p == NULL)
goto found;
}
return -1;
found:
ids->in_use++;
if (id > ids->max_id)
ids->max_id = id;
new->cuid = new->uid = current->euid;
new->gid = new->cgid = current->egid;
new->seq = ids->seq++;
if(ids->seq > ids->seq_max)
ids->seq = 0;
spin_lock(&ids->ary);
ids->entries[id].p = new;
return id;
}
如果分配标识号成功,就要将代表报文队列的报文队列头与ipc_ids结构msg_ids挂上钩。如前所述,该结构的指针entries指向以标识号为下标的ipc_id结构数组,而ipc_id结构的内容只是一个指针,指向一个kern_ipc_perm结构。同时,每个报文队列头结构中的第一个成分就是一个kern_ipc_perm数据结构,其起始地址与整个msg_queue结构的起始地址相同。所谓将某个报文队列头与msg_ids挂上钩,就是把特定msg_queue结构中kern_ipc_perm数据结构的起始地址根据标识号填入相应ipc_id结构中(见169行),并返回该标识号。
然而,既然是数组,就有大小,ipc_ids结构的字段size就记录着它的大小。可是这个大小怎样确定呢?再说,再大的数组也有可能会用完,那时候又怎么办?显然,最好是能根据实际的需要加以调整,这就是ipc_addid的代码中调用grow_ary的目的:
sys_ipc=>sys_msgget=>newque=>ipc_addid=>grow_ary
static int grow_ary(struct ipc_ids* ids, int newsize)
{
struct ipc_id* new;
struct ipc_id* old;
int i;
if(newsize > IPCMNI)
newsize = IPCMNI;
if(newsize <= ids->size)
return newsize;
new = ipc_alloc(sizeof(struct ipc_id)*newsize);
if(new == NULL)
return ids->size;
memcpy(new, ids->entries, sizeof(struct ipc_id)*ids->size);
for(i=ids->size;iary);
old = ids->entries;
ids->entries = new;
i = ids->size;
ids->size = newsize;
spin_unlock(&ids->ary);
ipc_free(old, sizeof(struct ipc_id)*i);
return ids->size;
}
参数newsize表示新的数组大小,如果其数值大于数组当前的大小就另外分配一块空间来取代原有的数组。不过,数组的大小只会扩展而不会缩小(见113行)。另一方面,数组的扩展也有个上限IPCMNI,该常数在include/linux/ipc.h中定义为32768。
#define IPCMNI 32768
在newque中调用ipc_addid时将一个全局变量msg_ctlmni作为参数传了下来,就是这里的newsize。这个全局变量的处置为MSGMNI,而MSGMNI在msg.h中定义为16。
最后,newque还要将这个标识号转换成一个一体化的标识号。代码中的msg_buildid是个宏定义:
#define msg_buildid(id, seq) ipc_buildid(&msg_ids, id, seq)
而ipc_buildid的定义则在ipc/util.h中:
extern inline int ipc_buildid(struct ipc_ids* ids, int id, int seq)
{
return SEQ_MULTIPLIER*seq + id;
}
为什么要做这样的转换呢?从ipc_addid的代码中可以看出,由它分配的标识号实际上是msg_ids结构数组中的数组下标。这个下标是重复使用的。如果一个进程建立了一个报文队列,然后又撤销了,而后来又有另一个进程要建立一个报文队列,就有可能又将同一个下标分配给这个新的队列。这样,虽然这种标识号的使用在任何一个特定的时间点上是唯一的,但是如果观察一个合理长的时间跨度就不一定是唯一的了。为了克服这个问题,在msg_ids中设了一个序号seq;每分配使用一个标识号时就递增这个序号(见ipc_addid中的第164行),并且将这个序号与下标编码在一起形成一个一体化的标识号。这么一来,即使在一段时间以后下标又重复了,但由于序号在递增,所以一体化的标识号在相当长的一段时期里都能保证唯一性。
还要注意,键值与标号是两码事。用文件系统打个比方,则键值类似与文件名,而标识号类似于打开文件号。
回到sys_msgget中,如果键值不是IPC_PRIVATE,那就先寻找以给定键值建立的报文队列是否已经存在。函数ipc_findkey的代码在ipc/util.c中:
sys_ipc=>sys_msgget=>ipc_findkey
int ipc_findkey(struct ipc_ids* ids, key_t key)
{
int id;
struct kern_ipc_perm* p;
for (id = 0; id <= ids->max_id; id++) {
p = ids->entries[id].p;
if(p==NULL)
continue;
if (key == p->key)
return id;
}
return -1;
}
寻找的结果对于调用sys_msgget的不同目的(创建或寻找)有着不同的意义,因此要分不同的情况来处理:1如果找不到,那么对于寻找的目的来说是一次失败,而对于创建的目的来说,却是好事,可以进而调用newque了。2如果找到了,那么对于独占性的创建(IPC_EXCL标志为1),这是一次失败,而对于寻找或可共享的创建来说却是好事。不过,一个已经创建的报文队列并不是谁都可以来使用或共享的。一般情况下,只有与创建者属于同一用户并且同一组或者属于超级用户的进程才有资格来使用或共享。所以在sys_msgget中,要调用ipcperms先检查一下访问权限是否相符。最后,通哟啊那个还要通过msg_buildid将实际上是十足下标的标识号转换成一体化的标识号。
函数ipcperms的代码在ipc/util.c中:
sys_ipc=>sys_msgget=>ipcperms
int ipcperms (struct kern_ipc_perm *ipcp, short flag)
{
int requested_mode, granted_mode;
requested_mode = (flag >> 6) | (flag >> 3) | flag;
granted_mode = ipcp->mode;
if (current->euid == ipcp->cuid || current->euid == ipcp->uid)
granted_mode >>= 6;
else if (in_group_p(ipcp->cgid) || in_group_p(ipcp->gid))
granted_mode >>= 3;
if ((requested_mode & ~granted_mode & 0007) &&
!capable(CAP_IPC_OWNER))
return -1;
return 0;
}
可见,报文队列访问权限的管理与文件系统访问权限的管理相似,读者可参考文件系统系列的有关内容。
库函数msgsnd--报文发送参与通信的双方在通过msgget创建了一个报文队列或取得了该队列的标识号以后,就可以向该队列发送或接收报文了。先来看报文的发送,这个函数比较大,所以我们还是分段来阅读(ipc/msg.c)。
sys_ipc=>sys_msgsnd
asmlinkage long sys_msgsnd (int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg)
{
struct msg_queue *msq;
struct msg_msg *msg;
long mtype;
int err;
if (msgsz > msg_ctlmax || (long) msgsz < 0 || msqid < 0)
return -EINVAL;
if (get_user(mtype, &msgp->mtype))
return -EFAULT;
if (mtype < 1)
return -EINVAL;
msg = load_msg(msgp->mtext, msgsz);
if(IS_ERR(msg))
return PTR_ERR(msg);
msg->m_type = mtype;
msg->m_ts = msgsz;
msq = msg_lock(msqid);
err=-EINVAL;
if(msq==NULL)
goto out_free;
首先是对参数做一些检查并将报文从用户控件复制过来,其中msgp是指向一个msgbuf结构的指针,这个数据结构是在include/linux/msg.h中定义的:
struct msgbuf {
long mtype;
char mtext[1];
};
虽然这个指针已经作为系统调用的参数传了过来,数据结构本身却还在用户空间,所以分别通过get_user和load_msg从用户空间复制到系统空间。其中load_msg还要在系统空间为此分配缓冲区。系统空间中使用的msg_msg结构与用户空间中使用的msgbuf结构不同:
struct msg_msg {
struct list_head m_list;
long m_type;
int m_ts;
struct msg_msgseg* next;
};
当报文本身的大小加上这个数据结构的大小仍小于一个页面时,msg_msg结构与报文本身在同一页面中,而报文本身紧跟在msg_msg数据结构后面。否则,当报文本身和msg_msg结构不能容纳在同一页面中时,则要将报文分段,然后将分布于不同页面中的报文段链接起来。此时除第一个页面的开头仍是msg_msg结构以外,其它各个页面的开头都是一个msg_msgseg结构,而每个报文段的大小则为页面大小减去msg_msgseg结构的大小:
struct msg_msgseg {
struct msg_msgseg* next;
};
函数load_msg的代码也在ipc/msg.c中,我们把它留给读者自己阅读。
sys_ipc=>sys_msgsnd=>load_msg
static struct msg_msg* load_msg(void* src, int len)
{
struct msg_msg* msg;
struct msg_msgseg** pseg;
int err;
int alen;
alen = len;
if(alen > DATALEN_MSG)
alen = DATALEN_MSG;
msg = (struct msg_msg *) kmalloc (sizeof(*msg) + alen, GFP_KERNEL);
if(msg==NULL)
return ERR_PTR(-ENOMEM);
msg->next = NULL;
if (copy_from_user(msg+1, src, alen)) {
err = -EFAULT;
goto out_err;
}
len -= alen;
src = ((char*)src)+alen;
pseg = &msg->next;
while(len > 0) {
struct msg_msgseg* seg;
alen = len;
if(alen > DATALEN_SEG)
alen = DATALEN_SEG;
seg = (struct msg_msgseg *) kmalloc (sizeof(*seg) + alen, GFP_KERNEL);
if(seg==NULL) {
err=-ENOMEM;
goto out_err;
}
*pseg = seg;
seg->next = NULL;
if(copy_from_user (seg+1, src, alen)) {
err = -EFAULT;
goto out_err;
}
pseg = &seg->next;
len -= alen;
src = ((char*)src)+alen;
}
return msg;
out_err:
free_msg(msg);
return ERR_PTR(err);
}



