Skip to content
Published at:

Nodejs

Overview

Node.js® is an open-source, cross-platform JavaScript runtime environment.

  • 浏览器因为安全问题,限制 JS 的能力(执行的 JS 是从远程的服务器拉过来的)
  • Nodejs 开发 OS 程序的能力给 JS,让JavaScript成为一种通用的编程语言

JS应用领域

  • 开发网页(浏览器)
  • 服务器(Nodejs)
  • 命令行工具(Nodejs),比如npm

基于OS平台开发的能力

  • 网络
  • 内存(string、集合、new)
  • 文件(底层:文件描述符、上层 API buffer)
  • 工具:path/url/buffer/集合/加密/协议(http/s)/压缩/内存/string(byte array) (其它golang/python stdlib都是类似)
  • etc

Dependencies核心依赖:

程序分类:IO 密集型程序(文件、网络、数据库); CPU密集型程序

  • 1 + 1;: CPU密集型
  • Consolo.log("hello world");: IO密集型 Nodejs

JS单线程

Node.js® is an JavaScript runtime environment.

npm 命令的代码如下:

ts
node require('../lib/cli.js')(process)

node 是一个二进制程序

理解 js 所谓的单线程:

  • 浏览器是单线程吗? Google Chrome Helper(Renderer) 进程实际开了26个线程(查看电脑上的进程状态)
  • nodejs是单线程吗?
    • 写一个hello world Demo程序测试,Nodejs 实际上开了 7个线程 (查看电脑上的进程状态)
    • 比如fs 文件的操作系统OS层面本身就是阻塞的,底层 OS 就不支持,Nodejs 内部是通过线程池来实现的 (File System 文档中有描述threadpool)

结论:JS 程序、进程本身是多线程的;只是执行到 JS 代码部分、或是说回调到 JS 代码的部分是在同一个线程。所以其它线程在为这一个线程服务,所以严格意义上讲

写 JS 代码和 C 代码在流程控制上的区别:

  • c语言: 程序的执行流程自己控制
  • js语言: 程序的执行流程不是自己控制:循环 --> 回调你写的 js 代码

底下干活 26个线程、7个线程

事件与 EventEmitter

在 Note 中,发送事件的都是 EventEmitter 或其子类的实例;让我们使用 on() 方法来监听事件,使用 emit() 方法来触发事件。

  • emitter.addListener(eventName, listener):添加一个监听器;on的别名
  • emitter.on(eventName, listener):添加一个监听器
  • emitter.emit(eventName[, ...args]):触发事件
  • emitter.removeListener(eventName, listener):移除一个监听器;off的别名
  • emitter.off(eventName, listener):移除一个监听器
  • emitter.once(eventName, listener):添加一个单次监听器
ts
var events = require('events');

var em = new events.EventEmitter();

em.on('FirstEvent', function (data) {
  console.log('First subscriber: ' + data);
});

em.emit('FirstEvent', 'This is my first Node.js event emitter example.');

事件注册回调的特点:

  • 顺序调用:按注册时间的顺序
  • 同步调用:同步调用完上面所有的注册回调,同步执行完在返回

回调和this问题

一般都会用剪头函数做为回调;剪头函数会使用定义它们的上下文中的this值

Buffer 缓冲区

二进制数据操作:网络、文件、其它

  • 继承自 Uint8Array
  • 可以和字符串相互转化
ts
let b = Buffer.from([0x41, 0x42, 0x43]); // <Buffer 41 42 43>
b.toString(); // "ABC"    默认utf8
b.toString('hex'); // "414243" 16进制

let computer = Buffer.from('IBM3111', 'ascii'); // <Buffer 49 42 4d 33 31 31 31>
for (let i = 0; i < computer.length; i++) {
  computer[i]--;
}
computer.toString('ascii'); // "HAL2000"
computer
  .subarray(0, 3)
  .map((x) => x + 1)
  .toString(); // "IBM"

let zeros = Buffer.alloc(1024); // 1024个0
let ones = Buffer.alloc(128, 1); // 128个1
let dead = Buffer.alloc(1024, 'DEADBEEF', 'hex'); // 1024个DEADBEEF(16进制, 总共1024,重复模式)

