# 场景

Node运行在单线程下,但这并不意味着无法利用多核/多机下多进程的优势

# 创建进程

通信方式与进程产生方式有关,而Node有4种创建的方式:spawn(),exec(), execFile()和fork()

# spawn

const { spawn } = require('child_process');
const child = spawn('pwd');
//带参数的形式
//const child = spawn('find', ['.', '-type', 'f']);

spawn()返回ChildProcess实例,ChildProcess同样基于事件机制(EventEmitter API),提供了一些事件:

  • exit:子进程退出时触发,可以得知进程退出状态(code和signal)
  • disconnect:父进程调用child.disconnect()触发
  • error:子进程创建失败,或被kill时触发
  • close:子进程的stdio流(标准输入输出流)关闭时触发
  • message:子进程通过process.send()发送消息时触发,父子进程之间通过这种内置的消息机制通信

可以通过child.stdin, child.stdout和child.stderr访问子进程的stdio流,这些流被关闭时,子进程会触发close事件

P.S.close和exit的区别主要体现在多进程共享同一stdio流的场景,某个进程退出了并不意味着stdio流被关闭了

在子进程中,stdout/stderr具有Readable特性,而stdin具有Writable特性,与主进程情况正好相反

const spawn = require('child_process').spawn;
const child = spawn('pwd')
// child.stdout.on('data', data => {
//     console.log(data.toString())
// })

child.stout.on('data', (data) => {
	console.log(`child stdout: \n${data}`);
})
child.stderr.on('data', (data) => {
	console.error(`child stderr:\n${data}`)
})

利用进程stdio流的管道特性,就可以完成更复杂的事情,例如:

const { spawn } = require('child_process');
const find = spawn('find', ['.', '-type', 'f']);
const wc = spawn('wc', ['-1']);
find.stdout.pipe(wc.stdin);
wc.stdout.on('data', (data) => {
	console.log(`Number of files ${data}`);
})

作用等价于find . -type f | wc -1 ,递归统计当前目录文件数量

# IPC选项

另外,通过spawn()方法的stdio选项可以建立IPC机制:

const { spawn } = require('child_process');
const child = spawn('node', ['./ipc-child.js'], {stdio: [null, null, null, 'ipc']});
child.on('message', (m) => {
	console.log(m);
})
child.send('Here Here');
process.on('message', (m) => {
	process.send(`< ${m}`);
    process.send('> 不要回答x3')
})

# exec

spawn()方法默认不会创建shell去执行传入的命令(所以性能上稍微好一点),而exec()方法会创建一个shell。另外,exec()不是基于stream的,而是把传入命令的执行结果暂存到buffer中,在整个传递给回调函数。

exec()方法的特点就是完全支持shell语法,可以直接传入任意shell脚本

const { exec } = require('child_process');
exec('find . -type f | wc -l',(err, stdout, stderr) => {
	if (err) {
    	console.error(`exec error: ${err}`);
        return;
    }
    console.log(`Number of files ${stdout}`);
})

但exec()方法也因此存在命令注入的安全风险,在含有用户输入等动态内容的场景要特别注意。所以,exec()方法的适用场景是:希望直接使用shell语法,并且预期输出数据量不大(不存在内存压力)

那么,有没有即支持shell语法,还具有stream IO优势的方式?

有!两全其美的方式如下

const { spawn } = require('child_process');
const child = spawn('find . -type f | wc -l', {
	shell: true
});
child.stdout.pipe(process.stdout);

开启spawn()的shell选项,并通过pipe()方法把子进程的标准输出简单地接到当前进程的标准输入上,以便看到命令执行结果。实际上还有更容易的方式

const { spawn } = require('child_process');
process.stdout.on('data', (data) => {
	console.log(data);
})
const child = spawn('find . -type f | wc -l', {
	shell: true,
    stdio: 'inherit'
})

stdio:'inherit'允许子进程继承当前进程的标准输入输出(共享stdin, stdout和stderr).所以上例能够通过监听当前进程process.stdout的data事件拿到子进程的输出结果

另外,除了stdio和shell选项,spawn()还支持一些其他选项。如:

const child = spawn('find . -type f | wc -l', {
	stdio:'inherit',
    shell: true,
    //修改环境变量,默认process.env
    env: { HOME: '/tmp/xxx' },
    //改变当前工作目录
    cwd: '/tmp', 
    //作为独立进程存在
    datached: true;
})

注意,env选项除了以环境变量形式向子进程传递数据外,还可以用来实现沙箱式的环境变量隔离,默认把process.env作为子进程的环境变量集,子进程与当前进程一样能够访问所有环境变量,如果像上例中指定自定义对象作为子进程的环境变量集,子进程就无法访问其他环境变量

所以,想要增/删环境变量的话,需要这样做

