全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-15 9 分钟 ✍️ juanwangdev

Node.js worker_threads模块

worker_threads 提供真正的多线程支持,适合 CPU 密集型任务。

基本概念

与 child_process 对比

特性worker_threadschild_process
内存共享内存独立内存
启动开销
通信SharedArrayBufferIPC 序列化
适用CPU密集型任务隔离

创建 Worker

基本用法

JavaScript
const { Worker } = require('worker_threads');

const worker = new Worker('./worker.js');

worker.on('message', (result) => {
  console.log('结果:', result);
});

worker.on('error', (err) => {
  console.error('错误:', err);
});

worker.on('exit', (code) => {
  if (code !== 0) {
    console.log('Worker 异常退出:', code);
  }
});

Worker 文件

JavaScript
// worker.js
const { parentPort } = require('worker_threads');

// 接收消息
parentPort.on('message', (data) => {
  // 处理任务
  const result = heavyCompute(data);
  parentPort.postMessage(result);
});

function heavyCompute(n) {
  let sum = 0;
  for (let i = 0; i < n; i++) {
    sum += i;
  }
  return sum;
}

数据传递

postMessage

JavaScript
// 主线程
const worker = new Worker('./worker.js');

worker.postMessage({ data: 'hello' });

worker.on('message', (msg) => {
  console.log('收到:', msg);
});

// Worker
parentPort.postMessage({ result: 'done' });

传递 Buffer

JavaScript
// 主线程
const buffer = Buffer.from('hello');
worker.postMessage(buffer);

// Worker 收到副本(非共享)
parentPort.on('message', (buffer) => {
  console.log(buffer.toString());
});

共享内存

SharedArrayBuffer

JavaScript
const { Worker } = require('worker_threads');

// 创建共享内存
const sharedBuffer = new SharedArrayBuffer(4 * 1024);
const sharedArray = new Int32Array(sharedBuffer);

// 传递共享内存
const worker = new Worker('./worker.js', {
  workerData: { shared: sharedArray }
});

// 主线程读写
sharedArray[0] = 100;

Worker 使用共享内存

JavaScript
// worker.js
const { workerData, parentPort } = require('worker_threads');

// 获取共享数据
const sharedArray = workerData.shared;

// 直接读写共享内存
sharedArray[0] = sharedArray[0] + 1;

// 不需要 postMessage 传递
parentPort.postMessage('done');

workerData

初始数据传递

JavaScript
const { Worker } = require('worker_threads');

const worker = new Worker('./worker.js', {
  workerData: {
    config: { threshold: 100 },
    input: [1, 2, 3, 4, 5]
  }
});

// worker.js
const { workerData } = require('worker_threads');

console.log('初始数据:', workerData);
const config = workerData.config;
const input = workerData.input;

MessageChannel

双向通信通道

JavaScript
const { Worker, MessageChannel } = require('worker_threads');

const { port1, port2 } = new MessageChannel();

const worker = new Worker('./worker.js');

// 发送端口给 Worker
worker.postMessage({ port: port2 }, [port2]);

// 主线程使用 port1
port1.on('message', (msg) => {
  console.log('port1 收到:', msg);
});

port1.postMessage('来自主线程');

Worker 接收端口

JavaScript
// worker.js
const { parentPort } = require('worker_threads');

parentPort.on('message', (msg) => {
  if (msg.port) {
    const port = msg.port;

    port.on('message', (data) => {
      console.log('port 收到:', data);
      port.postMessage('来自 Worker');
    });
  }
});

线程池实现

JavaScript
const { Worker } = require('worker_threads');
const path = require('path');

class ThreadPool {
  constructor(size, workerFile) {
    this.workers = [];
    this.queue = [];

    for (let i = 0; i < size; i++) {
      const worker = new Worker(workerFile);
      worker.on('message', (result) => {
        this.resolve(result);
      });
      this.workers.push({ worker, busy: false });
    }
  }

  run(data) {
    const available = this.workers.find(w => !w.busy);
    if (available) {
      available.busy = true;
      available.worker.postMessage(data);
    } else {
      this.queue.push(data);
    }
  }

  resolve(result) {
    console.log('结果:', result);
    const worker = this.workers.find(w => w.busy);
    if (worker) {
      worker.busy = false;
      if (this.queue.length) {
        worker.busy = true;
        worker.worker.postMessage(this.queue.shift());
      }
    }
  }
}

const pool = new ThreadPool(4, './worker.js');

其他 API

isMainThread

JavaScript
const { isMainThread } = require('worker_threads');

if (isMainThread) {
  console.log('主线程');
} else {
  console.log('Worker 线程');
}

threadId

JavaScript
const { threadId } = require('worker_threads');

console.log('线程ID:', threadId);

getEnvironmentData

JavaScript
const { setEnvironmentData, getEnvironmentData } = require('worker_threads');

// 主线程设置
setEnvironmentData('key', 'value');

// Worker 获取
const value = getEnvironmentData('key');

注意事项

  • Worker 不能访问主线程变量
  • 共享内存需注意并发同步问题
  • Worker 内可创建嵌套 Worker
  • 使用 Atomics 实现原子操作避免竞争

要点总结

  • new Worker(file) 创建线程
  • parentPort.postMessage() 发送消息
  • SharedArrayBuffer 实现共享内存
  • workerData 传递初始数据
  • 线程池管理多个 Worker

📝 发现内容有误?点击此处直接编辑

← 上一篇 Node.js cluster模块
下一篇 → Node.js 事件循环与线程池
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库