c++写一个完善的线程池,并附带注释
时间: 2024-05-04 14:16:30 浏览: 118
好的,我了解了你的问题。以下是一个基本的线程池实现示例,附带注释。
```c
#include <pthread.h>
#define THREADPOOL_MAX_THREADS 50
#define THREADPOOL_MAX_QUEUE 1000
// 任务的结构体
typedef struct {
void (*function)(void *); // 任务函数指针
void *argument; // 任务函数的参数
} threadpool_task_t;
// 线程池的结构体
struct threadpool_t {
pthread_mutex_t lock; // 互斥锁,用于线程池的同步
pthread_cond_t notify; // 线程条件变量,用于通知等待任务的线程
pthread_t *threads; // 线程数组,存储所有的线程 ID
threadpool_task_t *queue; // 任务队列
int thread_count; // 线程总数
int queue_size; // 队列的当前长度
int head; // 任务队列的队头索引
int tail; // 任务队列的队尾索引
int shutdown; // 标志位,表示线程池是否已经关闭
int started; // 标志位,表示线程池是否已经启动
};
// 初始化线程池
threadpool_t *threadpool_create(int thread_count, int queue_size)
{
// 不要让线程数或队列大小为 0
if (thread_count <= 0 || thread_count > THREADPOOL_MAX_THREADS || queue_size <= 0 || queue_size > THREADPOOL_MAX_QUEUE) {
return NULL;
}
threadpool_t *pool;
int i;
// 创建线程池结构体
if ((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {
return NULL;
}
// 初始化线程池变量
pool->thread_count = 0;
pool->queue_size = queue_size;
pool->head = pool->tail = pool->queue_size = 0;
pool->shutdown = pool->started = 0;
// 分配线程和任务队列的空间
pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count);
pool->queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t) * queue_size);
// 初始化互斥锁和线程条件变量
if ((pthread_mutex_init(&(pool->lock), NULL) != 0) || (pthread_cond_init(&(pool->notify), NULL) != 0)) {
threadpool_destroy(pool);
return NULL;
}
// 创建线程
for (i = 0; i < thread_count; i++) {
if (pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool) != 0) {
threadpool_destroy(pool);
return NULL;
}
pool->thread_count++;
pool->started++;
}
return pool;
}
// 添加任务到线程池的任务队列
int threadpool_add(threadpool_t *pool, void (*function)(void *), void *argument)
{
// 确定下一个任务的索引
int next;
if (pool == NULL || function == NULL) {
return -1;
}
if (pthread_mutex_lock(&(pool->lock)) != 0) {
return -1;
}
// 计算下一个队列索引
next = pool->tail + 1;
if (next == pool->queue_size) {
next = 0;
}
// 队列已满,添加任务失败
if (pool->queue_size == pool->queue_size) {
pthread_mutex_unlock(&(pool->lock));
return -1;
}
// 将新任务添加到任务队列
pool->queue[pool->tail].function = function;
pool->queue[pool->tail].argument = argument;
pool->tail = next;
pool->queue_size++;
// 通知等待的线程有新任务到来
if (pthread_cond_signal(&(pool->notify)) != 0) {
pthread_mutex_unlock(&(pool->lock));
return -1;
}
pthread_mutex_unlock(&(pool->lock));
return 0;
}
// 线程池的线程函数
void *threadpool_thread(void *threadpool)
{
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;
for (;;) {
// 等待新任务到来
pthread_mutex_lock(&(pool->lock));
// 如果没有新任务,并且线程池被关闭了,就结束该线程
while (pool->queue_size == 0 && !pool->shutdown) {
pthread_cond_wait(&(pool->notify), &(pool->lock));
}
// 如果线程池被关闭并且没有新任务了,就结束该线程
if (pool->shutdown && pool->queue_size == 0) {
break;
}
// 获取队头任务
task.function = pool->queue[pool->head].function;
task.argument = pool->queue[pool->head].argument;
pool->head += 1;
pool->queue_size -= 1;
// 计算下一个队列索引
if (pool->head == pool->queue_size) {
pool->head = 0;
}
// 通知其他线程有新任务可以执行
pthread_mutex_unlock(&(pool->lock));
// 执行任务函数
(*(task.function))(task.argument);
}
// 退出线程
pool->started--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
return NULL;
}
// 关闭线程池
int threadpool_destroy(threadpool_t *pool)
{
int i;
if (pool == NULL) {
return -1;
}
if (pthread_mutex_lock(&(pool->lock)) != 0) {
return -1;
}
// 如果线程池已经被关闭了,就返回 -1
if (pool->shutdown) {
pthread_mutex_unlock(&(pool->lock));
return -1;
}
// 标志线程池要关闭
pool->shutdown = 1;
// 唤醒所有等待的线程
if (pthread_cond_broadcast(&(pool->notify)) != 0) {
pthread_mutex_unlock(&(pool->lock));
return -1;
}
pthread_mutex_unlock(&(pool->lock));
// 等待正在执行的线程结束
for (i = 0; i < pool->thread_count; i++) {
if (pthread_join(pool->threads[i], NULL) != 0) {
return -1;
}
}
// 释放线程和任务队列的空间
free(pool->threads);
free(pool->queue);
// 销毁互斥锁和线程条件变量
pthread_mutex_destroy(&(pool->lock));
pthread_cond_destroy(&(pool->notify));
free(pool);
return 0;
}
```
希望我这个示例能够帮到你!
阅读全文