Node.js 进程间通信
IPC(Inter-Process Communication)实现进程间数据交换。
IPC 机制
通信方式
| 方式 | 适用场景 | 特点 |
|---|---|---|
| 管道 IPC | child_process.fork | 内置,简单 |
| 共享内存 | worker_threads | 高效,需同步 |
| Socket | 跨机器通信 | 网络通信 |
| Redis | 共享状态 | 外部存储 |
fork 内置 IPC
JavaScript
const { fork } = require('child_process');
const child = fork('./worker.js');
// IPC 通道自动建立
child.send({ type: 'task', data: 'hello' });
child.on('message', (msg) => {
console.log('收到:', msg);
});
基本通信
父进程发送
JavaScript
const { fork } = require('child_process');
const child = fork('./worker.js');
// 发送字符串
child.send('hello');
// 发送对象
child.send({ id: 1, action: 'compute' });
// 发送 Buffer
child.send(Buffer.from('data'));
// 发送句柄(socket)
child.send(socket, { keepOpen: true });
子进程接收
JavaScript
// worker.js
process.on('message', (msg) => {
console.log('收到父进程消息:', msg);
// 处理后回复
const result = handleTask(msg);
process.send({ id: msg.id, result });
});
function handleTask(msg) {
return 'processed: ' + msg.data;
}
cluster IPC
Master 到 Worker
JavaScript
const cluster = require('cluster');
if (cluster.isMaster) {
const worker = cluster.fork();
worker.send({ type: 'config' });
worker.on('message', (msg) => {
console.log('Worker 响应:', msg);
});
}
Worker 到 Master
JavaScript
// Worker 进程中
process.on('message', (msg) => {
if (msg.type === 'task') {
const result = compute(msg.data);
process.send({ type: 'result', data: result });
}
});
广播消息
JavaScript
if (cluster.isMaster) {
// 广播到所有 Worker
const message = { type: 'broadcast', data: '通知' };
Object.values(cluster.workers).forEach((worker) => {
worker.send(message);
});
}
消息类型约定
结构化消息
JavaScript
// 定义消息协议
const MessageTypes = {
TASK: 'task',
RESULT: 'result',
ERROR: 'error',
HEARTBEAT: 'heartbeat',
SHUTDOWN: 'shutdown'
};
// 发送
child.send({
type: MessageTypes.TASK,
taskId: 1,
payload: { data: 'input' }
});
// 接收处理
process.on('message', (msg) => {
switch (msg.type) {
case MessageTypes.TASK:
handleTask(msg);
break;
case MessageTypes.SHUTDOWN:
gracefulShutdown();
break;
}
});
句柄传递
传递 Socket
JavaScript
const { fork } = require('child_process');
const net = require('net');
const child = fork('./worker.js');
const server = net.createServer();
server.listen(8000);
// 将连接传递给子进程
server.on('connection', (socket) => {
child.send('connection', socket);
});
子进程处理
JavaScript
// worker.js
process.on('message', (msg, socket) => {
if (msg === 'connection') {
socket.end(`由子进程 ${process.pid} 处理\n`);
}
});
共享状态方案
Redis 共享
JavaScript
const redis = require('redis');
const client = redis.createClient();
// 写入共享状态
await client.set('shared_key', JSON.stringify(data));
// 读取共享状态
const value = await client.get('shared_key');
数据库共享
JavaScript
// 使用数据库存储共享状态
await db.insert('shared_table', { key, value });
const result = await db.query('shared_table', { key });
心跳检测
JavaScript
// Master
const workers = {};
if (cluster.isMaster) {
const worker = cluster.fork();
workers[worker.id] = { lastHeartbeat: Date.now() };
// 定期检查心跳
setInterval(() => {
Object.entries(workers).forEach(([id, info]) => {
if (Date.now() - info.lastHeartbeat > 10000) {
cluster.workers[id].kill();
cluster.fork();
}
});
}, 5000);
}
// Worker 发送心跳
setInterval(() => {
process.send({ type: 'heartbeat' });
}, 3000);
注意事项
- IPC 数据自动 JSON 序列化
- 不能传递函数、DOM 对象
- 大数据传输效率低,考虑共享内存
- 句柄传递仅支持特定类型
要点总结
child.send()发送消息,process.on('message')接收- cluster 使用相同 IPC 机制
- 消息建议定义类型字段,结构化处理
- 句柄传递可共享 Socket
- 共享状态使用 Redis 或数据库
📝 发现内容有误?点击此处直接编辑