如何解决InheritableThreadLocal和线程池搭配使用的问题
时间: 2024-06-16 20:08:36 浏览: 160
在使用线程池时,可能会遇到InheritableThreadLocal无法正确继承的问题。这是因为线程池在执行任务时会重用之前创建的线程,而这些线程可能已经绑定了旧的InheritableThreadLocal值,导致新任务继承错误的值。解决这个问题的办法是使用ThreadPoolExecutor而不是ThreadPool来创建线程池,并覆盖它的`ThreadFactory`方法,以创建一个新的线程并正确地继承InheritableThreadLocal值。具体来说,可以创建一个实现`ThreadFactory`接口的类,并覆盖`newThread`方法,如下所示: [^2]
```python
import threading
from concurrent.futures import ThreadPoolExecutor, _base
class MyThreadLocal(_base.Executor):
def __init__(self, thread_local):
self.thread_local = thread_local
def submit(self, fn, *args, **kwargs):
return super().submit(self.wrapper(fn), *args, **kwargs)
def wrapper(self, fn):
local = self.thread_local.copy()
def wrapped_fn(*args, **kwargs):
with local:
return fn(*args, **kwargs)
return wrapped_fn
def map(self, fn, *iterables, timeout=None, chunksize=1):
return list(self._map_async(fn, *iterables).result(timeout=timeout))
def shutdown(self, wait=True):
pass
class MyThreadFactory(ThreadPoolExecutor):
def __init__(self, thread_local, *args, **kwargs):
self.thread_local = thread_local
super().__init__(*args, **kwargs)
def new_thread(self, executor, task):
t = threading.Thread(target=executor._worker, args=(task,), daemon=True)
t.daemon = False
t.name = None
t._Thread__ident = None
t._target = None
t._args = None
t._kwargs = None
t._state = threading.S
t._thread_local = self.thread_local
return t
# 使用示例:
import random
import time
def test_inheritable_thread_local(thread_local, pool):
thread_local.value = random.randint(0, 100)
pool.submit(worker, thread_local.copy())
def worker(thread_local_copy):
print(thread_local_copy.value)
time.sleep(1)
print(thread_local_copy.value)
if __name__ == '__main__':
thread_local = threading.local()
pool = MyThreadLocal(thread_local)
factory = MyThreadFactory(thread_local, 5)
pool._threads = set()
pool._max_workers = 5
pool._thread_name_prefix = 'ThreadPoolExecutor-'
pool._initializer = None
pool._initargs = ()
pool._queue = queue.Queue()
pool._task_counter = itertools.count()
pool._shutdown = False
pool._results = {}
pool._work_ids = set()
pool._threads_lock = threading.Lock()
pool._threads_recreate_lock = threading.Lock()
pool._pending_work_items_lock = threading.Lock()
pool._wake_up_mutex = threading.Lock()
pool._not_responsive_workers = set()
pool._shutdown_thread = None
pool._shutdown_lock = threading.Lock()
pool._shutdown_cond = threading.Condition(pool._shutdown_lock)
pool._workers = {}
pool._done_with_recreate = threading.Condition()
pool._threads_recreate_override = False
pool._threads_recreate_count = 0
pool._threads_recreate_next_id = 0
pool._threads_recreate_idle_time = 0.0
pool._threads_recreate_rate = 0.5
pool._threads_recreate_max = 5
pool._threads_recreate_last = 0.0
pool._threads_recreate_reset = False
pool._kill_workers = False
pool._force_workers = set()
pool._shutdown_lock = threading.Lock()
pool._shutdown_cond = threading.Condition(pool._shutdown_lock)
pool._stop_f = None
pool._threads_recreate_condition = threading.Condition(pool._threads_recreate_lock)
pool._thread_factory = factory
test_inheritable_thread_local(thread_local, pool)
pool.shutdown()
```
阅读全文