Task* ThreadPool::GetTask(Thread* self) { MutexLock mu(self, task_queue_lock_); while (!IsShuttingDown()) { const size_t thread_count = GetThreadCount(); // Ensure that we don't use more threads than the maximum active workers. const size_t active_threads = thread_count - waiting_count_; // <= since self is considered an active worker. if (active_threads <= max_active_workers_) { Task* task = TryGetTaskLocked(); if (task != nullptr) { return task; } } ++waiting_count_; if (waiting_count_ == GetThreadCount() && !HasOutstandingTasks()) { // We may be done, lets broadcast to the completion condition. completion_condition_.Broadcast(self); } const uint64_t wait_start = kMeasureWaitTime ? NanoTime() : 0; task_queue_condition_.Wait(self); if (kMeasureWaitTime) { const uint64_t wait_end = NanoTime(); total_wait_time_ += wait_end - std::max(wait_start, start_time_); } --waiting_count_; } // We are shutting down, return null to tell the worker thread to stop looping. return nullptr; }
时间: 2024-04-19 14:28:05 浏览: 124
linux_threadpool.zip_epoll select _epoll thread_epoll编程_thread
这段代码是一个线程池的实现,用于获取任务并分配给线程执行。函数`GetTask`用于获取任务,具体流程如下:
1. 首先获取线程池的任务队列锁,确保线程安全。
2. 在循环中,判断线程池是否正在关闭。如果正在关闭,则直接返回nullptr,告诉工作线程停止循环。
3. 获取当前线程数量和活跃线程数量,确保不超过最大活跃工作线程数。
4. 尝试从任务队列中获取任务,如果成功获取到任务,则返回该任务。
5. 如果无法获取任务,则增加等待计数,并检查是否所有线程都处于等待状态且没有未完成的任务。如果是,则向完成条件发送信号。
6. 记录等待开始时间,并等待任务队列条件变量的唤醒。
7. 如果开启了等待时间测量,则记录等待结束时间,并计算总等待时间。
8. 减少等待计数。
9. 如果线程池正在关闭,返回nullptr,告诉工作线程停止循环。
10. 返回获取到的任务。
这段代码的作用是从线程池的任务队列中获取任务,并将其分配给工作线程进行执行。
阅读全文