
I. Python Multithreading — Complete API Reference Manual
_thread module. Because of the GIL (Global Interpreter Lock, 全局解释器锁), threads do not achieve true CPU parallelism for pure Python code — but they excel at IO-bound tasks (IO密集型任务) such as network requests, file operations, and database calls. This manual covers every public API with runnable examples. 1. Thread — Core Thread Object (核心线程对象)
threading.Thread is the fundamental building block. A thread can be created by passing a callable target or by subclassing and overriding run(). 1) Constructor (构造函数)
threading.Thread( group=None, # reserved, always None target=None, # callable to run in thread name=None, # thread name string args=(), # positional args tuple for target kwargs=None, # keyword args dict for target daemon=None # True → daemon thread (守护线程))2) Thread.start() — Launch the thread
Schedules the thread for execution. Must be called exactly once per Thread object.
import threadingimport time
def worker(name, delay): time.sleep(delay) print(f"[{name}] finished after {delay}s")
t1 = threading.Thread(target=worker, args=("Alpha", 1))t2 = threading.Thread(target=worker, args=("Beta", 2))
t1.start() # ← launches t1t2.start() # ← launches t2 concurrently
print("Main thread continues immediately")# Output order (non-deterministic):# Main thread continues immediately# [Alpha] finished after 1s# [Beta] finished after 2sstart() twice on the same Thread raises RuntimeError. If you need to rerun a task, create a new Thread instance.3) Thread.join(timeout=None) — Wait for completion (等待线程结束)
Blocks the calling thread until the target thread terminates, or until timeout seconds elapse.
import threading, time
def slow_task(): print("Task started") time.sleep(3) print("Task done")
t = threading.Thread(target=slow_task)t.start()
t.join(timeout=5) # wait up to 5 seconds
if t.is_alive(): print("Thread still running after timeout!")else: print("Thread completed successfully")# → Task started# → Task done# → Thread completed successfully4) Thread.is_alive() — Check thread status (检查线程状态)
Returns True between start() and thread termination.
import threading, time
def task(): time.sleep(2)
t = threading.Thread(target=task)print(t.is_alive()) # → False (not started yet)t.start()print(t.is_alive()) # → True (running)t.join()print(t.is_alive()) # → False (terminated)5) Thread.name / Thread.getName() / Thread.setName() — Thread name (线程名)
import threading
def task(): # Access name inside the thread print(f"Running as: {threading.current_thread().name}")
t = threading.Thread(target=task, name="WorkerThread-1")print(t.name) # → WorkerThread-1t.setName("Renamed")print(t.getName()) # → Renamedt.start()t.join()# → Running as: Renamed6) Thread.daemon — Daemon threads (守护线程)
A daemon thread is automatically killed when ALL non-daemon threads exit — it does NOT block program shutdown.
import threading, time
def background_monitor(): while True: print("[Monitor] heartbeat") time.sleep(1)
# Must set daemon BEFORE start()monitor = threading.Thread(target=background_monitor, daemon=True)monitor.start()
print("Main: doing work")time.sleep(2.5)print("Main: exiting — monitor will be killed automatically")# → [Monitor] heartbeat# → Main: doing work# → [Monitor] heartbeat# → [Monitor] heartbeat# → Main: exiting — monitor will be killed automatically7) Thread.ident / Thread.native_id — Thread identifiers (线程标识符)
import threading
def show_ids(): t = threading.current_thread() print(f"ident={t.ident}, native_id={t.native_id}")
t = threading.Thread(target=show_ids)t.start()t.join()# → ident=140234567890, native_id=12345
print(f"Main ident: {threading.main_thread().ident}")8) Subclass Pattern — Override run() (子类模式)
import threading, time
class DownloadThread(threading.Thread): """Custom thread that downloads a resource."""
def __init__(self, url: str): super().__init__(name=f"Download-{url}") self.url = url self.result = None
def run(self): # Simulate download time.sleep(0.5) self.result = f"<html from {self.url}>" print(f"Downloaded: {self.url}")
threads = [DownloadThread(f"http://example.com/page{i}") for i in range(3)]
for t in threads: t.start()
for t in threads: t.join() print(f"Result: {t.result}")2. Lock — Mutual Exclusion (互斥锁)
1) Lock.acquire(blocking=True, timeout=-1) / Lock.release()
import threading
counter = 0lock = threading.Lock()
def increment(n): global counter for _ in range(n): lock.acquire() # ← blocks until lock is free counter += 1 # critical section (临界区) lock.release() # ← always release!
threads = [threading.Thread(target=increment, args=(100_000,)) for _ in range(5)]for t in threads: t.start()for t in threads: t.join()
print(f"Counter: {counter}") # → Counter: 500000 (always correct)release() without a matching acquire() — raises RuntimeError. Always prefer the with context manager to guarantee release on exceptions.2) Context Manager — with lock (上下文管理器)
import threading
shared_list = []lock = threading.Lock()
def safe_append(value): with lock: # ← acquire on entry, release on exit (even on exception) shared_list.append(value)
threads = [threading.Thread(target=safe_append, args=(i,)) for i in range(10)]for t in threads: t.start()for t in threads: t.join()
print(sorted(shared_list)) # → [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]3) Lock.acquire(blocking=False) — Non-blocking try (非阻塞尝试)
import threading, time
lock = threading.Lock()
def try_lock(name): acquired = lock.acquire(blocking=False) if acquired: print(f"[{name}] acquired the lock") time.sleep(2) lock.release() else: print(f"[{name}] could not acquire — skipping")
t1 = threading.Thread(target=try_lock, args=("T1",))t2 = threading.Thread(target=try_lock, args=("T2",))t1.start(); t2.start()t1.join(); t2.join()# → [T1] acquired the lock# → [T2] could not acquire — skipping4) Lock.acquire(timeout=N) — Timed wait (超时等待)
import threading, time
lock = threading.Lock()lock.acquire() # pre-lock it
def worker(): result = lock.acquire(timeout=1.5) # wait max 1.5s if result: print("Got the lock") lock.release() else: print("Timed out waiting for lock")
t = threading.Thread(target=worker)t.start()t.join()# → Timed out waiting for lock (lock was never released)5) Lock.locked() — Query state (查询状态)
import threading
lock = threading.Lock()print(lock.locked()) # → False
lock.acquire()print(lock.locked()) # → True
lock.release()print(lock.locked()) # → False3. RLock — Reentrant Lock (可重入锁)
1) Basic RLock usage
import threading
rlock = threading.RLock()
def outer(): with rlock: # recursion count → 1 print("outer acquired") inner() # same thread acquires again print("outer releasing") # recursion count → 0 (fully released)
def inner(): with rlock: # recursion count → 2 print("inner acquired") # recursion count → 1
t = threading.Thread(target=outer)t.start(); t.join()# → outer acquired# → inner acquired# → outer releasingLock would DEADLOCK in the above pattern because the same thread tries to acquire an already-locked lock. Use RLock whenever a method holding the lock may call another method that also needs the lock.2) RLock in a class (类中使用RLock)
import threading
class BankAccount: def __init__(self, balance: float): self.balance = balance self._lock = threading.RLock()
def deposit(self, amount: float): with self._lock: self.balance += amount print(f"Deposited {amount:.2f} → balance={self.balance:.2f}")
def withdraw(self, amount: float): with self._lock: self.balance -= amount print(f"Withdrew {amount:.2f} → balance={self.balance:.2f}")
def transfer_in(self, amount: float): with self._lock: # outer acquire self.deposit(amount) # inner acquire (reentrant!) print(f"Transfer complete")
account = BankAccount(1000.0)t = threading.Thread(target=account.transfer_in, args=(250.0,))t.start(); t.join()# → Deposited 250.00 → balance=1250.00# → Transfer complete4. Condition — Wait/Notify Pattern (条件变量)
1) Condition.wait() / notify() / notify_all()
import threading, time, collections
# Classic Producer-Consumer (生产者-消费者) patternbuffer = collections.deque()MAX_SIZE = 3condition = threading.Condition()
def producer(): for i in range(6): with condition: while len(buffer) >= MAX_SIZE: print(f"Producer waiting — buffer full") condition.wait() # ← releases lock, blocks buffer.append(i) print(f"Produced {i} | buffer={list(buffer)}") condition.notify_all() # ← wake waiting consumers time.sleep(0.3)
def consumer(name): for _ in range(3): with condition: while not buffer: print(f"[{name}] waiting — buffer empty") condition.wait() # ← releases lock, blocks item = buffer.popleft() print(f"[{name}] consumed {item} | buffer={list(buffer)}") condition.notify_all() # ← wake waiting producer
threads = [ threading.Thread(target=producer), threading.Thread(target=consumer, args=("C1",)), threading.Thread(target=consumer, args=("C2",)),]for t in threads: t.start()for t in threads: t.join()2) Condition.wait(timeout=N) — Timed wait
import threading, time
condition = threading.Condition()data_ready = False
def waiter(): with condition: result = condition.wait(timeout=2.0) # wait max 2 seconds if result: print("Condition met!") else: print("Timed out — condition never triggered")
def notifier(): time.sleep(5) # too slow with condition: condition.notify()
t1 = threading.Thread(target=waiter)t2 = threading.Thread(target=notifier)t1.start(); t2.start()t1.join(); t2.join()# → Timed out — condition never triggered3) Condition.wait_for(predicate, timeout=None) — Predicate wait
import threading, time
items = []cond = threading.Condition()
def consumer(): with cond: # Block until at least 3 items are available cond.wait_for(lambda: len(items) >= 3) print(f"Got items: {items}")
def producer(): for i in range(5): time.sleep(0.5) with cond: items.append(i) print(f"Added item {i}") cond.notify_all()
t1 = threading.Thread(target=consumer)t2 = threading.Thread(target=producer)t1.start(); t2.start()t1.join(); t2.join()# → Added item 0# → Added item 1# → Added item 2# → Got items: [0, 1, 2]5. Semaphore & BoundedSemaphore (信号量)
acquire() decrements it (blocks at zero); release() increments it. Perfect for limiting concurrent access to a resource pool. 1) Semaphore(value=1) — Connection pool simulation (连接池模拟)
import threading, time, random
# Allow max 3 simultaneous DB connectionsdb_semaphore = threading.Semaphore(3)
def use_db_connection(thread_id): print(f"Thread {thread_id}: waiting for DB connection") with db_semaphore: # acquire (count -1) print(f"Thread {thread_id}: got connection") time.sleep(random.uniform(0.5, 1.5)) print(f"Thread {thread_id}: released connection") # release (count +1) on exit
threads = [threading.Thread(target=use_db_connection, args=(i,)) for i in range(7)]for t in threads: t.start()for t in threads: t.join()# At most 3 "got connection" lines active at any time2) BoundedSemaphore — Prevent over-release (防止超额释放)
Warning: a plain Semaphore allows release() beyond the initial value — this is usually a bug. BoundedSemaphore raises ValueError if the count would exceed the initial value.
import threading
sem = threading.Semaphore(2)bsem = threading.BoundedSemaphore(2)
# Plain Semaphore — silently over-releasessem.release() # count goes to 3 — no error (潜在bug)print(f"Semaphore value after over-release: OK (silent)")
# BoundedSemaphore — raises ValueErrortry: bsem.release() # count would exceed 2except ValueError as e: print(f"BoundedSemaphore caught: {e}")# → BoundedSemaphore caught: Semaphore released too many times3) Rate limiter pattern (限速器模式)
import threading, time
# Limit to 2 concurrent API callsapi_semaphore = threading.BoundedSemaphore(2)
def call_api(endpoint): with api_semaphore: print(f"Calling {endpoint}") time.sleep(1) # simulate API latency print(f"Done {endpoint}")
endpoints = [f"/api/resource/{i}" for i in range(6)]threads = [threading.Thread(target=call_api, args=(ep,)) for ep in endpoints]
for t in threads: t.start()for t in threads: t.join()6. Event — Simple Flag Signaling (事件信号)
wait() until the flag is set, and any thread can set() or clear() it. 1) Event.set() / Event.clear() / Event.wait() / Event.is_set()
import threading, time
start_event = threading.Event()
def worker(name): print(f"[{name}] waiting for start signal...") start_event.wait() # blocks until event is set print(f"[{name}] GO! Starting work")
workers = [threading.Thread(target=worker, args=(f"W{i}",)) for i in range(4)]for w in workers: w.start()
print("Main: preparing...")time.sleep(2)print("Main: firing start signal!")start_event.set() # wake ALL waiting threads at once
for w in workers: w.join()# → [W0] waiting for start signal...# → [W1] waiting for start signal...# → [W2] waiting for start signal...# → [W3] waiting for start signal...# (2s pause)# → Main: firing start signal!# → [W0] GO! Starting work (all 4 unblock simultaneously)2) Event.wait(timeout=N) — Timed wait
import threading, time
ready = threading.Event()
def service(): print("Service: initializing (takes 3s)...") time.sleep(3) ready.set() print("Service: ready!")
def client(): if ready.wait(timeout=1.5): # only wait 1.5s print("Client: connected!") else: print("Client: service not ready in time, aborting")
t1 = threading.Thread(target=service)t2 = threading.Thread(target=client)t1.start(); t2.start()t1.join(); t2.join()# → Service: initializing (takes 3s)...# → Client: service not ready in time, aborting# → Service: ready!3) Stop signal pattern (停止信号模式)
import threading, time
stop_event = threading.Event()
def background_worker(): count = 0 while not stop_event.is_set(): # check flag each iteration print(f"Working... iteration {count}") count += 1 time.sleep(0.5) print("Worker: received stop signal, exiting cleanly")
t = threading.Thread(target=background_worker)t.start()
time.sleep(2)print("Main: sending stop signal")stop_event.set()t.join()7. Timer — Delayed Execution (延迟执行)
threading.Timer is a subclass of Thread that executes a function after a specified delay. It can be cancelled before firing. 1) Basic Timer
import threading
def reminder(message): print(f"⏰ Reminder: {message}")
# Fire after 3 secondst = threading.Timer(3.0, reminder, args=("Meeting at 3pm!",))t.start()
print("Timer set. Waiting...")t.join()# → Timer set. Waiting...# (3s pause)# → ⏰ Reminder: Meeting at 3pm!2) Timer.cancel() — Cancel before firing
import threading, time
fired = False
def action(): global fired fired = True print("Action fired!")
t = threading.Timer(5.0, action)t.start()
time.sleep(1)t.cancel() # ← cancel within the windowt.join()
print(f"Action fired: {fired}") # → Action fired: False3) Repeating timer pattern (重复定时器模式)
import threading
class RepeatingTimer: """Fires a function every `interval` seconds."""
def __init__(self, interval: float, func, *args): self.interval = interval self.func = func self.args = args self._timer = None self._running = False
def _run(self): self.func(*self.args) if self._running: self._schedule()
def _schedule(self): self._timer = threading.Timer(self.interval, self._run) self._timer.daemon = True self._timer.start()
def start(self): self._running = True self._schedule()
def stop(self): self._running = False if self._timer: self._timer.cancel()
import time
counter = [0]def tick(): counter[0] += 1 print(f"Tick #{counter[0]}")
rt = RepeatingTimer(0.5, tick)rt.start()time.sleep(2.5)rt.stop()print(f"Total ticks: {counter[0]}") # → Total ticks: 58. Barrier — Thread Synchronization Point (屏障同步点)
1) Barrier(parties, action=None, timeout=None)
import threading, time, random
NUM_WORKERS = 4barrier = threading.Barrier(NUM_WORKERS)
def phase_worker(name): # Phase 1 duration = random.uniform(0.5, 2.0) print(f"[{name}] phase 1 working for {duration:.1f}s") time.sleep(duration) print(f"[{name}] phase 1 done — waiting at barrier")
barrier.wait() # ← all threads block here until all 4 arrive
print(f"[{name}] phase 2 starting (all threads released together)")
threads = [threading.Thread(target=phase_worker, args=(f"W{i}",)) for i in range(NUM_WORKERS)]for t in threads: t.start()for t in threads: t.join()2) Barrier with action callback
import threading, time
def setup_phase(): """Runs ONCE when all threads reach the barrier, before release.""" print(">>> All threads ready — running barrier action <<<")
barrier = threading.Barrier(3, action=setup_phase)
def worker(name): time.sleep(0.1) print(f"[{name}] arrived at barrier") barrier.wait() print(f"[{name}] past barrier")
threads = [threading.Thread(target=worker, args=(f"T{i}",)) for i in range(3)]for t in threads: t.start()for t in threads: t.join()3) Barrier.abort() / BrokenBarrierError
import threading, time
barrier = threading.Barrier(3)
def risky_worker(name, should_abort): try: if should_abort: time.sleep(0.2) print(f"[{name}] aborting barrier!") barrier.abort() # breaks the barrier for everyone else: print(f"[{name}] waiting at barrier...") barrier.wait(timeout=2) print(f"[{name}] passed!") except threading.BrokenBarrierError: print(f"[{name}] barrier was broken — handling gracefully")
threads = [ threading.Thread(target=risky_worker, args=("T0", False)), threading.Thread(target=risky_worker, args=("T1", False)), threading.Thread(target=risky_worker, args=("T2", True)), # aborts]for t in threads: t.start()for t in threads: t.join()4) Barrier properties
import threading
b = threading.Barrier(5)print(b.parties) # → 5 (total threads needed)print(b.n_waiting) # → 0 (currently waiting)print(b.broken) # → False9. local — Thread-local Storage (线程本地存储)
threading.local() creates an object where each thread has its own independent copy of every attribute. Ideal for thread-specific state like database connections or request contexts. 1) Basic thread-local usage
import threading
local_data = threading.local()
def worker(value): local_data.x = value # each thread sets its own .x import time; time.sleep(0.1) # let other threads run print(f"Thread {threading.current_thread().name}: x = {local_data.x}")
threads = [threading.Thread(target=worker, args=(i*10,), name=f"T{i}") for i in range(4)]for t in threads: t.start()for t in threads: t.join()# → Thread T0: x = 0# → Thread T1: x = 10# → Thread T2: x = 20# → Thread T3: x = 30# (each thread sees only its own value — no interference)2) Thread-local DB connection pattern
import threadingimport sqlite3
_local = threading.local()
def get_connection(db_path: str) -> sqlite3.Connection: """Return a per-thread DB connection (创建线程私有数据库连接).""" if not hasattr(_local, "conn"): _local.conn = sqlite3.connect(db_path) print(f"[{threading.current_thread().name}] created new connection") return _local.conn
def db_worker(db_path: str): conn = get_connection(db_path) conn.execute("CREATE TABLE IF NOT EXISTS t (v INTEGER)") conn.execute("INSERT INTO t VALUES (?)", (threading.get_ident(),)) conn.commit() print(f"[{threading.current_thread().name}] inserted row")
threads = [threading.Thread(target=db_worker, args=(":memory:",), name=f"DB-{i}") for i in range(3)]for t in threads: t.start()for t in threads: t.join()3) Subclass local for initialization
import threading
class RequestContext(threading.local): """Thread-local request context with defaults.""" def __init__(self): super().__init__() self.user_id = None self.request_id = None
ctx = RequestContext()
def handle_request(user_id, req_id): ctx.user_id = user_id ctx.request_id = req_id import time; time.sleep(0.05) print(f"Processing request {ctx.request_id} for user {ctx.user_id}")
threads = [threading.Thread(target=handle_request, args=(f"user{i}", f"req-{i:03}")) for i in range(4)]for t in threads: t.start()for t in threads: t.join()10. Module-level Functions (模块级函数)
1) threading.current_thread() — Get the current thread
import threading
def show_self(): t = threading.current_thread() print(f"name={t.name}, ident={t.ident}, daemon={t.daemon}")
main_t = threading.current_thread()print(f"Main thread: {main_t.name}")
t = threading.Thread(target=show_self, name="MyWorker")t.start(); t.join()# → Main thread: MainThread# → name=MyWorker, ident=140..., daemon=False2) threading.main_thread() — Get the main thread
import threading
def check_main(): mt = threading.main_thread() ct = threading.current_thread() print(f"Main thread: {mt.name}") print(f"This thread: {ct.name}") print(f"Am I main? {ct is mt}")
t = threading.Thread(target=check_main)t.start(); t.join()# → Main thread: MainThread# → This thread: Thread-1# → Am I main? False3) threading.active_count() — Count live threads
import threading, time
def slow(): time.sleep(2)
print(threading.active_count()) # → 1 (main only)
threads = [threading.Thread(target=slow) for _ in range(3)]for t in threads: t.start()
print(threading.active_count()) # → 4 (main + 3 workers)for t in threads: t.join()print(threading.active_count()) # → 14) threading.enumerate() — List all live threads
import threading, time
def task(n): time.sleep(n)
threads = [threading.Thread(target=task, args=(i,), name=f"T{i}") for i in range(1,4)]for t in threads: t.start()
for t in threading.enumerate(): print(f" alive: {t.name} | daemon={t.daemon}")# → alive: MainThread | daemon=False# → alive: T1 | daemon=False# → alive: T2 | daemon=False# → alive: T3 | daemon=False
for t in threads: t.join()5) threading.settrace(func) / threading.setprofile(func) — Thread hooks
import threading, sys
def my_tracer(frame, event, arg): if event == "call": print(f"[TRACE] calling {frame.f_code.co_name}") return my_tracer
def task(): x = 1 + 1 return x
threading.settrace(my_tracer) # set trace for ALL future threadst = threading.Thread(target=task)t.start(); t.join()threading.settrace(None) # remove tracer6) threading.stack_size(size=0) — Set thread stack size
import threading
# Set stack size to 512 KB for all future threadsthreading.stack_size(512 * 1024)print(f"Stack size: {threading.stack_size()} bytes")
def task(): print(f"Running with custom stack size")
t = threading.Thread(target=task)t.start(); t.join()
threading.stack_size(0) # reset to default7) threading.excepthook — Handle uncaught thread exceptions (未捕获异常处理)
import threading
def custom_excepthook(args): print(f"Uncaught exception in thread [{args.thread.name}]:") print(f" Type: {args.exc_type.__name__}") print(f" Message: {args.exc_value}")
threading.excepthook = custom_excepthook
def buggy_task(): raise ValueError("Something went wrong in thread!")
t = threading.Thread(target=buggy_task, name="BuggyThread")t.start(); t.join()# → Uncaught exception in thread [BuggyThread]:# → Type: ValueError# → Message: Something went wrong in thread!8) threading.get_ident() / threading.get_native_id()
import threading
def show_ids(): print(f"Python ident: {threading.get_ident()}") print(f"OS native id: {threading.get_native_id()}")
t = threading.Thread(target=show_ids)t.start(); t.join()11. queue Module — Thread-safe Queues (线程安全队列)
queue module provides three thread-safe queue classes: Queue (FIFO), LifoQueue (LIFO/stack), and PriorityQueue (优先队列). All use internal locks, so no external synchronization is needed. 1) Queue(maxsize=0) — FIFO Queue
from queue import Queueimport threading, time
q = Queue(maxsize=3)
def producer(): for i in range(6): q.put(i) # blocks if queue is full (maxsize reached) print(f"Put {i} | qsize={q.qsize()}") time.sleep(0.2)
def consumer(): for _ in range(6): item = q.get() # blocks if queue is empty print(f"Got {item}") q.task_done() time.sleep(0.5)
t1 = threading.Thread(target=producer)t2 = threading.Thread(target=consumer)t1.start(); t2.start()t1.join(); t2.join()2) Queue.put_nowait() / Queue.get_nowait() — Non-blocking
from queue import Queue, Full, Empty
q = Queue(maxsize=2)q.put("item1")q.put("item2")
try: q.put_nowait("item3") # queue full!except Full: print("Queue full — item3 dropped")
try: while True: print(q.get_nowait())except Empty: print("Queue emptied")# → Queue full — item3 dropped# → item1# → item2# → Queue emptied3) Queue.join() / Queue.task_done() — Work tracking
from queue import Queueimport threading
work_queue = Queue()
def worker(): while True: task = work_queue.get() if task is None: break print(f"Processing: {task}") work_queue.task_done() # signal this task is complete
# Start 3 workersworkers = [threading.Thread(target=worker, daemon=True) for _ in range(3)]for w in workers: w.start()
# Enqueue tasksfor task in ["task_A", "task_B", "task_C", "task_D", "task_E"]: work_queue.put(task)
work_queue.join() # blocks until ALL task_done() calledprint("All tasks completed!")4) LifoQueue — Stack (栈/后进先出)
from queue import LifoQueue
stack = LifoQueue()stack.put("first")stack.put("second")stack.put("third")
while not stack.empty(): print(stack.get())# → third# → second# → first5) PriorityQueue — Priority-based processing (优先级队列)
from queue import PriorityQueueimport threading, time
pq = PriorityQueue()
# (priority, task_name) — lower number = higher prioritypq.put((3, "low-priority task"))pq.put((1, "URGENT task"))pq.put((2, "medium task"))pq.put((1, "another URGENT task"))
while not pq.empty(): priority, task = pq.get() print(f"[priority={priority}] Processing: {task}")# → [priority=1] Processing: URGENT task# → [priority=1] Processing: another URGENT task# → [priority=2] Processing: medium task# → [priority=3] Processing: low-priority task12. ThreadPoolExecutor — High-level Thread Pool (高级线程池)
concurrent.futures.ThreadPoolExecutor provides a high-level, Future-based (Future对象) interface for thread pools. It is the recommended way to run IO-bound tasks in modern Python. 1) submit() → Future
from concurrent.futures import ThreadPoolExecutorimport time
def fetch_data(url: str) -> str: time.sleep(1) # simulate network call return f"<data from {url}>"
urls = [f"http://example.com/page{i}" for i in range(5)]
with ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(fetch_data, url) for url in urls]
for future in futures: result = future.result() # blocks until this future completes print(result)2) map() — Parallel map (并行映射)
from concurrent.futures import ThreadPoolExecutorimport time
def square(n): time.sleep(0.2) return n * n
with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(square, range(10)))
print(results) # → [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]3) Future API — done(), cancel(), add_done_callback()
from concurrent.futures import ThreadPoolExecutorimport time
def slow_task(n): time.sleep(n) return f"result-{n}"
def on_done(future): print(f"Callback: task finished → {future.result()}")
with ThreadPoolExecutor(max_workers=2) as executor: f1 = executor.submit(slow_task, 1) f2 = executor.submit(slow_task, 2)
f1.add_done_callback(on_done) # register callback f2.add_done_callback(on_done)
print(f"f1 done: {f1.done()}") # likely False (still running) time.sleep(1.5) print(f"f1 done: {f1.done()}") # → True4) as_completed() — Process in completion order (按完成顺序处理)
from concurrent.futures import ThreadPoolExecutor, as_completedimport time, random
def task(n): delay = random.uniform(0.1, 1.0) time.sleep(delay) return (n, delay)
with ThreadPoolExecutor(max_workers=5) as executor: futures = {executor.submit(task, i): i for i in range(8)}
for future in as_completed(futures): task_id = futures[future] n, delay = future.result() print(f"Task {n} finished in {delay:.2f}s")# Tasks print in the order they complete, not submission order5) Exception handling in futures (Future异常处理)
from concurrent.futures import ThreadPoolExecutor
def risky(x): if x == 3: raise ValueError(f"Bad input: {x}") return x * 2
with ThreadPoolExecutor(max_workers=2) as executor: futures = [executor.submit(risky, i) for i in range(5)]
for i, f in enumerate(futures): try: print(f"Result {i}: {f.result()}") except ValueError as e: print(f"Result {i}: ERROR — {e}")# → Result 0: 0# → Result 1: 2# → Result 2: 4# → Result 3: ERROR — Bad input: 3# → Result 4: 813. Common Patterns & Pitfalls (常见模式与陷阱)
1) Race condition example (竞态条件示例)
import threading
counter = 0 # UNSAFE shared state
def unsafe_increment(): global counter for _ in range(100_000): counter += 1 # NOT atomic! (read-modify-write)
threads = [threading.Thread(target=unsafe_increment) for _ in range(5)]for t in threads: t.start()for t in threads: t.join()
print(f"Expected: 500000")print(f"Actual: {counter}") # likely LESS than 500000 — data race!2) Deadlock example + fix (死锁示例及修复)
import threading
lock_a = threading.Lock()lock_b = threading.Lock()
# ─── DEADLOCK version ────────────────────────────────def thread1_deadlock(): with lock_a: import time; time.sleep(0.1) with lock_b: # waits for lock_b print("T1: got both locks")
def thread2_deadlock(): with lock_b: import time; time.sleep(0.1) with lock_a: # waits for lock_a → DEADLOCK print("T2: got both locks")
# ─── FIXED version: always acquire locks in the same order ──def thread1_safe(): with lock_a: # acquire A first with lock_b: # then B print("T1 safe: got both locks")
def thread2_safe(): with lock_a: # acquire A first (same order!) with lock_b: print("T2 safe: got both locks")
t1 = threading.Thread(target=thread1_safe)t2 = threading.Thread(target=thread2_safe)t1.start(); t2.start()t1.join(); t2.join()# → T1 safe: got both locks# → T2 safe: got both locks3) Thread-safe singleton (线程安全单例)
import threading
class Singleton: _instance = None _lock = threading.Lock()
def __new__(cls): if cls._instance is None: # first check (no lock) with cls._lock: if cls._instance is None: # second check (with lock) cls._instance = super().__new__(cls) print("Singleton created") return cls._instance
def get_instance(): s = Singleton() print(f"Got instance: {id(s)}")
threads = [threading.Thread(target=get_instance) for _ in range(5)]for t in threads: t.start()for t in threads: t.join()# → Singleton created (exactly once)# → Got instance: 140... (same id for all 5 threads)14. Full API Quick Reference (API速查表)
| Class / Function | Key Methods | Purpose |
|---|---|---|
Thread | start() join() is_alive() | Create and manage threads |
Lock | acquire() release() locked() | Mutual exclusion |
RLock | acquire() release() | Reentrant mutual exclusion |
Condition | wait() wait_for() notify() notify_all() | Wait/notify synchronization |
Semaphore | acquire() release() | Limit concurrent access |
BoundedSemaphore | acquire() release() | Semaphore with over-release guard |
Event | set() clear() wait() is_set() | Boolean flag signaling |
Timer | start() cancel() | Delayed / cancellable execution |
Barrier | wait() abort() reset() | N-thread rendezvous point |
local | attribute access | Per-thread storage |
Queue | put() get() task_done() join() | Thread-safe FIFO queue |
LifoQueue | put() get() | Thread-safe stack |
PriorityQueue | put() get() | Thread-safe priority queue |
ThreadPoolExecutor | submit() map() shutdown() | High-level thread pool |
current_thread() | — | Get current Thread object |
active_count() | — | Count live threads |
enumerate() | — | List all live threads |
excepthook | — | Handle uncaught thread exceptions |
Python threading excels at IO-bound concurrency (IO密集型并发): use
ThreadPoolExecutor for simple task pools, Queue for producer-consumer pipelines, Lock/RLock for shared state, Event for signaling, Semaphore for resource pools, and Barrier for multi-phase synchronization — always protect shared mutable state to avoid Race Conditions (竞态条件) and Deadlocks (死锁). I. Python Multithreading — Complete API Reference Manual
_thread module. Because of the GIL (Global Interpreter Lock, 全局解释器锁), threads do not achieve true CPU parallelism for pure Python code — but they excel at IO-bound tasks (IO密集型任务) such as network requests, file operations, and database calls. This manual covers every public API with runnable examples. 1. Thread — Core Thread Object (核心线程对象)
threading.Thread is the fundamental building block. A thread can be created by passing a callable target or by subclassing and overriding run(). 1) Constructor (构造函数)
threading.Thread( group=None, # reserved, always None target=None, # callable to run in thread name=None, # thread name string args=(), # positional args tuple for target kwargs=None, # keyword args dict for target daemon=None # True → daemon thread (守护线程))2) Thread.start() — Launch the thread
Schedules the thread for execution. Must be called exactly once per Thread object.
import threadingimport time
def worker(name, delay): time.sleep(delay) print(f"[{name}] finished after {delay}s")
t1 = threading.Thread(target=worker, args=("Alpha", 1))t2 = threading.Thread(target=worker, args=("Beta", 2))
t1.start() # ← launches t1t2.start() # ← launches t2 concurrently
print("Main thread continues immediately")# Output order (non-deterministic):# Main thread continues immediately# [Alpha] finished after 1s# [Beta] finished after 2sstart() twice on the same Thread raises RuntimeError. If you need to rerun a task, create a new Thread instance.3) Thread.join(timeout=None) — Wait for completion (等待线程结束)
Blocks the calling thread until the target thread terminates, or until timeout seconds elapse.
import threading, time
def slow_task(): print("Task started") time.sleep(3) print("Task done")
t = threading.Thread(target=slow_task)t.start()
t.join(timeout=5) # wait up to 5 seconds
if t.is_alive(): print("Thread still running after timeout!")else: print("Thread completed successfully")# → Task started# → Task done# → Thread completed successfully4) Thread.is_alive() — Check thread status (检查线程状态)
Returns True between start() and thread termination.
import threading, time
def task(): time.sleep(2)
t = threading.Thread(target=task)print(t.is_alive()) # → False (not started yet)t.start()print(t.is_alive()) # → True (running)t.join()print(t.is_alive()) # → False (terminated)5) Thread.name / Thread.getName() / Thread.setName() — Thread name (线程名)
import threading
def task(): # Access name inside the thread print(f"Running as: {threading.current_thread().name}")
t = threading.Thread(target=task, name="WorkerThread-1")print(t.name) # → WorkerThread-1t.setName("Renamed")print(t.getName()) # → Renamedt.start()t.join()# → Running as: Renamed6) Thread.daemon — Daemon threads (守护线程)
A daemon thread is automatically killed when ALL non-daemon threads exit — it does NOT block program shutdown.
import threading, time
def background_monitor(): while True: print("[Monitor] heartbeat") time.sleep(1)
# Must set daemon BEFORE start()monitor = threading.Thread(target=background_monitor, daemon=True)monitor.start()
print("Main: doing work")time.sleep(2.5)print("Main: exiting — monitor will be killed automatically")# → [Monitor] heartbeat# → Main: doing work# → [Monitor] heartbeat# → [Monitor] heartbeat# → Main: exiting — monitor will be killed automatically7) Thread.ident / Thread.native_id — Thread identifiers (线程标识符)
import threading
def show_ids(): t = threading.current_thread() print(f"ident={t.ident}, native_id={t.native_id}")
t = threading.Thread(target=show_ids)t.start()t.join()# → ident=140234567890, native_id=12345
print(f"Main ident: {threading.main_thread().ident}")8) Subclass Pattern — Override run() (子类模式)
import threading, time
class DownloadThread(threading.Thread): """Custom thread that downloads a resource."""
def __init__(self, url: str): super().__init__(name=f"Download-{url}") self.url = url self.result = None
def run(self): # Simulate download time.sleep(0.5) self.result = f"<html from {self.url}>" print(f"Downloaded: {self.url}")
threads = [DownloadThread(f"http://example.com/page{i}") for i in range(3)]
for t in threads: t.start()
for t in threads: t.join() print(f"Result: {t.result}")2. Lock — Mutual Exclusion (互斥锁)
1) Lock.acquire(blocking=True, timeout=-1) / Lock.release()
import threading
counter = 0lock = threading.Lock()
def increment(n): global counter for _ in range(n): lock.acquire() # ← blocks until lock is free counter += 1 # critical section (临界区) lock.release() # ← always release!
threads = [threading.Thread(target=increment, args=(100_000,)) for _ in range(5)]for t in threads: t.start()for t in threads: t.join()
print(f"Counter: {counter}") # → Counter: 500000 (always correct)release() without a matching acquire() — raises RuntimeError. Always prefer the with context manager to guarantee release on exceptions.2) Context Manager — with lock (上下文管理器)
import threading
shared_list = []lock = threading.Lock()
def safe_append(value): with lock: # ← acquire on entry, release on exit (even on exception) shared_list.append(value)
threads = [threading.Thread(target=safe_append, args=(i,)) for i in range(10)]for t in threads: t.start()for t in threads: t.join()
print(sorted(shared_list)) # → [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]3) Lock.acquire(blocking=False) — Non-blocking try (非阻塞尝试)
import threading, time
lock = threading.Lock()
def try_lock(name): acquired = lock.acquire(blocking=False) if acquired: print(f"[{name}] acquired the lock") time.sleep(2) lock.release() else: print(f"[{name}] could not acquire — skipping")
t1 = threading.Thread(target=try_lock, args=("T1",))t2 = threading.Thread(target=try_lock, args=("T2",))t1.start(); t2.start()t1.join(); t2.join()# → [T1] acquired the lock# → [T2] could not acquire — skipping4) Lock.acquire(timeout=N) — Timed wait (超时等待)
import threading, time
lock = threading.Lock()lock.acquire() # pre-lock it
def worker(): result = lock.acquire(timeout=1.5) # wait max 1.5s if result: print("Got the lock") lock.release() else: print("Timed out waiting for lock")
t = threading.Thread(target=worker)t.start()t.join()# → Timed out waiting for lock (lock was never released)5) Lock.locked() — Query state (查询状态)
import threading
lock = threading.Lock()print(lock.locked()) # → False
lock.acquire()print(lock.locked()) # → True
lock.release()print(lock.locked()) # → False3. RLock — Reentrant Lock (可重入锁)
1) Basic RLock usage
import threading
rlock = threading.RLock()
def outer(): with rlock: # recursion count → 1 print("outer acquired") inner() # same thread acquires again print("outer releasing") # recursion count → 0 (fully released)
def inner(): with rlock: # recursion count → 2 print("inner acquired") # recursion count → 1
t = threading.Thread(target=outer)t.start(); t.join()# → outer acquired# → inner acquired# → outer releasingLock would DEADLOCK in the above pattern because the same thread tries to acquire an already-locked lock. Use RLock whenever a method holding the lock may call another method that also needs the lock.2) RLock in a class (类中使用RLock)
import threading
class BankAccount: def __init__(self, balance: float): self.balance = balance self._lock = threading.RLock()
def deposit(self, amount: float): with self._lock: self.balance += amount print(f"Deposited {amount:.2f} → balance={self.balance:.2f}")
def withdraw(self, amount: float): with self._lock: self.balance -= amount print(f"Withdrew {amount:.2f} → balance={self.balance:.2f}")
def transfer_in(self, amount: float): with self._lock: # outer acquire self.deposit(amount) # inner acquire (reentrant!) print(f"Transfer complete")
account = BankAccount(1000.0)t = threading.Thread(target=account.transfer_in, args=(250.0,))t.start(); t.join()# → Deposited 250.00 → balance=1250.00# → Transfer complete4. Condition — Wait/Notify Pattern (条件变量)
1) Condition.wait() / notify() / notify_all()
import threading, time, collections
# Classic Producer-Consumer (生产者-消费者) patternbuffer = collections.deque()MAX_SIZE = 3condition = threading.Condition()
def producer(): for i in range(6): with condition: while len(buffer) >= MAX_SIZE: print(f"Producer waiting — buffer full") condition.wait() # ← releases lock, blocks buffer.append(i) print(f"Produced {i} | buffer={list(buffer)}") condition.notify_all() # ← wake waiting consumers time.sleep(0.3)
def consumer(name): for _ in range(3): with condition: while not buffer: print(f"[{name}] waiting — buffer empty") condition.wait() # ← releases lock, blocks item = buffer.popleft() print(f"[{name}] consumed {item} | buffer={list(buffer)}") condition.notify_all() # ← wake waiting producer
threads = [ threading.Thread(target=producer), threading.Thread(target=consumer, args=("C1",)), threading.Thread(target=consumer, args=("C2",)),]for t in threads: t.start()for t in threads: t.join()2) Condition.wait(timeout=N) — Timed wait
import threading, time
condition = threading.Condition()data_ready = False
def waiter(): with condition: result = condition.wait(timeout=2.0) # wait max 2 seconds if result: print("Condition met!") else: print("Timed out — condition never triggered")
def notifier(): time.sleep(5) # too slow with condition: condition.notify()
t1 = threading.Thread(target=waiter)t2 = threading.Thread(target=notifier)t1.start(); t2.start()t1.join(); t2.join()# → Timed out — condition never triggered3) Condition.wait_for(predicate, timeout=None) — Predicate wait
import threading, time
items = []cond = threading.Condition()
def consumer(): with cond: # Block until at least 3 items are available cond.wait_for(lambda: len(items) >= 3) print(f"Got items: {items}")
def producer(): for i in range(5): time.sleep(0.5) with cond: items.append(i) print(f"Added item {i}") cond.notify_all()
t1 = threading.Thread(target=consumer)t2 = threading.Thread(target=producer)t1.start(); t2.start()t1.join(); t2.join()# → Added item 0# → Added item 1# → Added item 2# → Got items: [0, 1, 2]5. Semaphore & BoundedSemaphore (信号量)
acquire() decrements it (blocks at zero); release() increments it. Perfect for limiting concurrent access to a resource pool. 1) Semaphore(value=1) — Connection pool simulation (连接池模拟)
import threading, time, random
# Allow max 3 simultaneous DB connectionsdb_semaphore = threading.Semaphore(3)
def use_db_connection(thread_id): print(f"Thread {thread_id}: waiting for DB connection") with db_semaphore: # acquire (count -1) print(f"Thread {thread_id}: got connection") time.sleep(random.uniform(0.5, 1.5)) print(f"Thread {thread_id}: released connection") # release (count +1) on exit
threads = [threading.Thread(target=use_db_connection, args=(i,)) for i in range(7)]for t in threads: t.start()for t in threads: t.join()# At most 3 "got connection" lines active at any time2) BoundedSemaphore — Prevent over-release (防止超额释放)
Warning: a plain Semaphore allows release() beyond the initial value — this is usually a bug. BoundedSemaphore raises ValueError if the count would exceed the initial value.
import threading
sem = threading.Semaphore(2)bsem = threading.BoundedSemaphore(2)
# Plain Semaphore — silently over-releasessem.release() # count goes to 3 — no error (潜在bug)print(f"Semaphore value after over-release: OK (silent)")
# BoundedSemaphore — raises ValueErrortry: bsem.release() # count would exceed 2except ValueError as e: print(f"BoundedSemaphore caught: {e}")# → BoundedSemaphore caught: Semaphore released too many times3) Rate limiter pattern (限速器模式)
import threading, time
# Limit to 2 concurrent API callsapi_semaphore = threading.BoundedSemaphore(2)
def call_api(endpoint): with api_semaphore: print(f"Calling {endpoint}") time.sleep(1) # simulate API latency print(f"Done {endpoint}")
endpoints = [f"/api/resource/{i}" for i in range(6)]threads = [threading.Thread(target=call_api, args=(ep,)) for ep in endpoints]
for t in threads: t.start()for t in threads: t.join()6. Event — Simple Flag Signaling (事件信号)
wait() until the flag is set, and any thread can set() or clear() it. 1) Event.set() / Event.clear() / Event.wait() / Event.is_set()
import threading, time
start_event = threading.Event()
def worker(name): print(f"[{name}] waiting for start signal...") start_event.wait() # blocks until event is set print(f"[{name}] GO! Starting work")
workers = [threading.Thread(target=worker, args=(f"W{i}",)) for i in range(4)]for w in workers: w.start()
print("Main: preparing...")time.sleep(2)print("Main: firing start signal!")start_event.set() # wake ALL waiting threads at once
for w in workers: w.join()# → [W0] waiting for start signal...# → [W1] waiting for start signal...# → [W2] waiting for start signal...# → [W3] waiting for start signal...# (2s pause)# → Main: firing start signal!# → [W0] GO! Starting work (all 4 unblock simultaneously)2) Event.wait(timeout=N) — Timed wait
import threading, time
ready = threading.Event()
def service(): print("Service: initializing (takes 3s)...") time.sleep(3) ready.set() print("Service: ready!")
def client(): if ready.wait(timeout=1.5): # only wait 1.5s print("Client: connected!") else: print("Client: service not ready in time, aborting")
t1 = threading.Thread(target=service)t2 = threading.Thread(target=client)t1.start(); t2.start()t1.join(); t2.join()# → Service: initializing (takes 3s)...# → Client: service not ready in time, aborting# → Service: ready!3) Stop signal pattern (停止信号模式)
import threading, time
stop_event = threading.Event()
def background_worker(): count = 0 while not stop_event.is_set(): # check flag each iteration print(f"Working... iteration {count}") count += 1 time.sleep(0.5) print("Worker: received stop signal, exiting cleanly")
t = threading.Thread(target=background_worker)t.start()
time.sleep(2)print("Main: sending stop signal")stop_event.set()t.join()7. Timer — Delayed Execution (延迟执行)
threading.Timer is a subclass of Thread that executes a function after a specified delay. It can be cancelled before firing. 1) Basic Timer
import threading
def reminder(message): print(f"⏰ Reminder: {message}")
# Fire after 3 secondst = threading.Timer(3.0, reminder, args=("Meeting at 3pm!",))t.start()
print("Timer set. Waiting...")t.join()# → Timer set. Waiting...# (3s pause)# → ⏰ Reminder: Meeting at 3pm!2) Timer.cancel() — Cancel before firing
import threading, time
fired = False
def action(): global fired fired = True print("Action fired!")
t = threading.Timer(5.0, action)t.start()
time.sleep(1)t.cancel() # ← cancel within the windowt.join()
print(f"Action fired: {fired}") # → Action fired: False3) Repeating timer pattern (重复定时器模式)
import threading
class RepeatingTimer: """Fires a function every `interval` seconds."""
def __init__(self, interval: float, func, *args): self.interval = interval self.func = func self.args = args self._timer = None self._running = False
def _run(self): self.func(*self.args) if self._running: self._schedule()
def _schedule(self): self._timer = threading.Timer(self.interval, self._run) self._timer.daemon = True self._timer.start()
def start(self): self._running = True self._schedule()
def stop(self): self._running = False if self._timer: self._timer.cancel()
import time
counter = [0]def tick(): counter[0] += 1 print(f"Tick #{counter[0]}")
rt = RepeatingTimer(0.5, tick)rt.start()time.sleep(2.5)rt.stop()print(f"Total ticks: {counter[0]}") # → Total ticks: 58. Barrier — Thread Synchronization Point (屏障同步点)
1) Barrier(parties, action=None, timeout=None)
import threading, time, random
NUM_WORKERS = 4barrier = threading.Barrier(NUM_WORKERS)
def phase_worker(name): # Phase 1 duration = random.uniform(0.5, 2.0) print(f"[{name}] phase 1 working for {duration:.1f}s") time.sleep(duration) print(f"[{name}] phase 1 done — waiting at barrier")
barrier.wait() # ← all threads block here until all 4 arrive
print(f"[{name}] phase 2 starting (all threads released together)")
threads = [threading.Thread(target=phase_worker, args=(f"W{i}",)) for i in range(NUM_WORKERS)]for t in threads: t.start()for t in threads: t.join()2) Barrier with action callback
import threading, time
def setup_phase(): """Runs ONCE when all threads reach the barrier, before release.""" print(">>> All threads ready — running barrier action <<<")
barrier = threading.Barrier(3, action=setup_phase)
def worker(name): time.sleep(0.1) print(f"[{name}] arrived at barrier") barrier.wait() print(f"[{name}] past barrier")
threads = [threading.Thread(target=worker, args=(f"T{i}",)) for i in range(3)]for t in threads: t.start()for t in threads: t.join()3) Barrier.abort() / BrokenBarrierError
import threading, time
barrier = threading.Barrier(3)
def risky_worker(name, should_abort): try: if should_abort: time.sleep(0.2) print(f"[{name}] aborting barrier!") barrier.abort() # breaks the barrier for everyone else: print(f"[{name}] waiting at barrier...") barrier.wait(timeout=2) print(f"[{name}] passed!") except threading.BrokenBarrierError: print(f"[{name}] barrier was broken — handling gracefully")
threads = [ threading.Thread(target=risky_worker, args=("T0", False)), threading.Thread(target=risky_worker, args=("T1", False)), threading.Thread(target=risky_worker, args=("T2", True)), # aborts]for t in threads: t.start()for t in threads: t.join()4) Barrier properties
import threading
b = threading.Barrier(5)print(b.parties) # → 5 (total threads needed)print(b.n_waiting) # → 0 (currently waiting)print(b.broken) # → False9. local — Thread-local Storage (线程本地存储)
threading.local() creates an object where each thread has its own independent copy of every attribute. Ideal for thread-specific state like database connections or request contexts. 1) Basic thread-local usage
import threading
local_data = threading.local()
def worker(value): local_data.x = value # each thread sets its own .x import time; time.sleep(0.1) # let other threads run print(f"Thread {threading.current_thread().name}: x = {local_data.x}")
threads = [threading.Thread(target=worker, args=(i*10,), name=f"T{i}") for i in range(4)]for t in threads: t.start()for t in threads: t.join()# → Thread T0: x = 0# → Thread T1: x = 10# → Thread T2: x = 20# → Thread T3: x = 30# (each thread sees only its own value — no interference)2) Thread-local DB connection pattern
import threadingimport sqlite3
_local = threading.local()
def get_connection(db_path: str) -> sqlite3.Connection: """Return a per-thread DB connection (创建线程私有数据库连接).""" if not hasattr(_local, "conn"): _local.conn = sqlite3.connect(db_path) print(f"[{threading.current_thread().name}] created new connection") return _local.conn
def db_worker(db_path: str): conn = get_connection(db_path) conn.execute("CREATE TABLE IF NOT EXISTS t (v INTEGER)") conn.execute("INSERT INTO t VALUES (?)", (threading.get_ident(),)) conn.commit() print(f"[{threading.current_thread().name}] inserted row")
threads = [threading.Thread(target=db_worker, args=(":memory:",), name=f"DB-{i}") for i in range(3)]for t in threads: t.start()for t in threads: t.join()3) Subclass local for initialization
import threading
class RequestContext(threading.local): """Thread-local request context with defaults.""" def __init__(self): super().__init__() self.user_id = None self.request_id = None
ctx = RequestContext()
def handle_request(user_id, req_id): ctx.user_id = user_id ctx.request_id = req_id import time; time.sleep(0.05) print(f"Processing request {ctx.request_id} for user {ctx.user_id}")
threads = [threading.Thread(target=handle_request, args=(f"user{i}", f"req-{i:03}")) for i in range(4)]for t in threads: t.start()for t in threads: t.join()10. Module-level Functions (模块级函数)
1) threading.current_thread() — Get the current thread
import threading
def show_self(): t = threading.current_thread() print(f"name={t.name}, ident={t.ident}, daemon={t.daemon}")
main_t = threading.current_thread()print(f"Main thread: {main_t.name}")
t = threading.Thread(target=show_self, name="MyWorker")t.start(); t.join()# → Main thread: MainThread# → name=MyWorker, ident=140..., daemon=False2) threading.main_thread() — Get the main thread
import threading
def check_main(): mt = threading.main_thread() ct = threading.current_thread() print(f"Main thread: {mt.name}") print(f"This thread: {ct.name}") print(f"Am I main? {ct is mt}")
t = threading.Thread(target=check_main)t.start(); t.join()# → Main thread: MainThread# → This thread: Thread-1# → Am I main? False3) threading.active_count() — Count live threads
import threading, time
def slow(): time.sleep(2)
print(threading.active_count()) # → 1 (main only)
threads = [threading.Thread(target=slow) for _ in range(3)]for t in threads: t.start()
print(threading.active_count()) # → 4 (main + 3 workers)for t in threads: t.join()print(threading.active_count()) # → 14) threading.enumerate() — List all live threads
import threading, time
def task(n): time.sleep(n)
threads = [threading.Thread(target=task, args=(i,), name=f"T{i}") for i in range(1,4)]for t in threads: t.start()
for t in threading.enumerate(): print(f" alive: {t.name} | daemon={t.daemon}")# → alive: MainThread | daemon=False# → alive: T1 | daemon=False# → alive: T2 | daemon=False# → alive: T3 | daemon=False
for t in threads: t.join()5) threading.settrace(func) / threading.setprofile(func) — Thread hooks
import threading, sys
def my_tracer(frame, event, arg): if event == "call": print(f"[TRACE] calling {frame.f_code.co_name}") return my_tracer
def task(): x = 1 + 1 return x
threading.settrace(my_tracer) # set trace for ALL future threadst = threading.Thread(target=task)t.start(); t.join()threading.settrace(None) # remove tracer6) threading.stack_size(size=0) — Set thread stack size
import threading
# Set stack size to 512 KB for all future threadsthreading.stack_size(512 * 1024)print(f"Stack size: {threading.stack_size()} bytes")
def task(): print(f"Running with custom stack size")
t = threading.Thread(target=task)t.start(); t.join()
threading.stack_size(0) # reset to default7) threading.excepthook — Handle uncaught thread exceptions (未捕获异常处理)
import threading
def custom_excepthook(args): print(f"Uncaught exception in thread [{args.thread.name}]:") print(f" Type: {args.exc_type.__name__}") print(f" Message: {args.exc_value}")
threading.excepthook = custom_excepthook
def buggy_task(): raise ValueError("Something went wrong in thread!")
t = threading.Thread(target=buggy_task, name="BuggyThread")t.start(); t.join()# → Uncaught exception in thread [BuggyThread]:# → Type: ValueError# → Message: Something went wrong in thread!8) threading.get_ident() / threading.get_native_id()
import threading
def show_ids(): print(f"Python ident: {threading.get_ident()}") print(f"OS native id: {threading.get_native_id()}")
t = threading.Thread(target=show_ids)t.start(); t.join()11. queue Module — Thread-safe Queues (线程安全队列)
queue module provides three thread-safe queue classes: Queue (FIFO), LifoQueue (LIFO/stack), and PriorityQueue (优先队列). All use internal locks, so no external synchronization is needed. 1) Queue(maxsize=0) — FIFO Queue
from queue import Queueimport threading, time
q = Queue(maxsize=3)
def producer(): for i in range(6): q.put(i) # blocks if queue is full (maxsize reached) print(f"Put {i} | qsize={q.qsize()}") time.sleep(0.2)
def consumer(): for _ in range(6): item = q.get() # blocks if queue is empty print(f"Got {item}") q.task_done() time.sleep(0.5)
t1 = threading.Thread(target=producer)t2 = threading.Thread(target=consumer)t1.start(); t2.start()t1.join(); t2.join()2) Queue.put_nowait() / Queue.get_nowait() — Non-blocking
from queue import Queue, Full, Empty
q = Queue(maxsize=2)q.put("item1")q.put("item2")
try: q.put_nowait("item3") # queue full!except Full: print("Queue full — item3 dropped")
try: while True: print(q.get_nowait())except Empty: print("Queue emptied")# → Queue full — item3 dropped# → item1# → item2# → Queue emptied3) Queue.join() / Queue.task_done() — Work tracking
from queue import Queueimport threading
work_queue = Queue()
def worker(): while True: task = work_queue.get() if task is None: break print(f"Processing: {task}") work_queue.task_done() # signal this task is complete
# Start 3 workersworkers = [threading.Thread(target=worker, daemon=True) for _ in range(3)]for w in workers: w.start()
# Enqueue tasksfor task in ["task_A", "task_B", "task_C", "task_D", "task_E"]: work_queue.put(task)
work_queue.join() # blocks until ALL task_done() calledprint("All tasks completed!")4) LifoQueue — Stack (栈/后进先出)
from queue import LifoQueue
stack = LifoQueue()stack.put("first")stack.put("second")stack.put("third")
while not stack.empty(): print(stack.get())# → third# → second# → first5) PriorityQueue — Priority-based processing (优先级队列)
from queue import PriorityQueueimport threading, time
pq = PriorityQueue()
# (priority, task_name) — lower number = higher prioritypq.put((3, "low-priority task"))pq.put((1, "URGENT task"))pq.put((2, "medium task"))pq.put((1, "another URGENT task"))
while not pq.empty(): priority, task = pq.get() print(f"[priority={priority}] Processing: {task}")# → [priority=1] Processing: URGENT task# → [priority=1] Processing: another URGENT task# → [priority=2] Processing: medium task# → [priority=3] Processing: low-priority task12. ThreadPoolExecutor — High-level Thread Pool (高级线程池)
concurrent.futures.ThreadPoolExecutor provides a high-level, Future-based (Future对象) interface for thread pools. It is the recommended way to run IO-bound tasks in modern Python. 1) submit() → Future
from concurrent.futures import ThreadPoolExecutorimport time
def fetch_data(url: str) -> str: time.sleep(1) # simulate network call return f"<data from {url}>"
urls = [f"http://example.com/page{i}" for i in range(5)]
with ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(fetch_data, url) for url in urls]
for future in futures: result = future.result() # blocks until this future completes print(result)2) map() — Parallel map (并行映射)
from concurrent.futures import ThreadPoolExecutorimport time
def square(n): time.sleep(0.2) return n * n
with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(square, range(10)))
print(results) # → [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]3) Future API — done(), cancel(), add_done_callback()
from concurrent.futures import ThreadPoolExecutorimport time
def slow_task(n): time.sleep(n) return f"result-{n}"
def on_done(future): print(f"Callback: task finished → {future.result()}")
with ThreadPoolExecutor(max_workers=2) as executor: f1 = executor.submit(slow_task, 1) f2 = executor.submit(slow_task, 2)
f1.add_done_callback(on_done) # register callback f2.add_done_callback(on_done)
print(f"f1 done: {f1.done()}") # likely False (still running) time.sleep(1.5) print(f"f1 done: {f1.done()}") # → True4) as_completed() — Process in completion order (按完成顺序处理)
from concurrent.futures import ThreadPoolExecutor, as_completedimport time, random
def task(n): delay = random.uniform(0.1, 1.0) time.sleep(delay) return (n, delay)
with ThreadPoolExecutor(max_workers=5) as executor: futures = {executor.submit(task, i): i for i in range(8)}
for future in as_completed(futures): task_id = futures[future] n, delay = future.result() print(f"Task {n} finished in {delay:.2f}s")# Tasks print in the order they complete, not submission order5) Exception handling in futures (Future异常处理)
from concurrent.futures import ThreadPoolExecutor
def risky(x): if x == 3: raise ValueError(f"Bad input: {x}") return x * 2
with ThreadPoolExecutor(max_workers=2) as executor: futures = [executor.submit(risky, i) for i in range(5)]
for i, f in enumerate(futures): try: print(f"Result {i}: {f.result()}") except ValueError as e: print(f"Result {i}: ERROR — {e}")# → Result 0: 0# → Result 1: 2# → Result 2: 4# → Result 3: ERROR — Bad input: 3# → Result 4: 813. Common Patterns & Pitfalls (常见模式与陷阱)
1) Race condition example (竞态条件示例)
import threading
counter = 0 # UNSAFE shared state
def unsafe_increment(): global counter for _ in range(100_000): counter += 1 # NOT atomic! (read-modify-write)
threads = [threading.Thread(target=unsafe_increment) for _ in range(5)]for t in threads: t.start()for t in threads: t.join()
print(f"Expected: 500000")print(f"Actual: {counter}") # likely LESS than 500000 — data race!2) Deadlock example + fix (死锁示例及修复)
import threading
lock_a = threading.Lock()lock_b = threading.Lock()
# ─── DEADLOCK version ────────────────────────────────def thread1_deadlock(): with lock_a: import time; time.sleep(0.1) with lock_b: # waits for lock_b print("T1: got both locks")
def thread2_deadlock(): with lock_b: import time; time.sleep(0.1) with lock_a: # waits for lock_a → DEADLOCK print("T2: got both locks")
# ─── FIXED version: always acquire locks in the same order ──def thread1_safe(): with lock_a: # acquire A first with lock_b: # then B print("T1 safe: got both locks")
def thread2_safe(): with lock_a: # acquire A first (same order!) with lock_b: print("T2 safe: got both locks")
t1 = threading.Thread(target=thread1_safe)t2 = threading.Thread(target=thread2_safe)t1.start(); t2.start()t1.join(); t2.join()# → T1 safe: got both locks# → T2 safe: got both locks3) Thread-safe singleton (线程安全单例)
import threading
class Singleton: _instance = None _lock = threading.Lock()
def __new__(cls): if cls._instance is None: # first check (no lock) with cls._lock: if cls._instance is None: # second check (with lock) cls._instance = super().__new__(cls) print("Singleton created") return cls._instance
def get_instance(): s = Singleton() print(f"Got instance: {id(s)}")
threads = [threading.Thread(target=get_instance) for _ in range(5)]for t in threads: t.start()for t in threads: t.join()# → Singleton created (exactly once)# → Got instance: 140... (same id for all 5 threads)14. Full API Quick Reference (API速查表)
| Class / Function | Key Methods | Purpose |
|---|---|---|
Thread | start() join() is_alive() | Create and manage threads |
Lock | acquire() release() locked() | Mutual exclusion |
RLock | acquire() release() | Reentrant mutual exclusion |
Condition | wait() wait_for() notify() notify_all() | Wait/notify synchronization |
Semaphore | acquire() release() | Limit concurrent access |
BoundedSemaphore | acquire() release() | Semaphore with over-release guard |
Event | set() clear() wait() is_set() | Boolean flag signaling |
Timer | start() cancel() | Delayed / cancellable execution |
Barrier | wait() abort() reset() | N-thread rendezvous point |
local | attribute access | Per-thread storage |
Queue | put() get() task_done() join() | Thread-safe FIFO queue |
LifoQueue | put() get() | Thread-safe stack |
PriorityQueue | put() get() | Thread-safe priority queue |
ThreadPoolExecutor | submit() map() shutdown() | High-level thread pool |
current_thread() | — | Get current Thread object |
active_count() | — | Count live threads |
enumerate() | — | List all live threads |
excepthook | — | Handle uncaught thread exceptions |
Python threading excels at IO-bound concurrency (IO密集型并发): use
ThreadPoolExecutor for simple task pools, Queue for producer-consumer pipelines, Lock/RLock for shared state, Event for signaling, Semaphore for resource pools, and Barrier for multi-phase synchronization — always protect shared mutable state to avoid Race Conditions (竞态条件) and Deadlocks (死锁). II. When to Use Each API — Scenario Decision Guide (使用场景决策指南)
1. Thread — When to create raw threads (何时创建原始线程)
1) ✅ Use Thread directly when
1. You need full lifecycle control — start, monitor, join at precise moments. 2. The thread has long-running, stateful logic best expressed as a class with run(). 3. You need to store a result on the thread object itself (self.result = ...). 4. You’re building a daemon background service (heartbeat, log flusher, monitor).
# ✅ Scenario: long-lived stateful background serviceimport threading, time
class HeartbeatThread(threading.Thread): """Sends periodic heartbeats to a server.""" def __init__(self, server_url, interval=5): super().__init__(daemon=True, name="Heartbeat") self.server_url = server_url self.interval = interval self._stop = threading.Event()
def run(self): while not self._stop.is_set(): print(f"[Heartbeat] ping → {self.server_url}") time.sleep(self.interval)
def stop(self): self._stop.set()
hb = HeartbeatThread("http://api.example.com/health")hb.start()time.sleep(12)hb.stop()2) ❌ Do NOT use raw Thread when
× You just need to run many short tasks in parallel → use ThreadPoolExecutor instead. × You need return values from many tasks → Future.result() is cleaner than t.result. × You need CPU parallelism → use multiprocessing (GIL blocks true parallelism).
3) daemon=True — Specifically when
Use daemon threads for tasks that should not keep the program alive if the main thread exits:
| Scenario | daemon=True | daemon=False |
|---|---|---|
| Background log flusher | ✅ | — |
| Health monitor / watchdog | ✅ | — |
| Worker that must finish | — | ✅ |
| DB write that must commit | — | ✅ |
# ✅ Scenario: log flusher that should die with the appimport threading, time
log_buffer = []
def flush_logs(): while True: if log_buffer: print(f"[Flush] writing {len(log_buffer)} log entries") log_buffer.clear() time.sleep(1)
flusher = threading.Thread(target=flush_logs, daemon=True)flusher.start()
# Main thread does work, flusher auto-dies when main exitsfor i in range(5): log_buffer.append(f"event-{i}") time.sleep(0.5)print("Main done — flusher daemon killed automatically")2. Lock — When to use mutual exclusion (何时使用互斥锁)
1) ✅ Use Lock when
1. Multiple threads read AND write the same variable / data structure. 2. An operation that looks atomic is actually read-modify-write (e.g. counter += 1). 3. You’re updating a shared list, dict, or custom object. 4. You need to protect a file write or database update.
# ✅ Scenario: shared bank account balance — MUST use Lockimport threading
class Account: def __init__(self, balance): self.balance = balance self._lock = threading.Lock()
def transfer(self, amount): with self._lock: # critical section if self.balance >= amount: time.sleep(0.001) # simulate DB latency self.balance -= amount return True return False
import timeacc = Account(1000)results = []
def try_withdraw(): results.append(acc.transfer(100))
threads = [threading.Thread(target=try_withdraw) for _ in range(20)]for t in threads: t.start()for t in threads: t.join()
print(f"Balance: {acc.balance}") # always ≥ 0print(f"Successful: {results.count(True)}")2) ❌ Do NOT use Lock when
× The same thread needs to acquire the lock twice → use RLock instead (plain Lock deadlocks). × You need to wait for a condition, not just exclusive access → use Condition. × You only need to limit concurrency to N > 1 → use Semaphore.
3) Scenario matrix (场景矩阵)
| Situation | Correct primitive |
|---|---|
| 1 thread at a time, non-reentrant | Lock |
| 1 thread at a time, same thread re-acquires | RLock |
| N threads at a time | Semaphore(N) |
| Wait until data is ready | Condition |
| One-time go signal | Event |
3. RLock — When re-entrancy is needed (何时需要可重入锁)
1) ✅ Use RLock when
1. A method holding the lock calls another method that also acquires the same lock. 2. You’re building a class with multiple synchronized methods that call each other. 3. You have recursive algorithms that need locking at each level.
# ✅ Scenario: tree traversal where each node uses the same lockimport threading
class SafeTree: def __init__(self, value, children=None): self.value = value self.children = children or [] self._lock = threading.RLock()
def sum_values(self): with self._lock: # acquire (depth 1) total = self.value for child in self.children: total += child.sum_values() # same lock, deeper (depth 2+) return total
tree = SafeTree(1, [SafeTree(2), SafeTree(3, [SafeTree(4)])])t = threading.Thread(target=lambda: print(f"Sum: {tree.sum_values()}"))t.start(); t.join()# → Sum: 102) ❌ Do NOT use RLock when
× Methods don’t call each other — a plain Lock has slightly lower overhead. × You want to detect accidental re-entry as a bug — Lock will surface it as a deadlock.
4. Condition — When threads must wait for state changes (何时等待状态变化)
1) ✅ Use Condition when
1. One thread must wait until another thread changes some data (not just unlocks). 2. Implementing producer-consumer patterns with a bounded buffer. 3. Threads need to coordinate in phases — e.g., “wait until queue has ≥ 3 items”. 4. You need selective wakeup — notify only one waiter vs. all waiters.
# ✅ Scenario: order fulfillment system# Orders must wait until inventory is restockedimport threading, time, collections
inventory = collections.defaultdict(int)cond = threading.Condition()
def restock_worker(): items = [("apple", 50), ("banana", 30), ("cherry", 20)] for item, qty in items: time.sleep(1) with cond: inventory[item] += qty print(f"[Restock] {item} +{qty} → total={inventory[item]}") cond.notify_all() # wake all waiting orders
def process_order(order_id, item, qty): with cond: cond.wait_for(lambda: inventory[item] >= qty) # wait for stock inventory[item] -= qty print(f"[Order {order_id}] filled {qty}x {item} → remaining={inventory[item]}")
threads = [ threading.Thread(target=restock_worker), threading.Thread(target=process_order, args=(1, "apple", 20)), threading.Thread(target=process_order, args=(2, "banana", 15)), threading.Thread(target=process_order, args=(3, "apple", 40)),]for t in threads: t.start()for t in threads: t.join()2) notify() vs notify_all() — When to use which
| Situation | Use |
|---|---|
| Only one consumer can act (e.g. one slot freed) | notify() |
| All consumers might now be able to proceed | notify_all() |
| You added multiple items to the buffer at once | notify_all() |
| Only one thread is waiting (guaranteed) | notify() |
3) ❌ Do NOT use Condition when
× You just need a one-time signal → use Event (simpler API). × The data flowing between threads is the signal → use Queue (built-in blocking).
5. Semaphore — When limiting concurrent access (何时限制并发访问数量)
1) ✅ Use Semaphore when
1. You have a resource pool with a fixed capacity: DB connections, HTTP clients, file handles. 2. You need rate limiting — at most N concurrent API calls. 3. Implementing a thread pool from scratch (though ThreadPoolExecutor is preferred). 4. A resource requires N permits to use (e.g. a GPU with N memory slots).
# ✅ Scenario: limit concurrent external API calls to avoid 429 Too Many Requestsimport threading, time, random
MAX_CONCURRENT = 3api_semaphore = threading.BoundedSemaphore(MAX_CONCURRENT)
def call_external_api(request_id): print(f"[Req {request_id}] queued") with api_semaphore: print(f"[Req {request_id}] calling API...") time.sleep(random.uniform(0.5, 1.5)) # simulate API latency print(f"[Req {request_id}] done")
# Simulate 10 concurrent requests — only 3 run at oncethreads = [threading.Thread(target=call_external_api, args=(i,)) for i in range(10)]for t in threads: t.start()for t in threads: t.join()2) Semaphore vs BoundedSemaphore — When to use which
| Situation | Use |
|---|---|
| Resource pool (connection pool, thread pool) | BoundedSemaphore — prevents logic bugs |
| Signaling between threads (producer increments) | Semaphore — counter can exceed initial |
| You want a runtime error on over-release | BoundedSemaphore |
3) ❌ Do NOT use Semaphore when
× You only need to allow 1 thread at a time → use Lock (clearer intent). × You need workers to process tasks from a queue → use ThreadPoolExecutor.
6. Event — When broadcasting a one-time signal (何时广播一次性信号)
1) ✅ Use Event when
1. One thread needs to signal multiple waiting threads simultaneously (broadcast). 2. Implementing a start gun — all workers blocked until a “ready” signal fires. 3. A graceful shutdown flag — workers poll stop_event.is_set() each iteration. 4. A service readiness probe — clients wait until the server is initialized. 5. One-shot notifications where the flag stays set permanently after firing.
# ✅ Scenario: web server workers wait for config to load before servingimport threading, time
config_loaded = threading.Event()config = {}
def load_config(): print("[Config] loading from database...") time.sleep(2) config.update({"host": "0.0.0.0", "port": 8080, "debug": False}) print("[Config] loaded!") config_loaded.set() # broadcast to ALL waiting workers
def request_handler(worker_id): config_loaded.wait() # block until config ready print(f"[Worker {worker_id}] serving on {config['host']}:{config['port']}")
threads = ( [threading.Thread(target=load_config)] + [threading.Thread(target=request_handler, args=(i,)) for i in range(5)])for t in threads: t.start()for t in threads: t.join()2) Event vs Condition — Decision rule
| Question | Answer → Use |
|---|---|
| Signal multiple threads with a permanent flag? | Event |
| Wait for a data condition that can change repeatedly? | Condition |
| Need to reset and re-arm the signal? | Event.clear() or Condition |
| One producer, many consumers woken at once? | Event |
3) ❌ Do NOT use Event when
× The condition can be true/false multiple times (e.g. buffer empty↔full) → use Condition. × You’re passing data along with the signal → use Queue.
7. Timer — When delaying or scheduling execution (何时延迟或定时执行)
1) ✅ Use Timer when
1. You need to run a function once after a delay, without blocking the main thread. 2. The action might need to be cancelled before it fires (e.g. debouncing). 3. Implementing timeouts for external operations. 4. Session expiry, cache invalidation, or auto-logout after inactivity.
# ✅ Scenario: debounce user input — only save after 500ms of inactivityimport threading
_save_timer = None
def debounced_save(content): global _save_timer if _save_timer: _save_timer.cancel() # cancel previous pending save _save_timer = threading.Timer(0.5, do_save, args=(content,)) _save_timer.start()
def do_save(content): print(f"[Save] writing: '{content}'")
# Rapid keystrokes — only the last one savesimport timedebounced_save("H")debounced_save("He")debounced_save("Hel")debounced_save("Hell")time.sleep(0.1)debounced_save("Hello")time.sleep(0.8)# → [Save] writing: 'Hello' (only once, after 500ms of quiet)2) ❌ Do NOT use Timer when
× You need recurring execution → build a RepeatingTimer (see §7.3 in Part I) or use sched. × You need sub-millisecond precision — Timer uses time.sleep() which is OS-dependent. × Complex scheduling (cron-like) → use APScheduler or Celery.
8. Barrier — When threads must synchronize at a checkpoint (何时需要检查点同步)
1) ✅ Use Barrier when
1. A computation has multiple phases and ALL threads must finish phase N before ANY starts phase N+1. 2. Parallel simulation — each timestep must complete across all worker threads before advancing. 3. Test synchronization — ensure all threads reach a certain point before asserting results. 4. Coordinated startup — all services initialized before traffic is allowed.
# ✅ Scenario: parallel matrix computation with two phasesimport threading, time, random
NUM_WORKERS = 4barrier = threading.Barrier(NUM_WORKERS)partial_results = [0] * NUM_WORKERSfinal_results = [0] * NUM_WORKERS
def compute_worker(worker_id): # ── Phase 1: independent computation ────────────── time.sleep(random.uniform(0.3, 1.2)) partial_results[worker_id] = random.randint(10, 100) print(f"[W{worker_id}] Phase 1 done: partial={partial_results[worker_id]}")
barrier.wait() # ← ALL workers must finish phase 1 before phase 2
# ── Phase 2: needs ALL phase-1 results ──────────── # e.g., normalize by global sum total = sum(partial_results) final_results[worker_id] = partial_results[worker_id] / total print(f"[W{worker_id}] Phase 2 done: final={final_results[worker_id]:.3f}")
threads = [threading.Thread(target=compute_worker, args=(i,)) for i in range(NUM_WORKERS)]for t in threads: t.start()for t in threads: t.join()
print(f"\nFinal results: {[f'{r:.3f}' for r in final_results]}")print(f"Sum check: {sum(final_results):.6f}") # → ~1.02) ❌ Do NOT use Barrier when
× Thread count is dynamic (unknown at creation time) — Barrier requires a fixed parties count. × Only one thread needs to wait for others → use Thread.join() or Event. × Threads have different roles (not symmetric) → use Condition or Queue.
9. threading.local — When isolating per-thread state (何时隔离线程私有状态)
1) ✅ Use threading.local when
1. Each thread needs its own copy of a connection (DB, HTTP session, file handle). 2. You’re building middleware or frameworks that attach request context per thread. 3. A global-looking variable must actually be thread-specific (e.g., current user, transaction ID). 4. Avoiding lock contention by giving each thread its own cache.
# ✅ Scenario: per-thread HTTP session (connection pooling per thread)import threadingimport urllib.request
_local = threading.local()
def get_session(): """Return a thread-local opener — no lock needed, no sharing.""" if not hasattr(_local, "opener"): _local.opener = urllib.request.build_opener() print(f"[{threading.current_thread().name}] created new HTTP session") return _local.opener
def fetch(url): session = get_session() # each thread gets its own # session.open(url) ... print(f"[{threading.current_thread().name}] fetching {url}")
threads = [threading.Thread(target=fetch, args=(f"http://example.com/{i}",), name=f"Fetcher-{i}") for i in range(4)]for t in threads: t.start()for t in threads: t.join()# Each thread creates exactly one session — no contention, no sharing2) ❌ Do NOT use threading.local when
× Threads need to share and pass data to each other → use Queue or shared objects with Lock. × Using ThreadPoolExecutor — threads are reused, old local state may persist unexpectedly.
threading.local inside a pool, always initialize the local value at the start of each task, not just on first access — otherwise task 2 on the same thread will see task 1's leftover state.10. Queue / LifoQueue / PriorityQueue — When passing data between threads (何时在线程间传递数据)
1) ✅ Use Queue when
1. Implementing producer-consumer patterns — the queue IS the synchronization. 2. Work items need to be processed in order (FIFO). 3. You want backpressure — producers block when the buffer is full (maxsize). 4. You need work completion tracking via task_done() + join().
# ✅ Scenario: image processing pipeline# Loader threads → Queue → Processor threads → Queue → Writer threadsimport threading, time, queue
raw_queue = queue.Queue(maxsize=10)processed_queue = queue.Queue(maxsize=10)
def loader(n_images): for i in range(n_images): time.sleep(0.1) raw_queue.put(f"image_{i:03}.jpg") print(f"[Loader] queued image_{i:03}.jpg") raw_queue.put(None) # sentinel (哨兵值)
def processor(): while True: item = raw_queue.get() if item is None: processed_queue.put(None) raw_queue.task_done() break result = f"processed_{item}" time.sleep(0.2) # simulate processing processed_queue.put(result) raw_queue.task_done()
def writer(): while True: item = processed_queue.get() if item is None: processed_queue.task_done() break print(f"[Writer] saved {item}") processed_queue.task_done()
threads = [ threading.Thread(target=loader, args=(5,)), threading.Thread(target=processor), threading.Thread(target=writer),]for t in threads: t.start()for t in threads: t.join()2) ✅ Use LifoQueue when
1. Most-recently-added tasks are more cache-warm or likely to be more relevant. 2. Implementing depth-first search with worker threads. 3. Worker threads processing undo stacks or rollback operations.
3) ✅ Use PriorityQueue when
1. Tasks have different urgency levels — critical tasks skip the queue. 2. Implementing a task scheduler with priority (e.g., real-time vs. batch jobs). 3. Retry logic — failed tasks re-enqueued with higher priority.
# ✅ Scenario: multi-tier job schedulerimport threading, queue, time
job_queue = queue.PriorityQueue()
# Priority levels (优先级级别)CRITICAL = 1HIGH = 2NORMAL = 3BATCH = 4
def scheduler(): while True: try: priority, job_id, task = job_queue.get(timeout=2) print(f"[Scheduler] running [{['','CRITICAL','HIGH','NORMAL','BATCH'][priority]}] {job_id}") task() job_queue.task_done() except queue.Empty: print("[Scheduler] no more jobs") break
# Submit jobs in arbitrary orderjob_queue.put((NORMAL, "job-001", lambda: time.sleep(0.1)))job_queue.put((BATCH, "job-002", lambda: time.sleep(0.1)))job_queue.put((CRITICAL, "job-003", lambda: time.sleep(0.1)))job_queue.put((HIGH, "job-004", lambda: time.sleep(0.1)))job_queue.put((NORMAL, "job-005", lambda: time.sleep(0.1)))
t = threading.Thread(target=scheduler)t.start(); t.join()# Always runs: CRITICAL → HIGH → NORMAL → NORMAL → BATCH11. ThreadPoolExecutor — When managing a pool of workers (何时使用线程池)
1) ✅ Use ThreadPoolExecutor when
1. Running many short-to-medium IO-bound tasks concurrently (HTTP, DB, file IO). 2. You need return values from concurrent tasks without manual thread management. 3. Applying the same function to many inputs in parallel (executor.map). 4. You want automatic thread lifecycle management (creation, recycling, shutdown).
# ✅ Scenario: fetch multiple URLs concurrently, collect all resultsfrom concurrent.futures import ThreadPoolExecutor, as_completedimport time, random
def fetch_url(url): """Simulate network fetch with random latency.""" latency = random.uniform(0.2, 1.5) time.sleep(latency) if "broken" in url: raise ConnectionError(f"Failed to connect to {url}") return {"url": url, "status": 200, "latency": round(latency, 3)}
urls = [ "https://api.example.com/users", "https://api.example.com/products", "https://api.example.com/broken-endpoint", "https://api.example.com/orders", "https://api.example.com/inventory",]
print("Starting concurrent fetches...\n")with ThreadPoolExecutor(max_workers=4) as executor: future_to_url = {executor.submit(fetch_url, url): url for url in urls}
for future in as_completed(future_to_url): url = future_to_url[future] try: result = future.result() print(f"✅ {result['url']:<40} latency={result['latency']}s") except ConnectionError as e: print(f"❌ {url:<40} ERROR: {e}")2) submit() vs map() — When to use which
| Situation | Use |
|---|---|
Need individual Future objects for callbacks/cancellation | submit() |
| Simple parallel map, results in submission order | map() |
| Process results as they complete (not submission order) | as_completed() |
| Mixed inputs with different argument structures | submit() |
3) ❌ Do NOT use ThreadPoolExecutor when
× CPU-bound tasks (image processing, ML inference) → use ProcessPoolExecutor instead. × Tasks need complex inter-thread communication → combine with Queue. × Thousands of very short tasks (< 1ms) → thread overhead dominates; use asyncio.
12. Master Decision Flowchart (总决策流程图)
START: I need concurrent execution│├─ CPU-bound (数学计算、压缩、ML)?│ └─ YES → use multiprocessing.Process or ProcessPoolExecutor│└─ IO-bound (网络、文件、数据库)? │ ├─ Simple: run N tasks, collect results │ └─ use ThreadPoolExecutor.submit() / map() │ ├─ Complex: need fine-grained control │ │ │ ├─ Tasks need to exchange data? │ │ └─ use Queue (FIFO) / LifoQueue / PriorityQueue │ │ │ ├─ Need to protect shared state? │ │ ├─ One thread at a time, non-reentrant → Lock │ │ ├─ One thread at a time, reentrant → RLock │ │ └─ N threads at a time → Semaphore(N) │ │ │ ├─ Need to wait for a condition? │ │ ├─ One-time broadcast signal → Event │ │ └─ Repeated condition change → Condition │ │ │ ├─ Need all threads to reach a point? │ │ └─ Barrier(N) │ │ │ ├─ Need per-thread private state? │ │ └─ threading.local() │ │ │ ├─ Need delayed / cancellable execution? │ │ └─ Timer │ │ │ └─ Long-lived background service? │ └─ Thread(daemon=True) + Event (stop signal) │ └─ Very high concurrency (1000s of tasks)? └─ use asyncio + aiohttp (not threading)13. Real-world Scenario → API Mapping (真实场景 → API 映射)
| Real-world scenario (真实场景) | API to use |
|---|---|
| Fetch 100 URLs in parallel | ThreadPoolExecutor |
| Download pipeline: fetch → parse → store | Queue (3-stage pipeline) |
| Shared counter incremented by many threads | Lock |
| Class method calls another synchronized method | RLock |
| Workers wait for DB to be populated | Condition.wait_for() |
| 5 workers start simultaneously (race simulation) | Barrier |
| Max 3 concurrent DB connections | BoundedSemaphore(3) |
| Server “ready” signal to all request handlers | Event |
| Graceful shutdown of background worker | Event (stop flag) |
| Auto-logout after 30min inactivity | Timer + cancel() on activity |
| Debounce save-to-disk on rapid edits | Timer + cancel() |
| Per-thread DB connection (no sharing) | threading.local() |
| Critical tasks skip the line | PriorityQueue |
| Undo stack processed by worker thread | LifoQueue |
| Parallel phases: all workers finish step 1 first | Barrier |
| Background heartbeat / health monitor | Thread(daemon=True) |
| LRU cache with thread-safe eviction | Lock + OrderedDict |
| Rate-limit outgoing API requests | BoundedSemaphore |
The decision rule is simple: data flows between threads →
Queue | shared state needs protection → Lock/RLock | wait for a condition → Event or Condition | limit concurrency → Semaphore | just run N tasks → ThreadPoolExecutor.