c中linux中进程间通信IPC之消息队列MessageQueue在实战中的应用??
参考~未验证??:https://blog.csdn.net/yang_yulei/article/details/19772649
创建消息队列msgget:
//创建一个消息队列,在创建时指定队列的最大消息数和每个消息的最大大小
//本程序使用说明:
//./create -e 路径名
#include <unistd.h>
#include <sys/types.h>
#include <sys/msg.h>
#include <sys/ipc.h> //包含ftok()
#include <fcntl.h> //包含getopt() 及相关的外部变量\
#include <stdio.h>
#include <stdlib.h>
int
main(int argc, char** argv)
{
int c, flags ;
int mqid ; //消息队列id
flags = IPC_CREAT ; //设置消息队列的标记
//从命令行读取 队列的最大消息数 消息的最大大小
while ((c = getopt(argc, argv, "e")) != -1)
{
switch(c)
{
case 'e':
flags |= IPC_EXCL ; //排他性创建:若已存在此名的消息队列,则返回错误
break;
}
}
//若用户没有指定选项
if (optind != argc -1)
puts("请按格式输入:[-e] [-m maxmsg] [-z msgsize] <name>") ;
//创建消息队列
mqid = msgget(ftok(argv[optind], 0), flags) ;
exit(0) ;
}
向消息队列发送消息msgsnd:
//向消息队列发送消息(把一个指定了长度和类型的消息放置到某个队列中)
//我们必须在不同进程中约定好 标识消息队列的 路径名和工程ID,
//可把此两者放置到一个头文件中,让用消息队列通信的进程代码都包含此头文件。
//
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h> //包含ftok
#include <sys/msg.h>
#include <sys/types.h>
#include <sys/fcntl.h>
#define MSG_W 0200
#define BUF_SIZE 512
typedef struct msgbuf
{
long mtype ;
char mdata[BUF_SIZE] ;
} mymsg_t ;
int
main(int argc, char** argv)
{
int mqid ; //消息队列的描述符
size_t msglen ; //消息的长度
long msgtype ; //消息的类型
mymsg_t* ptr ; //消息结构的指针
//用户未按格式输入
if (argc != 3)
puts("usage: send <pathname> <type>") ;
msgtype = atoi(argv[2]) ;
//获取已存在消息队列的描述符
mqid = msgget(ftok(argv[1], 0), MSG_W) ;
//构造一条消息
ptr = calloc(sizeof(long) + msglen, sizeof(char)) ;
ptr->mtype = msgtype ;
snprintf(ptr->mdata, BUF_SIZE, "Hi,Boy~") ;
//发送消息
msglen = strlen(ptr->mdata) ;
msgsnd(mqid, ptr, msglen, 0) ;
exit(0);
}
从消息队列中读取消息msgrcv:
//从一个消息队列中读出一个消息, -n命令行选项指定非阻塞 -t命令行选项指定接收的消息类型
//
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h> //包含ftok
#include <sys/msg.h>
#include <sys/types.h>
#include <sys/fcntl.h>
#include <sys/stat.h>
#define MSG_R 0400
#define MSG_BUF_SIZE (512 + sizeof(long))
typedef struct msgbuf
{
long mtype ;
char mdata[MSG_BUF_SIZE-sizeof(long)] ;
} mymsg_t ;
int
main(int argc, char** argv)
{
int c = 0 ;
int recvCntlFlag = 0 ;
int mqid = 0 ;
long msgtype = 0 ;
ssize_t recvBytes = 0 ; //已接收的字节数
struct msgbuf* buf = NULL ; //存储接收消息的缓冲区的指针
//读取用户输入的命令行
while ((c = getopt(argc, argv, "nt:")) != -1)
{
switch(c)
{
case 'n' :
recvCntlFlag |= IPC_NOWAIT ;
break ;
case 't' :
msgtype = atol(optarg) ;
break ;
}
}
if (optind != argc-1)
puts("usage:msgrcv [-n] [-t typeno] <pathname>") ;
//获取要接收队列的id
mqid = msgget(ftok(argv[optind], 0), MSG_R) ;
//开辟缓冲区 接收消息
buf = malloc(MSG_BUF_SIZE) ;
recvBytes = msgrcv(mqid, buf, MSG_BUF_SIZE, msgtype, recvCntlFlag ) ;
//输出消息内容
printf("recv:%d bytes type=%ld %s",recvBytes, buf->mtype, buf->mdata) ;
exit(0);
}
消息队列上的控制操作msgctl(删除队列):
#include <sys/msg.h>
int msgctl (int msqid, in cmd, struct msqid_ds * buff) ;
参数cmd说明对由msqid指定的队列要执行的命令:
IPC_STAT :取此队列的msqid_ds结构,并将它存放在buf指向的结构中。
IPC_SET :按由buf指向结构中的值,设置与此队列相关结构中的字段。
IPC_RMID:从系统中删除该消息队列以及仍在该队列中的所有数据。
(这三条命令也可用于信号量和共享存储)
//删除一个队列,我们以IPC_RMID命令调用msgctl
//
#include <stdio.h>
#incldue <stdlib.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int
main(int argc, char** argv)
{
int mqid ;
if (argc != 2)
puts("usage:remove <pathname>") ;
mqid = msgget(ftok(argv[1], 0), 0) ;
msgctl(mqid, IPC_RMID, NULL) ;
exit(0) ;
}
./create 任意一路径名
./send 路径名 消息类型(正整数)
./recv 路径名 ------或:./recv -t 消息类型 路径名
./remove 路径名
客户-服务器模型:
客户端服务器端发送接收消息模型,如下图所示:
客户服务器模型实例
//-------------------头文件msgqueue.h ------------------
#ifndef _MAGQUEUE_H_
#define _MAGQUEUE_H_
#include <sys/ipc.h> //包含ftok
#include <sys/msg.h>
#include <sys/types.h>
//消息队列的读 写模式掩码
#define MSG_W 0200
#define MSG_R 0400
//定义众所周知的消息队列键
#define MQ_KEY1 128L
#define DATA_SIZE 512
typedef struct msgbuf
{
long mtype ;
char mdata[DATA_SIZE] ;
} mymsg_t ;
#endif
//-----------------客户端进程-----------------
#include "msgqueue.h"
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
void client(int, int) ;
int
main(int argc, char** argv)
{
int msgqid ;
//打开消息队列
msgqid = msgget(MQ_KEY1, 0) ;
if (msgqid < 0)
{
puts("Open msg queue error!\n") ;
exit(0) ;
}
client(msgqid, msgqid) ;
exit(0) ;
}
void
client(int readfd, int writefd)
{
mymsg_t msgToServer ;
mymsg_t msgFromServer ;
char* writePtr ;
ssize_t pidLen ;
ssize_t dataLen ;
ssize_t recvBytes ;
int pid ;
//-------构造一条消息-----
//消息类型为1
msgToServer.mtype = 1 ;
//在消息头部放本进程ID和空格
pid = getpid() ;
snprintf(msgToServer.mdata, DATA_SIZE, "%ld ", pid) ;
pidLen = strlen(msgToServer.mdata) ;
writePtr = msgToServer.mdata + pidLen ;
//从标准输入读入文件路径
fgets(writePtr, DATA_SIZE - pidLen, stdin) ;
dataLen = strlen(msgToServer.mdata) ;
if (msgToServer.mdata[dataLen-1] == '\n') //删除换行符
{
msgToServer.mdata[dataLen-1] = '\0' ;
}
//发送消息
if (msgsnd(writefd, &msgToServer, strlen(msgToServer.mdata), 0) == -1)
{
puts("Send Error!");
exit(0) ;
}
//-----接收来自服务器的消息
while ((recvBytes = msgrcv(readfd, &msgFromServer, DATA_SIZE, pid, 0)) > 0)
{
write(STDOUT_FILENO, msgFromServer.mdata, recvBytes) ;
}
}
//---------------服务器端进程---------------
//消息队列是双向通信的,故用单个队列就够用。
//我们用每个消息的类型来标识该消息是从客户到服务器,还是从服务器到客户。
//客户向队列发类型为1、PID和路径名。
//服务器向队列发类型为客户进程ID的文件内容。
//
//小心死锁隐患:
//客户们可以填满消息队列,妨碍服务器发送应答,于是客户被阻塞在发送中,服务器也被阻塞。
//避免的方法是:约定服务器对消息队列总是使用非阻塞写。
#include "msgqueue.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
void server(int, int) ;
int
main(int argc, char** argv)
{
int msgqid;
//创建消息队列
msgqid = msgget(MQ_KEY1, IPC_CREAT) ;
if (msgqid < 0)
{
puts("Create msg queue error!\n") ;
exit(0) ;
}
server(msgqid, msgqid) ;
exit(0) ;
}
void
server(int readfd, int writefd)
{
FILE* fp ;
pid_t clientPid ;
mymsg_t* msgPtr ;
ssize_t recvBytes ;
char* pathStr ;
for ( ; ; )
{
//从消息队列中读取来自客户的请求文件路径
msgPtr = malloc(DATA_SIZE + sizeof(long)) ;
recvBytes = msgrcv(readfd, msgPtr, DATA_SIZE, 1, 0) ; //阻塞读
if (recvBytes <= 0)
{
puts("pathname missing") ;
continue ;
}
msgPtr->mdata[recvBytes] = '\0' ;
//分析消息,提取客户PID,文件路径
if ((pathStr = strchr(msgPtr->mdata, ' ')) == NULL)
{
puts("bogus request!") ;
continue ;
}
*pathStr++ = 0 ;
clientPid = atol(msgPtr->mdata) ;
//读取文件内容 返回给客户
msgPtr->mtype = clientPid ; //msgPtr既作为接收消息 又用作发送消息
if ((fp = fopen(pathStr, "r")) == NULL)
{
//读取文件失败,返回给客户失败信息(在原消息内容后 添加错误信息)
snprintf(msgPtr->mdata + recvBytes, sizeof(msgPtr->mdata) -recvBytes,
": can't open!") ;
if (msgsnd(writefd, msgPtr, strlen(msgPtr->mdata), IPC_NOWAIT) == -1)
{
puts("Send Error!");
exit(0);
}
}
else
{ //copy文件内容 发给客户
while (fgets(msgPtr->mdata, DATA_SIZE, fp) != NULL)
{
msgsnd(writefd, msgPtr, strlen(msgPtr->mdata), IPC_NOWAIT) ; //非阻塞写
}
}
}//for()
}