请实现一个队列,队列的使用方有生产者(往队列里写数据),同时有消费者(从里面取数据);实现生产与消费的接口函数;需要考虑多线程环境,生产与消费可能同时进行的情况,导致数据不安全的问题;作为消费者,它如何能实时的知道队列里有数据而去进行消费?即消息或者事件实现实时通知。请写一段程序满足以上要求
时间: 2023-04-04 08:01:46 浏览: 77
以下是一个基于线程安全的队列实现,包含生产者和消费者的接口函数,以及实时通知的功能:
```c
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#define QUEUE_SIZE 10
typedef struct {
int *buffer; // 队列缓冲区
int head; // 队列头指针
int tail; // 队列尾指针
int size; // 队列大小
sem_t *empty; // 空闲缓冲区信号量
sem_t *full; // 已用缓冲区信号量
pthread_mutex_t *mutex; // 互斥锁
} queue_t;
queue_t *queue_create() {
queue_t *queue = (queue_t *)malloc(sizeof(queue_t));
queue->buffer = (int *)malloc(sizeof(int) * QUEUE_SIZE);
queue->head = ;
queue->tail = ;
queue->size = ;
queue->empty = (sem_t *)malloc(sizeof(sem_t));
queue->full = (sem_t *)malloc(sizeof(sem_t));
queue->mutex = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t));
sem_init(queue->empty, , QUEUE_SIZE);
sem_init(queue->full, , );
pthread_mutex_init(queue->mutex, NULL);
return queue;
}
void queue_destroy(queue_t *queue) {
free(queue->buffer);
sem_destroy(queue->empty);
sem_destroy(queue->full);
pthread_mutex_destroy(queue->mutex);
free(queue->empty);
free(queue->full);
free(queue->mutex);
free(queue);
}
void queue_push(queue_t *queue, int value) {
sem_wait(queue->empty);
pthread_mutex_lock(queue->mutex);
queue->buffer[queue->tail] = value;
queue->tail = (queue->tail + 1) % QUEUE_SIZE;
queue->size++;
pthread_mutex_unlock(queue->mutex);
sem_post(queue->full);
}
int queue_pop(queue_t *queue) {
sem_wait(queue->full);
pthread_mutex_lock(queue->mutex);
int value = queue->buffer[queue->head];
queue->head = (queue->head + 1) % QUEUE_SIZE;
queue->size--;
pthread_mutex_unlock(queue->mutex);
sem_post(queue->empty);
return value;
}
int queue_size(queue_t *queue) {
return queue->size;
}
void *consumer_thread(void *arg) {
queue_t *queue = (queue_t *)arg;
while (1) {
int value = queue_pop(queue);
printf("Consumer thread: %d\n", value);
}
}
void queue_start_consumer(queue_t *queue) {
pthread_t thread;
pthread_create(&thread, NULL, consumer_thread, queue);
}
void queue_notify(queue_t *queue) {
sem_post(queue->full);
}
int main() {
queue_t *queue = queue_create();
queue_start_consumer(queue);
while (1) {
int value = rand() % 100;
queue_push(queue, value);
printf("Producer thread: %d\n", value);
if (queue_size(queue) > ) {
queue_notify(queue);
}
sleep(1);
}
queue_destroy(queue);
return ;
}
```
在主函数中,我们创建了一个队列,并启动了一个消费者线程。在生产者线程中,我们不断往队列中写入数据,并检查队列是否非空,如果非空则调用 `queue_notify` 函数通知消费者线程进行消费。消费者线程中,我们不断从队列中取出数据并打印出来。
在队列的实现中,我们使用了信号量和互斥锁来保证线程安全。`sem_wait` 和 `sem_post` 函数用于操作信号量,`pthread_mutex_lock` 和 `pthread_mutex_unlock` 函数用于操作互斥锁。在 `queue_push` 和 `queue_pop` 函数中,我们使用了信号量来控制空闲缓冲区和已用缓冲区的数量,使用互斥锁来保证队列头指针、尾指针和大小的更新是原子操作。
在 `queue_start_consumer` 函数中,我们创建了一个消费者线程,并将队列作为参数传递给线程函数。在消费者线程中,我们不断从队列中取出数据并打印出来。
在 `queue_notify` 函数中,我们使用了 `sem_post` 函数来通知消费者线程进行消费。这个函数可以在生产者线程中任意位置调用,因为它只是通知消费者线程进行消费,并不会阻塞当前线程。
需要注意的是,这个队列实现并不是最优的,它只是一个简单的示例。在实际应用中,可能需要考虑更多的因素,比如队列的容量、生产者和消费者的速度差异、消费者的数量等等。
阅读全文