多线程
多线程(Multithreading)是程序设计中实现并发的一种方式。在 Python 中,一个进程可以包含多个线程,这些线程共享进程的内存空间(如全局变量、堆内存),但拥有独立的栈空间和程序计数器。
Python 的多线程主要用于处理 I/O 密集型任务(如文件读写、网络请求、数据库操作),对于 CPU 密集型任务,由于全局解释器锁(GIL)的存在,多线程并不能有效利用多核 CPU 的优势,此时应考虑使用多进程(multiprocessing模块)。
核心模块
Python 提供了两个主要的多线程模块:
_thread(旧版/低级模块)
是 Python 的低级线程模块。
接口简单,但缺乏高级功能(如线程同步、线程池等)
不推荐在生产环境中使用,主要用于学习或简单的脚本。
注意:在 Python3 中,该模块名为
_thread,Python2 中为``thread`
threading(推荐/高级模块)
是 Python 的高级线程模块,构建在
_thread之上。提供了丰富、更安全的接口,如
Thread类、锁、条件变量、信号量、事件、定时器等。是进行 Python 多线程编程的首选模块。
创建和启动线程
方法一:通过函数创建线程
使用threading.Thread类,将目标函数作为target参数传入。
def worker(name, delay):
"""线程执行的函数"""
for i in range(3):
print(f"Thread {name}: {i}")
time.sleep(delay)
print(f"Thread {name} finished")
# 创建线程对象
thread1 = threading.Thread(target=worker, args=("A", 1))
thread2 = threading.Thread(target=worker, args=("B", 2))
# 启动线程
thread1.start()
thread2.start()
# 等待线程执行完毕
thread1.join()
thread2.join()
print("All threads finished")方法二:通过继承 Thread 类
自定义一个类继承threading.Thread,并重写run()方法。
class WorkerThread(threading.Thread):
def __init__(self, name, delay):
super().__init__()
self.name = name
self.delay = delay
def run(self):
"""线程启动后自动执行此方法"""
for i in range(3):
print(f"WorkerThread {self.name}: {i}")
time.sleep(self.delay)
print(f"WorkerThread {self.name} finished.")
# 创建并启动线程
t1 = WorkerThread("X", 1)
t2 = WorkerThread("Y", 1.5)
t1.start()
t2.start()
t1.join()
t2.join()
print("All WorkerThreads finished.")线程同步
当多个线程访问共享资源时,必须进行同步,以防止数据竞争(Race Condition)和不一致。
Lock(互斥锁)
最简单的同步机制,确保同一事件只有一个线程可以被锁保护的代码块。
import threading
# 共享资源
counter = 0
lock = threading.Lock() # 创建一个锁
def increment():
global counter
for _ in range(10000):
lock.acquire() # 获取锁
try:
counter += 1
finally:
lock.release() # 释放锁
# 更推荐使用上下文管理器(with语句)
# with lock:
# counter += 1
# 创建多个线程
threads = []
for i in range(5):
t = threading.Thread(target=increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Final counter value: {counter}") # 应为 500000RLock(可重入锁)
与Lock类似,但允许同一个线程多次获取同一个锁,而不会造成死锁。适用于递归调用或一个函数调用另一个需要相同锁的函数的场景。
lock = threading.RLock()
def outer_function():
with lock:
print("Outer function acquired lock")
inner_function()
def inner_function():
with lock: # 同一个线程再次获取锁,RLock 允许
print("Inner function acquired lock")
# 在一个线程中调用
t = threading.Thread(target=outer_function)
t.start()
t.join()Condition(条件变量)
允许一个或多个线程等待某个特定条件成立,而另一个线程在条件成立时通知它们。通常与 Lock 或 RLock 一起使用。
import threading
import time
# 生产者-消费者模型示例
condition = threading.Condition()
items = []
def producer():
global items
for i in range(5):
with condition:
items.append(i)
print(f"Produced item {i}")
condition.notify() # 通知一个等待的消费者
# condition.notify_all() # 通知所有等待的消费者
time.sleep(1)
def consumer():
global items
while True:
with condition:
while len(items) == 0: # 使用 while 而非 if,防止虚假唤醒
print("Consumer waiting...")
condition.wait() # 等待通知
item = items.pop(0)
print(f"Consumed item {item}")
if item >= 4: # 消费到最后一个
break
time.sleep(0.5)
# 启动生产者和消费者
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start()
c.start()
p.join()
c.join()Semaphore(信号量)
控制同时访问特定资源的线程数量。可以看作是一个计数器,acquire() 会减少计数,release() 会增加计数。当计数为 0 时,acquire() 会阻塞。
import threading
import time
# 模拟数据库连接池,最多允许 2 个连接
semaphore = threading.Semaphore(2)
def connect_to_db(thread_name):
print(f"{thread_name} is trying to connect...")
with semaphore: # 获取信号量
print(f"{thread_name} connected to DB.")
time.sleep(2) # 模拟数据库操作
print(f"{thread_name} disconnected from DB.")
# 创建 5 个线程尝试连接
threads = []
for i in range(5):
t = threading.Thread(target=connect_to_db, args=(f"Thread-{i}",))
threads.append(t)
t.start()
for t in threads:
t.join()Event(事件)
一种简单的线程间通信机制。一个线程可以设置一个事件(set()),其他线程可以等待这个事件(wait())。事件内部有一个标志,初始为 False。
import threading
import time
event = threading.Event()
def waiter():
print("Waiter is waiting for the event...")
event.wait() # 阻塞,直到事件被设置
print("Waiter received the event!")
def setter():
time.sleep(3)
print("Setter is setting the event...")
event.set() # 设置事件,唤醒所有等待者
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=waiter)
t3 = threading.Thread(target=setter)
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()主要线程方法
start(): 启动线程。调用后,线程开始执行run()方法。run(): 线程的入口点。在Thread子类中应重写此方法。join([timeout]): 等待线程执行完毕。主线程会阻塞,直到该线程结束。可以设置超时时间。is_alive(): 返回线程是否还在运行。getName()/setName(name): 获取/设置线程名称。ident: 线程的标识符(ID),是一个整数。如果线程未启动或已结束,值为None。
守护线程(Daemon Threads)
守护线程是“后台”线程。
当程序中所有非守护线程(主线程和其他非守护线程)结束时,无论守护线程是否执行完毕,整个 Python 程序都会退出。
通过设置
thread.daemon = True或在创建时Thread(daemon=True)来指定。常用于执行周期性任务、监控、清理工作等。
import threading
import time
def daemon_task():
while True:
print("Daemon is working...")
time.sleep(1)
d = threading.Thread(target=daemon_task, daemon=True)
d.start()
# 主线程执行其他任务
time.sleep(3)
print("Main thread finished.")
# 程序在此处退出,因为主线程结束,且 d 是守护线程线程局部存储(threading.local)
threading.local 创建一个对象,其属性对每个线程都是独立的。每个线程都可以读写自己的副本,而不会影响其他线程。
import threading
# 创建线程局部对象
local_data = threading.local()
def process_student():
# 每个线程设置自己的 student_id
local_data.student_id = threading.current_thread().name
print(f"Student ID in {threading.current_thread().name}: {local_data.student_id}")
t1 = threading.Thread(target=process_student, name="Alice")
t2 = threading.Thread(target=process_student, name="Bob")
t1.start()
t2.start()
t1.join()
t2.join()全局解释器锁(GIL-Global Interpreter Lock)
核心概念: GIL 是 CPython 解释器的一个互斥锁,它确保同一时刻只有一个线程执行 Python 字节码。
影响:
对于 CPU 密集型任务,多线程无法实现真正的并行计算,性能提升有限,甚至可能因线程切换开销而变慢。
对于 I/O 密集型任务,当线程进行 I/O 操作(如等待网络响应)时,GIL 会被释放,允许其他线程运行,因此多线程在这种场景下非常有效。
应对策略:
I/O 密集型:使用
threading模块。CPU 密集型:使用
multiprocessing模块(利用多进程绕过 GIL)或使用 C 扩展(如 NumPy)在 C 层面释放 GIL。
最佳实践与注意事项
优先使用
threading模块,而非_thread。正确使用
join():如果需要确保线程完成后再继续,务必调用join()。避免死锁:多个线程按相同顺序获取多个锁。使用超时或更高级的同步原语。
使用上下文管理器:对于
Lock,RLock,Condition等,优先使用with语句,确保锁能被正确释放。线程安全的数据结构:尽量使用线程安全的数据结构(如
queue.Queue),或对共享数据进行同步。避免共享可变状态:设计程序时,尽量减少线程间的共享状态。
理解 GIL:明确你的任务类型,选择合适的并发模型(线程 vs 进程)。
调试困难:多线程程序的 bug(如竞态条件)通常难以复现和调试,需格外小心。
相关模块
queue: 提供了线程安全的队列(Queue,LifoQueue,PriorityQueue),是线程间通信的理想选择。concurrent.futures: 提供了更高层次的接口(ThreadPoolExecutor),用于管理线程池,简化异步任务的执行。multiprocessing: 用于实现多进程编程,可以绕过 GIL,真正利用多核 CPU。
最后更新于