全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-19 8 分钟 ✍️ juanwangdev

Python 线程基础

threading 模块实现多线程编程,适合 I/O 密集型任务并发执行。

创建线程

Python
import threading
import time

def worker(name):
    print(f"线程 {name} 开始")
    time.sleep(2)
    print(f"线程 {name} 结束")

# 方式一:函数创建
t = threading.Thread(target=worker, args=('A',))
t.start()

# 方式二:类继承
class MyThread(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(f"线程 {self.name} 开始")
        time.sleep(2)
        print(f"线程 {self.name} 结束")

t2 = MyThread('B')
t2.start()

等待线程完成

Python
import threading

def task(n):
    import time
    time.sleep(n)
    print(f"任务完成,耗时 {n} 秒")

threads = []
for i in [1, 2, 3]:
    t = threading.Thread(target=task, args=(i,))
    threads.append(t)
    t.start()

# 等待所有线程完成
for t in threads:
    t.join()

print("所有线程已完成")

守护线程

Python
import threading
import time

def daemon_task():
    while True:
        print("守护线程运行中...")
        time.sleep(1)

def normal_task():
    time.sleep(3)
    print("正常任务完成")

# 守护线程:主线程结束时自动终止
d = threading.Thread(target=daemon_task)
d.daemon = True  # 设置为守护线程
d.start()

# 正常线程
n = threading.Thread(target=normal_task)
n.start()

# 主线程等待正常线程
n.join()
# 守护线程随之终止

线程属性

Python
import threading

def worker():
    import time
    time.sleep(5)

t = threading.Thread(target=worker)

print(t.name)      # Thread-1(默认名)
print(t.ident)     # None(启动前)
print(t.is_alive()) # False(启动前)

t.start()

print(t.ident)      # 线程ID(数字)
print(t.is_alive()) # True

# 自定义线程名
t2 = threading.Thread(target=worker, name='WorkerThread')
print(t2.name)      # WorkerThread

当前线程信息

Python
import threading

def show_info():
    current = threading.current_thread()
    print(f"当前线程: {current.name}")
    print(f"线程ID: {current.ident}")
    print(f"主线程: {threading.main_thread().name}")
    print(f"活跃线程数: {threading.active_count()}")

t = threading.Thread(target=show_info, name='Worker')
t.start()
t.join()

show_info()  # 主线程中调用

线程状态

Python
import threading
import time

def task():
    time.sleep(1)

t = threading.Thread(target=task)

# 状态检查
print(t.is_alive())  # False(未启动)

t.start()
print(t.is_alive())  # True

t.join()
print(t.is_alive())  # False(已完成)

线程传递参数

Python
import threading

def worker(a, b, c=None):
    print(f"a={a}, b={b}, c={c}")

# args:位置参数(元组)
t1 = threading.Thread(target=worker, args=(1, 2))
t1.start()

# kwargs:关键字参数(字典)
t2 = threading.Thread(target=worker, kwargs={'a': 1, 'b': 2, 'c': 3})
t2.start()

# 混合使用
t3 = threading.Thread(target=worker, args=(1,), kwargs={'b': 2, 'c': 3})
t3.start()

线程方法对比

方法说明
start()启动线程
join(timeout)等待线程完成
is_alive()检查线程是否存活
setName()设置线程名
getName()获取线程名

注意:直接调用 run() 方法不会创建新线程,只在当前线程执行。

要点总结

  • Thread(target=func, args=()) 创建线程
  • start() 启动线程,join() 等待完成
  • daemon=True 设置守护线程,主线程结束时终止
  • current_thread() 获取当前线程对象
  • active_count() 获取活跃线程数
  • 不要直接调用 run(),应使用 start()
  • 线程适合 I/O 密集型任务,CPU 密集型受 GIL 限制

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python 线程同步机制
下一篇 → Python 线程安全数据结构
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库