多进程

Python 的多进程编程允许程序创建和管理多个独立的进程,以充分利用多核 CPU 的计算能力,实现真正的并行执行。与多线程不同,多进程不受全局解释器锁(GIL)的限制,每个进程拥有独立的 Python 解释器和内存空间。

核心概念

  • 进程(Process):一个独立的程序执行实例,拥有独立的内存空间、文件描述符的等资源。

  • 父进程与子进程:创建新进程的进程为父进程,被创建的进程称为子进程。子进程是父进程的副本,但可以执行不同的代码。

  • 进程间通信(IPC):由于进程间内存隔离,需要通过特定机制(如管道、队列、共享内存)进行数据交换。

  • 并发与并行:多进程可以实现真正的并行计算(多个 CPU 核心同时运行),二多线程在 Python 中通常知识兵法(GIL 限制)。

核心模块与类

multiprocessing.Process创建和启动新进程的基本类。

常用方法

  • __init__(target, args, kwargs, name, daemon): 初始化进程对象。

    • target: 要在新进程中执行的函数。

    • args: 传递给 target 函数的参数元组。

    • kwargs: 传递给 target 函数的关键字参数字典。

    • name: 进程名称。

    • daemon: 是否为守护进程。守护进程会随主进程结束而终止。

  • start(): 启动进程,调用 target 函数。

  • join([timeout]): 阻塞主进程,等待该进程结束。可设置超时时间。

  • is_alive(): 检查进程是否仍在运行。

  • terminate(): 强制终止进程(不推荐,可能导致资源未释放)。

  • kill(): 向进程发送 SIGKILL 信号(Unix)或 TerminateProcess(Windows)。

  • close(): 关闭进程对象,释放资源(Python 3.7+)。

import multiprocessing
import time

def worker(name, delay):
    print(f"Worker {name} starting...")
    time.sleep(delay)
    print(f"Worker {name} finished.")

if __name__ == "__main__":
    # 创建两个进程
    p1 = multiprocessing.Process(target=worker, args=("A", 2), name="Worker-A")
    p2 = multiprocessing.Process(target=worker, args=("B", 1), name="Worker-B")

    # 启动进程
    p1.start()
    p2.start()

    # 等待进程结束
    p1.join()
    p2.join()

    print("All workers done.")

multiprocessing.Queue进程安全的队列,用于进程间通信(FIFO)

常用方法

  • put(item): 将 item 放入队列。可设置 blocktimeout

  • get(): 从队列中取出一个 item。可设置 blocktimeout

  • empty(): 检查队列是否为空。

  • full(): 检查队列是否已满。

  • qsize(): 返回队列大致大小(不精确,慎用)。

import multiprocessing

def producer(queue):
    for i in range(5):
        queue.put(f"Item-{i}")
        print(f"Produced: Item-{i}")
    queue.put("DONE")  # 发送结束信号

def consumer(queue):
    while True:
        item = queue.get()
        if item == "DONE":
            break
        print(f"Consumed: {item}")

if __name__ == "__main__":
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=producer, args=(q,))
    c = multiprocessing.Process(target=consumer, args=(q,))

    p.start()
    c.start()

    p.join()
    c.join()

multiprocessing.Pipe:创建一对连接的管道,用于双向通信。

函数

  • Pipe(duplex=True): 创建管道,返回 (conn1, conn2) 两个连接对象。

    • duplex=True: 双向通信(两个连接都可收发)。

    • duplex=False: 单向通信(conn1 发,conn2 收)。

连接对象方法

  • send(obj): 发送对象。

  • recv(): 接收对象(阻塞)。

  • poll([timeout]): 检查是否有数据可接收。

  • close(): 关闭连接。

import multiprocessing

def sender(conn):
    messages = ["Hello", "World", "from", "Sender"]
    for msg in messages:
        conn.send(msg)
    conn.close()

