linux 线程间通信消息队列实现
时间: 2023-05-24 13:06:04 浏览: 205
在 Linux 中,线程间通信可以使用多种方法实现,其中包括消息队列。消息队列是一种基于内核的 IPC(进程间通信)机制,允许一个进程向另一个进程发送一个消息。
以下是在 Linux 中实现线程间通信消息队列的步骤:
1. 创建消息队列,使用mq_open()函数来创建一个消息队列,需要指定消息队列的名称以及一些其他的属性。
2. 发送消息到消息队列,使用mq_send()函数来向消息队列发送一个消息。该函数需要指定要发送的消息队列的描述符,以及要发送的消息内容。
3. 接收消息,使用mq_receive()函数从消息队列中接收消息。该函数需要指定要接收消息的消息队列的描述符,以及接收消息的缓冲区。
4. 关闭消息队列,使用mq_close()函数来关闭一个消息队列。当不需要在使用消息队列时,需要调用此函数来关闭消息队列。
5. 删除消息队列,使用mq_unlink()函数来删除已经创建的消息队列。当不需要使用消息队列时,需要调用此函数来删除消息队列。
以上是在 Linux 中实现线程间通信消息队列的基本步骤,需要结合具体的应用场景进行具体的设计和实现。在实际应用中,需要注意消息的大小和类型,以及消息队列的最大大小等细节问题。
相关问题
linux 线程间消息队列实现
在 Linux 中实现线程间的消息队列,我们可以使用 POSIX 消息队列(POSIX message queues)和 SysV 队列(System V Message Queue)。
1. POSIX 消息队列
POSIX 消息队列是 POSIX 线程提供的一个进程间通信的方式。一个消息队列包含一个标识符、一个最大队列长度和一个消息队列的缓存区。它类似于管道和消息队列,但它与进程无关。
POSIX 消息队列提供了两个函数 mq_open() 和 mq_send() 来创建队列和发送消息。mq_open() 函数返回类型为 mqd_t(消息队列描述符),它是一个整数,用于后续操作中参考队列。mq_send() 函数用于向队列中发送消息,它需要传递指定的描述符以便发送消息。
以下是 POSIX 消息队列的实现示例:
```
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#define QUEUE_NAME "/my_message_queue"
#define QUEUE_PERMS ((mode_t) 0600)
#define MAX_MSG_SIZE 2048
#define MAX_MSG_NUM 10
void die(char *s){
perror(s);
exit(EXIT_FAILURE);
}
int main(int argc, char *argv[]){
mqd_t qd;
struct mq_attr attr;
attr.mq_flags = 0; //设置队列标志为 0
attr.mq_maxmsg = MAX_MSG_NUM; //设置队列中允许最大的消息数
attr.mq_msgsize = MAX_MSG_SIZE; //设置队列中允许最大的消息大小
/*打开或创建一个新的消息队列*/
qd = mq_open(QUEUE_NAME, O_RDWR | O_CREAT, QUEUE_PERMS, &attr);
if(qd == -1) die("mq_open");
char *msg = "Hello, world.";
size_t msg_len = strlen(msg);
unsigned int priority = 1;
/*将消息发送到队列中*/
if(mq_send(qd, msg, msg_len, priority) == -1) die("mq_send");
mq_close(qd); //关闭消息队列
return 0;
}
```
2. SysV 队列
SysV 队列是 System V IPC 提供的一种线程间通信方式。它提供了 msgget()、msgctl()、msgrcv() 这三个函数来创建队列、控制队列和接收消息。
以下是 SysV 消息队列的实现示例:
```
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define QUEUE_PERMS 0600
#define BUF_SIZE 256
struct message{
long mtype;
char mtext[BUF_SIZE];
};
int main(int argc, char *argv[]){
key_t key;
int qid;
struct message msg;
/*创建共享目录*/
if((key = ftok(".", 'a')) == -1){
perror("ftok");
exit(1);
}
/*创建消息队列*/
if((qid = msgget(key, QUEUE_PERMS | IPC_CREAT)) == -1){
perror("msgget");
exit(1);
}
/*向消息队列中发送消息*/
msg.mtype = 1;
strncpy(msg.mtext, "Hello, world.", BUF_SIZE-1);
if(msgsnd(qid, &msg, strlen(msg.mtext), IPC_NOWAIT) == -1){
perror("msgsnd");
exit(1);
}
/*释放消息队列*/
if(msgctl(qid, IPC_RMID, NULL) == -1){
perror("msgctl");
exit(1);
}
return 0;
}
```
linux 使用POSIX 消息队列实现线程间通信的例子
以下是一个使用 POSIX 消息队列实现线程间通信的例子:
```
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <mqueue.h>
#define MSG_SIZE 256
#define MAX_MSG 10
mqd_t mqd;
pthread_t tid[2];
pthread_attr_t attr;
// 线程1:发送消息
void *send_func(void *arg) {
char msg[MSG_SIZE];
int i;
for (i = 0; i < MAX_MSG; i++) {
memset(msg, 0, MSG_SIZE);
sprintf(msg, "Message %d from thread 1", i);
if (mq_send(mqd, msg, strlen(msg) + 1, 0) == -1) {
perror("mq_send");
exit(1);
}
printf("Thread 1 sent: %s\n", msg);
sleep(1);
}
}
// 线程2:接收消息
void *recv_func(void *arg) {
char msg[MSG_SIZE];
unsigned int prio;
int i;
for (i = 0; i < MAX_MSG; i++) {
memset(msg, 0, MSG_SIZE);
if (mq_receive(mqd, msg, MSG_SIZE, &prio) == -1) {
perror("mq_receive");
exit(1);
}
printf("Thread 2 received: %s\n", msg);
sleep(1);
}
}
int main() {
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = 10;
attr.mq_msgsize = MSG_SIZE;
attr.mq_curmsgs = 0;
if ((mqd = mq_open("/test_mq", O_CREAT | O_RDWR, S_IRUSR | S_IWUSR, &attr)) == -1) {
perror("mq_open");
exit(1);
}
if (pthread_attr_init(&attr) != 0) {
perror("pthread_attr_init");
exit(1);
}
if (pthread_create(&tid[0], &attr, send_func, NULL) != 0) {
perror("pthread_create");
exit(1);
}
if (pthread_create(&tid[1], &attr, recv_func, NULL) != 0) {
perror("pthread_create");
exit(1);
}
if (pthread_join(tid[0], NULL) != 0) {
perror("pthread_join");
exit(1);
}
if (pthread_join(tid[1], NULL) != 0) {
perror("pthread_join");
exit(1);
}
if (mq_close(mqd) == -1) {
perror("mq_close");
exit(1);
}
if (mq_unlink("/test_mq") == -1) {
perror("mq_unlink");
exit(1);
}
return 0;
}
```
该程序创建了一个 POSIX 消息队列 `/test_mq`,其中维护了最大消息数为 10,每条消息为 256 字节。程序启动两个线程,一个用于发送消息,一个用于接收消息,它们都可以同时操作消息队列。发送线程每秒钟向队列中发送一条消息,接收线程每秒钟从队列中接收一条消息并打印出来。程序使用 `pthread_create()` 创建线程,使用 `pthread_join()` 以等待线程完成,使用 `mq_send()` 发送消息,使用 `mq_receive()` 接收消息。最后程序清理了 POSIX 消息队列。