多进程

Python 的多进程编程允许程序创建和管理多个独立的进程,以充分利用多核 CPU 的计算能力,实现真正的并行执行。与多线程不同,多进程不受全局解释器锁(GIL)的限制,每个进程拥有独立的 Python 解释器和内存空间。

核心概念

  • 进程(Process):一个独立的程序执行实例,拥有独立的内存空间、文件描述符的等资源。

  • 父进程与子进程:创建新进程的进程为父进程,被创建的进程称为子进程。子进程是父进程的副本,但可以执行不同的代码。

  • 进程间通信(IPC):由于进程间内存隔离,需要通过特定机制(如管道、队列、共享内存)进行数据交换。

  • 并发与并行:多进程可以实现真正的并行计算(多个 CPU 核心同时运行),二多线程在 Python 中通常知识兵法(GIL 限制)。

核心模块与类

multiprocessing.Process创建和启动新进程的基本类。

常用方法

  • __init__(target, args, kwargs, name, daemon): 初始化进程对象。

    • target: 要在新进程中执行的函数。

    • args: 传递给 target 函数的参数元组。

    • kwargs: 传递给 target 函数的关键字参数字典。

    • name: 进程名称。

    • daemon: 是否为守护进程。守护进程会随主进程结束而终止。

  • start(): 启动进程,调用 target 函数。

  • join([timeout]): 阻塞主进程,等待该进程结束。可设置超时时间。

  • is_alive(): 检查进程是否仍在运行。

  • terminate(): 强制终止进程(不推荐,可能导致资源未释放)。

  • kill(): 向进程发送 SIGKILL 信号(Unix)或 TerminateProcess(Windows)。

  • close(): 关闭进程对象,释放资源(Python 3.7+)。

multiprocessing.Queue进程安全的队列,用于进程间通信(FIFO)

常用方法

  • put(item): 将 item 放入队列。可设置 blocktimeout

  • get(): 从队列中取出一个 item。可设置 blocktimeout

  • empty(): 检查队列是否为空。

  • full(): 检查队列是否已满。

  • qsize(): 返回队列大致大小(不精确,慎用)。

multiprocessing.Pipe:创建一对连接的管道,用于双向通信。

函数

  • Pipe(duplex=True): 创建管道,返回 (conn1, conn2) 两个连接对象。

    • duplex=True: 双向通信(两个连接都可收发)。

    • duplex=False: 单向通信(conn1 发,conn2 收)。

连接对象方法

  • send(obj): 发送对象。

  • recv(): 接收对象(阻塞)。

  • poll([timeout]): 检查是否有数据可接收。

  • close(): 关闭连接。

multiprocessing.Pool进程池,用于管理一组工作进程,简化并行任务的分配。

常用方法

  • __init__(processes, initializer, initargs): 创建进程池。

    • processes: 进程数量(默认为 CPU 核心数)。

  • apply(func, args): 阻塞式执行,立即在池中一个进程上运行 func(*args)

  • apply_async(func, args, callback, error_callback): 异步执行,立即返回 AsyncResult 对象。

  • map(func, iterable): 阻塞式,将 func 应用于 iterable 的每个元素,并返回结果列表。

  • map_async(func, iterable, callback, error_callback): 异步版本的 map

  • imap(func, iterable, chunksize): 返回迭代器,结果按顺序返回。

  • imap_unordered(func, iterable, chunksize): 返回迭代器,结果按完成顺序返回。

  • close(): 关闭池,不再接受新任务。

  • terminate(): 立即终止所有工作进程。

  • join(): 等待所有工作进程结束(必须在 close()terminate() 后调用)。

AsyncResult 对象方法

  • get([timeout]): 获取结果(阻塞)。

  • wait([timeout]): 等待结果可用。

  • ready(): 任务是否完成。

  • successful(): 任务是否成功完成(无异常)。

multiprocessing.Valuemultiprocessing.Array

用于在进程间共享简单的数值或数组数据。

  • Value(typecode_or_type, *args, lock=True): 共享单个值。

  • Array(typecode_or_type, size_or_initializer, lock=True): 共享数组。

需要使用锁(lock)来保证操作的原子性。

进程同步原语

用于协调多个进程的执行。

  • multiprocessing.Lock: 互斥锁,确保同一时间只有一个进程访问临界区。

  • multiprocessing.RLock: 可重入锁。

  • multiprocessing.Semaphore(value): 信号量,控制同时访问资源的进程数量。

  • multiprocessing.Event: 事件,用于进程间发送信号(set() / wait())。

  • multiprocessing.Condition(lock): 条件变量,结合锁使用,实现更复杂的同步。

最佳时间与注意事项

  1. if __name__ == "__main__": 必须包含此语句,防止在 Windows 或使用 spawn 启动方法时出现无限递归。

  2. 进程开销: 进程创建和销毁开销较大,适合计算密集型任务。对于 I/O 密集型任务,考虑使用 asyncio 或线程。

  3. 进程间通信成本: IPC 比内存访问慢,尽量减少不必要的数据传递。

  4. 资源清理: 使用 join() 等待子进程结束,使用 close() 关闭队列/管道,使用上下文管理器(with)。

  5. 异常处理: 在子进程中妥善处理异常,避免进程意外终止。

  6. 序列化: 通过 QueuePipe 传递的对象必须是可序列化的(pickle 模块支持)。

  7. 启动方法: 使用 multiprocessing.get_start_method()multiprocessing.set_start_method() 选择合适的启动方法(fork, spawn, forkserver)。

相关模块

  • concurrent.futures: 提供更高层的接口(ProcessPoolExecutor),简化多进程编程。

  • asyncio: 用于异步 I/O 编程,适合高并发 I/O 操作。

最后更新于