用C++实现线程池示例
时间: 2023-02-15 20:53:38 浏览: 55
代码:
```
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#define MAX_THREAD_NUM 20
#define MAX_QUEUE_SIZE 100
typedef struct task {
void *(*run)(void *args);
void *arg;
struct task *next;
} task_t;
typedef struct threadpool {
pthread_mutex_t lock;
pthread_cond_t notify;
pthread_t *threads;
task_t *queue;
int thread_count;
相关问题
使用c++ 11实现线程池
以下是使用C++11实现线程池的示例代码:
```cpp
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
class ThreadPool {
public:
ThreadPool(size_t num_threads) {
for (size_t i = 0; i < num_threads; ++i) {
threads_.emplace_back([this] {
while (true) {
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this] { return !tasks_.empty() || stop_; });
if (stop_ && tasks_.empty())
return;
auto task = std::move(tasks_.front());
tasks_.pop();
lock.unlock();
task();
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(mutex_);
stop_ = true;
}
condition_.notify_all();
for (auto& thread : threads_)
thread.join();
}
template<class F>
void Enqueue(F&& task) {
{
std::unique_lock<std::mutex> lock(mutex_);
tasks_.emplace(std::forward<F>(task));
}
condition_.notify_one();
}
private:
std::vector<std::thread> threads_;
std::queue<std::function<void()>> tasks_;
std::mutex mutex_;
std::condition_variable condition_;
bool stop_ = false;
};
int main() {
ThreadPool pool(4);
for (int i = 0; i < 8; ++i) {
pool.Enqueue([i] {
std::cout << "Task " << i << " is running" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Task " << i << " is done" << std::endl;
});
}
return 0;
}
```
此示例实现了一个线程池,其中包含一个任务队列和一组工作线程。构造函数创建指定数量的工作线程,并启动它们以侦听任务队列。Enqueue()方法用于将新任务添加到队列中。每个工作线程将从队列中获取任务并执行它。当线程池被销毁时,所有工作线程都将被停止。
使用c++11实现线程池
以下是一个简单的C11线程池实现,包含基本的初始化、添加任务和销毁线程池的函数:
```c
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdatomic.h>
// 最大任务数
#define MAX_TASKS 1024
// 任务结构体
typedef struct {
void (*func)(void*); // 任务函数指针
void* arg; // 任务参数
} task_t;
// 线程池结构体
typedef struct {
pthread_t* threads; // 线程数组
atomic_int num_threads; // 线程数
atomic_int num_tasks; // 任务数
atomic_bool shutdown; // 关闭标志
pthread_mutex_t lock; // 互斥锁
pthread_cond_t task_cond; // 条件变量
task_t tasks[MAX_TASKS]; // 任务数组
} thread_pool_t;
// 执行任务
static void* thread_func(void* arg) {
thread_pool_t* pool = (thread_pool_t*)arg;
while (1) {
// 等待任务
pthread_mutex_lock(&pool->lock);
while (pool->num_tasks == 0 && !pool->shutdown) {
pthread_cond_wait(&pool->task_cond, &pool->lock);
}
// 关闭
if (pool->shutdown) {
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL);
}
// 取出任务
task_t task = pool->tasks[--pool->num_tasks];
pthread_mutex_unlock(&pool->lock);
// 执行任务
task.func(task.arg);
}
}
// 初始化线程池
int thread_pool_init(thread_pool_t* pool, int num_threads) {
// 初始化互斥锁和条件变量
if (pthread_mutex_init(&pool->lock, NULL) != 0) {
return -1;
}
if (pthread_cond_init(&pool->task_cond, NULL) != 0) {
pthread_mutex_destroy(&pool->lock);
return -1;
}
// 初始化线程数组
pool->threads = (pthread_t*)malloc(num_threads * sizeof(pthread_t));
if (pool->threads == NULL) {
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->task_cond);
return -1;
}
// 初始化变量
atomic_init(&pool->num_threads, num_threads);
atomic_init(&pool->num_tasks, 0);
atomic_init(&pool->shutdown, 0);
// 创建线程
for (int i = 0; i < num_threads; i++) {
if (pthread_create(&pool->threads[i], NULL, thread_func, pool) != 0) {
thread_pool_destroy(pool);
return -1;
}
}
return 0;
}
// 添加任务
int thread_pool_add_task(thread_pool_t* pool, void (*func)(void*), void* arg) {
if (atomic_load(&pool->shutdown)) {
return -1;
}
// 添加任务
pthread_mutex_lock(&pool->lock);
if (pool->num_tasks == MAX_TASKS) {
pthread_mutex_unlock(&pool->lock);
return -1;
}
task_t task = { func, arg };
pool->tasks[pool->num_tasks++] = task;
pthread_cond_signal(&pool->task_cond);
pthread_mutex_unlock(&pool->lock);
return 0;
}
// 销毁线程池
void thread_pool_destroy(thread_pool_t* pool) {
if (atomic_exchange(&pool->shutdown, 1)) {
return;
}
// 唤醒所有等待任务的线程
pthread_mutex_lock(&pool->lock);
pthread_cond_broadcast(&pool->task_cond);
pthread_mutex_unlock(&pool->lock);
// 等待所有线程退出
for (int i = 0; i < atomic_load(&pool->num_threads); i++) {
pthread_join(pool->threads[i], NULL);
}
// 释放资源
free(pool->threads);
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->task_cond);
}
```
使用示例:
```c
void task_func(void* arg) {
int* num = (int*)arg;
printf("Task %d executed by thread %ld\n", *num, pthread_self());
}
int main() {
thread_pool_t pool;
thread_pool_init(&pool, 4);
int nums[10];
for (int i = 0; i < 10; i++) {
nums[i] = i + 1;
thread_pool_add_task(&pool, task_func, &nums[i]);
}
thread_pool_destroy(&pool);
return 0;
}
```