Task* ThreadPool::GetTask(Thread* self) { MutexLock mu(self, task_queue_lock_); while (!IsShuttingDown()) { const size_t thread_count = GetThreadCount(); // Ensure that we don't use more threads than the maximum active workers. const size_t active_threads = thread_count - waiting_count_; // <= since self is considered an active worker. if (active_threads <= max_active_workers_) { Task* task = TryGetTaskLocked(); if (task != nullptr) { return task; } } ++waiting_count_; if (waiting_count_ == GetThreadCount() && !HasOutstandingTasks()) { // We may be done, lets broadcast to the completion condition. completion_condition_.Broadcast(self); } const uint64_t wait_start = kMeasureWaitTime ? NanoTime() : 0; task_queue_condition_.Wait(self); if (kMeasureWaitTime) { const uint64_t wait_end = NanoTime(); total_wait_time_ += wait_end - std::max(wait_start, start_time_); } --waiting_count_; } // We are shutting down, return null to tell the worker thread to stop looping. return nullptr; }
时间: 2024-04-19 10:28:05 浏览: 13
这段代码是一个线程池的实现,用于获取任务并分配给线程执行。函数`GetTask`用于获取任务,具体流程如下:
1. 首先获取线程池的任务队列锁,确保线程安全。
2. 在循环中,判断线程池是否正在关闭。如果正在关闭,则直接返回nullptr,告诉工作线程停止循环。
3. 获取当前线程数量和活跃线程数量,确保不超过最大活跃工作线程数。
4. 尝试从任务队列中获取任务,如果成功获取到任务,则返回该任务。
5. 如果无法获取任务,则增加等待计数,并检查是否所有线程都处于等待状态且没有未完成的任务。如果是,则向完成条件发送信号。
6. 记录等待开始时间,并等待任务队列条件变量的唤醒。
7. 如果开启了等待时间测量,则记录等待结束时间,并计算总等待时间。
8. 减少等待计数。
9. 如果线程池正在关闭,返回nullptr,告诉工作线程停止循环。
10. 返回获取到的任务。
这段代码的作用是从线程池的任务队列中获取任务,并将其分配给工作线程进行执行。
相关问题
调用函数 std::unique_lock<std::mutex>::unlock 之前,调用方保留锁定 pool->m_ThreadPool 失败
在调用 std::unique_lock<std::mutex>::unlock 之前,调用方必须保留锁定 pool->m_ThreadPool,否则会导致未定义的行为。因为在解锁之前释放互斥量可能会导致其他线程在未锁定互斥量的情况下访问共享资源。因此,在调用 std::unique_lock<std::mutex>::unlock 之前,调用方应该始终确保已经获得了互斥量的锁定。
#include <vector> #include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <functional> #include <future> class ThreadPool { public: ThreadPool(size_t threads) : stop(false) { for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this] { for (;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } }); } } ~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread &worker : workers) worker.join(); } template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); std::future<return_type> res = task->get_future(); { std_lock<std::mutex> lock(queue_mutex); if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task] { (*task)(); }); } condition.notify_one(); return res; } private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; };
这是一个 C++ 实现的线程池类。它可以在创建时指定线程数,然后通过 `enqueue` 方法将任务提交到线程池中执行。线程池会自动创建多个线程来执行任务,并且可以动态调整线程池大小,以适应任务负载的变化。这个实现使用了 C++11 中的标准库,包括 `std::thread`、`std::mutex`、`std::condition_variable` 等。它还提供了一个 `std::future` 对象,可以用来获取任务执行的结果。