欧洲世界杯_06年世界杯梅西 - hello186.com

Python 多线程:完整指南与实战示例 – Kanaries

2026-02-25 16:59:15 世界杯重播 4578

Python 多线程:完整指南与实战示例

NameSoren AtelierUpdated on 2026/2/16

你的 Python 程序需要发起 50 次 API 调用,而且必须逐个进行。每次调用需要等待 200 毫秒。简单计算一下:你的程序要浪费整整 10 秒钟盯着网络响应发呆。此时 CPU 利用率几乎为零,脚本却在处理本可以同时进行的 I/O 密集型操作时缓慢爬行。

这个问题会迅速恶化。顺序抓取数千个页面的网络爬虫、逐个读写文件的处理脚本、阻塞整个应用的数据库查询。每一秒的闲置等待,都是程序本可以用于执行有用工作的时间。

Python 的 threading 模块通过在单个进程内并发运行多个操作来解决这个问题。线程共享内存空间,启动速度快,特别擅长处理程序大部分时间都在等待的 I/O 密集型工作负载。本指南涵盖从基础线程创建到高级同步模式的所有内容,并提供可立即投入生产的代码示例。

📚

Python 中的多线程是什么?

多线程允许程序在同一个进程内并发运行多个操作。每个线程共享相同的内存空间,使得线程间通信快速且直接。

Python 的 threading 模块提供了创建和管理线程的高级接口。但有一个重要的注意事项:全局解释器锁(GIL)。

全局解释器锁(GIL)

GIL 是 CPython 中的一个互斥锁,它确保任何时候只有一个线程在执行 Python 字节码。这意味着对于 CPU 密集型操作,线程无法实现真正的并行。然而,在 I/O 操作(网络调用、文件读取、数据库查询)期间,GIL 会释放,允许一个线程等待 I/O 时其他线程运行。

import threading

import time

def cpu_bound(n):

"""CPU 密集型:GIL 阻止并行执行"""

total = 0

for i in range(n):

total += i * i

return total

def io_bound(url):

"""I/O 密集型:网络等待期间 GIL 会释放"""

import urllib.request

return urllib.request.urlopen(url).read()

# CPU 密集型:4 个线程逐个运行(无加速效果)

start = time.time()

threads = [threading.Thread(target=cpu_bound, args=(10_000_000,)) for _ in range(4)]

for t in threads: t.start()

for t in threads: t.join()

print(f"CPU 密集型使用线程:{time.time() - start:.2f}s")

# I/O 密集型:4 个线程重叠等待时间(大幅加速)

这意味着多线程适用于 I/O 密集型任务,但不适用于 CPU 密集型计算。对于 CPU 密集型工作,应改用 multiprocessing 模块。

何时使用 Threading、Multiprocessing 还是 Asyncio

特性threadingmultiprocessingasyncio最适合I/O 密集型任务CPU 密集型任务高并发 I/O并行性并发(受 GIL 限制)真正的并行并发(单线程)内存共享(轻量级)每个进程独立共享(轻量级)启动开销低(约 1ms)高(约 50-100ms)极低通信方式直接内存访问管道、队列、共享内存可等待的协程可扩展性数十到数百个线程受 CPU 核心数限制数千个协程复杂度中等(需要加锁)中等(需要序列化)高(async/await 语法)使用场景网络爬虫、文件 I/O、API 调用数据处理、机器学习训练Web 服务器、聊天应用

经验法则:如果你的程序等待网络或磁盘,使用多线程;如果进行数值计算,使用多进程;如果需要数千个并发连接,使用 asyncio。

线程基础:创建与运行线程

threading.Thread 类

创建线程最简单的方式是将目标函数传递给 threading.Thread:

import threading

import time

def download_file(filename):

print(f"[{threading.current_thread().name}] 正在下载 {filename}...")

time.sleep(2) # 模拟下载

print(f"[{threading.current_thread().name}] 完成 {filename}")

# 创建线程

t1 = threading.Thread(target=download_file, args=("data.csv",))

t2 = threading.Thread(target=download_file, args=("report.pdf",))

# 启动线程

t1.start()

t2.start()

# 等待两个线程都完成

