Node事件驱动架构

Node处理异步调用原始的方法叫做回调函数(callbacks)。

回调、Promises、Async/Await

很久之前,在Javascript原生支持promises和async/await特性之前。我们看看异步回调如何与事件循环一起工作的,但是在一般定义中回调仅仅是作为参数传入到另一个函数中的函数,当然之所以可以这么做,是因为函数在javascript是一等公民。

1
2
3
4
5
6
const doSomething = (cb) => {
//...
};
doSomething((err,data) => {
//...
});

这里有一点很重要,代码中回调函数并没有表明是一个异步调用。在前面的几个文章里,我们看到了一个函数是怎样同步、异步地调用回调并且讲解了用process.nextTick怎么避免出现那样的情况。我们来看一个用回调写的经典的异步函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const fs = require('fs');
const readFileAsArray = function(file, cb) {
fs.readFile(file, function(err, data) {
if(err) return cb(err);
const lines = data.toString().trim().split('\n');
cb(null, lines);
});
};
readFileAsArray('./numbers', (err,lines) => {
if(err) throw err;
const numbers = lines.map(Number);
const oddNumbers = numbers.filter(number => number % 2 ===1);
console.log('odd number count: ', oddNumbers.length);
});

readFileAsArray需要一个文件和一个回调,读文件,将buffer转成字符串,以回车符分离成数组穿传回调。假设我们现在有四个数字的一个文件,调用readFileAsArray时,把字符串转为数字,并且计算偶数个数。

简单的例子,让我们看到 回调中错误优先,并且我们把回调作为最后的参数传递给函数。这是写这类函数的法则,我们需要遵守。因为大家写Node都会那样写。因此不能把回调作为第一个参数传递,或者把定义中的cb(null,lines)去掉。

在现代Javascript中,我们有promise对象。Promises是替代回调的一个异步API。特点就是不需要把回调作为参数传入并且在同一个地方处理错误。一个promise对象允许我们单独处理成功和失败的场景,允许我们链式多个异步调用,而不是层级嵌套。下来用promise改写上面的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const fs = require('fs');
const readFileAsArray = function(file) {
return new Promise((resolve, reject) => {
fs.readFile(file, function(err, data) {
if(err) return reject(err);
const lines = data.toString().trim().split('\n');
resolve(lines);
});
});
};
readFileAsArray('./numbers')
.then(lines = > {
const numbers = lines.map(Number);
const oddNumbers = numbers.filter(number => number % 2 ===1);
console.log('odd numbers count: ', oddNumbers.length);
})
.catch(console.error);

.then给我们访问lines数组的入口,我们能够在这里处理它。要处理错误,我们有一个catch调用来打印错误。函数定义部分,我们用promise包裹了我们之前的代码段,promise对象暴露了两个函数,resolve和reject,接下来用reject调用替代cb(err),用resolve替代cb(null,lines),之前的cb参数定义,也不需要了。

官方对于异步调用的解决方案是回调,如果你想既有回调,又想有promise接口,许多流行的node包已经做了这些工作。非常简单,不要替换掉回调,保留它们并且增加一个promise调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
const fs = require('fs');
const readFileAsArray = function(file,cb=() => {}) {
return new Promise((resolve, reject) => {
fs.readFile(file, function(err, data) {
if(err) {
reject(err)
return cb(err);
}
const lines = data.toString().trim().split('\n');
resolve(lines);
cb(null, lines);
});
});
};
readFileAsArray('./numbers')
.then(lines = > {
const numbers = lines.map(Number);
const oddNumbers = numbers.filter(number => number % 2 ===1);
console.log('odd numbers count: ', oddNumbers.length);
})
.catch(console.error);
readFileAsArray('./numbers', (err,lines) => {
if(err) throw err;
const numbers = lines.map(Number);
const oddNumbers = numbers.filter(number => number % 2 ===1);
console.log('odd number count: ', oddNumbers.length);
});

这种情况下唯一要做的事是给这个回调参数设置一个默认值,为了代码被promise接口所使用。我们可以使用一个简单默认的空函数放在参数中,那么现在这个代码就可以既运行在promise接口,也可以运行在回调参数上。

当需要在一个异步函数中做循环的时候,增加一个promise接口使你的代码变得更加容易。如果是回调,就会变得很冗余,Promises改善了一点点,generators也做了一点改进。但是解决异步问题更加优雅的替代方案是async函数,这个函数允许像同步一样写异步代码,当我们需要在循环中处理事情时,使得易读性增强。用async/await来改写上面的例子

1
2
3
4
5
6
7
8
9
10
11
12
async function countOdd() {
try {
const lines = await readFileAsArray('./numbers');
const numbers = lines.map(Number);
const oddCount = numbers.filter(number => number % 2 ===1).length;
console.log('odd numbers count: ', oddCount);
} catch(err) {
console.error(err);
}
}
countOdd();