var spawn_env = JSON.parse(JSON.stringify(process.env));
//remove those env vars
delete spawn_env.ATOM_SHELL_INTERNAL_RUN_AS_NODE;
delete spawn_env.ELECTRON_RUNN_AD_NODE;
var sp = spawn(command, [.], [cwd: cwd, env: spawn_env])

detached选项更有意思

const { spawn } = require('child_process');
const child = spawn('node', ['stuff.js'], {
	detaached: true, 
    stdio: 'ignore'
})
child..unref();

以这种方式创建的独立进程行为取决有操作系统,windows上detached子进程将拥有自己的console窗口,而linux上该进程会创建新的process group(这个特性可以用来管理子进程族,实现类似于tree-kill特性)

unref方法用来断绝关系,这样父进程可以独立退出(不会导致子进程跟着退出),但要注意这时子进程的stdio也应该独立于父进程,否则父进程退出后子进程仍会收到影响

# execFile

const { execFile } = require('child_process');
const child = execFile('node', ['--version'], (err, stdout, stderr) => {
    if (error) {
        throw error;
    }
    console.log(stdout);
})

与exec()方法类似,但不通过shelll来执行(所以性能更好一些),所以要求传入可执行文件。window下某些文件无法直接执行,比如.bat和.cmd,这些文件就不能用execFile()来执行,只能借助exec()或开启了shell选项的spawn()

与exec()一样也不是基于stream,同样存在数据量风险

xxxSync

spawn, exec和execFile都有对应的同步阻塞版本,一直等到子进程退出

const {
    spawnSync,
    execSync,
    execFileSync
} = require('child_process')

同步方法用来简化脚本任务,比如启动流程,其他时候应该避免使用这些方法

# fork

fork()是spawn()的变体,用来创建Node进程,最大的特点是父子进程自带通信机制(IPC管道)。

例如:

var n = child_process.fork('./child.js');
n.on('message', function(n) {
    console.log('PARENT got message:' , m);
})
n.send({hello: 'world'})
// ./child.js
process.on('message', function(m) {
    console.log('CHILD go message', m);
})
process.send({ foo: 'bar'});

因为fork()自带通信机制的优势,尤其适合用来拆分耗时逻辑,例如:

const http = require('http');
const longComputation = () => {
    let sum = 0;
    for (let i = 0; i < 1e9; i++) {
        sum += i;
    };
    return sum;
}
const server = http.createServer();
server.on('request', (req, res) => {
    if (req.url === '/compute') {
    const sum = longComputation();
    return res.end(`Sum is ${sum}`);
  } else {
    res.end('Ok')
  }
})
server.listen(3000);

这样做的致命问题是一旦有人访问 /compute ,后续请求都无法及时处理,因为事件循环还被 longComputation 阻塞着,直到耗时计算结束才能恢复服务能力

为了避免耗时操作阻塞主进程的事件循环,可以把 longComputation() 拆分到子进程中:

// compute.js
const longComputation = () => {
  let sum = 0;
  for (let i = 0; i < 1e9; i++) {
    sum += i;
  };
  return sum;
};
// 开关,收到消息才开始做
process.on('message', (msg) => {
  const sum = longComputation();
  process.send(sum);
});

主进程开启子进程执行 longComputation :

const http = require('http');
const { fork } = require('child_process');
const server = http.createServer();
server.on('request', (req, res) => {
  if (req.url === '/compute') {
    const compute = fork('compute.js');
    compute.send('start');
    compute.on('message', sum => {
      res.end(`Sum is ${sum}`);
    });
  } else {
    res.end('Ok')
  }
});
server.listen(3000);

主进程的事件循环不会再被耗时计算阻塞,但进程数量还需要进一步限制,否则资源被进程消耗殆尽时服务能力仍会受到影响

P.S.实际上, cluster 模块就是对多进程服务能力的封装, 思路与这个简单示例类似

# 通信方式

# 通过stdin/stdout传递json

stdin/stdout annd a JSON payload

最直接的通信方式,拿到子进程的handle后,可以访问其stdio流,然后约定一种message格式开始愉快的通信

const { spawn } = require('child_process');
child = spawn('node', ['./stdio-child.js']);
child.stdout.setEncoding('utf8');
// 父进程-发
child.stdin.write(JSON.stringify({
    type: 'handshake', 
    payload: '你好呀'
}))
// 父进程-收
child.stdout.on('data', function(chunk) {
    let data = chunk.toString();
    let message = JSON.parse(data);
    console.log(`${message.type} ${message.payload}`);
})

子进程与之类似

// ./stdio-child.js
// 子进程-收
process.stdin.onn('data', (chunk) => {
    let data = chunk.toString();
    let message = JSON.parse(data);
    switch (message.type) {
        case 'handshake':
            // 子进程-发
            process.stdout.write(JSON.stringify({
                type: 'message',
                payload: message.payload + ' : hoho'
            }))
            break;
        default: 
            break;
    }
})

P.S.VS Code进程间通信就采用了这种方式 具体见 access electron API from vscode extension (opens new window)

