加速你的大模型速度

simon
1
2025-10-20

今天使用大模型分析RAG(Retrieval-Augmented Generation)对话质量,需要批量处理多个不同的输入参数。为了加快整体运行速度,打算计划并发执行这些任务。根据函数特性(I/O密集型或CPU密集型),选择多线程或多进程的方式进行并行处理。顺便整理了下加速 python 执行的所有方法。
如果函数主要受限于I/O操作(如网络请求、文件读写等),则建议采用多线程方式来提高效率;而如果函数更多地依赖于计算能力(即CPU密集型任务),则更适合使用多进程方法来充分利用多核处理器的优势。这样可以根据具体应用场景有效地加速数据处理过程。

下面分别总结了了 多线程(threading)多进程(multiprocessing) 的完整示例,并说明如何选择。

1. 🎯 场景说明

假设函数是:

def my_func(x):
    # 模拟计算:可能是 I/O(如请求)或 CPU(如数学运算)
    time.sleep(1)  # 或者做一些计算
    return x * x

你要对参数列表 [1, 2, 3, 4] 分别计算 my_func(x),希望并发执行。

2. ✅ 方案一:I/O 密集型 → 使用 多线程(concurrent.futures.ThreadPoolExecutor

适用于:网络请求、文件读写、数据库查询等 等待型操作

import time
from concurrent.futures import ThreadPoolExecutor

def my_func(x):
    print(f"开始处理 {x}")
    time.sleep(1)  # 模拟 I/O 等待
    result = x * x
    print(f"完成 {x} → {result}")
    return result

if __name__ == "__main__":
    inputs = [1, 2, 3, 4]

    with ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(my_func, inputs))

    print("所有结果:", results)

✅ 总耗时 ≈ 1 秒(而不是 4 秒)
✅ 线程安全,适合 I/O

3. ✅ 方案二:CPU 密集型 → 使用 多进程(concurrent.futures.ProcessPoolExecutor

适用于:大量数学计算、图像处理、加密解密等 CPU 计算

因为 Python 的 GIL(全局解释器锁),多线程无法真正并行执行 CPU 密集任务,必须用多进程。

import time
from concurrent.futures import ProcessPoolExecutor

def my_func(x):
    print(f"开始处理 {x}")
    # 模拟 CPU 密集计算(避免用 time.sleep,它不占 CPU)
    total = 0
    for i in range(10_000_000):
        total += i * x
    result = total % 1000
    print(f"完成 {x} → {result}")
    return result

if __name__ == "__main__":
    inputs = [1, 2, 3, 4]

    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(my_func, inputs))

    print("所有结果:", results)

✅ 真正并行,绕过 GIL
✅ 适合计算密集型任务

⚠️ 注意:if __name__ == "__main__": 是必须的,否则在 Windows/macOS 上会报错(子进程会递归导入)。

4. 🔄 补充:用 asyncio + 线程池(混合方案)

如果在异步框架(如 FastAPI、aiohttp)中,也可以这样包装:

import asyncio
from concurrent.futures import ThreadPoolExecutor

def my_func(x):
    time.sleep(1)  # I/O 或阻塞操作
    return x * x

async def main():
    loop = asyncio.get_running_loop()
    inputs = [1, 2, 3, 4]
    
    with ThreadPoolExecutor() as pool:
        tasks = [loop.run_in_executor(pool, my_func, x) for x in inputs]
        results = await asyncio.gather(*tasks)
    
    print(results)

asyncio.run(main())

5. 📌 如何选择?

函数类型

推荐方式

网络请求、文件读写、sleep 等 I/O 等待

ThreadPoolExecutor(多线程)

数学计算、循环、加密等 CPU 计算

ProcessPoolExecutor(多进程)

在 async 程序中调用阻塞函数

loop.run_in_executor + 线程池

动物装饰