Nodejs
Overview
Node.js® is an open-source, cross-platform JavaScript runtime environment.
- 浏览器因为安全问题,限制 JS 的能力(执行的 JS 是从远程的服务器拉过来的)
- Nodejs 开发 OS 程序的能力给 JS,让JavaScript成为一种通用的编程语言
JS应用领域
- 开发网页(浏览器)
- 服务器(Nodejs)
- 命令行工具(Nodejs),比如npm
基于OS平台开发的能力
- 网络
- 内存(string、集合、new)
- 文件(底层:文件描述符、上层 API buffer)
- 工具:
path
/url
/buffer
/集合/加密/协议(http/s
)/压缩/内存/string(byte array)
(其它golang/python stdlib都是类似) - etc
Dependencies核心依赖:
- V8 JavaScript engine:解析和执行 js 代码
- libuv :事件循环
程序分类:IO 密集型程序(文件、网络、数据库); CPU密集型程序
1 + 1;
: CPU密集型Consolo.log("hello world");
: IO密集型 Nodejs
JS单线程
Node.js® is an JavaScript runtime environment.
npm 命令的代码如下:
node require('../lib/cli.js')(process)
node 是一个二进制程序
理解 js 所谓的单线程:
- 浏览器是单线程吗? Google Chrome Helper(Renderer) 进程实际开了26个线程(查看电脑上的进程状态)
- nodejs是单线程吗?
- 写一个
hello world
Demo程序测试,Nodejs 实际上开了 7个线程 (查看电脑上的进程状态) - 比如
fs
文件的操作系统OS层面本身就是阻塞的,底层 OS 就不支持,Nodejs 内部是通过线程池来实现的 (File System 文档中有描述threadpool)
- 写一个
结论:JS 程序、进程本身是多线程的;只是执行到 JS 代码部分、或是说回调到 JS 代码的部分是在同一个线程。所以其它线程在为这一个线程服务,所以严格意义上讲
写 JS 代码和 C 代码在流程控制上的区别:
- c语言: 程序的执行流程自己控制
- js语言: 程序的执行流程不是自己控制:循环 -->
回调
你写的 js 代码
底下干活 26个线程、7个线程
事件与 EventEmitter
在 Note 中,发送事件的都是 EventEmitter 或其子类的实例;让我们使用 on()
方法来监听事件,使用 emit()
方法来触发事件。
emitter.addListener(eventName, listener)
:添加一个监听器;on
的别名emitter.on(eventName, listener)
:添加一个监听器emitter.emit(eventName[, ...args])
:触发事件emitter.removeListener(eventName, listener)
:移除一个监听器;off
的别名emitter.off(eventName, listener)
:移除一个监听器emitter.once(eventName, listener)
:添加一个单次监听器
var events = require('events');
var em = new events.EventEmitter();
em.on('FirstEvent', function (data) {
console.log('First subscriber: ' + data);
});
em.emit('FirstEvent', 'This is my first Node.js event emitter example.');
事件注册回调的特点:
- 顺序调用:按注册时间的顺序
- 同步调用:同步调用完上面所有的注册回调,同步执行完在返回
回调和this问题:
一般都会用剪头函数做为回调;剪头函数会使用定义它们的上下文中的this值
Buffer 缓冲区
二进制数据操作:网络、文件、其它
- 继承自 Uint8Array
- 可以和字符串相互转化
let b = Buffer.from([0x41, 0x42, 0x43]); // <Buffer 41 42 43>
b.toString(); // "ABC" 默认utf8
b.toString('hex'); // "414243" 16进制
let computer = Buffer.from('IBM3111', 'ascii'); // <Buffer 49 42 4d 33 31 31 31>
for (let i = 0; i < computer.length; i++) {
computer[i]--;
}
computer.toString('ascii'); // "HAL2000"
computer
.subarray(0, 3)
.map((x) => x + 1)
.toString(); // "IBM"
let zeros = Buffer.alloc(1024); // 1024个0
let ones = Buffer.alloc(128, 1); // 128个1
let dead = Buffer.alloc(1024, 'DEADBEEF', 'hex'); // 1024个DEADBEEF(16进制, 总共1024,重复模式)
// read
dead.readUInt32BE(0); // 0xDEADBEEF
dead.readUInt32BE(1); // 0xADBEEFDE
dead.readBigUInt64BE(6); // 0xBEEFDEADBEEFDEADn
dead.readUInt32LE(1020); // 0xEFBEADDE
// write
// ...
流Stream
场景:处理大量数据,不需要一次性全部加载到内存中、也不需要等全部读写完才能进行后续操作,而是分批次处理。基于流的算法,其本质就是把数据分割成小块,内存中不会保存全部的数据,而是保存当前处理的数据块,使得内存的利用率更高。Nodejs 的网络 API 是基于流的,Nodejs 的文件系统模块也定义了流 API 用于读写文件。
Note:对实际常用场景的抽象处理,比如:文件读写、网络通信等。
- 接收数据 -> 管道-> 处理
- 生产者 - 消费者模型
- Java: Stream
- Rxjava: Data streaming
- 函数式编程
Node 中的流分为四种类型:
- 可读流:从数据源读取数据
- 可写流:向目标写入数据
- 双工流:可读可写
- 转换流:可读可写,但输出的数据与输入的数据不一样
管道
文件通过管道pipe写入到Socket:
import { Socket } from 'net';
import fs from 'fs';
function pipeFileToSocket(filename: string, socket: Socket) {
fs.createReadStream(filename).pipe(socket);
}
下面通过管道把一个流导向另一个流,并在完成或发生错误时调用一个回调:
import fs, { ReadStream, WriteStream } from 'fs';
function pipe(readable: ReadStream, writable: WriteStream, callback: (error?: Error) => void) {
function handleError(error: Error) {
readable.close();
writable.close();
callback(error);
}
readable
.on('error', handleError) // 错误处理
.pipe(writable)
.on('error', handleError) // 错误处理
.on('finish', callback); // 结束处理
}
管道配置转换流一起使用,可以创建多个流的传输管理:
import fs from 'fs';
import zlib from 'zlib';
function gzip(filename: string, callback: (error?: Error) => void) {
let source = fs.createReadStream(filename);
let destination = fs.createWriteStream(filename + '.gz');
let gzipper = zlib.createGzip();
source
.on('error', callback)
.pipe(gzipper) // 转换流
.pipe(destination)
.on('error', callback)
.on('finnish', callback);
}
实现自已的Transform转换流
// TODO: https://nodejs.org/api/stream.html#class-streamtransform
异步迭代
Node 12及之后,可读流是异步迭代器,意味着在async函数中可以使用for/await
循环从流中读取字符串或Buffer块
async function grep(
source: NodeJS.ReadStream,
destination: NodeJS.WriteStream,
pattern: RegExp,
encoding: BufferEncoding = 'utf8'
) {
source.setEncoding(encoding);
destination.on('error', (error) => process.exit(1));
let incompleteLine: string | undefined = '';
// 使用 for/await 循环异步从输入流读取块
for await (let chunk of source) {
let lines: string[] = (incompleteLine + chunk).split('\n');
incompleteLine = lines.pop();
for (let line of lines) {
if (pattern.test(line)) {
destination.write(line + '\n', encoding);
}
}
}
if (incompleteLine && pattern.test(incompleteLine)) {
destination.write(incompleteLine + '\n', encoding);
}
}
// call
let pattern = new RegExp(process.argv[2]);
grep(process.stdin, process.stdout, pattern) // error handle
.catch((error) => {
console.error(error);
process.exit(1);
});
写入流及背压处理(两边不平衡问题)
write()
方法有个重要的返回值:
- 如果缓冲区未满,返回true
- 如果缓冲区满了,返回false,
write()
方法返回false值是一种背压 backpressure 的表现。背压是一种消息,表示你向流中写入数据的速度超过了它的处理能力。返回false之后应该停止继续调用 write()
,等到流发出 drain
事件(缓冲区数据耗尽),表示缓冲区又有空间了。
自已手动处理背压backpressure:
import { WriteStream } from 'fs';
function write(stream: WriteStream, chunk: Buffer, callback: () => void) {
let hasMoreRoom = stream.write(chunk);
// 检查 write() 方法返回值
if (hasMoreRoom) {
setImmediate(callback); // 立马调用callback
} else {
stream.once('drain', callback); // 等缓冲区 “耗尽/清空” 之后调用callback
}
}
Note:当使用pipe()
时,Node会自动的为你处理背压
如果程序里面使用async 和 await,将可读流作为异步迭代器使用,来处理背压backpressure:
import { ReadStream, WriteStream } from 'fs';
function write(stream: WriteStream, chunk: Buffer) {
let hasMoreRoom = stream.write(chunk);
if (hasMoreRoom) {
return Promise.resolve(null);
} else {
// drain事件后,返回
return new Promise((resolve) => {
stream.once('drain', resolve);
});
}
}
async function copy(source: ReadStream, destination: WriteStream) {
destination.on('error', (error) => process.exit(1));
for await (let chunk of source) {
// 写入块,并等到缓冲区有空间再继续
await write(destination, chunk);
}
}
通过事件读取流:流动模式、暂停模式
可读流有两种模式,同时有自已的读取API。如果不能使用管道和异步迭代,那就需要从这两种API中选择一种来处理流。注意,只能选一种,不能两种混用
流动模式:
从一个流读取数据、处理数据,然后再把数据写入一个可定流,可能需要处理可写流背压。
- 暂停可读流:可写流
write()
方法返回false表示写入缓冲区已满;可以调用可读流的pause()
来暂停停止data
事件 - 恢复可读流:可写流接收到
drain
事件时;调用可读流的resume()
来恢复data
事件
import fs from 'fs';
function copyFile(sourceFileName: string, destnationFilename: string, callback: (error?: Error) => void) {
let input = fs.createReadStream(sourceFileName);
let output = fs.createWriteStream(destnationFilename);
input.on('data', (chunk) => {
let hasRoom = output.write(chunk);
// 输入流满了,先暂停
if (!hasRoom) {
input.pause();
}
});
input.on('end', () => {
output.end();
});
input.on('error', (error) => {
callback(error);
process.exit(1);
});
// output
output.on('drain', () => {
// 输入流有空间了,恢复
input.resume();
});
output.on('error', (error) => {
callback(error);
process.exit(1);
});
output.on('finish', () => {
callback(undefined);
});
}
let from = process.argv[2],
to = process.argv[3];
console.log(`Copying file ${from} to ${to}`);
copyFile(from, to, (error) => {
if (error) {
console.error(error);
} else {
console.log('done.');
}
});
暂停模式:
这个模式是流开始时所处的模式。不注册 data
事件处理程序,也不调用 pipe()
方法,那么这个流就会一直处于暂停模式。
- 这个模式下需要显示的调用
read()
方法从流中拉取数据。 - 这个
read()
不是阻塞的,会立马返回;如果没有数据就返回null - 可读流在暂停模式下,会发送
readable
事件,表示有数据可以读了,显示的调用read()
把数据读完 - 当收到
readable
事件后,没有把数据读完,则不会触发下一个readable
事件
import fs from 'fs';
import crypto from 'crypto';
function sha256(filename: string, callback: (error?: Error, hash?: string) => void) {
let input = fs.createReadStream(filename);
let hasher = crypto.createHash('sha256');
input.on('readable', () => {
let chunk;
while ((chunk = input.read())) {
hasher.update(chunk);
}
});
input.on('end', () => {
let hash = hasher.digest('hex');
callback(undefined, hash);
});
input.on('error', callback);
}
sha256(process.argv[2], (error?: Error, hash?: string) => {
if (error) {
console.error(error.toString());
} else {
console.log(hash);
}
});
进程、CPU 和操作系统
常用属性和函数
process
进程模块
process.argv; // 包含命令行参数的数组
process.arch; // CPU架构:如x64
process.cwd(); // 返回当前工作目录
process.chdir(); // 设置当前工作目录
process.cpuUsage(); // 报告 CPU 使用情况
Drocess.env; // 环境变量对象
process.execPath; // Node 可执行文件的绝对路径
process.exit(); // 终止当前程序
process.exitCode; // 程序退出时报告的整数编码
process.getuid(); // 返回当前用户的 Unix 用户ID返回“高分辨率”纳秒级时间戳process.hrtime.bigint()
process.kill(); // 向另一个进程发送信号
process.memoryUsage(); // 元返回一个包含内存占用细节的对象
process.nextTick(); // 类似于 setImmediate(),立刻调用一个函数
process.pid; // 当前进程的进程 ID
process.ppid; // 父进程 ID
process.platform; // 操作系统:如 Linux、Darwin 或 Win32
process.resourceUsage(); // 返回包含资源占用细节的对象
process.setuid(); // 通过 ID 或名字设置当前用户
process.title; // 出现在 Ds 列表中的进程名
process.umask(); // 设置或返回新文件的默认权限
process.uptime(); // 返回 Node 正常运行的时间(秒)
process.version; // Node 的版本字符串
process.versions; // Node 依赖库的版本字符串
os
模块
const os = require('os');
os.arch(); // 返回CPU架构:如x64或arm
os.constants; // 有用的常量,如os.constants.signals.SIGINT
os.cpus(); // 关于系统CPU核心的数据,包括使用时间
os.endianness(); // CPU的原生字节序:BE或LE
os.EOL; // 操作系统原生的行终止符:\n或\r\n
os.freemem; // 返回自由RAM数量(字节)
os.getPriority(); // 返回操作系统调度进程的优先级
os.homedir; // 返回当前用户的主目录
os.hostname; // 返回计算机的主机名
os.loadavg(); // 返回1、5和15分钟的平均负载
os.networkInterfaces; // 返回可用网络连接的细节
os.platform(); // 返回操作系统:例如Linux、Darwin或Win32
os.release(); // 返回操作系统的版本号
os.setPriority(); // 尝试设置进程的调度优先级
os.tmpdir(); // 返回默认的临时目录
os.totalmem(); // 返回RAM的总数(字节)
os.type(); // 返回操作系统:Linux、Darwin或Windows_NT等
os.uptime(); // 返回系统正常运行的时间(秒)
os.userInfo(); // 返回当前用户的uid、username、home和shell程序
Note:可以看到
process
和os
中有些重复的内容,部分是os
模块使用了process
模块相关的属性和方法
路径、文件描述符和 FileHandle
import os from 'os';
process.cwd(); // 返回当前工作目录
__filename; // 当前模块的文件名
__dirname; // 当前模块的目录名
os.homedir(); // 返回当前用户的主目录
import path from 'path';
// 分隔符
path.sep; // / 或者 \,和操作系统相关
// path提供的简单的解析函数
let p = "src/pkg/test.js"
path.basename(p); // test.js
path.extname(p); // .js
path.dirname(p); // src/pkg
path.basename(p); // test.js
path.basename(path.dirname(p)); // pkg
path.dirname(path.dirname(p)); // src
// normallze()
path.normalize("a/b/c/../d/"); // a/b/d/
path.normalize("a/./b"); // a/b
path.normalize("//a//b//"); // /a/b/
// join()
path.join("src", "pkg", "t.js"); // src/pkg/t.js
// resolve 接收一个或多个路径片段,返回一个绝对路径
path.resolve(); // process.cwd();
path.resolve('t.js'); // path.join(process.cwd(), 't.js');
path.resolve('/tmp', 't.js'); // '/tmp/t.js'
path.resolve('/a', '/b', 't.js'); // '/b/t.js'
文件操作
特点:
- 三套 API 接口:
- 异步:
fs.copyFile()
- 同步:
sync
;比如:fs.copyFileSync()
- promises:比如:
fs.promise.copyFile()
- 异步:
- 文件和文件描述符接口:文件(底层:文件描述符、上层 API buffer)
Note:删除文件是 unlink
;OS 内核计数
读文件
三种操作方式:
- 一次性读取文件的内容
- 通过流
- 通过低级API
一次性读取文件的内容
import fs from 'fs';
// 同步读取文件
let buffer: Buffer = fs.readFileSync('test.data'); // 同步;返回Buffer缓冲区
let text: string = fs.readFileSync('data.csv', 'utf8'); // 同步;返回字符串
// 异步读取文件的字节; 基于回调
fs.readFile('test.data', (error, buffer?: Buffer) => {
if (error) {
// handle error
} else {
// handle buffer
console.log('buffer:', buffer?.toString());
}
});
// 异步读取文件的字节; 基于promises
fs.promises //
.readFile('data.csv', 'utf8')
.then(processFileText)
.catch(handleReadError);
// async/await + promises
async function processText(filename: string, encoding: BufferEncoding = 'utf8') {
let text = await fs.promises.readFile(filename, encoding);
// handle text
}
通过流:
import fs from 'fs';
async function printFile(filename: string, encoding: BufferEncoding = 'utf8') {
fs.createReadStream(filename, encoding).pipe(process.stdout);
}
通过低级API:
import fs from 'fs';
fs.open('data', (error, fd) => {
if (error) {
// handle error
return;
}
try {
fs.read(fd, Buffer.alloc(400), 0, 400, 20, (error, n, b) => {
if (error) {
// handle error
return;
}
// n: 实际读到的数据
// b: 读入字节的缓冲区
});
} catch (error) {
console.error(error);
} finally {
fs.close(fd); // 关闭文件描述符
}
});
写文件
三种操作方式:
- 基于回调
- 基于同步
- 基于promises
import fs from 'fs';
import path from 'path';
// 同步
fs.writeFileSync(path.resolve(__dirname, 'settings.json'), JSON.stringify(settings));
// 追加写:
// - fs.appendFile();
// - fs.appendFileSync();
// - fs.promises.appendFile();
// 基于回调
fs.writeFile(path.resolve(__dirname, 'settings.json'), JSON.stringify(settings), (error) => {
// handle error
});
// 基于promises
fs.promises.writeFile(path.resolve(__dirname, 'settings.json'), JSON.stringify(settings));
// 基于流
let output = fs.createWriteStream('numbers.txt');
for (let i = 0; i < 100; i++) {
output.write(`${i}\n`);
}
output.end();
low level API:
fs.write()
:写缓冲区或字符串到文件fs.writeSync()
fs.promises.write()
fs.writev()
: 写入Buffer对象数组fs.writevSync()
文件操作
- copyfile:拷贝文件
fs.copyFileSync()
fs.copyFile()
fs.promises.copyFile()
- rename:移动或重全名文件
fs.renameSync()
fs.rename()
- link:创建链接文件
fs.link()
fs.symlink()
- unlink: 删除文件
fs.unlinkSync()
fs.unlink()
fs.promises.unlink()
文件元数据
fs.stat()
、fs.statSync()
、fs.promises.stat()
函数可以获取文件或目录的元数据
import fs from 'fs';
let stats = fs.statSync('book/ch15.md');
stats.isFile(); // true;这是一个文件
stats.isDirectory(); // false;这不是一个目录
stats.size; // 文件大小
stats.atime; // 最后访问时间
stats.mtime; // 最后修改时间
stats.uid; // 所有者的用户ID
stats.gid; // 所有者的组ID
stats.mode.toString(8); // 权限位 8进制;比如:100644
操作目录
fs.mkdir()
: 创建目录fs.mkdir()
fs.mkdirSync()
fs.promises.mkdir()
fs.mkdtemp()
: 打开临时目录fs.mkdtemp()
fs.mkdtempSync()
fs.promises.mkdtemp()
fs.opendir()
: 打开目录fs.opendir()
fs.opendirSync()
fs.promises.opendir()
fs.readdir()
: 读取目录fs.readdir()
fs.readdirSync()
fs.promises.readdir()
fs.rmdir()
: 删除目录
操作子进程
这个模块的用途:编写执行其他程序的脚本
execSync()
与execFileSync()
:同步执行外部程序exec()
与execFile()
:异步执行外部程序spawn()
:异步执行外部程序,但不等待其完成fork()
:异步执行 Node 模块,但不等待其完成
execSync()
与 execFileSync()
同步执行外部程序;正常返回该命令的标准输出内容;错误则抛出异常。注意,如果命令向标准错误写入任何输出,则该输出只会传给父进程的标准错误流
const { execSync, execFileSync } = require('child_process');
let listing = child_process.execSync('ls -l');
let listing = child_process.execFileSync('ls', ['-l'], { encoding: 'utf8' );
子进程选项
- cwd: 子进程的工作目录;默认父进程的工作目录
- env: 环境变量
- Input: ;仅同步的时候用
- maxBuffer: 指定exec 函数可以收集的最大输出字节数。如果超过时长没有退出,子进程会被杀死并以错误退出(仅试用于
exec()
,但不试用于spawn()
和fork()
,它们使用的流 ) - shell: 指定命令行解释器可执行文件的路径或 true
- timeout: 指定允许子进程运行的最长时长(毫秒)。如果超过时长没有退出,子进程会被杀死并以错误退出(仅试用于
exec()
,但不试用于spawn()
和fork()
) - uid: 指定用户 ID 来运行程序
exec()
与 execFile()
异步执行外部程序(基于回调);返回 ChildProcess 对象
const { exec, execFile } = require('child_process');
exec('ls -l', (err, stdout, stderr) => {
if (err) {
console.error(err);
return;
}
console.log(stdout);
});
execFile('ls', ['-l'], (err, stdout, stderr) => {
if (err) {
console.error(err);
return;
}
console.log(stdout);
});
spawn()
和 exec
区别:
exec
用来执行简单的程序;简单:- 子进程输出不多;一次性输出(返回)、非流式
- 不用和子进程交互
spawn
用来执行稍复杂的程序;- 输出是流式
- 可以和进程交互:发送参数(命令)到子进程的标准输入;
kill()
发送信号
异步执行外部程序,但不等待其子进程执行完成
const { spawn } = require('child_process');
let child = spawn('ls', ['-l']);
child.stdout.on('data', (data) => {
console.log(data.toString());
});
fork()
spawn
用来执行外部程序,fork
用来执行 js 程序(js文件)。parent.js
文件
const child_process = require('child_process');
let child = child_process.fork(`${__dirname}/child.js`);
// 向子进程发送消息
child.send({ x: 4, y: 3 });
// 接收子进程的消息
child.on('message', (message) => {
console.log(message.hypotenuse);
child.disconnect();
});
chlid.js
文件
// 接收父进程的消息
process.on('message', (message) => {
// 发送消息给父进程
process.send({ hypotenuse: Math.hypot(message.x, message.y) });
});
总结
API中单词含义
exec
是用解释器去运行sync
表示同步file
需要指定执行文件的路径spawn
用来处理稍复杂的子进程:有交互、控制子进程fork
用来用子进程的方式运行js程序(文件)
工作线程
- Nodejs提供一套和 Web Workers 类似的API
- JavaScript线程默认不共享内存
- 消息传递通信:
postMessage()
发送消息;message
事件接收消息
应用场景,什么时候用:
- CPU密集性任务
- 保证主线程的快速响应能力,可以去用工作线程
- 工作线程可以把同步阻塞的任务转化为非阻塞的异步操作
总结:如果程序不是CPU密集型,没有响应性的问题,可能就不需要考虑工作线程
// TODO
- 创建工作线程及传递消息
- 工作张程的执行环境
- 通信信道与MessagePort
- 转移MessagePort和定型数组
- 在线程间共享定型数组
References:
- https://nodejs.org/
- JavaScript权威指南 第16章