Nodejs 构建Cluster多线程

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

Nodejs 构建Cluster多线程

SaraiNoQ   2022-10-23 我要评论

前言

前两天我们介绍了使用 Nodejs 中的 child_process 模块创建多个子进程,同时利用进程间通信的API构建了一个集群式的Web服务器。实际上,你可以通过 cluster 模块更方便的完成这一操作。

但是,cluster 创建的进程之间无法共享内存,通信必须使用 JSON 格式,有一定的局限性和性能问题。如果你不想要进程隔离,可以使用 worker_thread 模块,它允许在一个 Node.js 实例中运行多个应用程序线程。相比创建多个进程更轻量,并且可以共享内存。

进程间通过传输 ArrayBuffer 实例或共享 SharedArrayBuffer 实例来做到这一点,对数据格式没有太多要求。但是要注意,数据中不能包含函数。

Cluster 多进程

我们可以使用 cluster 模块提供的API重构昨天的案例:

// master.js
const cl = require("cluster");
const cpus = require("os").cpus().length;
// 修改默认的 fork() 方法配置
cl.setupPrimary({
  exec: 'worker.js'
});
for(let i = 0; i < cpus; i++) {
  cl.fork();
};
cl.on('listening', (data) => {
  console.log(`listenning on: ${data.id}--${data.process.pid}`);
});
cl.on('exit', (data, code, signal) => {
  console.log(`exited: ${data.id}--${data.process.pid}, kill code: $[code], signal: ${signal}`);
  cl.fork();
});

子进程依旧使用昨天的代码:

const http = require("http");
const server = http.createServer((req, res) => {
  res.writeHead(200, {
    "Content-Type": "text/plain"
  });
  res.end("Hello,World!" + process.pid);
  // 抛出异常,捕获后终止进程
  throw new Error('throw exception');
}).listen(1337);
// 捕获异常后终止进程
process.on('uncaughtException', (err) => {
  // 停止接收新的连接
  server.close((data) => {
    console.log(`worker: ${process.pid} is stopping!`);
    process.exit(1);
  })
  // 避免长连接请求长时间无法终止,5s后自动终止
  setTimeout(() => {
    process.exit(1);
  }, 5000)
});

执行 node master.js,会得到与昨天利用 child_process 模块创建子进程集群相同的效果。

同样,你可以使用官方推荐的写法,利用 cluster.isPrimary 和 cluster.isWorker 来判断当前进程是否为主进程:

const cluster = require('node:cluster');
const http = require('node:http');
const numCPUs = require('node:os').cpus().length;
const process = require('node:process');
if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);
  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }
  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(1337);
  console.log(`Worker ${process.pid} started`);
};

实现原理

事实上,cluster 模块就是将 child_processnet 模块的API组合起来实现的。cluster启动时,进程会在内部启动TCP服务器。而在调用 cluster.fork() 复制子进程时,会将这个TCP服务器端 Socket 的句柄发送给工作进程。如果进程是通过 cluster.fork() 复制出来的,那么它的环境变量里就存在 NODE_UNIQUE_ID。如果工作进程中存在 listen() 侦听网络端口的调用,它将拿到该句柄,再通过 SO_REUSEADDR 端口重用,从而实现多个子进程共享端口。对于正常方式启动的进程,则不存在句柄共享和传递等过程。

cluster 内部隐式创建TCP服务器的方式对使用者是透明的,你不需要自己手动去实现句柄的传递,但也正是因此,它无法像使用 child_process 那样灵活。在 child_process 中你可以自行控制句柄的传送,因此可以灵活地控制工作进程,甚至控制多组工作进程。

cluster事件

  • Event: disconnect 主进程和工作进程之间IPC通道断开后会触发该事件。
  • Event: exit 有工作进程退出时触发该事件。
  • Event: fork 复制一个工作进程后触发该事件。
  • Event: listening 工作进程中调用 listen() 后,发送该消息给主进程,主进程收到后,触发该事件。
  • Event: message
  • Event: online fork好一个工作进程后,工作进程主动发送该消息给主进程,主进程收到消息后,触发该事件。
  • Event: setup .setupPrimary() 方法执行后触发

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们