// read
dead.readUInt32BE(0); // 0xDEADBEEF
dead.readUInt32BE(1); // 0xADBEEFDE
dead.readBigUInt64BE(6); // 0xBEEFDEADBEEFDEADn
dead.readUInt32LE(1020); // 0xEFBEADDE

// write
// ...

流Stream

场景:处理大量数据,不需要一次性全部加载到内存中、也不需要等全部读写完才能进行后续操作,而是分批次处理。基于流的算法,其本质就是把数据分割成小块,内存中不会保存全部的数据,而是保存当前处理的数据块,使得内存的利用率更高。Nodejs 的网络 API 是基于流的,Nodejs 的文件系统模块也定义了流 API 用于读写文件。

Note:对实际常用场景的抽象处理,比如:文件读写、网络通信等。

  • 接收数据 -> 管道-> 处理
  • 生产者 - 消费者模型
  • Java: Stream
  • Rxjava: Data streaming
  • 函数式编程

Node 中的流分为四种类型:

  • 可读流:从数据源读取数据
  • 可写流:向目标写入数据
  • 双工流:可读可写
  • 转换流:可读可写,但输出的数据与输入的数据不一样

管道

文件通过管道pipe写入到Socket:

ts
import { Socket } from 'net';
import fs from 'fs';

function pipeFileToSocket(filename: string, socket: Socket) {
  fs.createReadStream(filename).pipe(socket);
}

下面通过管道把一个流导向另一个流,并在完成或发生错误时调用一个回调:

ts
import fs, { ReadStream, WriteStream } from 'fs';

function pipe(readable: ReadStream, writable: WriteStream, callback: (error?: Error) => void) {
  function handleError(error: Error) {
    readable.close();
    writable.close();
    callback(error);
  }

  readable
    .on('error', handleError) // 错误处理
    .pipe(writable)
    .on('error', handleError) // 错误处理
    .on('finish', callback); // 结束处理
}

管道配置转换流一起使用,可以创建多个流的传输管理:

ts
import fs from 'fs';
import zlib from 'zlib';

function gzip(filename: string, callback: (error?: Error) => void) {
  let source = fs.createReadStream(filename);
  let destination = fs.createWriteStream(filename + '.gz');
  let gzipper = zlib.createGzip();

  source
    .on('error', callback)
    .pipe(gzipper) // 转换流
    .pipe(destination)
    .on('error', callback)
    .on('finnish', callback);
}

实现自已的Transform转换流

// TODO: https://nodejs.org/api/stream.html#class-streamtransform

异步迭代

Node 12及之后,可读流是异步迭代器,意味着在async函数中可以使用for/await 循环从流中读取字符串或Buffer块

ts
async function grep(
  source: NodeJS.ReadStream,
  destination: NodeJS.WriteStream,
  pattern: RegExp,
  encoding: BufferEncoding = 'utf8'
) {
  source.setEncoding(encoding);

  destination.on('error', (error) => process.exit(1));

  let incompleteLine: string | undefined = '';

  // 使用 for/await 循环异步从输入流读取块
  for await (let chunk of source) {
    let lines: string[] = (incompleteLine + chunk).split('\n');
    incompleteLine = lines.pop();

    for (let line of lines) {
      if (pattern.test(line)) {
        destination.write(line + '\n', encoding);
      }
    }
  }

  if (incompleteLine && pattern.test(incompleteLine)) {
    destination.write(incompleteLine + '\n', encoding);
  }
}

// call
let pattern = new RegExp(process.argv[2]);
grep(process.stdin, process.stdout, pattern) // error handle
  .catch((error) => {
    console.error(error);
    process.exit(1);
  });

写入流及背压处理(两边不平衡问题)

write() 方法有个重要的返回值:

  • 如果缓冲区未满,返回true
  • 如果缓冲区满了,返回false,

write() 方法返回false值是一种背压 backpressure 的表现。背压是一种消息,表示你向流中写入数据的速度超过了它的处理能力。返回false之后应该停止继续调用 write(),等到流发出 drain 事件(缓冲区数据耗尽),表示缓冲区又有空间了。