t1.join()

t2.join()

print("所有下载已完成")

两个下载任务并发运行,大约 2 秒完成,而不是 4 秒。

start() 和 join()

start() 开始执行线程。一个线程只能被启动一次。

join(timeout=None) 阻塞调用线程,直到目标线程完成。传入 timeout 参数(以秒为单位)可避免无限期等待。

import threading

import time

def slow_task():

time.sleep(10)

t = threading.Thread(target=slow_task)

t.start()

# 最多等待 3 秒

t.join(timeout=3)

if t.is_alive():

print("3 秒后线程仍在运行")

else:

print("线程已完成")

线程命名

为线程命名有助于调试:

import threading

def worker():

name = threading.current_thread().name

print(f"运行在线程中:{name}")

t = threading.Thread(target=worker, name="DataProcessor")

t.start()

t.join()

守护线程

守护线程是后台线程,当主程序退出时会自动终止。非守护线程会阻止程序退出,直到它们完成。

import threading

import time

def background_monitor():

while True:

print("监控系统健康状态...")

time.sleep(5)

# 守护线程:主程序退出时自动终止

monitor = threading.Thread(target=background_monitor, daemon=True)

monitor.start()

# 主程序执行工作

time.sleep(12)

print("主程序退出")

# monitor 线程被自动终止

将守护线程用于后台日志记录、监控或清理任务,这些任务不应阻止程序退出。

继承 Thread 类

对于更复杂的线程行为,可以继承 threading.Thread:

import threading

import time

class FileProcessor(threading.Thread):

def __init__(self, filepath):

super().__init__()

self.filepath = filepath

self.result = None

def run(self):

"""重写 run() 方法实现线程逻辑"""

print(f"正在处理 {self.filepath}")

time.sleep(1) # 模拟工作

self.result = f"已处理:{self.filepath}"

# 创建并运行

processor = FileProcessor("/data/report.csv")

processor.start()

processor.join()

print(processor.result)

向线程传递参数

使用 args 和 kwargs

使用 args(元组)传递位置参数,使用 kwargs(字典)传递关键字参数:

import threading

def fetch_data(url, timeout, retries=3, verbose=False):

print(f"正在获取 {url}(超时={timeout}秒,重试={retries},详细={verbose})")

# 位置参数作为元组

t1 = threading.Thread(target=fetch_data, args=("https://api.example.com", 30))

# 关键字参数作为字典

t2 = threading.Thread(

target=fetch_data,

args=("https://api.example.com",),

kwargs={"timeout": 30, "retries": 5, "verbose": True}

)

t1.start()

t2.start()

t1.join()

t2.join()

常见错误:忘记单元素元组中的尾随逗号。args=("hello",) 是元组;args=("hello") 只是带括号的字符串。

从线程收集结果

线程不会直接返回值。使用共享数据结构或列表来收集结果:

import threading

results = {}

lock = threading.Lock()

def compute(task_id, value):

result = value ** 2

with lock:

results[task_id] = result

threads = []

for i in range(5):

t = threading.Thread(target=compute, args=(i, i * 10))

threads.append(t)

t.start()

for t in threads:

t.join()

print(results) # {0: 0, 1: 100, 2: 400, 3: 900, 4: 1600}

更简洁的方法是使用 ThreadPoolExecutor(接下来介绍),它可以自动处理结果收集。

ThreadPoolExecutor:现代处理方式

concurrent.futures 模块提供了 ThreadPoolExecutor,这是一个高级接口,用于管理工作者线程池。它自动处理线程创建、结果收集和异常传播。

使用 submit() 的基础用法

from concurrent.futures import ThreadPoolExecutor, as_completed

import time

def fetch_url(url):

time.sleep(1) # 模拟网络请求

return f"来自 {url} 的内容"

urls = [

"https://example.com/page1",

"https://example.com/page2",

"https://example.com/page3",

"https://example.com/page4",

"https://example.com/page5",

]

with ThreadPoolExecutor(max_workers=3) as executor:

# 提交任务并获取 Future 对象

future_to_url = {executor.submit(fetch_url, url): url for url in urls}

# 按完成顺序处理结果

