多进程
Python 的多进程编程允许程序创建和管理多个独立的进程,以充分利用多核 CPU 的计算能力,实现真正的并行执行。与多线程不同,多进程不受全局解释器锁(GIL)的限制,每个进程拥有独立的 Python 解释器和内存空间。
核心概念
进程(Process):一个独立的程序执行实例,拥有独立的内存空间、文件描述符的等资源。
父进程与子进程:创建新进程的进程为父进程,被创建的进程称为子进程。子进程是父进程的副本,但可以执行不同的代码。
进程间通信(IPC):由于进程间内存隔离,需要通过特定机制(如管道、队列、共享内存)进行数据交换。
并发与并行:多进程可以实现真正的并行计算(多个 CPU 核心同时运行),二多线程在 Python 中通常知识兵法(GIL 限制)。
核心模块与类
multiprocessing.Process
创建和启动新进程的基本类。
常用方法
__init__(target, args, kwargs, name, daemon)
: 初始化进程对象。target
: 要在新进程中执行的函数。args
: 传递给target
函数的参数元组。kwargs
: 传递给target
函数的关键字参数字典。name
: 进程名称。daemon
: 是否为守护进程。守护进程会随主进程结束而终止。
start()
: 启动进程,调用target
函数。join([timeout])
: 阻塞主进程,等待该进程结束。可设置超时时间。is_alive()
: 检查进程是否仍在运行。terminate()
: 强制终止进程(不推荐,可能导致资源未释放)。kill()
: 向进程发送SIGKILL
信号(Unix)或TerminateProcess
(Windows)。close()
: 关闭进程对象,释放资源(Python 3.7+)。
import multiprocessing
import time
def worker(name, delay):
print(f"Worker {name} starting...")
time.sleep(delay)
print(f"Worker {name} finished.")
if __name__ == "__main__":
# 创建两个进程
p1 = multiprocessing.Process(target=worker, args=("A", 2), name="Worker-A")
p2 = multiprocessing.Process(target=worker, args=("B", 1), name="Worker-B")
# 启动进程
p1.start()
p2.start()
# 等待进程结束
p1.join()
p2.join()
print("All workers done.")
multiprocessing.Queue
进程安全的队列,用于进程间通信(FIFO)
常用方法
put(item)
: 将item
放入队列。可设置block
和timeout
。get()
: 从队列中取出一个item
。可设置block
和timeout
。empty()
: 检查队列是否为空。full()
: 检查队列是否已满。qsize()
: 返回队列大致大小(不精确,慎用)。
import multiprocessing
def producer(queue):
for i in range(5):
queue.put(f"Item-{i}")
print(f"Produced: Item-{i}")
queue.put("DONE") # 发送结束信号
def consumer(queue):
while True:
item = queue.get()
if item == "DONE":
break
print(f"Consumed: {item}")
if __name__ == "__main__":
q = multiprocessing.Queue()
p = multiprocessing.Process(target=producer, args=(q,))
c = multiprocessing.Process(target=consumer, args=(q,))
p.start()
c.start()
p.join()
c.join()
multiprocessing.Pipe
:创建一对连接的管道,用于双向通信。
函数
Pipe(duplex=True)
: 创建管道,返回(conn1, conn2)
两个连接对象。duplex=True
: 双向通信(两个连接都可收发)。duplex=False
: 单向通信(conn1
发,conn2
收)。
连接对象方法
send(obj)
: 发送对象。recv()
: 接收对象(阻塞)。poll([timeout])
: 检查是否有数据可接收。close()
: 关闭连接。
import multiprocessing
def sender(conn):
messages = ["Hello", "World", "from", "Sender"]
for msg in messages:
conn.send(msg)
conn.close()
def receiver(conn):
while True:
if conn.poll(1): # 等待1秒
msg = conn.recv()
print(f"Received: {msg}")
else:
print("No more messages.")
break
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=sender, args=(parent_conn,))
p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
parent_conn.close()
child_conn.close()
multiprocessing.Pool
进程池,用于管理一组工作进程,简化并行任务的分配。
常用方法
__init__(processes, initializer, initargs)
: 创建进程池。processes
: 进程数量(默认为 CPU 核心数)。
apply(func, args)
: 阻塞式执行,立即在池中一个进程上运行func(*args)
。apply_async(func, args, callback, error_callback)
: 异步执行,立即返回AsyncResult
对象。map(func, iterable)
: 阻塞式,将func
应用于iterable
的每个元素,并返回结果列表。map_async(func, iterable, callback, error_callback)
: 异步版本的map
。imap(func, iterable, chunksize)
: 返回迭代器,结果按顺序返回。imap_unordered(func, iterable, chunksize)
: 返回迭代器,结果按完成顺序返回。close()
: 关闭池,不再接受新任务。terminate()
: 立即终止所有工作进程。join()
: 等待所有工作进程结束(必须在close()
或terminate()
后调用)。
AsyncResult
对象方法
get([timeout])
: 获取结果(阻塞)。wait([timeout])
: 等待结果可用。ready()
: 任务是否完成。successful()
: 任务是否成功完成(无异常)。
import multiprocessing
import time
def square(x):
time.sleep(1)
return x * x
if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
# 同步 map
result1 = pool.map(square, [1, 2, 3, 4, 5])
print("map result:", result1)
# 异步 map
async_result = pool.map_async(square, [6, 7, 8])
print("map_async result:", async_result.get())
# apply_async
result2 = pool.apply_async(square, (9,))
print("apply_async result:", result2.get())
multiprocessing.Value
和 multiprocessing.Array
用于在进程间共享简单的数值或数组数据。
Value(typecode_or_type, *args, lock=True)
: 共享单个值。Array(typecode_or_type, size_or_initializer, lock=True)
: 共享数组。
需要使用锁(lock
)来保证操作的原子性。
import multiprocessing
def increment(shared_value):
for _ in range(100000):
with shared_value.get_lock(): # 获取锁
shared_value.value += 1
if __name__ == "__main__":
# 共享整数,初始值为 0
shared_num = multiprocessing.Value('i', 0)
processes = []
for _ in range(4):
p = multiprocessing.Process(target=increment, args=(shared_value,))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Final value: {shared_value.value}") # 应为 400000
进程同步原语
用于协调多个进程的执行。
multiprocessing.Lock
: 互斥锁,确保同一时间只有一个进程访问临界区。multiprocessing.RLock
: 可重入锁。multiprocessing.Semaphore(value)
: 信号量,控制同时访问资源的进程数量。multiprocessing.Event
: 事件,用于进程间发送信号(set()
/wait()
)。multiprocessing.Condition(lock)
: 条件变量,结合锁使用,实现更复杂的同步。
最佳时间与注意事项
if __name__ == "__main__":
必须包含此语句,防止在 Windows 或使用spawn
启动方法时出现无限递归。进程开销: 进程创建和销毁开销较大,适合计算密集型任务。对于 I/O 密集型任务,考虑使用
asyncio
或线程。进程间通信成本: IPC 比内存访问慢,尽量减少不必要的数据传递。
资源清理: 使用
join()
等待子进程结束,使用close()
关闭队列/管道,使用上下文管理器(with
)。异常处理: 在子进程中妥善处理异常,避免进程意外终止。
序列化: 通过
Queue
或Pipe
传递的对象必须是可序列化的(pickle
模块支持)。启动方法: 使用
multiprocessing.get_start_method()
和multiprocessing.set_start_method()
选择合适的启动方法(fork
,spawn
,forkserver
)。
相关模块
concurrent.futures
: 提供更高层的接口(ProcessPoolExecutor
),简化多进程编程。asyncio
: 用于异步 I/O 编程,适合高并发 I/O 操作。
最后更新于