自已手动处理背压backpressure:

ts
import { WriteStream } from 'fs';

function write(stream: WriteStream, chunk: Buffer, callback: () => void) {
  let hasMoreRoom = stream.write(chunk);

  // 检查 write() 方法返回值
  if (hasMoreRoom) {
    setImmediate(callback); // 立马调用callback
  } else {
    stream.once('drain', callback); // 等缓冲区 “耗尽/清空” 之后调用callback
  }
}

Note:当使用pipe()时,Node会自动的为你处理背压

如果程序里面使用async 和 await,将可读流作为异步迭代器使用,来处理背压backpressure:

ts
import { ReadStream, WriteStream } from 'fs';

function write(stream: WriteStream, chunk: Buffer) {
  let hasMoreRoom = stream.write(chunk);

  if (hasMoreRoom) {
    return Promise.resolve(null);
  } else {
    // drain事件后,返回
    return new Promise((resolve) => {
      stream.once('drain', resolve);
    });
  }
}

async function copy(source: ReadStream, destination: WriteStream) {
  destination.on('error', (error) => process.exit(1));

  for await (let chunk of source) {
    // 写入块,并等到缓冲区有空间再继续
    await write(destination, chunk);
  }
}

通过事件读取流:流动模式、暂停模式

可读流有两种模式,同时有自已的读取API。如果不能使用管道和异步迭代,那就需要从这两种API中选择一种来处理流。注意,只能选一种,不能两种混用

流动模式:

从一个流读取数据、处理数据,然后再把数据写入一个可定流,可能需要处理可写流背压。

  • 暂停可读流:可写流write()方法返回false表示写入缓冲区已满;可以调用可读流的pause()来暂停停止 data 事件
  • 恢复可读流:可写流接收到 drain 事件时;调用可读流的 resume() 来恢复 data 事件
ts
import fs from 'fs';
function copyFile(sourceFileName: string, destnationFilename: string, callback: (error?: Error) => void) {
  let input = fs.createReadStream(sourceFileName);
  let output = fs.createWriteStream(destnationFilename);

  input.on('data', (chunk) => {
    let hasRoom = output.write(chunk);
    // 输入流满了,先暂停
    if (!hasRoom) {
      input.pause();
    }
  });

  input.on('end', () => {
    output.end();
  });

  input.on('error', (error) => {
    callback(error);
    process.exit(1);
  });

  // output
  output.on('drain', () => {
    // 输入流有空间了,恢复
    input.resume();
  });

  output.on('error', (error) => {
    callback(error);
    process.exit(1);
  });

  output.on('finish', () => {
    callback(undefined);
  });
}

let from = process.argv[2],
  to = process.argv[3];
console.log(`Copying file ${from} to ${to}`);
copyFile(from, to, (error) => {
  if (error) {
    console.error(error);
  } else {
    console.log('done.');
  }
});

暂停模式:

这个模式是流开始时所处的模式。不注册 data事件处理程序,也不调用 pipe() 方法,那么这个流就会一直处于暂停模式。

  • 这个模式下需要显示的调用 read() 方法从流中拉取数据。
  • 这个 read() 不是阻塞的,会立马返回;如果没有数据就返回null
  • 可读流在暂停模式下,会发送 readable 事件,表示有数据可以读了,显示的调用 read() 把数据读完
  • 当收到 readable 事件后,没有把数据读完,则不会触发下一个 readable 事件
ts
import fs from 'fs';
import crypto from 'crypto';

function sha256(filename: string, callback: (error?: Error, hash?: string) => void) {
  let input = fs.createReadStream(filename);
  let hasher = crypto.createHash('sha256');

  input.on('readable', () => {
    let chunk;
    while ((chunk = input.read())) {
      hasher.update(chunk);
    }
  });

  input.on('end', () => {
    let hash = hasher.digest('hex');
    callback(undefined, hash);
  });
  input.on('error', callback);
}

sha256(process.argv[2], (error?: Error, hash?: string) => {
  if (error) {
    console.error(error.toString());
  } else {
    console.log(hash);
  }
});

进程、CPU 和操作系统

常用属性和函数

process 进程模块

