多线程

多线程(Multithreading)是程序设计中实现并发的一种方式。在 Python 中,一个进程可以包含多个线程,这些线程共享进程的内存空间(如全局变量、堆内存),但拥有独立的栈空间和程序计数器。

Python 的多线程主要用于处理 I/O 密集型任务(如文件读写、网络请求、数据库操作),对于 CPU 密集型任务,由于全局解释器锁(GIL)的存在,多线程并不能有效利用多核 CPU 的优势,此时应考虑使用多进程(multiprocessing模块)。

核心模块

Python 提供了两个主要的多线程模块:

_thread(旧版/低级模块)

  • 是 Python 的低级线程模块。

  • 接口简单,但缺乏高级功能(如线程同步、线程池等)

  • 不推荐在生产环境中使用,主要用于学习或简单的脚本。

  • 注意:在 Python3 中,该模块名为_thread,Python2 中为``thread`

threading(推荐/高级模块)

  • 是 Python 的高级线程模块,构建在_thread之上。

  • 提供了丰富、更安全的接口,如Thread类、锁、条件变量、信号量、事件、定时器等。

  • 是进行 Python 多线程编程的首选模块。

创建和启动线程

方法一:通过函数创建线程

使用threading.Thread类,将目标函数作为target参数传入。

def worker(name, delay):
    """线程执行的函数"""
    for i in range(3):
        print(f"Thread {name}: {i}")
        time.sleep(delay)
    print(f"Thread {name} finished")


# 创建线程对象
thread1 = threading.Thread(target=worker, args=("A", 1))
thread2 = threading.Thread(target=worker, args=("B", 2))

# 启动线程
thread1.start()
thread2.start()

# 等待线程执行完毕
thread1.join()
thread2.join()

print("All threads finished")

方法二:通过继承 Thread 类

自定义一个类继承threading.Thread,并重写run()方法。

class WorkerThread(threading.Thread):
    def __init__(self, name, delay):
        super().__init__()
        self.name = name
        self.delay = delay

    def run(self):
        """线程启动后自动执行此方法"""
        for i in range(3):
            print(f"WorkerThread {self.name}: {i}")
            time.sleep(self.delay)
        print(f"WorkerThread {self.name} finished.")


# 创建并启动线程
t1 = WorkerThread("X", 1)
t2 = WorkerThread("Y", 1.5)

t1.start()
t2.start()

t1.join()
t2.join()

print("All WorkerThreads finished.")

线程同步

当多个线程访问共享资源时,必须进行同步,以防止数据竞争(Race Condition)和不一致。

Lock(互斥锁)

最简单的同步机制,确保同一事件只有一个线程可以被锁保护的代码块。

import threading

# 共享资源
counter = 0
lock = threading.Lock()  # 创建一个锁


def increment():
    global counter
    for _ in range(10000):
        lock.acquire()  # 获取锁
        try:
            counter += 1
        finally:
            lock.release()  # 释放锁
        # 更推荐使用上下文管理器(with语句)
        # with lock:
        # counter += 1


# 创建多个线程
threads = []
for i in range(5):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Final counter value: {counter}")  # 应为 500000

RLock(可重入锁)

Lock类似,但允许同一个线程多次获取同一个锁,而不会造成死锁。适用于递归调用或一个函数调用另一个需要相同锁的函数的场景。

lock = threading.RLock()

def outer_function():
    with lock:
        print("Outer function acquired lock")
        inner_function()

def inner_function():
    with lock:  # 同一个线程再次获取锁,RLock 允许
        print("Inner function acquired lock")

# 在一个线程中调用
t = threading.Thread(target=outer_function)
t.start()
t.join()

Condition(条件变量)

允许一个或多个线程等待某个特定条件成立,而另一个线程在条件成立时通知它们。通常与 LockRLock 一起使用。

import threading
import time

# 生产者-消费者模型示例
condition = threading.Condition()
items = []

def producer():
    global items
    for i in range(5):
        with condition:
            items.append(i)
            print(f"Produced item {i}")
            condition.notify()  # 通知一个等待的消费者
            # condition.notify_all()  # 通知所有等待的消费者
        time.sleep(1)

def consumer():
    global items
    while True:
        with condition:
            while len(items) == 0:  # 使用 while 而非 if,防止虚假唤醒
                print("Consumer waiting...")
                condition.wait()  # 等待通知
            item = items.pop(0)
            print(f"Consumed item {item}")
            if item >= 4:  # 消费到最后一个
                break
        time.sleep(0.5)

# 启动生产者和消费者
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)

p.start()
c.start()

p.join()
c.join()

Semaphore(信号量)

控制同时访问特定资源的线程数量。可以看作是一个计数器,acquire() 会减少计数,release() 会增加计数。当计数为 0 时,acquire() 会阻塞。

import threading
import time

# 模拟数据库连接池,最多允许 2 个连接
semaphore = threading.Semaphore(2)

def connect_to_db(thread_name):
    print(f"{thread_name} is trying to connect...")
    with semaphore:  # 获取信号量
        print(f"{thread_name} connected to DB.")
        time.sleep(2)  # 模拟数据库操作
        print(f"{thread_name} disconnected from DB.")

# 创建 5 个线程尝试连接
threads = []
for i in range(5):
    t = threading.Thread(target=connect_to_db, args=(f"Thread-{i}",))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Event(事件)

一种简单的线程间通信机制。一个线程可以设置一个事件(set()),其他线程可以等待这个事件(wait())。事件内部有一个标志,初始为 False

import threading
import time

event = threading.Event()

def waiter():
    print("Waiter is waiting for the event...")
    event.wait()  # 阻塞,直到事件被设置
    print("Waiter received the event!")

def setter():
    time.sleep(3)
    print("Setter is setting the event...")
    event.set()  # 设置事件,唤醒所有等待者

t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=waiter)
t3 = threading.Thread(target=setter)

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

主要线程方法

  • start(): 启动线程。调用后,线程开始执行 run() 方法。

  • run(): 线程的入口点。在 Thread 子类中应重写此方法。

  • join([timeout]): 等待线程执行完毕。主线程会阻塞,直到该线程结束。可以设置超时时间。

  • is_alive(): 返回线程是否还在运行。

  • getName() / setName(name): 获取/设置线程名称。

  • ident: 线程的标识符(ID),是一个整数。如果线程未启动或已结束,值为 None

守护线程(Daemon Threads)

  • 守护线程是“后台”线程。

  • 当程序中所有非守护线程(主线程和其他非守护线程)结束时,无论守护线程是否执行完毕,整个 Python 程序都会退出。

  • 通过设置 thread.daemon = True 或在创建时 Thread(daemon=True) 来指定。

  • 常用于执行周期性任务、监控、清理工作等。

import threading
import time

def daemon_task():
    while True:
        print("Daemon is working...")
        time.sleep(1)

d = threading.Thread(target=daemon_task, daemon=True)
d.start()

# 主线程执行其他任务
time.sleep(3)
print("Main thread finished.")

# 程序在此处退出,因为主线程结束,且 d 是守护线程

线程局部存储(threading.local)

threading.local 创建一个对象,其属性对每个线程都是独立的。每个线程都可以读写自己的副本,而不会影响其他线程。

import threading

# 创建线程局部对象
local_data = threading.local()

def process_student():
    # 每个线程设置自己的 student_id
    local_data.student_id = threading.current_thread().name
    print(f"Student ID in {threading.current_thread().name}: {local_data.student_id}")

t1 = threading.Thread(target=process_student, name="Alice")
t2 = threading.Thread(target=process_student, name="Bob")

t1.start()
t2.start()

t1.join()
t2.join()

全局解释器锁(GIL-Global Interpreter Lock)

  • 核心概念: GIL 是 CPython 解释器的一个互斥锁,它确保同一时刻只有一个线程执行 Python 字节码。

  • 影响:

    • 对于 CPU 密集型任务,多线程无法实现真正的并行计算,性能提升有限,甚至可能因线程切换开销而变慢。

    • 对于 I/O 密集型任务,当线程进行 I/O 操作(如等待网络响应)时,GIL 会被释放,允许其他线程运行,因此多线程在这种场景下非常有效。

  • 应对策略:

    • I/O 密集型:使用 threading 模块。

    • CPU 密集型:使用 multiprocessing 模块(利用多进程绕过 GIL)或使用 C 扩展(如 NumPy)在 C 层面释放 GIL。

最佳实践与注意事项

  1. 优先使用 threading 模块,而非 _thread

  2. 正确使用 join():如果需要确保线程完成后再继续,务必调用 join()

  3. 避免死锁:多个线程按相同顺序获取多个锁。使用超时或更高级的同步原语。

  4. 使用上下文管理器:对于 Lock, RLock, Condition 等,优先使用 with 语句,确保锁能被正确释放。

  5. 线程安全的数据结构:尽量使用线程安全的数据结构(如 queue.Queue),或对共享数据进行同步。

  6. 避免共享可变状态:设计程序时,尽量减少线程间的共享状态。

  7. 理解 GIL:明确你的任务类型,选择合适的并发模型(线程 vs 进程)。

  8. 调试困难:多线程程序的 bug(如竞态条件)通常难以复现和调试,需格外小心。

相关模块

  • queue: 提供了线程安全的队列(Queue, LifoQueue, PriorityQueue),是线程间通信的理想选择。

  • concurrent.futures: 提供了更高层次的接口(ThreadPoolExecutor),用于管理线程池,简化异步任务的执行。

  • multiprocessing: 用于实现多进程编程,可以绕过 GIL,真正利用多核 CPU。

最后更新于