for future in as_completed(future_to_url):

url = future_to_url[future]

try:

data = future.result()

print(f"{url}:{data}")

except Exception as e:

print(f"{url} 生成异常:{e}")

使用 map() 获取有序结果

executor.map() 返回与输入顺序相同的结果,类似于内置的 map():

from concurrent.futures import ThreadPoolExecutor

def process_item(item):

return item.upper()

items = ["apple", "banana", "cherry", "date"]

with ThreadPoolExecutor(max_workers=4) as executor:

results = list(executor.map(process_item, items))

print(results) # ['APPLE', 'BANANA', 'CHERRY', 'DATE']

submit() 与 map() 的对比

submit()map()返回Future 对象结果迭代器结果顺序完成顺序(配合 as_completed)输入顺序错误处理通过 future.result() 逐个处理首次失败时抛出参数单次函数调用对每项应用函数最适合异构任务、需要提前获取结果同构批处理

使用 Futures 处理异常

from concurrent.futures import ThreadPoolExecutor, as_completed

def risky_task(n):

if n == 3:

raise ValueError(f"错误输入:{n}")

return n * 10

with ThreadPoolExecutor(max_workers=4) as executor:

futures = {executor.submit(risky_task, i): i for i in range(5)}

for future in as_completed(futures):

task_id = futures[future]

try:

result = future.result(timeout=5)

print(f"任务 {task_id}:{result}")

except ValueError as e:

print(f"任务 {task_id} 失败:{e}")

except TimeoutError:

print(f"任务 {task_id} 超时")

取消任务

from concurrent.futures import ThreadPoolExecutor

import time

def long_task(n):

time.sleep(5)

return n

with ThreadPoolExecutor(max_workers=2) as executor:

futures = [executor.submit(long_task, i) for i in range(10)]

# 取消待处理任务(已在运行的任务无法取消)

for f in futures[4:]:

cancelled = f.cancel()

print(f"已取消:{cancelled}")

线程同步原语

当多个线程访问共享数据时,需要同步机制来防止竞态条件。

Lock

Lock 确保同一时间只有一个线程进入临界区:

import threading

class BankAccount:

def __init__(self, balance):

self.balance = balance

self.lock = threading.Lock()

def withdraw(self, amount):

with self.lock: # 同一时间只有一个线程

if self.balance >= amount:

self.balance -= amount

return True

return False

def deposit(self, amount):

with self.lock:

self.balance += amount

account = BankAccount(1000)

def make_transactions():

for _ in range(100):

account.deposit(10)

account.withdraw(10)

threads = [threading.Thread(target=make_transactions) for _ in range(10)]

for t in threads: t.start()

for t in threads: t.join()

print(f"最终余额:{account.balance}") # 始终为 1000

没有锁的话,并发读写会产生错误结果(竞态条件)。

RLock(可重入锁)

RLock 可以被同一线程多次获取。这可以防止当一个持有锁的函数调用另一个也需要相同锁的函数时发生死锁:

import threading

class SafeCache:

def __init__(self):

self._data = {}

self._lock = threading.RLock()

def get(self, key):

with self._lock:

return self._data.get(key)

def set(self, key, value):

with self._lock:

self._data[key] = value

def get_or_set(self, key, default):

with self._lock:

# 这里调用 get(),它也会获取 _lock

# RLock 允许这样做;普通 Lock 会导致死锁

existing = self.get(key)

if existing is None:

self.set(key, default)

return default

return existing

Semaphore

Semaphore 允许固定数量的线程同时访问资源:

import threading

import time

# 最多允许 3 个并发数据库连接

db_semaphore = threading.Semaphore(3)

def query_database(query_id):

with db_semaphore:

print(f"查询 {query_id}:已连接(活动连接数:{3 - db_semaphore._value})")

time.sleep(2) # 模拟查询

print(f"查询 {query_id}:完成")

threads = [threading.Thread(target=query_database, args=(i,)) for i in range(10)]

for t in threads: t.start()

for t in threads: t.join()

Event

Event 允许一个线程向其他等待的线程发送信号:

import threading

import time

data_ready = threading.Event()

shared_data = []

def producer():

print("生产者:准备数据...")

time.sleep(3)

shared_data.extend([1, 2, 3, 4, 5])

print("生产者:数据就绪,通知消费者")

data_ready.set()

def consumer(name):

print(f"消费者 {name}:等待数据...")

data_ready.wait() # 阻塞直到事件被设置

print(f"消费者 {name}:获取数据 = {shared_data}")

threads = [

threading.Thread(target=producer),

threading.Thread(target=consumer, args=("A",)),

threading.Thread(target=consumer, args=("B",)),

]

for t in threads: t.start()

for t in threads: t.join()

Condition

Condition 将锁与等待通知的能力结合在一起。它是生产者-消费者模式的基础:

import threading

import time

import random

buffer = []

MAX_SIZE = 5

condition = threading.Condition()

def producer():

for i in range(20):

with condition:

while len(buffer) >= MAX_SIZE:

condition.wait() # 等待直到有空间

item = random.randint(1, 100)

buffer.append(item)

print(f"生产:{item}(缓冲区大小:{len(buffer)})")

condition.notify_all()

time.sleep(0.1)

def consumer(name):

for _ in range(10):

with condition:

while len(buffer) == 0:

condition.wait() # 等待直到有数据

item = buffer.pop(0)

print(f"消费者 {name} 消费:{item}(缓冲区大小:{len(buffer)})")

condition.notify_all()

time.sleep(0.15)

t1 = threading.Thread(target=producer)

t2 = threading.Thread(target=consumer, args=("X",))

t3 = threading.Thread(target=consumer, args=("Y",))

t1.start(); t2.start(); t3.start()

t1.join(); t2.join(); t3.join()

同步原语总结

原语用途何时使用Lock互斥保护共享可变状态RLock可重入互斥同一线程内的嵌套锁定Semaphore限制并发速率限制、连接池Event一次性信号初始化完成、关闭信号Condition等待/通知模式生产者-消费者、状态变化Barrier同步 N 个线程所有线程必须在某点同步后才能继续

线程安全的数据结构

queue.Queue

queue.Queue 是首选的线程安全数据结构。它内部处理了所有锁定:

import threading

import queue

import time

task_queue = queue.Queue()

results = queue.Queue()

def worker():

while True:

item = task_queue.get() # 阻塞直到有数据

if item is None:

break

result = item ** 2

results.put(result)

task_queue.task_done()

# 启动 4 个工作者

workers = []

for _ in range(4):

t = threading.Thread(target=worker, daemon=True)

t.start()

workers.append(t)

# 提交任务

for i in range(20):

task_queue.put(i)

# 等待所有任务完成

task_queue.join()

# 停止工作者

for _ in range(4):

task_queue.put(None)

for w in workers:

w.join()

# 收集结果

all_results = []

while not results.empty():

all_results.append(results.get())

print(f"结果:{sorted(all_results)}")

queue.Queue 还支持:

Queue(maxsize=10):put() 在队列满时阻塞

PriorityQueue():按优先级排序的项目

LifoQueue():后进先出(栈行为)

collections.deque

collections.deque 对于 append() 和 popleft() 操作是线程安全的(在 CPython 中 C 层面是原子的),这使其成为简单生产者-消费者模式的快速替代方案:

from collections import deque

import threading

import time

buffer = deque(maxlen=1000)

def producer():

for i in range(100):

buffer.append(i)

time.sleep(0.01)

def consumer():

consumed = 0

while consumed < 100:

if buffer:

item = buffer.popleft()

consumed += 1

else:

time.sleep(0.01)

print(f"消费了 {consumed} 个项目")

t1 = threading.Thread(target=producer)

t2 = threading.Thread(target=consumer)

t1.start(); t2.start()

t1.join(); t2.join()

注意:虽然单独的 append 和 popleft 操作是线程安全的,但先检查 len(buffer) 再弹出不是原子操作。为了完全的线程安全,请使用 queue.Queue。

常见的多线程模式

生产者-消费者模式

解耦数据生产与数据处理的经典模式:

import threading

import queue

import time

import random

def producer(q, name, num_items):

for i in range(num_items):

item = f"{name}-item-{i}"

q.put(item)

print(f"生产者 {name}:创建 {item}")

time.sleep(random.uniform(0.05, 0.15))

print(f"生产者 {name}:完成")

def consumer(q, name, stop_event):

while not stop_event.is_set() or not q.empty():

try:

item = q.get(timeout=0.5)

print(f"消费者 {name}:处理 {item}")

time.sleep(random.uniform(0.1, 0.2))

q.task_done()

except queue.Empty:

continue

print(f"消费者 {name}:关闭")

task_queue = queue.Queue(maxsize=10)

stop_event = threading.Event()

producers = [

threading.Thread(target=producer, args=(task_queue, "P1", 10)),

threading.Thread(target=producer, args=(task_queue, "P2", 10)),

]

consumers = [

threading.Thread(target=consumer, args=(task_queue, "C1", stop_event)),

threading.Thread(target=consumer, args=(task_queue, "C2", stop_event)),

threading.Thread(target=consumer, args=(task_queue, "C3", stop_event)),

]

for c in consumers: c.start()

for p in producers: p.start()

for p in producers: p.join()

task_queue.join() # 等待所有项目被处理

stop_event.set() # 通知消费者停止

for c in consumers: c.join()

工作线程池(手动实现)

当你需要比 ThreadPoolExecutor 更多的控制时:

import threading

import queue

class WorkerPool:

def __init__(self, num_workers):

self.task_queue = queue.Queue()

self.result_queue = queue.Queue()

self.workers = []

for _ in range(num_workers):

t = threading.Thread(target=self._worker, daemon=True)

t.start()

self.workers.append(t)

def _worker(self):

while True:

func, args, kwargs, future_id = self.task_queue.get()

if func is None:

break

try:

result = func(*args, **kwargs)

self.result_queue.put((future_id, result, None))

except Exception as e:

self.result_queue.put((future_id, None, e))

finally:

self.task_queue.task_done()

def submit(self, func, *args, **kwargs):

future_id = id(func) # 简单 ID

self.task_queue.put((func, args, kwargs, future_id))

return future_id

def shutdown(self):

for _ in self.workers:

self.task_queue.put((None, None, None, None))

for w in self.workers:

w.join()

# 使用

pool = WorkerPool(4)

for i in range(10):

pool.submit(lambda x: x * x, i)

pool.task_queue.join()

pool.shutdown()

速率限制线程池

控制线程发起外部请求的速度:

import threading

import time

from concurrent.futures import ThreadPoolExecutor

class RateLimiter:

def __init__(self, max_per_second):

self.interval = 1.0 / max_per_second

self.lock = threading.Lock()

self.last_call = 0

def wait(self):

with self.lock:

elapsed = time.time() - self.last_call

wait_time = self.interval - elapsed

if wait_time > 0:

time.sleep(wait_time)

self.last_call = time.time()

limiter = RateLimiter(max_per_second=5)

def rate_limited_fetch(url):

limiter.wait()

print(f"正在获取 {url},时间 {time.time():.2f}")

time.sleep(0.5) # 模拟请求

return f"来自 {url} 的数据"

urls = [f"https://api.example.com/item/{i}" for i in range(20)]

with ThreadPoolExecutor(max_workers=10) as executor:

results = list(executor.map(rate_limited_fetch, urls))

线程安全陷阱及避免方法

竞态条件

竞态条件发生在程序结果依赖于线程执行时序的情况下:

import threading

# 错误:竞态条件

counter = 0

def increment_unsafe():

global counter

for _ in range(100_000):

counter += 1 # 读、增、写:不是原子操作

threads = [threading.Thread(target=increment_unsafe) for _ in range(5)]

for t in threads: t.start()

for t in threads: t.join()

print(f"预期:500000,实际:{counter}") # 通常小于 500000

# 正确:使用锁保护

counter = 0

lock = threading.Lock()

def increment_safe():

global counter

for _ in range(100_000):

with lock:

counter += 1

threads = [threading.Thread(target=increment_safe) for _ in range(5)]

for t in threads: t.start()

for t in threads: t.join()

print(f"预期:500000,实际:{counter}") # 始终为 500000

死锁

死锁发生在两个线程各自持有对方需要的锁时:

import threading

lock_a = threading.Lock()

lock_b = threading.Lock()

def thread_1():

with lock_a:

print("线程 1:获取 lock_a")

with lock_b: # 如果 thread_2 持有 lock_b 则永远等待

print("线程 1:获取 lock_b")

def thread_2():

with lock_b:

print("线程 2:获取 lock_b")

with lock_a: # 如果 thread_1 持有 lock_a 则永远等待

print("线程 2:获取 lock_a")

# 这将导致死锁

# t1 = threading.Thread(target=thread_1)

# t2 = threading.Thread(target=thread_2)

# t1.start(); t2.start()

预防死锁的方法:

始终以相同顺序获取锁:

def thread_1_fixed():

with lock_a: # 始终先 lock_a

with lock_b:

print("线程 1:获取两个锁")

def thread_2_fixed():

with lock_a: # 始终先 lock_a(相同顺序)

with lock_b:

print("线程 2:获取两个锁")

使用超时:

def safe_acquire():

acquired_a = lock_a.acquire(timeout=2)

if not acquired_a:

print("无法获取 lock_a,回退")

return

try:

acquired_b = lock_b.acquire(timeout=2)

if not acquired_b:

print("无法获取 lock_b,释放 lock_a")

return

try:

print("安全获取两个锁")

finally:

lock_b.release()

finally:

lock_a.release()

最小化锁的作用域:持有锁的时间尽可能短。

线程安全检查清单

使用锁保护所有共享可变状态

尽可能使用 queue.Queue 代替共享列表或字典

避免全局可变状态;通过函数参数传递数据

使用 ThreadPoolExecutor 代替手动线程管理

永远不要假设线程间的操作顺序

使用 threading.active_count() 和日志记录来检测线程泄漏

实战案例

并发网络爬虫

from concurrent.futures import ThreadPoolExecutor, as_completed

import urllib.request

import time

def fetch_page(url):

"""获取网页并返回内容长度"""

try:

with urllib.request.urlopen(url, timeout=10) as response:

content = response.read()

return url, len(content), None

except Exception as e:

return url, 0, str(e)

urls = [

"https://python.org",

"https://docs.python.org",

"https://pypi.org",

"https://realpython.com",

"https://github.com",

"https://stackoverflow.com",

"https://news.ycombinator.com",

"https://httpbin.org",

]

# 顺序执行

start = time.time()

for url in urls:

fetch_page(url)

sequential_time = time.time() - start

# 使用多线程并发

start = time.time()

with ThreadPoolExecutor(max_workers=8) as executor:

futures = {executor.submit(fetch_page, url): url for url in urls}

for future in as_completed(futures):

url, size, error = future.result()

if error:

print(f" 失败 {url}:{error}")

else:

print(f" 成功 {url}:{size:,} 字节")

threaded_time = time.time() - start

print(f"\n顺序执行:{sequential_time:.2f}s")

print(f"多线程: {threaded_time:.2f}s")

print(f"加速比: {sequential_time / threaded_time:.1f}x")

并行文件 I/O

from concurrent.futures import ThreadPoolExecutor

import os

import hashlib

def process_file(filepath):

"""读取文件并计算其 SHA-256 哈希值"""

with open(filepath, 'rb') as f:

content = f.read()

file_hash = hashlib.sha256(content).hexdigest()

size = os.path.getsize(filepath)

return filepath, file_hash, size

def hash_all_files(directory, pattern="*.py"):

"""使用多线程哈希目录中所有匹配的文件"""

import glob

files = glob.glob(os.path.join(directory, "**", pattern), recursive=True)

results = {}

with ThreadPoolExecutor(max_workers=8) as executor:

futures = {executor.submit(process_file, f): f for f in files}

for future in futures:

try:

path, hash_val, size = future.result()

results[path] = {"hash": hash_val, "size": size}

except Exception as e:

print(f"处理 {futures[future]} 时出错:{e}")

return results

# 使用

# file_hashes = hash_all_files("/path/to/project")

带重试逻辑的并发 API 调用

from concurrent.futures import ThreadPoolExecutor, as_completed

import urllib.request