ts
process.argv; // 包含命令行参数的数组
process.arch; // CPU架构:如x64
process.cwd(); // 返回当前工作目录
process.chdir(); // 设置当前工作目录
process.cpuUsage(); // 报告 CPU 使用情况
Drocess.env; // 环境变量对象
process.execPath; // Node 可执行文件的绝对路径
process.exit(); // 终止当前程序
process.exitCode; // 程序退出时报告的整数编码
process.getuid(); // 返回当前用户的 Unix 用户ID返回“高分辨率”纳秒级时间戳process.hrtime.bigint()
process.kill(); // 向另一个进程发送信号
process.memoryUsage(); // 元返回一个包含内存占用细节的对象
process.nextTick(); // 类似于 setImmediate(),立刻调用一个函数
process.pid; // 当前进程的进程 ID
process.ppid; // 父进程 ID
process.platform; // 操作系统:如 Linux、Darwin 或 Win32
process.resourceUsage(); // 返回包含资源占用细节的对象
process.setuid(); // 通过 ID 或名字设置当前用户
process.title; // 出现在 Ds 列表中的进程名
process.umask(); // 设置或返回新文件的默认权限
process.uptime(); // 返回 Node 正常运行的时间(秒)
process.version; // Node 的版本字符串
process.versions; // Node 依赖库的版本字符串

os 模块

ts
const os = require('os');

os.arch(); // 返回CPU架构:如x64或arm
os.constants; // 有用的常量,如os.constants.signals.SIGINT
os.cpus(); // 关于系统CPU核心的数据,包括使用时间
os.endianness(); // CPU的原生字节序:BE或LE
os.EOL; // 操作系统原生的行终止符:\n或\r\n
os.freemem; // 返回自由RAM数量(字节)
os.getPriority(); // 返回操作系统调度进程的优先级
os.homedir; // 返回当前用户的主目录
os.hostname; // 返回计算机的主机名
os.loadavg(); // 返回1、5和15分钟的平均负载
os.networkInterfaces; // 返回可用网络连接的细节
os.platform(); // 返回操作系统:例如Linux、Darwin或Win32
os.release(); // 返回操作系统的版本号
os.setPriority(); // 尝试设置进程的调度优先级
os.tmpdir(); // 返回默认的临时目录
os.totalmem(); // 返回RAM的总数(字节)
os.type(); // 返回操作系统:Linux、Darwin或Windows_NT等
os.uptime(); // 返回系统正常运行的时间(秒)
os.userInfo(); // 返回当前用户的uid、username、home和shell程序

Note:可以看到 processos 中有些重复的内容,部分是 os 模块使用了 process 模块相关的属性和方法

路径、文件描述符和 FileHandle

ts
import os from 'os';
process.cwd();                  // 返回当前工作目录
__filename;                     // 当前模块的文件名
__dirname;                      // 当前模块的目录名
os.homedir();                   // 返回当前用户的主目录

import path from 'path';
// 分隔符
path.sep;                       // / 或者 \,和操作系统相关

// path提供的简单的解析函数
let p = "src/pkg/test.js"
path.basename(p);               // test.js
path.extname(p);                // .js
path.dirname(p);                // src/pkg
path.basename(p);               // test.js
path.basename(path.dirname(p)); // pkg
path.dirname(path.dirname(p));  // src

//  normallze()
path.normalize("a/b/c/../d/");  // a/b/d/
path.normalize("a/./b");        // a/b
path.normalize("//a//b//");     // /a/b/

// join()
path.join("src", "pkg", "t.js");    // src/pkg/t.js

// resolve 接收一个或多个路径片段,返回一个绝对路径
path.resolve();                     // process.cwd();
path.resolve('t.js');               // path.join(process.cwd(), 't.js');
path.resolve('/tmp', 't.js');       // '/tmp/t.js'
path.resolve('/a', '/b', 't.js');   // '/b/t.js'

文件操作

特点:

  • 三套 API 接口:
    • 异步:fs.copyFile()
    • 同步:sync;比如:fs.copyFileSync()
    • promises:比如:fs.promise.copyFile()
  • 文件文件描述符接口:文件(底层:文件描述符、上层 API buffer)

