Python 线程同步机制
多线程访问共享资源需要同步机制,防止竞争条件和数据损坏。
Lock 基础锁
Python
import threading
lock = threading.Lock()
counter = 0
def increment():
global counter
for _ in range(100000):
lock.acquire() # 获取锁
try:
counter += 1
finally:
lock.release() # 释放锁
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # 1000000(正确)
不使用锁:
Python
counter = 0
def unsafe_increment():
global counter
for _ in range(100000):
counter += 1 # 竞争条件
threads = [threading.Thread(target=unsafe_increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # 结果不确定(可能小于1000000)
with 语法使用锁
Python
import threading
lock = threading.Lock()
def safe_operation():
with lock: # 自动获取和释放
# 临界区代码
pass
RLock 可重入锁
同一个线程可以多次获取锁。
Python
import threading
rlock = threading.RLock()
def outer():
with rlock:
print("outer 获取锁")
inner() # 同一线程可以再次获取
def inner():
with rlock: # RLock 允许同一线程多次获取
print("inner 获取锁")
outer()
# Lock 不允许同一线程多次获取,会阻塞
Semaphore 信号量
限制同时访问资源的线程数量。
Python
import threading
import time
semaphore = threading.Semaphore(3) # 最多3个线程同时访问
def worker(n):
with semaphore:
print(f"线程 {n} 开始工作")
time.sleep(2)
print(f"线程 {n} 完成")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
# 同时最多3个线程在工作
BoundedSemaphore 有界信号量
防止 release 超过初始值。
Python
import threading
sem = threading.BoundedSemaphore(2)
sem.acquire()
sem.acquire()
sem.release()
sem.release()
sem.release() # ValueError: Semaphore released too many times
Event 事件
线程间通信,等待事件触发。
Python
import threading
import time
event = threading.Event()
def waiter():
print("等待事件...")
event.wait() # 阻塞等待
print("事件触发,继续执行")
def trigger():
time.sleep(2)
print("触发事件")
event.set() # 触发事件
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=trigger)
t1.start()
t2.start()
t1.join()
t2.join()
Event 方法:
Python
event.set() # 触发事件,所有等待线程继续
event.clear() # 清除事件,后续 wait() 再次阻塞
event.wait(timeout) # 等待事件,可设置超时
event.is_set() # 检查事件是否已触发
Condition 条件变量
等待特定条件满足。
Python
import threading
condition = threading.Condition()
items = []
def consumer():
with condition:
while not items:
condition.wait() # 等待生产者通知
item = items.pop()
print(f"消费: {item}")
def producer():
with condition:
items.append('商品')
condition.notify() # 通知一个等待线程
# condition.notify_all() # 通知所有等待线程
t1 = threading.Thread(target=consumer)
t2 = threading.Thread(target=producer)
t1.start()
t2.start()
t1.join()
t2.join()
同步机制对比
| 类型 | 用途 | 特点 |
|---|---|---|
| Lock | 基础互斥 | 简单,不可重入 |
| RLock | 可重入互斥 | 同一线程可多次获取 |
| Semaphore | 资源限制 | 控制并发数量 |
| Event | 事件通知 | 一对多通信 |
| Condition | 条件等待 | 复杂同步逻辑 |
死锁预防
Python
import threading
lock_a = threading.Lock()
lock_b = threading.Lock()
def task1():
with lock_a:
print("task1 获取 lock_a")
with lock_b: # 按顺序获取
print("task1 获取 lock_b")
def task2():
with lock_a: # 同样顺序获取
print("task2 获取 lock_a")
with lock_b:
print("task2 获取 lock_b")
# 避免反向获取造成死锁
# 错误示例:
# def task2():
# with lock_b: # 反向获取
# with lock_a: # 可能死锁
要点总结
Lock基本互斥锁,确保资源独占访问RLock可重入锁,同一线程可多次获取Semaphore限制并发线程数量Event线程间事件通知通信Condition等待特定条件满足- 使用
with语法自动管理锁 - 遵循锁获取顺序,避免死锁
- 同步机制是并发编程的基础保障
📝 发现内容有误?点击此处直接编辑