全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-19 8 分钟 ✍️ juanwangdev

Python 进程池 ProcessPoolExecutor

ProcessPoolExecutor 自动管理进程池,适合 CPU 密集型任务的并行处理。

基本使用

Python
from concurrent.futures import ProcessPoolExecutor
import time

def cpu_task(n):
    total = 0
    for i in range(n):
        total += i
    return total

# 创建进程池
with ProcessPoolExecutor(max_workers=4) as executor:
    future = executor.submit(cpu_task, 10000000)
    result = future.result()
    print(result)

submit 提交任务

Python
from concurrent.futures import ProcessPoolExecutor

def process(n):
    return n ** 2

with ProcessPoolExecutor(max_workers=4) as executor:
    futures = []
    for n in [1, 2, 3, 4, 5]:
        future = executor.submit(process, n)
        futures.append(future)

    for future in futures:
        print(future.result())

map 批量处理

Python
from concurrent.futures import ProcessPoolExecutor

def factorial(n):
    if n < 2:
        return 1
    result = 1
    for i in range(2, n + 1):
        result *= i
    return result

numbers = [5, 10, 15, 20, 25]

with ProcessPoolExecutor(max_workers=4) as executor:
    results = executor.map(factorial, numbers)
    print(list(results))

并行计算示例

Python
from concurrent.futures import ProcessPoolExecutor
import math

def is_prime(n):
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

# 并行检查多个数字是否为质数
numbers = range(100000, 101000)

with ProcessPoolExecutor(max_workers=4) as executor:
    primes = [n for n, is_p in zip(numbers, executor.map(is_prime, numbers)) if is_p]
    print(f"质数数量: {len(primes)}")

Future 回调

Python
from concurrent.futures import ProcessPoolExecutor

def compute(n):
    return sum(range(n))

def done_callback(future):
    print(f"计算完成,结果: {future.result()}")

with ProcessPoolExecutor(max_workers=2) as executor:
    future = executor.submit(compute, 1000000)
    future.add_done_callback(done_callback)

as_completed 按完成顺序获取

Python
from concurrent.futures import ProcessPoolExecutor, as_completed

def task(n):
    import time
    time.sleep(n % 3)  # 不同耗时
    return n ** 2

with ProcessPoolExecutor(max_workers=3) as executor:
    futures = {executor.submit(task, i): i for i in range(10)}

    for future in as_completed(futures):
        n = futures[future]
        print(f"任务 {n} 完成: {future.result()}")

异常处理

Python
from concurrent.futures import ProcessPoolExecutor

def risky_task(n):
    if n == 3:
        raise ValueError("不允许数字3")
    return n * 2

with ProcessPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(risky_task, i) for i in range(5)]

    for future in futures:
        try:
            print(future.result())
        except Exception as e:
            print(f"异常: {e}")

性能对比

Python
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time

def cpu_bound():
    return sum(range(10000000))

# 多线程(受 GIL 影响)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    executor.map(cpu_bound, range(4))
print(f"线程池: {time.time() - start:.2f}s")

# 多进程(绕过 GIL)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
    executor.map(cpu_bound, range(4))
print(f"进程池: {time.time() - start:.2f}s")

chunksize 参数

Python
from concurrent.futures import ProcessPoolExecutor

def process(n):
    return n * 2

# 大数据量时设置 chunksize 提升效率
data = range(100000)

with ProcessPoolExecutor(max_workers=4) as executor:
    results = executor.map(process, data, chunksize=1000)

进程池与线程池对比

特性ProcessPoolExecutorThreadPoolExecutor
绕过 GIL
内存占用
启动开销
适用任务CPU 密集型I/O 密集型
数据共享需 IPC直接共享

要点总结

  • ProcessPoolExecutor 绕过 GIL,实现真正并行
  • max_workers 通常设为 CPU 核心数
  • submit() 提交单个任务,返回 Future
  • map() 批量处理,返回结果迭代器
  • chunksize 大数据量时优化性能
  • 适合 CPU 密集型任务的并行计算
  • 进程间通信开销较大,注意数据量

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python 进程基础
下一篇 → Python HTTP客户端请求
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库