Note:删除文件是 unlink ;OS 内核计数

读文件

三种操作方式:

  • 一次性读取文件的内容
  • 通过流
  • 通过低级API

一次性读取文件的内容

ts
import fs from 'fs';

// 同步读取文件
let buffer: Buffer = fs.readFileSync('test.data'); // 同步;返回Buffer缓冲区
let text: string = fs.readFileSync('data.csv', 'utf8'); // 同步;返回字符串

// 异步读取文件的字节; 基于回调
fs.readFile('test.data', (error, buffer?: Buffer) => {
  if (error) {
    // handle error
  } else {
    // handle buffer
    console.log('buffer:', buffer?.toString());
  }
});

// 异步读取文件的字节; 基于promises
fs.promises //
  .readFile('data.csv', 'utf8')
  .then(processFileText)
  .catch(handleReadError);

// async/await + promises
async function processText(filename: string, encoding: BufferEncoding = 'utf8') {
  let text = await fs.promises.readFile(filename, encoding);
  // handle text
}

通过流:

ts
import fs from 'fs';

async function printFile(filename: string, encoding: BufferEncoding = 'utf8') {
  fs.createReadStream(filename, encoding).pipe(process.stdout);
}

通过低级API:

ts
import fs from 'fs';

fs.open('data', (error, fd) => {
  if (error) {
    // handle error
    return;
  }

  try {
    fs.read(fd, Buffer.alloc(400), 0, 400, 20, (error, n, b) => {
      if (error) {
        // handle error
        return;
      }

      // n: 实际读到的数据
      // b: 读入字节的缓冲区
    });
  } catch (error) {
    console.error(error);
  } finally {
    fs.close(fd); // 关闭文件描述符
  }
});

写文件

三种操作方式:

  • 基于回调
  • 基于同步
  • 基于promises
ts
import fs from 'fs';
import path from 'path';

// 同步
fs.writeFileSync(path.resolve(__dirname, 'settings.json'), JSON.stringify(settings));
// 追加写:
// - fs.appendFile();
// - fs.appendFileSync();
// - fs.promises.appendFile();

// 基于回调
fs.writeFile(path.resolve(__dirname, 'settings.json'), JSON.stringify(settings), (error) => {
  // handle error
});

// 基于promises
fs.promises.writeFile(path.resolve(__dirname, 'settings.json'), JSON.stringify(settings));

// 基于流
let output = fs.createWriteStream('numbers.txt');
for (let i = 0; i < 100; i++) {
  output.write(`${i}\n`);
}
output.end();

low level API:

  • fs.write():写缓冲区或字符串到文件
  • fs.writeSync()
  • fs.promises.write()
  • fs.writev(): 写入Buffer对象数组
  • fs.writevSync()

文件操作

  • copyfile:拷贝文件
    • fs.copyFileSync()
    • fs.copyFile()
    • fs.promises.copyFile()
  • rename:移动或重全名文件
    • fs.renameSync()
    • fs.rename()
  • link:创建链接文件
    • fs.link()
    • fs.symlink()
  • unlink: 删除文件
    • fs.unlinkSync()
    • fs.unlink()
    • fs.promises.unlink()

文件元数据

fs.stat()fs.statSync()fs.promises.stat() 函数可以获取文件或目录的元数据

ts
import fs from 'fs';
let stats = fs.statSync('book/ch15.md');
stats.isFile(); // true;这是一个文件
stats.isDirectory(); // false;这不是一个目录
stats.size; // 文件大小
stats.atime; // 最后访问时间
stats.mtime; // 最后修改时间
stats.uid; // 所有者的用户ID
stats.gid; // 所有者的组ID
stats.mode.toString(8); // 权限位 8进制;比如:100644

操作目录

  • fs.mkdir(): 创建目录
    • fs.mkdir()
    • fs.mkdirSync()
    • fs.promises.mkdir()
  • fs.mkdtemp(): 打开临时目录
    • fs.mkdtemp()
    • fs.mkdtempSync()
    • fs.promises.mkdtemp()
  • fs.opendir(): 打开目录
    • fs.opendir()
    • fs.opendirSync()
    • fs.promises.opendir()
  • fs.readdir(): 读取目录
    • fs.readdir()
    • fs.readdirSync()
    • fs.promises.readdir()
  • fs.rmdir(): 删除目录

