前言:nodejs 作为一个单线程的执行引擎,无法发挥多核处理器应有的性能。好在 0.6.0 之后的 nodejs 内置了 cluster 集群模块帮助使得 nodejs 可以充分利用处理器的计算性能
Cluster 测试环境 操作系统 : mac os 10.13.5 & win10 1803 64bit & ubuntu 16.04 LTS nodejs 版本 : v8.11.3
第一个 Cluster 应用例子 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 const cluster = require ('cluster' );const http = require ('http' );const numCPUs = require ('os' ).cpus().length;if (cluster.isMaster) { console .log("master start..." ); for (let i = 0 ; i < numCPUs; i++) { cluster.fork(); } cluster.on('listening' , function (worker, address ) { console .log('listening: worker ' + worker.process.pid + ', Address: ' + address.address + ":" + address.port); }); cluster.on('exit' , function (worker, code, signal ) { console .log('worker ' + worker.process.pid + ' died' ); }); } else { http.createServer(function (req, res ) { res.writeHead(200 ); res.end("hello world\n" ); }).listen(8000 ); }
现在运行 nodejs,工作进程之间将会共享 8000 端口,运行结果:
1 2 3 4 5 master start... listening: worker 847 , Address: null :8000 listening: worker 844 , Address: null :8000 listening: worker 846 , Address: null :8000 listening: worker 845 , Address: null :8000
该程序启动了 9 个 nodeJs 进程,其中一个 master 进程,8 个 worker 进程。此 8 个 worker 进程共同监听同一个端口 8000
nodeJs Cluster 的调度机制
循环法( 非 windows 操作系统 ):由主进程负责监听端口,接收新连接后再将连接循环分发给工作进程。在分发中使用了一些内置技巧防止工作进程任务过载。
主进程创建监听socket后发送给感兴趣的工作进程,由工作进程负责直接接收连接,理论上这种方式最好,但实际应用时,经常会发生负载不均衡的情况发生
Master 进程事件和方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 const cluster = require ('cluster' );const http = require ('http' );const numCPUs = require ('os' ).cpus().length;if (cluster.isMaster) { console .log("master start..." ); for (let i = 0 ; i < numCPUs; i++) { cluster.fork(); } cluster.on('fork' , worker => { console .log(`worker id: ${worker.id} is forked!` ); }); cluster.on('listening' , (worker, address ) => { console .log(`A worker: ${worker.id} is now connected to ${address.address} :${address.port} ` ); }); cluster.on('online' , worker => { console .log(`worker : ${worker.id} is running` ); }); cluster.on('message' , (worker, message, handle ) => { console .log(`[Master] [Message] : ${message} ` ) }); cluster.on('disconnect' , worker => { console .log(`The worker #${worker.id} has disconnected` ); }); cluster.on('exit' , (worker, code, signal ) => { console .log('worker %d died (%s). restarting...' , worker.process.pid, signal || code); cluster.fork(); }); } else if (cluster.isWorker) { http.createServer(function (req, res ) { res.writeHead(200 ); res.end("hello world" ); }).listen(5555 ); }
执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 master start... worker id : 1 is forked! worker id : 2 is forked! worker id : 3 is forked! worker id : 4 is forked! worker : 1 is running worker : 4 is running worker : 3 is running worker : 2 is running A worker: 1 is now connected to null :5555 A worker: 4 is now connected to null :5555 A worker: 3 is now connected to null :5555 A worker: 2 is now connected to null :5555
在上述事件中,fork 事件先触发,然后再触发 online 事件,最后触发 listening 事件
Worker 进程事件和方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 const cluster = require ('cluster' );const http = require ('http' );const numCPUs = require ('os' ).cpus().length;if (cluster.isMaster) { console .log('[master] ' + "start master..." ); for (let i = 0 ; i < numCPUs; i++) { let wk = cluster.fork(); wk.send('[master] ' + 'hi worker' + wk.id); } cluster.on('fork' , function (worker ) { console .log('[master] ' + 'fork: worker' + worker.id); }); cluster.on('online' , function (worker ) { console .log('[master] ' + 'online: worker' + worker.id); }); cluster.on('listening' , function (worker, address ) { console .log('[master] ' + 'listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port); }); cluster.on('disconnect' , function (worker ) { console .log('[master] ' + 'disconnect: worker' + worker.id); }); cluster.on('exit' , function (worker, code, signal ) { console .log('[master] ' + 'exit worker' + worker.id + ' died' ); }); cluster.on('message' , (worker, message, handle) => { console .log(`[master] [Message] : ${message} ` ) }); function eachWorker (callback ) { for (let id in cluster.workers) { callback(cluster.workers[id]); } } setTimeout(function ( ) { eachWorker(function (worker ) { worker.send('[master] ' + 'send message to worker' + worker.id); }); }, 3000 ); Object .keys(cluster.workers).forEach(function (id ) { cluster.workers[id].on('message' , function (msg ) { console .log('[master] ' + 'message ' + msg); }); }); } else if (cluster.isWorker) { console .log('[worker] ' + "start worker ..." + cluster.worker.id); process.on('message' , function (msg ) { console .log('[worker] ' + msg); process.send('[worker] worker' + cluster.worker.id + ' received!' ); }); http.createServer(function (req, res ) { res.writeHead(200 , {"content-type" : "text/html" }); res.end('worker' + cluster.worker.id + ',PID:' + process.pid); }).listen(5555 ); }
输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 [master] start master ...[master] fork : worker1 [master] fork : worker2 [master] fork : worker3 [master] fork : worker4 [master] online : worker4 [master] online : worker3 [master] online : worker1 [master] online : worker2 [worker] start worker ...4 [worker] [master] hi worker4 [worker] start worker ...3 [worker] start worker ...1 [master] [Message] : [worker] worker4 received ![master] message [worker] worker4 received ![worker] start worker ...2 [master] listening : worker4 ,pid :936 , Address :null :5555 [worker] [master] hi worker3 [master] [Message] : [worker] worker3 received ![master] message [worker] worker3 received ![worker] [master] hi worker1 [master] [Message] : [worker] worker1 received ![master] message [worker] worker1 received ![master] listening : worker3 ,pid :935 , Address :null :5555 [master] listening : worker1 ,pid :933 , Address :null :5555 [worker] [master] hi worker2 [master] [Message] : [worker] worker2 received ![master] message [worker] worker2 received ![master] listening : worker2 ,pid :934 , Address :null :5555
上述例子中:
cluster.isMaster 用于判断当前是否是主进程,cluster.isWorker 用于判断当前是否是子进程
主进程中 Worker 进程实例对象的 send 方法用于在主进程中向子进程发送信息
主进程 cluster 中的 message 方法可以监听所有 Worker 进程发送给主进程的消息
子进程实例的 message 方法可以在主进程中监听该 Worker 进程发送给主进程的消息
在子进程中,使用 process.send 可以给主进程发送消息
fork 事件的应用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 const cluster = require ('cluster' );const http = require ('http' );const numCPUs = require ('os' ).cpus().length;if (cluster.isMaster) { console .log("master start..." ); for (let i = 0 ; i < numCPUs; i++) { cluster.fork(); } const timeouts = []; function errorMsg ( ) { console .error('Something must be wrong with the connection ...' ); } cluster.on('fork' , (worker ) => { timeouts[worker.id] = setTimeout(errorMsg, 2000 ); }); cluster.on('listening' , (worker, address ) => { clearTimeout(timeouts[worker.id]); }); cluster.on('exit' , (worker, code, signal ) => { clearTimeout(timeouts[worker.id]); errorMsg(); }); } else { http.createServer(function (req, res ) { res.writeHead(200 ); res.end("hello world" ); }).listen(8000 ); }
由于 listening 和 exit 事件在 fork 事件触发之后才触发。所以我们可以用这个特性,来设置 Worker 进程的超时时间,若是一定时间内没能启动 Worker 进程,则抛出超时异常信息
Cluster worker 对象 ( 在工作进程中代表当前工作进程 ) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 const cluster = require ('cluster' );const http = require ('http' );const numCPUs = require ('os' ).cpus().length;if (cluster.isMaster) { console .log("master start..." ); for (let i = 0 ; i < numCPUs; i++) { cluster.fork(); } } else { console .log(`current worker id: ${cluster.worker.id} ` ); http.createServer(function (req, res ) { res.writeHead(200 ); res.end("hello world" ); }).listen(8000 ); }
在工作进程中,cluster.worker 就代表当前 worker 工作进程本身,上面例子输出:
1 2 3 4 5 master start... current worker id : 1 current worker id : 2 current worker id : 4 current worker id : 3
Cluster workers 对象 ( 在主进程中代表所有工作进程集合 ) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 const cluster = require ('cluster' );const http = require ('http' );const numCPUs = require ('os' ).cpus().length;if (cluster.isMaster) { console .log("master start..." ); for (let i = 0 ; i < numCPUs; i++) { cluster.fork(); } Object .keys(cluster.workers).forEach(id => { console .log(`cluster-workers-id: ${id} ` ); }) } else { http.createServer(function (req, res ) { res.writeHead(200 ); res.end("hello world" ); }).listen(8000 ); }
Cluster workers 对象,在主进程中代表所有工作进程集合,上面输出结果为:
1 2 3 4 5 master start... cluster-workers-id: 1 cluster -workers-id: 2 cluster -workers-id: 3 cluster -workers-id: 4
worker 进程事件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 const cluster = require ('cluster' );const http = require ('http' );const numCPUs = require ('os' ).cpus().length;if (cluster.isMaster) { console .log("master start..." ); let count = 0 ; for (let i = 0 ; i < numCPUs; i++) { let worker = cluster.fork(); worker.on('online' , () => { console .log(`worker id: ${worker.id} is online !` ) }); worker.on('disconnect' , () => { console .log(`worker id: ${worker.id} is disconnected !` ); }); worker.on('exit' , (code, signal ) => { if (signal) { console .log(`worker was killed by signal: ${signal} ` ); } else if (code !== 0 ) { console .log(`worker exited with error code: ${code} ` ); } else console .log('worker exit success!' ); }); worker.on('listening' , address => { console .log(`当前工作进程启动:workerId: ${worker.id} , address port: ${address.port} !` ); }); worker.on('message' , (msg, handle ) => { if (msg && msg.cmd && msg.cmd === 'notifyRequest' ) { count++; console .log(`当前请求量: ${count} ` ); } }); } } else if (cluster.isWorker) { http.createServer(function (req, res ) { res.writeHead(200 ); res.end("hello world" ); process.send({cmd: 'notifyRequest' }); }).listen(8000 ); }
上述例子输出结果:
1 2 3 4 5 6 7 8 9 master start.. . worker id: 1 is online ! worker id: 3 is online ! worker id: 2 is online ! 当前工作进程启动:workerId: 1, address port: 8000 ! worker id: 4 is online ! 当前工作进程启动:workerId: 3, address port: 8000 ! 当前工作进程启动:workerId: 2, address port: 8000 ! 当前工作进程启动:workerId: 4, address port: 8000 !
Worker 进程 send 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 const cluster = require ('cluster' );const http = require ('http' );const numCPUs = require ('os' ).cpus().length;if (cluster.isMaster) { console .log("master start..." ); for (let i = 0 ; i < 4 ; i++) { let worker = cluster.fork(); worker.send('hello world' ); } cluster.on('message' , (worker, message, handle ) => { console .log(`主进程接收到子进程消息:${message} ` ); }); } else if (cluster.isWorker) { process.on('message' , msg => { console .log(`子进程接收到主进程消息:${msg} ` ); process.send('received !' ); }); http.createServer(function (req, res ) { res.writeHead(200 ); res.end("hello world" ); }).listen(8000 ); }
Worker 进程的其他方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 const cluster = require ('cluster' );const http = require ('http' );const numCPUs = require ('os' ).cpus().length;if (cluster.isMaster) { console .log("master start..." ); for (let i = 0 ; i < numCPUs; i++) { let worker = cluster.fork(); } } else if (cluster.isWorker) { let worker = cluster.worker; console .log(worker.id); console .log(worker.isConnected()); console .log(worker.isDead()); http.createServer(function (req, res ) { res.writeHead(200 ); res.end("hello world" ); }).listen(8000 ); }
上面代码的执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 master start... 4 true false 1 true false 2 true false 3 true false
Worker 实例的 disconnect 事件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 const cluster = require ('cluster' ); const http = require ('http' ); if (cluster.isMaster) { console .log(cluster.settings); console .log("master start..." ); cluster.on ('exit' , (worker, code, signal) => { console .log(`worker id: ${worker.id}, ${worker.exitedAfterDisconnect} `) }); let worker1 = cluster.fork(); let worker2 = cluster.fork(); worker2.on ('listening' , () => { console .log('worker2 is listening' ); }); worker2.on ('disconnect' , () => { console .log('worker2 is disconnected !' ); }); worker2.on ('exit' , () => { console .log('worker2 is exited !' ); worker1.kill(); }); worker1.on ('disconnect' , () => { console .log('worker1 is disconnected !' ); }); worker1.on ('exit' , () => { console .log('worker1 is exited !' ); }); } else if (cluster.isWorker) { http.createServer(function (req, res) { console .log(`request enter, workerId : ${cluster.worker.id} `); res.writeHead(200 ); res.end("hello world" ); if (cluster.worker.id === 2 ) { throw new Error('test exitedAfterDisconnect' ); } }).listen(8000 ); }
说明:我们启动了 2 个 Wroker 进程,测试时我们访问 localhost:8080,当 worker.id 为 2 的工作进程接收到请求时,手动抛出异常,触发该工作进程的 disconnect 方法。上述代码执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 master start... worker2 is listening request enter, workerId: 1 request enter, workerId: 1 request enter, workerId: 2 /Cluster/ 10. worker disconnect 方法.js: 43 throw new Error('test exitedAfterDisconnect' ); ^ Error: test exitedAfterDisconnect at Server.<anonymous> (/Users/ shenyujie/svn/ myBlogs/code/ Koa2/Cluster/ 10. worker disconnect 方法.js: 43 :19 ) at emitTwo (events.js: 126 :13 ) at Server.emit (events.js: 214 :7 ) at parserOnIncoming (_http_server.js: 619 :12 ) at HTTPParser.parserOnHeadersComplete (_http_common.js: 112 :17 ) worker2 is disconnected ! worker2 is exited ! worker id: 2 , false request enter, workerId: 1
上述代码中,当工作进程 2 接收到用户请求并抛出异常后,我们发现 worker2 先是触发了 disconnected 事件端来数据连接,然后触发 exit 事件,退出了该工作进程
Worker 实例的 disconnect 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 const cluster = require ('cluster' );const http = require ('http' );if (cluster.isMaster) { console .log("master start..." ); cluster.on('exit' , (worker, code, signal ) => { console .log(`worker id: ${worker.id} , ${worker.exitedAfterDisconnect} ` ) }); let worker1 = cluster.fork(); let worker2 = cluster.fork(); worker1.on('listening' , () => { console .log(`worker id: ${worker1.id} ` ); setTimeout(() => { worker1.disconnect(); }, 5000 ) }); worker1.on('disconnect' , () => { console .log('worker1 is disconnected !' ); }); worker1.on('exit' , () => { console .log('worker1 is exited !' ); }); } else if (cluster.isWorker) { http.createServer(function (req, res ) { res.writeHead(200 ); res.end("hello world" ); }).listen(8000 ); }
上述代码中,我们在工作进程 1 启动 5s 后手动执行其 disconnect 方法,执行结果如下:
1 2 3 4 5 master start... worker id : 1 worker1 is disconnected ! worker1 is exited ! worker id : 1 , true
我们发现,手动执行 disconnected 方法后,该工作进程同样会先触发 disconnect 事件关闭数据连接,而后触发 exit 事件退出当前工作进程
Worker 实例的 kill 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 const cluster = require ('cluster' );const http = require ('http' );if (cluster.isMaster) { console .log("master start..." ); cluster.on('exit' , (worker, code, signal ) => { console .log(`worker id: ${worker.id} , ${worker.exitedAfterDisconnect} ` ) }); let worker1 = cluster.fork(); let worker2 = cluster.fork(); worker1.on('listening' , () => { console .log(`worker id: ${worker1.id} ` ); setTimeout(() => { worker1.kill(); }, 5000 ) }); worker1.on('disconnect' , () => { console .log('worker1 is disconnected !' ); }); worker1.on('exit' , () => { console .log('worker1 is exited !' ); }); } else if (cluster.isWorker) { http.createServer(function (req, res ) { res.writeHead(200 ); res.end("hello world" ); }).listen(8000 ); }
代码执行结果:
1 2 3 4 5 master start... worker id : 1 worker1 is disconnected ! worker1 is exited ! worker id : 1 , true
从执行结果来看,kill 方法和 disconnect 方法效果相同,区别是 disconnect 存在无法退出的情况,此时可以调用 kill 方法手动杀掉该进程
master 和 worker 中的 process 并不相同 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 const cluster = require ('cluster' );const http = require ('http' );if (cluster.isMaster) { console .log("master start..." ); var _process = process; for (let i = 0 ; i < 1 ; i++) { cluster.fork(); } } else if (cluster.isWorker) { console .log(process === _process); console .log(process === cluster.worker.process); console .log('--------------' ); http.createServer(function (req, res ) { res.writeHead(200 ); res.end("hello world" ); }).listen(8000 ); }