def receiver(conn):
    while True:
        if conn.poll(1):  # 等待1秒
            msg = conn.recv()
            print(f"Received: {msg}")
        else:
            print("No more messages.")
            break
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=sender, args=(parent_conn,))
    p2 = multiprocessing.Process(target=receiver, args=(child_conn,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    parent_conn.close()
    child_conn.close()

multiprocessing.Pool进程池,用于管理一组工作进程,简化并行任务的分配。

常用方法

  • __init__(processes, initializer, initargs): 创建进程池。

    • processes: 进程数量(默认为 CPU 核心数)。

  • apply(func, args): 阻塞式执行,立即在池中一个进程上运行 func(*args)

  • apply_async(func, args, callback, error_callback): 异步执行,立即返回 AsyncResult 对象。

  • map(func, iterable): 阻塞式,将 func 应用于 iterable 的每个元素,并返回结果列表。

  • map_async(func, iterable, callback, error_callback): 异步版本的 map

  • imap(func, iterable, chunksize): 返回迭代器,结果按顺序返回。

  • imap_unordered(func, iterable, chunksize): 返回迭代器,结果按完成顺序返回。

  • close(): 关闭池,不再接受新任务。

  • terminate(): 立即终止所有工作进程。

  • join(): 等待所有工作进程结束(必须在 close()terminate() 后调用)。

AsyncResult 对象方法

  • get([timeout]): 获取结果(阻塞)。

  • wait([timeout]): 等待结果可用。

  • ready(): 任务是否完成。

  • successful(): 任务是否成功完成(无异常)。

import multiprocessing
import time

def square(x):
    time.sleep(1)
    return x * x

if __name__ == "__main__":
    with multiprocessing.Pool(processes=4) as pool:
        # 同步 map
        result1 = pool.map(square, [1, 2, 3, 4, 5])
        print("map result:", result1)

        # 异步 map
        async_result = pool.map_async(square, [6, 7, 8])
        print("map_async result:", async_result.get())

        # apply_async
        result2 = pool.apply_async(square, (9,))
        print("apply_async result:", result2.get())

multiprocessing.Valuemultiprocessing.Array

用于在进程间共享简单的数值或数组数据。

  • Value(typecode_or_type, *args, lock=True): 共享单个值。

  • Array(typecode_or_type, size_or_initializer, lock=True): 共享数组。

需要使用锁(lock)来保证操作的原子性。

import multiprocessing

def increment(shared_value):
    for _ in range(100000):
        with shared_value.get_lock():  # 获取锁
            shared_value.value += 1

if __name__ == "__main__":
    # 共享整数,初始值为 0
    shared_num = multiprocessing.Value('i', 0)

    processes = []
    for _ in range(4):
        p = multiprocessing.Process(target=increment, args=(shared_value,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"Final value: {shared_value.value}")  # 应为 400000

进程同步原语

用于协调多个进程的执行。

  • multiprocessing.Lock: 互斥锁,确保同一时间只有一个进程访问临界区。

  • multiprocessing.RLock: 可重入锁。

  • multiprocessing.Semaphore(value): 信号量,控制同时访问资源的进程数量。

  • multiprocessing.Event: 事件,用于进程间发送信号(set() / wait())。

  • multiprocessing.Condition(lock): 条件变量,结合锁使用,实现更复杂的同步。

最佳时间与注意事项

  1. if __name__ == "__main__": 必须包含此语句,防止在 Windows 或使用 spawn 启动方法时出现无限递归。

  2. 进程开销: 进程创建和销毁开销较大,适合计算密集型任务。对于 I/O 密集型任务,考虑使用 asyncio 或线程。

  3. 进程间通信成本: IPC 比内存访问慢,尽量减少不必要的数据传递。

  4. 资源清理: 使用 join() 等待子进程结束,使用 close() 关闭队列/管道,使用上下文管理器(with)。

  5. 异常处理: 在子进程中妥善处理异常,避免进程意外终止。

  6. 序列化: 通过 QueuePipe 传递的对象必须是可序列化的(pickle 模块支持)。

  7. 启动方法: 使用 multiprocessing.get_start_method()multiprocessing.set_start_method() 选择合适的启动方法(fork, spawn, forkserver)。

相关模块

  • concurrent.futures: 提供更高层的接口(ProcessPoolExecutor),简化多进程编程。

  • asyncio: 用于异步 I/O 编程,适合高并发 I/O 操作。

最后更新于