操作子进程

这个模块的用途:编写执行其他程序的脚本

  • execSync()execFileSync():同步执行外部程序
  • exec()execFile():异步执行外部程序
  • spawn():异步执行外部程序,但不等待其完成
  • fork():异步执行 Node 模块,但不等待其完成

execSync()execFileSync()

同步执行外部程序;正常返回该命令的标准输出内容;错误则抛出异常。注意,如果命令向标准错误写入任何输出,则该输出只会传给父进程的标准错误流

ts
const { execSync, execFileSync } = require('child_process');

let listing = child_process.execSync('ls -l');
let listing = child_process.execFileSync('ls', ['-l'], { encoding: 'utf8' );

子进程选项

  • cwd: 子进程的工作目录;默认父进程的工作目录
  • env: 环境变量
  • Input: ;仅同步的时候用
  • maxBuffer: 指定exec 函数可以收集的最大输出字节数。如果超过时长没有退出,子进程会被杀死并以错误退出(仅试用于exec(),但不试用于 spawn()fork(),它们使用的流 )
  • shell: 指定命令行解释器可执行文件的路径或 true
  • timeout: 指定允许子进程运行的最长时长(毫秒)。如果超过时长没有退出,子进程会被杀死并以错误退出(仅试用于exec(),但不试用于 spawn()fork()
  • uid: 指定用户 ID 来运行程序

exec()execFile()

异步执行外部程序(基于回调);返回 ChildProcess 对象

ts
const { exec, execFile } = require('child_process');

exec('ls -l', (err, stdout, stderr) => {
  if (err) {
    console.error(err);
    return;
  }
  console.log(stdout);
});

execFile('ls', ['-l'], (err, stdout, stderr) => {
  if (err) {
    console.error(err);
    return;
  }
  console.log(stdout);
});

spawn()

exec 区别:

  • exec 用来执行简单的程序;简单:
    • 子进程输出不多;一次性输出(返回)、非流式
    • 不用和子进程交互
  • spawn 用来执行稍复杂的程序;
    • 输出是流式
    • 可以和进程交互:发送参数(命令)到子进程的标准输入;kill() 发送信号

异步执行外部程序,但不等待其子进程执行完成

ts
const { spawn } = require('child_process');

let child = spawn('ls', ['-l']);
child.stdout.on('data', (data) => {
  console.log(data.toString());
});

fork()

spawn 用来执行外部程序,fork 用来执行 js 程序(js文件)。parent.js 文件

ts
const child_process = require('child_process');

let child = child_process.fork(`${__dirname}/child.js`);

// 向子进程发送消息
child.send({ x: 4, y: 3 });

// 接收子进程的消息
child.on('message', (message) => {
  console.log(message.hypotenuse);
  child.disconnect();
});

chlid.js文件

ts
// 接收父进程的消息
process.on('message', (message) => {
  // 发送消息给父进程
  process.send({ hypotenuse: Math.hypot(message.x, message.y) });
});

总结

API中单词含义

  • exec是用解释器去运行
  • sync表示同步
  • file需要指定执行文件的路径
  • spawn用来处理稍复杂的子进程:有交互、控制子进程
  • fork用来用子进程的方式运行js程序(文件)

工作线程

  • Nodejs提供一套和 Web Workers 类似的API
  • JavaScript线程默认不共享内存
  • 消息传递通信:postMessage() 发送消息;message 事件接收消息

应用场景,什么时候用:

  • CPU密集性任务
  • 保证主线程的快速响应能力,可以去用工作线程
  • 工作线程可以把同步阻塞的任务转化为非阻塞的异步操作

总结:如果程序不是CPU密集型,没有响应性的问题,可能就不需要考虑工作线程

// TODO

  • 创建工作线程及传递消息
  • 工作张程的执行环境
  • 通信信道与MessagePort
  • 转移MessagePort和定型数组
  • 在线程间共享定型数组

References:

Updated at: