全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-19 8 分钟 ✍️ juanwangdev

Python 线程安全数据结构

queue 模块提供线程安全的队列,内置锁机制,适合多线程数据交换。

Queue 基础队列

Python
import threading
import queue

q = queue.Queue(maxsize=10)  # 最大容量10

def producer():
    for i in range(5):
        q.put(i)  # 放入元素
        print(f"生产: {i}")

def consumer():
    while True:
        item = q.get()  # 取出元素
        print(f"消费: {item}")
        q.task_done()  # 标记任务完成

        if item == 4:
            break

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)

t1.start()
t2.start()

t1.join()
t2.join()

阻塞与非阻塞操作

Python
import queue

q = queue.Queue(maxsize=2)

# 阻塞操作
q.put(1)        # 阻塞直到有空位
q.put(2)        # 阻塞直到有空位
q.put(3, timeout=2)  # 等待2秒,超时抛 queue.Full

# 非阻塞操作
q.put_nowait(4)  # 无空位立即抛 queue.Full

# 获取元素
item = q.get()           # 阻塞直到有元素
item = q.get(timeout=2)  # 等待2秒,超时抛 queue.Empty
item = q.get_nowait()    # 无元素立即抛 queue.Empty

LifoQueue 后进先出

Python
import queue

q = queue.LifoQueue()

q.put(1)
q.put(2)
q.put(3)

print(q.get())  # 3(最后放入的)
print(q.get())  # 2
print(q.get())  # 1

PriorityQueue 优先级队列

Python
import queue

q = queue.PriorityQueue()

# 元素格式:(优先级, 数据)
q.put((3, '低优先级'))
q.put((1, '高优先级'))
q.put((2, '中优先级'))

print(q.get())  # (1, '高优先级')
print(q.get())  # (2, '中优先级')
print(q.get())  # (3, '低优先级')

自定义对象优先级:

Python
import queue

class Task:
    def __init__(self, priority, name):
        self.priority = priority
        self.name = name

    def __lt__(self, other):
        return self.priority < other.priority

q = queue.PriorityQueue()
q.put(Task(3, '低'))
q.put(Task(1, '高'))
q.put(Task(2, '中'))

print(q.get().name)  # 高
print(q.get().name)  # 中
print(q.get().name)  # 低

队列状态查询

Python
import queue

q = queue.Queue(maxsize=5)

for i in range(3):
    q.put(i)

print(q.qsize())    # 3(当前元素数)
print(q.empty())    # False
print(q.full())     # False

q.put(4)
q.put(5)
print(q.full())     # True

任务完成追踪

Python
import queue
import threading

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f"处理: {item}")
        q.task_done()  # 标记完成
        if item == 'stop':
            break

t = threading.Thread(target=worker)
t.start()

for item in ['a', 'b', 'c', 'stop']:
    q.put(item)

q.join()  # 等待所有任务完成
print("所有任务已完成")

SimpleQueue 简单队列

Python
import queue

q = queue.SimpleQueue()  # 无界、无 task_done 功能

q.put(1)
q.put(2)

print(q.get())  # 1

生产者-消费者模式

Python
import threading
import queue
import time

q = queue.Queue(maxsize=5)

def producer():
    for i in range(10):
        q.put(i)
        print(f"生产: {i}")
        time.sleep(0.1)

def consumer(name):
    while True:
        item = q.get()
        print(f"{name} 消费: {item}")
        q.task_done()
        time.sleep(0.2)

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=('消费者A',))
t3 = threading.Thread(target=consumer, args=('消费者B',))

for t in [t1, t2, t3]:
    t.start()

t1.join()
q.join()  # 等待队列清空

队列类型对比

队列类型特点适用场景
QueueFIFO任务队列
LifoQueueLIFO栈式处理
PriorityQueue按优先级任务调度
SimpleQueue无界简单轻量通信

要点总结

  • Queue 内置锁机制,线程安全
  • put() 放入,get() 取出,默认阻塞
  • put_nowait()get_nowait() 非阻塞版本
  • task_done() 标记任务完成,join() 等待清空
  • LifoQueue 后进先出,PriorityQueue 按优先级
  • 生产者-消费者模式是经典应用
  • 队列简化多线程数据交换,避免手动同步

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python 线程基础
下一篇 → Python 线程池 ThreadPoolExecutor
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库