今天使用大模型分析RAG(Retrieval-Augmented Generation)对话质量,需要批量处理多个不同的输入参数。为了加快整体运行速度,打算计划并发执行这些任务。根据函数特性(I/O密集型或CPU密集型),选择多线程或多进程的方式进行并行处理。顺便整理了下加速 python 执行的所有方法。
如果函数主要受限于I/O操作(如网络请求、文件读写等),则建议采用多线程方式来提高效率;而如果函数更多地依赖于计算能力(即CPU密集型任务),则更适合使用多进程方法来充分利用多核处理器的优势。这样可以根据具体应用场景有效地加速数据处理过程。
下面分别总结了了 多线程(threading) 和 多进程(multiprocessing) 的完整示例,并说明如何选择。
1. 🎯 场景说明
假设函数是:
def my_func(x):
# 模拟计算:可能是 I/O(如请求)或 CPU(如数学运算)
time.sleep(1) # 或者做一些计算
return x * x你要对参数列表 [1, 2, 3, 4] 分别计算 my_func(x),希望并发执行。
2. ✅ 方案一:I/O 密集型 → 使用 多线程(concurrent.futures.ThreadPoolExecutor)
适用于:网络请求、文件读写、数据库查询等 等待型操作。
import time
from concurrent.futures import ThreadPoolExecutor
def my_func(x):
print(f"开始处理 {x}")
time.sleep(1) # 模拟 I/O 等待
result = x * x
print(f"完成 {x} → {result}")
return result
if __name__ == "__main__":
inputs = [1, 2, 3, 4]
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(my_func, inputs))
print("所有结果:", results)✅ 总耗时 ≈ 1 秒(而不是 4 秒)
✅ 线程安全,适合 I/O
3. ✅ 方案二:CPU 密集型 → 使用 多进程(concurrent.futures.ProcessPoolExecutor)
适用于:大量数学计算、图像处理、加密解密等 CPU 计算。
因为 Python 的 GIL(全局解释器锁),多线程无法真正并行执行 CPU 密集任务,必须用多进程。
import time
from concurrent.futures import ProcessPoolExecutor
def my_func(x):
print(f"开始处理 {x}")
# 模拟 CPU 密集计算(避免用 time.sleep,它不占 CPU)
total = 0
for i in range(10_000_000):
total += i * x
result = total % 1000
print(f"完成 {x} → {result}")
return result
if __name__ == "__main__":
inputs = [1, 2, 3, 4]
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(my_func, inputs))
print("所有结果:", results)✅ 真正并行,绕过 GIL
✅ 适合计算密集型任务
⚠️ 注意:if __name__ == "__main__": 是必须的,否则在 Windows/macOS 上会报错(子进程会递归导入)。
4. 🔄 补充:用 asyncio + 线程池(混合方案)
如果在异步框架(如 FastAPI、aiohttp)中,也可以这样包装:
import asyncio
from concurrent.futures import ThreadPoolExecutor
def my_func(x):
time.sleep(1) # I/O 或阻塞操作
return x * x
async def main():
loop = asyncio.get_running_loop()
inputs = [1, 2, 3, 4]
with ThreadPoolExecutor() as pool:
tasks = [loop.run_in_executor(pool, my_func, x) for x in inputs]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())