1.代码:
//接受消息 #include#include #include #include #include #include #include #include struct my_msg_st{ long int my_msg_type; char some_text[BUFSIZ]; }; int main(){ int running=1; int msgid; struct my_msg_st some_data; long int msg_to_receive=0; //First,we set up thee message queue msgid=msgget((key_t)1234,0666|IPC_CREAT); if(msgid==-1){ fprintf(stderr,"megget failed with error: %dn",errno); exit(EXIT_FAILURE); } //Then the message are retrieved from the queue,until an end message is encountered //Lastly,the message queue id deleted while(running){ if(msgrcv(msgid,(void*)&some_data,BUFSIZ,msg_to_receive,0)==-1){ fprintf(stderr,"msgrcv failed with error: %dn",errno); exit(EXIT_FAILURE); } printf("You wrote: %s ",some_data.some_text); if(strncmp(some_data.some_text,"end",3)==0) running=0; } if(msgctl(msgid,IPC_RMID,0)==-1){ fprintf(stderr,"msgctl(IPC_RMID) failedn"); exit(EXIT_FAILURE); } exit(EXIT_SUCCESS); }
///发送消息 #include#include #include #include #include #include #include #include #define MAX_TEXT 512 struct my_msg_st{ long int my_msg_type; char some_text[MAX_TEXT]; }; int main(){ int running=1; int msgid; struct my_msg_st some_data; char buffer[BUFSIZ]; msgid=msgget((key_t)1234,0666|IPC_CREAT); if(msgid==-1){ fprintf(stderr,"megget failed with error: %dn",errno); exit(EXIT_FAILURE); } while(running){ printf("Enter some text:"); fgets(buffer,BUFSIZ,stdin); some_data.my_msg_type=1; strcpy(some_data.some_text,buffer); if(msgsnd(msgid,(void*)&some_data,MAX_TEXT,0)==-1){ fprintf(stderr,"msgsnd failedn"); exit(EXIT_FAILURE); } if(strncmp(buffer,"end",3)==0) running=0; } exit(EXIT_SUCCESS); } //开两个终端分别运行
执行结果如图:
2)熟悉消息队列相关的系统调用
1.头文件: #include#include #include 2.操作: 创建队列: msgget((key_t)10086,0666|IPC_CREAT); 发送信息: msgsnd(mq,(void*)&over2,MAX_SIZE,0) 接收信息: msgrcv(mq,(void*)&buf,1024,0,0) 指定操作: msgctl(mq,IPC_RMID,&t)
3)尝试多个发送进程和接收进程,管擦进程的并发执行情况,并解释
现象: 多个发送信息的终端在输入后,信息依次轮序出现在各个接收端进程中
解释:消息队列(MQ)由内核提供同步,互斥机制,可以翻阅msgsnd,msgrcv linux底层源代码一探究竟
链接
如图:
二.三个线程(两个发送者,一个接收者)并发执行。通过消息队列,分别进行消息的发送和接收
1.代码:
#include#include #include #include #include #include #include #include #include #define QUEUE_ID 10086 #define MAX_SIZE 1024 #define MSG_STOP "exit" #define snd_to_rcv1 1 #define snd_to_rcv2 2 #define rcv_to_snd1 3 #define rcv_to_snd2 4 #define CHECK(x) do{ if(!(x)){ fprintf(stderr,"%s:%d:",__func__,__LINE__); perror(#x); exit(-1); } }while(0) #define P(x) sem_wait(&x) #define V(x) sem_post(&x) struct msg_st{ // long int message_type; char buffer[MAX_SIZE+1]; }; //function void *sender1(); void *sender2(); void *receiver(); //global variable sem_t w_mutex,empty,full,over,rcv_dp,snd_dp; void *sender1(){ int mq; struct msg_st buf; ssize_t bytes_read; //open the mail queue mq=msgget((key_t)QUEUE_ID,0666|IPC_CREAT); CHECK((key_t)-1!=mq); do{ P(w_mutex); // printf("11n"); P(snd_dp); // printf("12n"); printf("sender1>"); V(rcv_dp); fflush(stdout); fgets(buf.buffer,BUFSIZ,stdin); buf.message_type=1; //send the message P(empty); // printf("13n"); CHECK(0<=msgsnd(mq,(void*)&buf,MAX_SIZE,0)); V(full); V(w_mutex); usleep(100); } while(strncmp(buf.buffer,MSG_STOP,strlen(MSG_STOP))); //wait for response P(over); // printf("14n"); bytes_read=msgrcv(mq,(void*)&buf,MAX_SIZE,rcv_to_snd1,0); CHECK(bytes_read>=0); printf("%s",buf.buffer); printf("--------------------------n"); V(snd_dp); pthread_exit(NULL); } void *sender2(){ int mq; struct msg_st buf; ssize_t bytes_read; //open the mail queue mq=msgget((key_t)QUEUE_ID,0666|IPC_CREAT); CHECK((key_t)-1!=mq); do{ P(w_mutex); // printf("21n"); P(snd_dp); // printf("22n"); printf("sender2>"); V(rcv_dp); fflush(stdout); fgets(buf.buffer,BUFSIZ,stdin); buf.message_type=2; //send the message P(empty); // printf("23n"); CHECK(0<=msgsnd(mq,(void*)&buf,MAX_SIZE,0)); V(full); V(w_mutex); usleep(100); } while(strncmp(buf.buffer,MSG_STOP,strlen(MSG_STOP))); //wait for response P(over); // printf("24n"); bytes_read=msgrcv(mq,(void*)&buf,MAX_SIZE,rcv_to_snd2,0); CHECK(bytes_read>=0); // printf("%s",buf.buffer); printf("--------------------------n"); V(snd_dp); pthread_exit(NULL); } void *receiver(){ struct msg_st buf,over1,over2; int mq,must_stop=2; struct msqid_ds t; over1.message_type=3; strcpy(over1.buffer,"over1n"); over2.message_type=4; strcpy(over2.buffer,"over2n"); //open the mail queue mq=msgget((key_t)QUEUE_ID,0666|IPC_CREAT); CHECK((key_t)-1!=mq); do{ ssize_t bytes_read,bytes_write; //receive the message P(full); // printf("31n"); bytes_read=msgrcv(mq,(void*)&buf,MAX_SIZE,0,0); V(empty); CHECK(bytes_read>=0); if(!strncmp(buf.buffer,MSG_STOP,strlen(MSG_STOP))){ if(buf.message_type==1){ // printf("321n"); bytes_write=msgsnd(mq,(void*)&over1,MAX_SIZE,0); CHECK(bytes_write>=0); V(over); must_stop--; } else if(buf.message_type==2){ // printf("322n"); bytes_write=msgsnd(mq,(void*)&over2,MAX_SIZE,0); // CHECK(bytes_write>=0); V(over); must_stop--; } else{ // printf("323n"); P(rcv_dp); printf("Received %d:%s",buf.message_type,buf.buffer); printf("------------------------------------------n"); V(snd_dp); } } } while(must_stop); //clean up P(snd_dp); // printf("33/n"); CHECK(!msgctl(mq,IPC_RMID,&t)); // pthread_exit(NULL); } int main(int argc,char **argv){ pthread_t t1,t2,t3; int state; sem_init(&snd_dp,1,1); sem_init(&rcv_dp,1,0); sem_init(&empty,1,10); sem_init(&full,1,0); sem_init(&w_mutex,1,1); sem_init(&over,1,0); state=pthread_create(&t1,NULL,receiver,NULL); CHECK(state==0); state=pthread_create(&t3,NULL,sender1,NULL); CHECK(state==0); state=pthread_create(&t2,NULL,sender2,NULL); CHECK(state==0); pthread_join(t3,NULL); pthread_join(t2,NULL); pthread_join(t1,NULL); return 0; }
注意:运行后,两次均输入exit,才能满足receive解锁条件,实现同步
运行如图:
2)熟悉消息队列相关的系统调用
1.头文件:
#include
#include
#include
2.操作:
创建队列: msgget((key_t)10086,0666|IPC_CREAT);
发送信息: msgsnd(mq,(void*)&over2,MAX_SIZE,0)
接收信息: msgrcv(mq,(void*)&buf,1024,0,0)
指定操作: msgctl(mq,IPC_RMID,&t)
3)回顾posix线程控制和信号量相关的函数
1.头文件: #include2.操作: 声明: sem_t sem1 初始: sem_init(&sem1,1,1) p操作: sem_wait(&sem1) v操作: sem_post(&sem1) 3.互斥/同步: pv/vp
4)删除信号量的并发控制,观察混乱情况
代码中注释掉所有P操作
5)理清并发线程中同步和互斥关系
1.操作:去掉原文代码所有注释 作用:在所有P操作后进行输出,来观察进程并发执行情况
运行如图:
流程图:并发线程同步,互斥关系:(sender1与sender2互斥)
移动端图片看不清,可以用pc端浏览;欢迎访问gitee仓库
ZUCC_操作系统原理实验_Lab9进程的通信消息队列
三.编写程序
1.修改例程1,模仿例程2,在原有的MSG1向MSG2发送消息的基础上,实现MSG2也能向MSG1发送消息,即两并发进程能通过消息队列,实现双向对话。
构造核心:
1.使用System V信号量实现两个进程间的通信
2.使用消息队列实现两个进程间消息的接收与发送
3.确定流程图顺序,从而确定信号量个数与初值的设定,同时流程图拆分确定两个进程的PV操作和核心代码
代码:
///发送消息 #include#include #include #include #include #include #include #include #include #include #include struct my_msg_st{ long int my_msg_type; char some_text[BUFSIZ]; }; union semun{ int val; //value for setval struct semid_ds *buf; //buffer for IPC_STAT,IPC_SET unsigned short int *array; //array for GETALL,SETALL struct seminfo *_buf; //buffer for IPC_INFO }; static int sem_id=0; static int set_semvalue(int semnum,int sem_value); static void del_semvalue(int semnum); static int semaphore_p(int semnum); static int semaphore_v(int semnum); int main(){ int running=1; int msgid; struct my_msg_st some_data; char buffer[BUFSIZ]; msgid=msgget((key_t)1234,0666|IPC_CREAT); sem_id=semget((key_t)1234,4,0666|IPC_CREAT); //创建信号量 //会与msgget冲突吗? set_semvalue(0,0); set_semvalue(1,0); set_semvalue(2,0); set_semvalue(3,0); while(running){ //输入发送 printf("Enter some text:"); fgets(buffer,BUFSIZ,stdin); some_data.my_msg_type=1; strcpy(some_data.some_text,buffer); msgsnd(msgid,(void*)&some_data,BUFSIZ,0); semaphore_v(0); if(strncmp(some_data.some_text,"end",3)==0) break; semaphore_p(1); //接收输出 msgrcv(msgid,(void*)&some_data,BUFSIZ,0,0); printf("You wrote: %s",some_data.some_text); semaphore_v(2); if(strncmp(some_data.some_text,"end",3)==0) break; semaphore_p(3); //接收输出 msgrcv(msgid,(void*)&some_data,BUFSIZ,0,0); printf("You wrote: %s",some_data.some_text); if(strncmp(some_data.some_text,"end",3)==0) break; } exit(EXIT_SUCCESS); } //开两个终端分别运行 static int set_semvalue(int semnum,int sem_value){ //初始化信号量,使用信号量前必须这样做 union semun sem_union; sem_union.val=sem_value; if(semctl(sem_id,semnum,SETVAL,sem_union)==-1){ fprintf(stderr,"Failed to set the semaphoren"); return 0; } return 1; } static void del_semvalue(int semnum){ //删除信号量 union semun sem_union; if(semctl(sem_id,semnum,IPC_RMID,sem_union)==-1){ fprintf(stderr,"Failed to delete the semaphoren"); } } static int semaphore_p(int semnum){ //对信号量做-1,即等待P struct sembuf sem_b; sem_b.sem_num=semnum; sem_b.sem_op=-1; sem_b.sem_flg=SEM_UNDO; if(semop(sem_id,&sem_b,1)==-1){ fprintf(stderr,"semaphore_p failedn"); return 0; } return 1; } static int semaphore_v(int semnum){ //释放操作,使得信号量可用,发送V信号 struct sembuf sem_b; sem_b.sem_num=semnum; sem_b.sem_op=1; sem_b.sem_flg=SEM_UNDO; if(semop(sem_id,&sem_b,1)==-1){ fprintf(stderr,"semaphore_v failedn"); return 0; } return 1; }
//接受消息 #include#include #include #include #include #include #include #include #include #include #include union semun{ int val; //value for setval struct semid_ds *buf; //buffer for IPC_STAT,IPC_SET unsigned short int *array; //array for GETALL,SETALL struct seminfo *_buf; //buffer for IPC_INFO }; static int sem_id=0; static int set_semvalue(int semnum,int sem_value); static void del_semvalue(int semnum); static int semaphore_p(int semnum); static int semaphore_v(int semnum); struct my_msg_st{ long int my_msg_type; char some_text[BUFSIZ]; }; int main(){ int running=1; int msgid; struct my_msg_st some_data; char buffer[BUFSIZ]; msgid=msgget((key_t)1234,0666|IPC_CREAT); sem_id=semget((key_t)1234,4,0666|IPC_CREAT); //创建信号量 //会与msgget冲突吗? while(running){ semaphore_p(0); //接收输出 msgrcv(msgid,(void*)&some_data,BUFSIZ,0,0); printf("You wrote: %s",some_data.some_text); if(strncmp(some_data.some_text,"end",3)==0) break; //输入发送 printf("Enter some text:"); fgets(buffer,BUFSIZ,stdin); some_data.my_msg_type=1; strcpy(some_data.some_text,buffer); msgsnd(msgid,(void*)&some_data,BUFSIZ,0); semaphore_v(1); if(strncmp(some_data.some_text,"end",3)==0) break; semaphore_p(2); //输入发送 printf("Enter some text:"); fgets(buffer,BUFSIZ,stdin); some_data.my_msg_type=1; strcpy(some_data.some_text,buffer); msgsnd(msgid,(void*)&some_data,BUFSIZ,0); semaphore_v(3); if(strncmp(some_data.some_text,"end",3)==0) break; } sleep(1); msgctl(msgid,IPC_RMID,0); del_semvalue(0); exit(EXIT_SUCCESS); } static int set_semvalue(int semnum,int sem_value){ //初始化信号量,使用信号量前必须这样做 union semun sem_union; sem_union.val=sem_value; if(semctl(sem_id,semnum,SETVAL,sem_union)==-1){ fprintf(stderr,"Failed to set the semaphoren"); return 0; } return 1; } static void del_semvalue(int semnum){ //删除信号量 union semun sem_union; if(semctl(sem_id,semnum,IPC_RMID,sem_union)==-1){ fprintf(stderr,"Failed to delete the semaphoren"); } } static int semaphore_p(int semnum){ //对信号量做-1,即等待P struct sembuf sem_b; sem_b.sem_num=semnum; sem_b.sem_op=-1; sem_b.sem_flg=SEM_UNDO; if(semop(sem_id,&sem_b,1)==-1){ fprintf(stderr,"semaphore_p failedn"); return 0; } return 1; } static int semaphore_v(int semnum){ //释放操作,使得信号量可用,发送V信号 struct sembuf sem_b; sem_b.sem_num=semnum; sem_b.sem_op=1; sem_b.sem_flg=SEM_UNDO; if(semop(sem_id,&sem_b,1)==-1){ fprintf(stderr,"semaphore_v failedn"); return 0; } return 1; }
sender,receiver进程并发控制的流程图,代码构成
运行如图:
四.消息队列常用模块
#include#include #include #include #include #include #include #include struct my_msg_st{ long int my_msg_type; char some_text[BUFSIZ]; }; int main(){ int running=1; int msgid; struct my_msg_st some_data; char buffer[BUFSIZ]; //创建 msgid=msgget((key_t)1234,0666|IPC_CREAT); if(msgid==-1){ fprintf(stderr,"megget failed with error: %dn",errno); exit(EXIT_FAILURE); } while(running){ //输入发送 printf("Enter some text:"); fgets(buffer,BUFSIZ,stdin); some_data.my_msg_type=1; strcpy(some_data.some_text,buffer); if(msgrcv(msgid,(void*)&some_data,BUFSIZ,msg_to_receive,0)==-1){ fprintf(stderr,"msgrcv failed with error: %dn",errno); exit(EXIT_FAILURE); } if(strncmp(some_data.some_text,"end",3)==0) break; //接收输出 if(msgrcv(msgid,(void*)&some_data,BUFSIZ,msg_to_receive,0)==-1){ fprintf(stderr,"msgrcv failed with error: %dn",errno); exit(EXIT_FAILURE); } printf("You wrote: %s",some_data.some_text); } exit(EXIT_SUCCESS); }
1