import json

import time

def fetch_api(endpoint, max_retries=3, backoff=1.0):

"""获取 API 端点,带指数退避重试"""

for attempt in range(max_retries):

try:

url = f"https://jsonplaceholder.typicode.com{endpoint}"

req = urllib.request.Request(url)

with urllib.request.urlopen(req, timeout=10) as response:

data = json.loads(response.read())

return {"endpoint": endpoint, "data": data, "error": None}

except Exception as e:

if attempt < max_retries - 1:

wait = backoff * (2 ** attempt)

time.sleep(wait)

else:

return {"endpoint": endpoint, "data": None, "error": str(e)}

endpoints = [f"/posts/{i}" for i in range(1, 21)]

start = time.time()

with ThreadPoolExecutor(max_workers=10) as executor:

futures = [executor.submit(fetch_api, ep) for ep in endpoints]

results = [f.result() for f in futures]

elapsed = time.time() - start

success = sum(1 for r in results if r["error"] is None)

print(f"在 {elapsed:.2f} 秒内获取了 {success}/{len(endpoints)} 个端点")

周期性后台任务

import threading

import time

class PeriodicTask:

"""在后台线程中以固定间隔运行函数"""

def __init__(self, interval, func, *args, **kwargs):

self.interval = interval

self.func = func

self.args = args

self.kwargs = kwargs

self._stop_event = threading.Event()

self._thread = None

def start(self):

self._thread = threading.Thread(target=self._run, daemon=True)

self._thread.start()

def _run(self):

while not self._stop_event.is_set():

self.func(*self.args, **self.kwargs)

self._stop_event.wait(self.interval)

def stop(self):

self._stop_event.set()

if self._thread:

self._thread.join()

# 使用

def check_health():

print(f"健康检查时间 {time.strftime('%H:%M:%S')}")

task = PeriodicTask(2.0, check_health)

task.start()

time.sleep(7)

task.stop()

print("已停止")

性能对比:Threading vs Multiprocessing vs Asyncio

合适的并发工具取决于工作负载。以下是常见任务的实际耗时对比:

任务顺序执行Threading (4)Multiprocessing (4)Asyncio100 次 HTTP 请求(每次 200ms)20.0s5.1s5.8s4.9s100 次文件读取(每次 10ms)1.0s0.28s0.35s0.26s100 次 CPU 任务(每次 100ms)10.0s10.2s2.7s10.0s50 次数据库查询(每次 50ms)2.5s0.68s0.85s0.62sI/O + CPU 混合任务15.0s8.2s4.1s9.5s

关键结论:

Threading 在 I/O 密集型工作负载上提供 3-5 倍加速,代码改动最小

Multiprocessing 是真正的 CPU 并行唯一选择,但增加了进程开销

Asyncio 在高并发 I/O 上略胜 Threading,但需要用 async/await 重写代码

对于混合工作负载,考虑结合使用 Threading 处理 I/O 和 Multiprocessing 处理 CPU 任务

import time

import threading

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def io_task():

time.sleep(0.2)

def cpu_task(n=2_000_000):

return sum(i * i for i in range(n))

# 对比 Threading 和 Multiprocessing 的基准测试

NUM_TASKS = 20

# Threading - I/O 密集型

start = time.time()

with ThreadPoolExecutor(max_workers=4) as pool:

list(pool.map(lambda _: io_task(), range(NUM_TASKS)))

print(f"Threading (I/O):{time.time() - start:.2f}s")

# Threading - CPU 密集型

start = time.time()

with ThreadPoolExecutor(max_workers=4) as pool:

list(pool.map(lambda _: cpu_task(), range(NUM_TASKS)))

print(f"Threading (CPU):{time.time() - start:.2f}s")

在 RunCell 中试验多线程

调试和分析多线程代码可能具有挑战性。当你需要测试线程同步、可视化时间重叠或交互式诊断竞态条件时,RunCell (www.runcell.dev (opens in a new tab)) 提供了专为这种工作流程设计的 AI 驱动 Jupyter 环境。