在普通的函数前面增加async,在内部使用了await,就像同步调用readFileAsArray一样,之后继续执行后面的代码。为了处理错误,我们需要用try/catch包裹代码段,因此不要忘记这点。要执行所有的代码,需要加上--harmony-async-await标志,

1
~ node --harmony-async-await async-promise.js

EventEmitter

EventEmitter是Node中促进对象间通信的模块。EventEmitter是Node异步事件驱动架构的核心(event-driven)。许多Node内置模块都是从EventEmitter继承而来。

概念很简单,emitter对象触发命名事件,导致监听函数被调用。一个emitter对象有两个特征,触发命名事件和注册监听函数。

1
2
3
4
5
const EventEmitter = require('events');
class Logger extends EventEmitter {}
const logger = new Logger();
logger.emit('event');
logger.on('event',listenerFunc);

触发一个事件是当某些条件发生时的一个信号,这个条件一般是触发对象的状态改变。我们用on方法增加了一个监听函数,一旦触发对象触发了关联的事件之后,这些监听函数就会被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const EventEmitter = require('events');
class WithLog extends EventEmitter {
execute(tashFunc) {
console.log('Before executing');
this.emit('begin');
taskFunc();
this.emit('end');
console.log('After executing');
}
}
const withLog = new WithLog();
withLog.on('begin', () => console.log('About to execute'));
withLog.on('end', () => console.log('Done with execute'));
withLog.execute(() => console.log('*** Executing task ***'));
/**
Before executing
About to execute
*** Executing task ***
Done with execute
After executing
**/

不要假设这是异步还是同步的代码,这很重要,因为如果说taskFunc是一个异步,我们可以用一个setTimeout来模拟而不用直接的函数代码,代码如下,现在看起来这行是异步的了,后面的两行也不再精确了。为了在一个异步函数执行完触发一个事件,我们可能得需要回调和promises的帮助了

1
2
3
4
5
6
7
8
9
10
11
12
withLog.execute(() => setTimeout(
() => console.log('*** Executing task ***'),
500
)
);
/**
Before executing
About to execute
Done with execute
After executing
*** Executing task ***
**/

使用事件而不是一般的回调的好处是我们能够对同一个信号通过定义多个监听函数做出多次反应。回调要完成同样的效果内部要做很多逻辑。事件对应用程序来说是一个很棒的方式来允许多个额外的插件在应用程序基础之上扩展功能。你可以把它们理解成钩子(hook),在状态改变的时候可以自定义做一些事情。下面用一个异步函数改写上面的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
const EventEmitter = require('events');
class WithTime extends EventEmitter {
execute(asyncFunc, ...args) {
console.time('execute');
this.emit('begin');
asyncFunc(...args, (err,data) => {
if(err) return this.emit('error', err);
this.emit('data', data);
console.timeEnd('execute');
this.emit('end');
});
}
}
const withTime = new WithTime();
withTime.on('begin', () => console.log('About to execute'));
withTime.on('end', () => console.log('Done with execute'));
withTime.execute(fs.readFile, __filename);
/**
About to execute
execute: 3.188ms
Done with execute
*/

参数,错误和监听顺序

在上面的例子中,有两个被触发的事件带着额外的参数,error触发时带了一个err对象,data事件触发时带了一个data对象。我们可以在命名事件之后传递我们想要的任何参数,这些参数在监听函数中都是可获得的。看下面的例子。error是个特殊的事件,就算我们不监听,遇到错误,Node就会自动退出。如果有监听函数,错误就会按照我们代码写的方式展示。Node也就不会崩溃或者异常退出。另一个处理异常的方法是为未捕获异常进程事件注册一个监听器。在未捕获异常我们有一个建议,我们应该让程序不管怎样都应该退出。

想象一下如果有多个错误事件发生,意思是uncaughtException监听器会多次触发,或许对我们在里面做的清理工作会是个问题。eventemitter模块暴露了一个once方法,这个once方法仅调用一次监听器,不是在每次发生都调用。在处理未捕获异常的时候,once最有实际作用的地方。

1
2
3
4
5
6
7
8
9
withTime.on('data', (data) => {
console.log(`Length: ${data.length}`);
});
withTime.on('error', console.log(err));
process.on('uncaughtException', (err) => {
console.log(err);
// do some cleanup
process.exit(1); // exit anyway
});

如果我们为一个同样的事件监听多个监听器,这些监听器将会按顺序被调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
withTime.on('data', (data) => {
console.log(`Length: ${data.length}`);
});
withTime.on('data', (data) => {
console.log(`Charaters: ${data.toString()}`);
});
withTime.prependListener('data', (data) => {
console.log(`Charaters: ${data.toString()}`);
});
// remove listener
withTime.removeListener('data', (data) => {
console.log(`Charaters: ${data.toString()}`);
})

如果你想让你之后的监听器首先被调用,你需要使用prependListener来实现。