明显的限制是需要拿到子进程的handle,两个完全独立的进程之间无法通过这种方式通信(比如跨应用,甚至跨机器的场景)。

P.S.关于stream及pipe的详细信息,请查看node中的流 (opens new window)

# 原生IPC支持

如spawn()及fork()的例子,进程之间可以借助内置的IPC机制通信

父进程 - process.on('message') 收 - child.send() 发 子进程 - process.on('message') 收 - process.send() 发 限制同上,同样要有一方能够拿到另一方的handle才行

# sockes

借助网络来完成进程间通信,不仅能跨进程,还能跨机器

node-ipc (opens new window)就是采用这种方案

// server
const ipc = require('../../../node-ipc');
ipc.config.id = 'world';
ipc.config.entry = 1500;
ipc.config.maxConnections = 1;
ipc.serveNet(
    function() {
        ipc.server.on(
            'message',
            function(data, socket) {
                ipc.log('got a message: ', data);
                ipc.server.emit(
                    socket,
                    'message',
                    data + ' world!'
                );
            }
        );
        ipc.server.on(
            'socket.disconnected',
            function(data, socket) {
                console.log('DISconnected\n\n', arguments);
            }
        )
    }
);
ipc.server.on(
    'error',
    function(err) {
        ipc.log('Got an ERROR!', err);
    }
)
ipc.server.start();
// client
const ipc = require('node-ipc');
ipc.config.id ='hello';
ipc.config.retry= 1500;
ipc.connectToNet(
    'world',
    function(){
        ipc.of.world.on(
            'connect',
            function(){
                ipc.log('## connected to world ##', ipc.config.delay);
                ipc.of.world.emit(
                    'message',
                    'hello'
                );
            }
        );
        ipc.of.world.on(
            'disconnect',
            function(){
                ipc.log('disconnected from world');
            }
        );
        ipc.of.world.on(
            'message',
            function(data){
                ipc.log('got a message from world : ', data);
            }
        );
    }
);

当然,单机场景下通过网络来完成进程间通信有些浪费性能,但网络通信的 优势 在于跨环境的兼容性与更进一步的RPC场景

# message queus

父子进程都通过外部消息机制来通信,跨进程的能力取决于MQ支持

即进程间不直接通信,而是通过中间层(MQ),加一个控制层就能获得更多灵活性和优势

  • 稳定下:消息机制提供了强大的稳定性保证,比如确认传达(消息回执ACK),失败重发、防止多发等。
  • 优先级控制:允许调整消息响应式次序
  • 离线能力:消息可以被缓存
  • 事务性消息处理:把关联消息组成事务,保证其传达顺序以及完成性

P.S.不好实现?包一层能解决嘛?不行就包两层....

比较受欢迎的有 smrchy/rsmq (opens new window) ,例如:

// init 
RedisSMQ = require('rsmq');
rsmq = new RedisSMQ({host: "127.0.0.1", port: 6379, ns: "rsmq"});
// create queue
rsmq.createQueue({qname: "myqueus"}, function(err, resp) {
    if (resp === 1) {
        console.log('queue created');
    }
})
// send message
rsmq.sendMessage({qname:"myqueue", message:"Hello World"}, function (err, resp) {
  if (resp) {
    console.log("Message sent. ID:", resp);
  }
});
// receive message
rsmq.receiveMessage({qname:"myqueue"}, function (err, resp) {
  if (resp.id) {
    console.log("Message received.", resp)  
  }
  else {
    console.log("No messages for me...")
  }
});

会起一个Redis server,基本原理如下:

Using a shared Redis server multiple Node.js processes can send / receive messages.

消息的收/发/缓存/持久化依靠Redis提供的能力,在次基础上实现完整的队列机制

# Redis

基本思路与message queue类似:

Use Redis as a message bus/broker.

Redis自带 Pub/Sub机制 (即发布-订阅模式),适用于简单的通信场景,比如一对一或一对多并且 不关注消息可靠性 的场景

另外,Redis有list结构,可以用作消息队列,以此提高消息可靠性。一般做法是生产者 LPUSH 消息,消费者 BRPOP 消息。适用于要求消息可靠性的简单通信场景,但缺点是消息不具状态,且没有ACK机制,无法满足复杂的通信需求

P.S.Redis的Pub/Sub示例见 What’s the most efficient node.js inter-process communication library/method?

# 总结

  • 通过stdin/stdout传递json: 最直接的方式,适用于你能够拿到子进城handle的场景,适用于关联进程之间的通信,无法跨机器
  • Node原生IPC支持:最native(地道?)的方法,比上一种正规一些,具有同样的局限性
  • 通过sockets:最通用的方式,有良好的跨环境能力,但存在网络的性能算好
  • 借助message queue:最强大的方式,既然要通信,场景还复杂,不妨扩展除一层消息中间件,漂亮的解决各种问题

# 资料

node进程管理 (opens new window)