RunCell 的 AI 代理可以分析你的多线程代码,在死锁发生前识别潜在风险,根据你的工作负载建议最佳工作线程数,并帮助你理解线程异常行为的原因。当线程池间歇性产生错误结果时,RunCell 会追踪执行时间线,精确定位共享状态被破坏的确切时刻。

如果你想可视化不同多线程配置的性能特征,PyGWalker (github.com/Kanaries/pygwalker) 可以将你的基准测试 DataFrame 转换为交互式图表。运行多线程基准测试,将计时数据收集到 pandas DataFrame 中,然后通过拖放可视化探索结果,找到适合你工作负载的最佳线程数。

常见问题

Python 中 Threading 和 Multiprocessing 有什么区别?

Threading 在单个进程内运行多个线程,共享内存。全局解释器锁(GIL)阻止线程并行执行 Python 字节码,使得 Threading 仅对网络请求和文件操作等 I/O 密集型任务有效。Multiprocessing 创建独立进程,每个进程有自己的 Python 解释器和内存空间,支持 CPU 密集型任务的真正并行执行。Threading 开销更低(启动更快、内存更少),而 Multiprocessing 通过绕过 GIL 实现真正的并行。

Python 多线程是真正的并行吗?

不是,由于 GIL 的存在,Python 多线程对于 CPU 密集型代码是并发而非并行的。任何时刻只有一个线程在执行 Python 字节码。然而,在 I/O 操作(网络、磁盘、数据库)期间,GIL 会释放,因此多个线程在等待 I/O 时可以有效并行运行。对于 CPU 密集型并行,请使用 multiprocessing 模块或释放 GIL 的 C 扩展(如 NumPy)。

Python 中应该使用多少个线程?

对于 I/O 密集型任务,根据外部服务的速率限制和网络带宽,从 5-20 个线程开始。对单个服务器发起过多线程可能导致连接被拒绝或限速。对于混合工作负载,尝试在 CPU 核心数到 4 倍核心数之间的线程数。使用 ThreadPoolExecutor 并通过不同的 max_workers 值进行基准测试,找到适合你特定工作负载的最佳数量。ThreadPoolExecutor 的默认值是 min(32, os.cpu_count() + 4)。

如何从 Python 线程返回值?

线程不会直接从目标函数返回值。三种主要方法是:(1) 使用 ThreadPoolExecutor.submit(),它返回一个 Future 对象,你可以调用 future.result() 获取返回值。(2) 传入可变容器(如字典或列表)作为参数,让线程将结果写入其中,使用 Lock 保护。(3) 使用 queue.Queue,线程将结果放入队列,主线程从队列读取。对于大多数用例,ThreadPoolExecutor 是最简洁的方法。

如果 Python 线程抛出异常会发生什么?

在原始的 threading.Thread 中,未处理的异常会静默终止该线程,异常信息会丢失。主线程和其他线程继续运行而没有任何通知。使用 ThreadPoolExecutor 时,异常会被捕获并在调用 future.result() 时重新抛出,使错误处理更加可靠。始终在线程目标函数内部使用 try/except 块,或使用 ThreadPoolExecutor 确保异常被正确捕获和处理。

总结

Python 多线程是加速 I/O 密集型程序的强大工具。通过并发运行网络请求、文件操作和数据库查询,你可以将原本需要 20 秒的顺序执行脚本优化为 5 秒完成,而且代码改动极小。

需要记住的关键点:

将多线程用于 I/O 密集型工作。GIL 阻止 CPU 并行,但线程能有效重叠 I/O 等待时间。

使用 ThreadPoolExecutor 满足大多数多线程需求。它管理线程、收集结果并干净地传播异常。

使用锁保护共享状态。竞态条件是最常见的多线程 bug,而 queue.Queue 消除了大多数加锁顾虑。

避免死锁,通过以一致的顺序获取锁并使用超时。

选择合适的工具:I/O 用 Threading,CPU 用 Multiprocessing,数千个并发连接用 Asyncio。

从 ThreadPoolExecutor 和简单的 executor.map() 调用开始。测量加速效果。仅在共享可变状态需要的地方添加同步。多线程不需要完全重写代码。几行 concurrent.futures 的代码就能为任何花费时间等待的程序带来显著的性能提升。

📚