index.ts 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125
  1. import { Worker, MessageChannel, MessagePort, receiveMessageOnPort } from 'worker_threads';
  2. import { once } from 'events';
  3. import EventEmitterAsyncResource from 'eventemitter-asyncresource';
  4. import { AsyncResource } from 'async_hooks';
  5. import { cpus } from 'os';
  6. import { fileURLToPath, URL } from 'url';
  7. import { resolve } from 'path';
  8. import { inspect, types } from 'util';
  9. import assert from 'assert';
  10. import { Histogram, build } from 'hdr-histogram-js';
  11. import { performance } from 'perf_hooks';
  12. import hdrobj from 'hdr-histogram-percentiles-obj';
  13. import {
  14. ReadyMessage,
  15. RequestMessage,
  16. ResponseMessage,
  17. StartupMessage,
  18. commonState,
  19. kResponseCountField,
  20. kRequestCountField,
  21. kFieldCount,
  22. Transferable,
  23. Task,
  24. TaskQueue,
  25. kQueueOptions,
  26. isTaskQueue,
  27. isTransferable,
  28. markMovable,
  29. isMovable,
  30. kTransferable,
  31. kValue
  32. } from './common';
  33. import { version } from '../package.json';
  34. const cpuCount : number = (() => {
  35. try {
  36. return cpus().length;
  37. } catch {
  38. /* istanbul ignore next */
  39. return 1;
  40. }
  41. })();
  42. interface AbortSignalEventTargetAddOptions {
  43. once : boolean;
  44. };
  45. interface AbortSignalEventTarget {
  46. addEventListener : (
  47. name : 'abort',
  48. listener : () => void,
  49. options? : AbortSignalEventTargetAddOptions) => void;
  50. removeEventListener : (
  51. name : 'abort',
  52. listener : () => void) => void;
  53. aborted? : boolean;
  54. }
  55. interface AbortSignalEventEmitter {
  56. off : (name : 'abort', listener : () => void) => void;
  57. once : (name : 'abort', listener : () => void) => void;
  58. }
  59. type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter;
  60. function onabort (abortSignal : AbortSignalAny, listener : () => void) {
  61. if ('addEventListener' in abortSignal) {
  62. abortSignal.addEventListener('abort', listener, { once: true });
  63. } else {
  64. abortSignal.once('abort', listener);
  65. }
  66. }
  67. class AbortError extends Error {
  68. constructor () {
  69. super('The task has been aborted');
  70. }
  71. get name () { return 'AbortError'; }
  72. }
  73. type ResourceLimits = Worker extends {
  74. resourceLimits? : infer T;
  75. } ? T : {};
  76. type EnvSpecifier = typeof Worker extends {
  77. new (filename : never, options?: { env: infer T }) : Worker;
  78. } ? T : never;
  79. class ArrayTaskQueue implements TaskQueue {
  80. tasks : Task[] = [];
  81. get size () { return this.tasks.length; }
  82. shift () : Task | null {
  83. return this.tasks.shift() as Task;
  84. }
  85. push (task : Task) : void {
  86. this.tasks.push(task);
  87. }
  88. remove (task : Task) : void {
  89. const index = this.tasks.indexOf(task);
  90. assert.notStrictEqual(index, -1);
  91. this.tasks.splice(index, 1);
  92. }
  93. }
  94. interface Options {
  95. filename? : string | null,
  96. name?: string,
  97. minThreads? : number,
  98. maxThreads? : number,
  99. idleTimeout? : number,
  100. maxQueue? : number | 'auto',
  101. concurrentTasksPerWorker? : number,
  102. useAtomics? : boolean,
  103. resourceLimits? : ResourceLimits,
  104. argv? : string[],
  105. execArgv? : string[],
  106. env? : EnvSpecifier,
  107. workerData? : any,
  108. taskQueue? : TaskQueue,
  109. niceIncrement? : number,
  110. trackUnmanagedFds? : boolean,
  111. }
  112. interface FilledOptions extends Options {
  113. filename : string | null,
  114. name: string,
  115. minThreads : number,
  116. maxThreads : number,
  117. idleTimeout : number,
  118. maxQueue : number,
  119. concurrentTasksPerWorker : number,
  120. useAtomics: boolean,
  121. taskQueue : TaskQueue,
  122. niceIncrement : number
  123. }
  124. const kDefaultOptions : FilledOptions = {
  125. filename: null,
  126. name: 'default',
  127. minThreads: Math.max(cpuCount / 2, 1),
  128. maxThreads: cpuCount * 1.5,
  129. idleTimeout: 0,
  130. maxQueue: Infinity,
  131. concurrentTasksPerWorker: 1,
  132. useAtomics: true,
  133. taskQueue: new ArrayTaskQueue(),
  134. niceIncrement: 0,
  135. trackUnmanagedFds: true
  136. };
  137. interface RunOptions {
  138. transferList? : TransferList,
  139. filename? : string | null,
  140. signal? : AbortSignalAny | null,
  141. name? : string | null
  142. }
  143. interface FilledRunOptions extends RunOptions {
  144. transferList : TransferList | never,
  145. filename : string | null,
  146. signal : AbortSignalAny | null,
  147. name : string | null
  148. }
  149. const kDefaultRunOptions : FilledRunOptions = {
  150. transferList: undefined,
  151. filename: null,
  152. signal: null,
  153. name: null
  154. };
  155. class DirectlyTransferable implements Transferable {
  156. #value : object;
  157. constructor (value : object) {
  158. this.#value = value;
  159. }
  160. get [kTransferable] () : object { return this.#value; }
  161. get [kValue] () : object { return this.#value; }
  162. }
  163. class ArrayBufferViewTransferable implements Transferable {
  164. #view : ArrayBufferView;
  165. constructor (view : ArrayBufferView) {
  166. this.#view = view;
  167. }
  168. get [kTransferable] () : object { return this.#view.buffer; }
  169. get [kValue] () : object { return this.#view; }
  170. }
  171. let taskIdCounter = 0;
  172. type TaskCallback = (err : Error, result: any) => void;
  173. // Grab the type of `transferList` off `MessagePort`. At the time of writing,
  174. // only ArrayBuffer and MessagePort are valid, but let's avoid having to update
  175. // our types here every time Node.js adds support for more objects.
  176. type TransferList = MessagePort extends { postMessage(value : any, transferList : infer T) : any; } ? T : never;
  177. type TransferListItem = TransferList extends (infer T)[] ? T : never;
  178. function maybeFileURLToPath (filename : string) : string {
  179. return filename.startsWith('file:')
  180. ? fileURLToPath(new URL(filename))
  181. : filename;
  182. }
  183. // Extend AsyncResource so that async relations between posting a task and
  184. // receiving its result are visible to diagnostic tools.
  185. class TaskInfo extends AsyncResource implements Task {
  186. callback : TaskCallback;
  187. task : any;
  188. transferList : TransferList;
  189. filename : string;
  190. name : string;
  191. taskId : number;
  192. abortSignal : AbortSignalAny | null;
  193. abortListener : (() => void) | null = null;
  194. workerInfo : WorkerInfo | null = null;
  195. created : number;
  196. started : number;
  197. constructor (
  198. task : any,
  199. transferList : TransferList,
  200. filename : string,
  201. name : string,
  202. callback : TaskCallback,
  203. abortSignal : AbortSignalAny | null,
  204. triggerAsyncId : number) {
  205. super('Piscina.Task', { requireManualDestroy: true, triggerAsyncId });
  206. this.callback = callback;
  207. this.task = task;
  208. this.transferList = transferList;
  209. // If the task is a Transferable returned by
  210. // Piscina.move(), then add it to the transferList
  211. // automatically
  212. if (isMovable(task)) {
  213. // This condition should never be hit but typescript
  214. // complains if we dont do the check.
  215. /* istanbul ignore if */
  216. if (this.transferList == null) {
  217. this.transferList = [];
  218. }
  219. this.transferList =
  220. this.transferList.concat(task[kTransferable]);
  221. this.task = task[kValue];
  222. }
  223. this.filename = filename;
  224. this.name = name;
  225. this.taskId = taskIdCounter++;
  226. this.abortSignal = abortSignal;
  227. this.created = performance.now();
  228. this.started = 0;
  229. }
  230. releaseTask () : any {
  231. const ret = this.task;
  232. this.task = null;
  233. return ret;
  234. }
  235. done (err : Error | null, result? : any) : void {
  236. this.runInAsyncScope(this.callback, null, err, result);
  237. this.emitDestroy(); // `TaskInfo`s are used only once.
  238. // If an abort signal was used, remove the listener from it when
  239. // done to make sure we do not accidentally leak.
  240. if (this.abortSignal && this.abortListener) {
  241. if ('removeEventListener' in this.abortSignal && this.abortListener) {
  242. this.abortSignal.removeEventListener('abort', this.abortListener);
  243. } else {
  244. (this.abortSignal as AbortSignalEventEmitter).off(
  245. 'abort', this.abortListener);
  246. }
  247. }
  248. }
  249. get [kQueueOptions] () : object | null {
  250. return kQueueOptions in this.task ? this.task[kQueueOptions] : null;
  251. }
  252. }
  253. abstract class AsynchronouslyCreatedResource {
  254. onreadyListeners : (() => void)[] | null = [];
  255. markAsReady () : void {
  256. const listeners = this.onreadyListeners;
  257. assert(listeners !== null);
  258. this.onreadyListeners = null;
  259. for (const listener of listeners) {
  260. listener();
  261. }
  262. }
  263. isReady () : boolean {
  264. return this.onreadyListeners === null;
  265. }
  266. onReady (fn : () => void) {
  267. if (this.onreadyListeners === null) {
  268. fn(); // Zalgo is okay here.
  269. return;
  270. }
  271. this.onreadyListeners.push(fn);
  272. }
  273. abstract currentUsage() : number;
  274. }
  275. class AsynchronouslyCreatedResourcePool<
  276. T extends AsynchronouslyCreatedResource> {
  277. pendingItems = new Set<T>();
  278. readyItems = new Set<T>();
  279. maximumUsage : number;
  280. onAvailableListeners : ((item : T) => void)[];
  281. constructor (maximumUsage : number) {
  282. this.maximumUsage = maximumUsage;
  283. this.onAvailableListeners = [];
  284. }
  285. add (item : T) {
  286. this.pendingItems.add(item);
  287. item.onReady(() => {
  288. /* istanbul ignore else */
  289. if (this.pendingItems.has(item)) {
  290. this.pendingItems.delete(item);
  291. this.readyItems.add(item);
  292. this.maybeAvailable(item);
  293. }
  294. });
  295. }
  296. delete (item : T) {
  297. this.pendingItems.delete(item);
  298. this.readyItems.delete(item);
  299. }
  300. findAvailable () : T | null {
  301. let minUsage = this.maximumUsage;
  302. let candidate = null;
  303. for (const item of this.readyItems) {
  304. const usage = item.currentUsage();
  305. if (usage === 0) return item;
  306. if (usage < minUsage) {
  307. candidate = item;
  308. minUsage = usage;
  309. }
  310. }
  311. return candidate;
  312. }
  313. * [Symbol.iterator] () {
  314. yield * this.pendingItems;
  315. yield * this.readyItems;
  316. }
  317. get size () {
  318. return this.pendingItems.size + this.readyItems.size;
  319. }
  320. maybeAvailable (item : T) {
  321. /* istanbul ignore else */
  322. if (item.currentUsage() < this.maximumUsage) {
  323. for (const listener of this.onAvailableListeners) {
  324. listener(item);
  325. }
  326. }
  327. }
  328. onAvailable (fn : (item : T) => void) {
  329. this.onAvailableListeners.push(fn);
  330. }
  331. }
  332. type ResponseCallback = (response : ResponseMessage) => void;
  333. const Errors = {
  334. ThreadTermination:
  335. () => new Error('Terminating worker thread'),
  336. FilenameNotProvided:
  337. () => new Error('filename must be provided to run() or in options object'),
  338. TaskQueueAtLimit:
  339. () => new Error('Task queue is at limit'),
  340. NoTaskQueueAvailable:
  341. () => new Error('No task queue available and all Workers are busy')
  342. };
  343. class WorkerInfo extends AsynchronouslyCreatedResource {
  344. worker : Worker;
  345. taskInfos : Map<number, TaskInfo>;
  346. idleTimeout : NodeJS.Timeout | null = null; // eslint-disable-line no-undef
  347. port : MessagePort;
  348. sharedBuffer : Int32Array;
  349. lastSeenResponseCount : number = 0;
  350. onMessage : ResponseCallback;
  351. constructor (
  352. worker : Worker,
  353. port : MessagePort,
  354. onMessage : ResponseCallback) {
  355. super();
  356. this.worker = worker;
  357. this.port = port;
  358. this.port.on('message',
  359. (message : ResponseMessage) => this._handleResponse(message));
  360. this.onMessage = onMessage;
  361. this.taskInfos = new Map();
  362. this.sharedBuffer = new Int32Array(
  363. new SharedArrayBuffer(kFieldCount * Int32Array.BYTES_PER_ELEMENT));
  364. }
  365. destroy () : void {
  366. this.worker.terminate();
  367. this.port.close();
  368. this.clearIdleTimeout();
  369. for (const taskInfo of this.taskInfos.values()) {
  370. taskInfo.done(Errors.ThreadTermination());
  371. }
  372. this.taskInfos.clear();
  373. }
  374. clearIdleTimeout () : void {
  375. if (this.idleTimeout !== null) {
  376. clearTimeout(this.idleTimeout);
  377. this.idleTimeout = null;
  378. }
  379. }
  380. ref () : WorkerInfo {
  381. this.port.ref();
  382. return this;
  383. }
  384. unref () : WorkerInfo {
  385. // Note: Do not call ref()/unref() on the Worker itself since that may cause
  386. // a hard crash, see https://github.com/nodejs/node/pull/33394.
  387. this.port.unref();
  388. return this;
  389. }
  390. _handleResponse (message : ResponseMessage) : void {
  391. this.onMessage(message);
  392. if (this.taskInfos.size === 0) {
  393. // No more tasks running on this Worker means it should not keep the
  394. // process running.
  395. this.unref();
  396. }
  397. }
  398. postTask (taskInfo : TaskInfo) {
  399. assert(!this.taskInfos.has(taskInfo.taskId));
  400. const message : RequestMessage = {
  401. task: taskInfo.releaseTask(),
  402. taskId: taskInfo.taskId,
  403. filename: taskInfo.filename,
  404. name: taskInfo.name
  405. };
  406. try {
  407. this.port.postMessage(message, taskInfo.transferList);
  408. } catch (err) {
  409. // This would mostly happen if e.g. message contains unserializable data
  410. // or transferList is invalid.
  411. taskInfo.done(err);
  412. return;
  413. }
  414. taskInfo.workerInfo = this;
  415. this.taskInfos.set(taskInfo.taskId, taskInfo);
  416. this.ref();
  417. this.clearIdleTimeout();
  418. // Inform the worker that there are new messages posted, and wake it up
  419. // if it is waiting for one.
  420. Atomics.add(this.sharedBuffer, kRequestCountField, 1);
  421. Atomics.notify(this.sharedBuffer, kRequestCountField, 1);
  422. }
  423. processPendingMessages () {
  424. // If we *know* that there are more messages than we have received using
  425. // 'message' events yet, then try to load and handle them synchronously,
  426. // without the need to wait for more expensive events on the event loop.
  427. // This would usually break async tracking, but in our case, we already have
  428. // the extra TaskInfo/AsyncResource layer that rectifies that situation.
  429. const actualResponseCount =
  430. Atomics.load(this.sharedBuffer, kResponseCountField);
  431. if (actualResponseCount !== this.lastSeenResponseCount) {
  432. this.lastSeenResponseCount = actualResponseCount;
  433. let entry;
  434. while ((entry = receiveMessageOnPort(this.port)) !== undefined) {
  435. this._handleResponse(entry.message);
  436. }
  437. }
  438. }
  439. isRunningAbortableTask () : boolean {
  440. // If there are abortable tasks, we are running one at most per Worker.
  441. if (this.taskInfos.size !== 1) return false;
  442. const [[, task]] = this.taskInfos;
  443. return task.abortSignal !== null;
  444. }
  445. currentUsage () : number {
  446. if (this.isRunningAbortableTask()) return Infinity;
  447. return this.taskInfos.size;
  448. }
  449. }
  450. class ThreadPool {
  451. publicInterface : Piscina;
  452. workers : AsynchronouslyCreatedResourcePool<WorkerInfo>;
  453. options : FilledOptions;
  454. taskQueue : TaskQueue;
  455. skipQueue : TaskInfo[] = [];
  456. completed : number = 0;
  457. runTime : Histogram;
  458. waitTime : Histogram;
  459. start : number = performance.now();
  460. inProcessPendingMessages : boolean = false;
  461. startingUp : boolean = false;
  462. workerFailsDuringBootstrap : boolean = false;
  463. constructor (publicInterface : Piscina, options : Options) {
  464. this.publicInterface = publicInterface;
  465. this.taskQueue = options.taskQueue || new ArrayTaskQueue();
  466. this.runTime = build({ lowestDiscernibleValue: 1 });
  467. this.waitTime = build({ lowestDiscernibleValue: 1 });
  468. const filename =
  469. options.filename ? maybeFileURLToPath(options.filename) : null;
  470. this.options = { ...kDefaultOptions, ...options, filename, maxQueue: 0 };
  471. // The >= and <= could be > and < but this way we get 100 % coverage 🙃
  472. if (options.maxThreads !== undefined &&
  473. this.options.minThreads >= options.maxThreads) {
  474. this.options.minThreads = options.maxThreads;
  475. }
  476. if (options.minThreads !== undefined &&
  477. this.options.maxThreads <= options.minThreads) {
  478. this.options.maxThreads = options.minThreads;
  479. }
  480. if (options.maxQueue === 'auto') {
  481. this.options.maxQueue = this.options.maxThreads ** 2;
  482. } else {
  483. this.options.maxQueue = options.maxQueue ?? kDefaultOptions.maxQueue;
  484. }
  485. this.workers = new AsynchronouslyCreatedResourcePool<WorkerInfo>(
  486. this.options.concurrentTasksPerWorker);
  487. this.workers.onAvailable((w : WorkerInfo) => this._onWorkerAvailable(w));
  488. this.startingUp = true;
  489. this._ensureMinimumWorkers();
  490. this.startingUp = false;
  491. }
  492. _ensureMinimumWorkers () : void {
  493. while (this.workers.size < this.options.minThreads) {
  494. this._addNewWorker();
  495. }
  496. }
  497. _addNewWorker () : void {
  498. const pool = this;
  499. const worker = new Worker(resolve(__dirname, 'worker.js'), {
  500. env: this.options.env,
  501. argv: this.options.argv,
  502. execArgv: this.options.execArgv,
  503. resourceLimits: this.options.resourceLimits,
  504. workerData: this.options.workerData,
  505. trackUnmanagedFds: this.options.trackUnmanagedFds
  506. });
  507. const { port1, port2 } = new MessageChannel();
  508. const workerInfo = new WorkerInfo(worker, port1, onMessage);
  509. if (this.startingUp) {
  510. // There is no point in waiting for the initial set of Workers to indicate
  511. // that they are ready, we just mark them as such from the start.
  512. workerInfo.markAsReady();
  513. }
  514. const message : StartupMessage = {
  515. filename: this.options.filename,
  516. name: this.options.name,
  517. port: port2,
  518. sharedBuffer: workerInfo.sharedBuffer,
  519. useAtomics: this.options.useAtomics,
  520. niceIncrement: this.options.niceIncrement
  521. };
  522. worker.postMessage(message, [port2]);
  523. function onMessage (message : ResponseMessage) {
  524. const { taskId, result } = message;
  525. // In case of success: Call the callback that was passed to `runTask`,
  526. // remove the `TaskInfo` associated with the Worker, which marks it as
  527. // free again.
  528. const taskInfo = workerInfo.taskInfos.get(taskId);
  529. workerInfo.taskInfos.delete(taskId);
  530. pool.workers.maybeAvailable(workerInfo);
  531. /* istanbul ignore if */
  532. if (taskInfo === undefined) {
  533. const err = new Error(
  534. `Unexpected message from Worker: ${inspect(message)}`);
  535. pool.publicInterface.emit('error', err);
  536. } else {
  537. taskInfo.done(message.error, result);
  538. }
  539. pool._processPendingMessages();
  540. }
  541. worker.on('message', (message : ReadyMessage) => {
  542. if (message.ready === true) {
  543. if (workerInfo.currentUsage() === 0) {
  544. workerInfo.unref();
  545. }
  546. if (!workerInfo.isReady()) {
  547. workerInfo.markAsReady();
  548. }
  549. return;
  550. }
  551. worker.emit('error', new Error(
  552. `Unexpected message on Worker: ${inspect(message)}`));
  553. });
  554. worker.on('error', (err : Error) => {
  555. // Work around the bug in https://github.com/nodejs/node/pull/33394
  556. worker.ref = () => {};
  557. // In case of an uncaught exception: Call the callback that was passed to
  558. // `postTask` with the error, or emit an 'error' event if there is none.
  559. const taskInfos = [...workerInfo.taskInfos.values()];
  560. workerInfo.taskInfos.clear();
  561. // Remove the worker from the list and potentially start a new Worker to
  562. // replace the current one.
  563. this._removeWorker(workerInfo);
  564. if (workerInfo.isReady() && !this.workerFailsDuringBootstrap) {
  565. this._ensureMinimumWorkers();
  566. } else {
  567. // Do not start new workers over and over if they already fail during
  568. // bootstrap, there's no point.
  569. this.workerFailsDuringBootstrap = true;
  570. }
  571. if (taskInfos.length > 0) {
  572. for (const taskInfo of taskInfos) {
  573. taskInfo.done(err, null);
  574. }
  575. } else {
  576. this.publicInterface.emit('error', err);
  577. }
  578. });
  579. worker.unref();
  580. port1.on('close', () => {
  581. // The port is only closed if the Worker stops for some reason, but we
  582. // always .unref() the Worker itself. We want to receive e.g. 'error'
  583. // events on it, so we ref it once we know it's going to exit anyway.
  584. worker.ref();
  585. });
  586. this.workers.add(workerInfo);
  587. }
  588. _processPendingMessages () {
  589. if (this.inProcessPendingMessages || !this.options.useAtomics) {
  590. return;
  591. }
  592. this.inProcessPendingMessages = true;
  593. try {
  594. for (const workerInfo of this.workers) {
  595. workerInfo.processPendingMessages();
  596. }
  597. } finally {
  598. this.inProcessPendingMessages = false;
  599. }
  600. }
  601. _removeWorker (workerInfo : WorkerInfo) : void {
  602. workerInfo.destroy();
  603. this.workers.delete(workerInfo);
  604. }
  605. _onWorkerAvailable (workerInfo : WorkerInfo) : void {
  606. while ((this.taskQueue.size > 0 || this.skipQueue.length > 0) &&
  607. workerInfo.currentUsage() < this.options.concurrentTasksPerWorker) {
  608. // The skipQueue will have tasks that we previously shifted off
  609. // the task queue but had to skip over... we have to make sure
  610. // we drain that before we drain the taskQueue.
  611. const taskInfo = this.skipQueue.shift() ||
  612. this.taskQueue.shift() as TaskInfo;
  613. // If the task has an abortSignal and the worker has any other
  614. // tasks, we cannot distribute the task to it. Skip for now.
  615. if (taskInfo.abortSignal && workerInfo.taskInfos.size > 0) {
  616. this.skipQueue.push(taskInfo);
  617. break;
  618. }
  619. const now = performance.now();
  620. this.waitTime.recordValue(now - taskInfo.created);
  621. taskInfo.started = now;
  622. workerInfo.postTask(taskInfo);
  623. this._maybeDrain();
  624. return;
  625. }
  626. if (workerInfo.taskInfos.size === 0 &&
  627. this.workers.size > this.options.minThreads) {
  628. workerInfo.idleTimeout = setTimeout(() => {
  629. assert.strictEqual(workerInfo.taskInfos.size, 0);
  630. if (this.workers.size > this.options.minThreads) {
  631. this._removeWorker(workerInfo);
  632. }
  633. }, this.options.idleTimeout).unref();
  634. }
  635. }
  636. runTask (
  637. task : any,
  638. options : RunOptions) : Promise<any> {
  639. let {
  640. filename,
  641. name
  642. } = options;
  643. const {
  644. transferList = [],
  645. signal = null
  646. } = options;
  647. if (filename == null) {
  648. filename = this.options.filename;
  649. }
  650. if (name == null) {
  651. name = this.options.name;
  652. }
  653. if (typeof filename !== 'string') {
  654. return Promise.reject(Errors.FilenameNotProvided());
  655. }
  656. filename = maybeFileURLToPath(filename);
  657. let resolve : (result : any) => void;
  658. let reject : (err : Error) => void;
  659. // eslint-disable-next-line
  660. const ret = new Promise((res, rej) => { resolve = res; reject = rej; });
  661. const taskInfo = new TaskInfo(
  662. task,
  663. transferList,
  664. filename,
  665. name,
  666. (err : Error | null, result : any) => {
  667. this.completed++;
  668. if (taskInfo.started) {
  669. this.runTime.recordValue(performance.now() - taskInfo.started);
  670. }
  671. if (err !== null) {
  672. reject(err);
  673. } else {
  674. resolve(result);
  675. }
  676. },
  677. signal,
  678. this.publicInterface.asyncResource.asyncId());
  679. if (signal !== null) {
  680. // If the AbortSignal has an aborted property and it's truthy,
  681. // reject immediately.
  682. if ((signal as AbortSignalEventTarget).aborted) {
  683. return Promise.reject(new AbortError());
  684. }
  685. taskInfo.abortListener = () => {
  686. // Call reject() first to make sure we always reject with the AbortError
  687. // if the task is aborted, not with an Error from the possible
  688. // thread termination below.
  689. reject(new AbortError());
  690. if (taskInfo.workerInfo !== null) {
  691. // Already running: We cancel the Worker this is running on.
  692. this._removeWorker(taskInfo.workerInfo);
  693. this._ensureMinimumWorkers();
  694. } else {
  695. // Not yet running: Remove it from the queue.
  696. this.taskQueue.remove(taskInfo);
  697. }
  698. };
  699. onabort(signal, taskInfo.abortListener);
  700. }
  701. // If there is a task queue, there's no point in looking for an available
  702. // Worker thread. Add this task to the queue, if possible.
  703. if (this.taskQueue.size > 0) {
  704. const totalCapacity = this.options.maxQueue + this.pendingCapacity();
  705. if (this.taskQueue.size >= totalCapacity) {
  706. if (this.options.maxQueue === 0) {
  707. return Promise.reject(Errors.NoTaskQueueAvailable());
  708. } else {
  709. return Promise.reject(Errors.TaskQueueAtLimit());
  710. }
  711. } else {
  712. if (this.workers.size < this.options.maxThreads) {
  713. this._addNewWorker();
  714. }
  715. this.taskQueue.push(taskInfo);
  716. }
  717. return ret;
  718. }
  719. // Look for a Worker with a minimum number of tasks it is currently running.
  720. let workerInfo : WorkerInfo | null = this.workers.findAvailable();
  721. // If we want the ability to abort this task, use only workers that have
  722. // no running tasks.
  723. if (workerInfo !== null && workerInfo.currentUsage() > 0 && signal) {
  724. workerInfo = null;
  725. }
  726. // If no Worker was found, or that Worker was handling another task in some
  727. // way, and we still have the ability to spawn new threads, do so.
  728. let waitingForNewWorker = false;
  729. if ((workerInfo === null || workerInfo.currentUsage() > 0) &&
  730. this.workers.size < this.options.maxThreads) {
  731. this._addNewWorker();
  732. waitingForNewWorker = true;
  733. }
  734. // If no Worker is found, try to put the task into the queue.
  735. if (workerInfo === null) {
  736. if (this.options.maxQueue <= 0 && !waitingForNewWorker) {
  737. return Promise.reject(Errors.NoTaskQueueAvailable());
  738. } else {
  739. this.taskQueue.push(taskInfo);
  740. }
  741. return ret;
  742. }
  743. // TODO(addaleax): Clean up the waitTime/runTime recording.
  744. const now = performance.now();
  745. this.waitTime.recordValue(now - taskInfo.created);
  746. taskInfo.started = now;
  747. workerInfo.postTask(taskInfo);
  748. this._maybeDrain();
  749. return ret;
  750. }
  751. pendingCapacity () : number {
  752. return this.workers.pendingItems.size *
  753. this.options.concurrentTasksPerWorker;
  754. }
  755. _maybeDrain () {
  756. if (this.taskQueue.size === 0 && this.skipQueue.length === 0) {
  757. this.publicInterface.emit('drain');
  758. }
  759. }
  760. async destroy () {
  761. while (this.skipQueue.length > 0) {
  762. const taskInfo : TaskInfo = this.skipQueue.shift() as TaskInfo;
  763. taskInfo.done(new Error('Terminating worker thread'));
  764. }
  765. while (this.taskQueue.size > 0) {
  766. const taskInfo : TaskInfo = this.taskQueue.shift() as TaskInfo;
  767. taskInfo.done(new Error('Terminating worker thread'));
  768. }
  769. const exitEvents : Promise<any[]>[] = [];
  770. while (this.workers.size > 0) {
  771. const [workerInfo] = this.workers;
  772. exitEvents.push(once(workerInfo.worker, 'exit'));
  773. this._removeWorker(workerInfo);
  774. }
  775. await Promise.all(exitEvents);
  776. }
  777. }
  778. class Piscina extends EventEmitterAsyncResource {
  779. #pool : ThreadPool;
  780. constructor (options : Options = {}) {
  781. super({ ...options, name: 'Piscina' });
  782. if (typeof options.filename !== 'string' && options.filename != null) {
  783. throw new TypeError('options.filename must be a string or null');
  784. }
  785. if (typeof options.name !== 'string' && options.name != null) {
  786. throw new TypeError('options.name must be a string or null');
  787. }
  788. if (options.minThreads !== undefined &&
  789. (typeof options.minThreads !== 'number' || options.minThreads < 0)) {
  790. throw new TypeError('options.minThreads must be a non-negative integer');
  791. }
  792. if (options.maxThreads !== undefined &&
  793. (typeof options.maxThreads !== 'number' || options.maxThreads < 1)) {
  794. throw new TypeError('options.maxThreads must be a positive integer');
  795. }
  796. if (options.minThreads !== undefined && options.maxThreads !== undefined &&
  797. options.minThreads > options.maxThreads) {
  798. throw new RangeError('options.minThreads and options.maxThreads must not conflict');
  799. }
  800. if (options.idleTimeout !== undefined &&
  801. (typeof options.idleTimeout !== 'number' || options.idleTimeout < 0)) {
  802. throw new TypeError('options.idleTimeout must be a non-negative integer');
  803. }
  804. if (options.maxQueue !== undefined &&
  805. options.maxQueue !== 'auto' &&
  806. (typeof options.maxQueue !== 'number' || options.maxQueue < 0)) {
  807. throw new TypeError('options.maxQueue must be a non-negative integer');
  808. }
  809. if (options.concurrentTasksPerWorker !== undefined &&
  810. (typeof options.concurrentTasksPerWorker !== 'number' ||
  811. options.concurrentTasksPerWorker < 1)) {
  812. throw new TypeError(
  813. 'options.concurrentTasksPerWorker must be a positive integer');
  814. }
  815. if (options.useAtomics !== undefined &&
  816. typeof options.useAtomics !== 'boolean') {
  817. throw new TypeError('options.useAtomics must be a boolean value');
  818. }
  819. if (options.resourceLimits !== undefined &&
  820. (typeof options.resourceLimits !== 'object' ||
  821. options.resourceLimits === null)) {
  822. throw new TypeError('options.resourceLimits must be an object');
  823. }
  824. if (options.taskQueue !== undefined && !isTaskQueue(options.taskQueue)) {
  825. throw new TypeError('options.taskQueue must be a TaskQueue object');
  826. }
  827. if (options.niceIncrement !== undefined &&
  828. (typeof options.niceIncrement !== 'number' || options.niceIncrement < 0)) {
  829. throw new TypeError('options.niceIncrement must be a non-negative integer');
  830. }
  831. if (options.trackUnmanagedFds !== undefined &&
  832. typeof options.trackUnmanagedFds !== 'boolean') {
  833. throw new TypeError('options.trackUnmanagedFds must be a boolean value');
  834. }
  835. this.#pool = new ThreadPool(this, options);
  836. }
  837. /** @deprecated Use run(task, options) instead **/
  838. runTask (task : any, transferList? : TransferList, filename? : string, abortSignal? : AbortSignalAny) : Promise<any>;
  839. /** @deprecated Use run(task, options) instead **/
  840. runTask (task : any, transferList? : TransferList, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<any>;
  841. /** @deprecated Use run(task, options) instead **/
  842. runTask (task : any, transferList? : string, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<any>;
  843. /** @deprecated Use run(task, options) instead **/
  844. runTask (task : any, transferList? : AbortSignalAny, filename? : undefined, abortSignal? : undefined) : Promise<any>;
  845. /** @deprecated Use run(task, options) instead **/
  846. runTask (task : any, transferList? : any, filename? : any, signal? : any) {
  847. // If transferList is a string or AbortSignal, shift it.
  848. if ((typeof transferList === 'object' && !Array.isArray(transferList)) ||
  849. typeof transferList === 'string') {
  850. signal = filename as (AbortSignalAny | undefined);
  851. filename = transferList;
  852. transferList = undefined;
  853. }
  854. // If filename is an AbortSignal, shift it.
  855. if (typeof filename === 'object' && !Array.isArray(filename)) {
  856. signal = filename;
  857. filename = undefined;
  858. }
  859. if (transferList !== undefined && !Array.isArray(transferList)) {
  860. return Promise.reject(
  861. new TypeError('transferList argument must be an Array'));
  862. }
  863. if (filename !== undefined && typeof filename !== 'string') {
  864. return Promise.reject(
  865. new TypeError('filename argument must be a string'));
  866. }
  867. if (signal !== undefined && typeof signal !== 'object') {
  868. return Promise.reject(
  869. new TypeError('signal argument must be an object'));
  870. }
  871. return this.#pool.runTask(
  872. task, {
  873. transferList,
  874. filename: filename || null,
  875. name: 'default',
  876. signal: signal || null
  877. });
  878. }
  879. run (task : any, options : RunOptions = kDefaultRunOptions) {
  880. if (options === null || typeof options !== 'object') {
  881. return Promise.reject(
  882. new TypeError('options must be an object'));
  883. }
  884. const {
  885. transferList,
  886. filename,
  887. name,
  888. signal
  889. } = options;
  890. if (transferList !== undefined && !Array.isArray(transferList)) {
  891. return Promise.reject(
  892. new TypeError('transferList argument must be an Array'));
  893. }
  894. if (filename != null && typeof filename !== 'string') {
  895. return Promise.reject(
  896. new TypeError('filename argument must be a string'));
  897. }
  898. if (name != null && typeof name !== 'string') {
  899. return Promise.reject(new TypeError('name argument must be a string'));
  900. }
  901. if (signal != null && typeof signal !== 'object') {
  902. return Promise.reject(
  903. new TypeError('signal argument must be an object'));
  904. }
  905. return this.#pool.runTask(task, { transferList, filename, name, signal });
  906. }
  907. destroy () {
  908. return this.#pool.destroy();
  909. }
  910. get options () : FilledOptions {
  911. return this.#pool.options;
  912. }
  913. get threads () : Worker[] {
  914. const ret : Worker[] = [];
  915. for (const workerInfo of this.#pool.workers) { ret.push(workerInfo.worker); }
  916. return ret;
  917. }
  918. get queueSize () : number {
  919. const pool = this.#pool;
  920. return Math.max(pool.taskQueue.size - pool.pendingCapacity(), 0);
  921. }
  922. get completed () : number {
  923. return this.#pool.completed;
  924. }
  925. get waitTime () : any {
  926. const result = hdrobj.histAsObj(this.#pool.waitTime);
  927. return hdrobj.addPercentiles(this.#pool.waitTime, result);
  928. }
  929. get runTime () : any {
  930. const result = hdrobj.histAsObj(this.#pool.runTime);
  931. return hdrobj.addPercentiles(this.#pool.runTime, result);
  932. }
  933. get utilization () : number {
  934. // The capacity is the max compute time capacity of the
  935. // pool to this point in time as determined by the length
  936. // of time the pool has been running multiplied by the
  937. // maximum number of threads.
  938. const capacity = this.duration * this.#pool.options.maxThreads;
  939. const totalMeanRuntime = this.#pool.runTime.mean *
  940. this.#pool.runTime.totalCount;
  941. // We calculate the appoximate pool utilization by multiplying
  942. // the mean run time of all tasks by the number of runtime
  943. // samples taken and dividing that by the capacity. The
  944. // theory here is that capacity represents the absolute upper
  945. // limit of compute time this pool could ever attain (but
  946. // never will for a variety of reasons. Multiplying the
  947. // mean run time by the number of tasks sampled yields an
  948. // approximation of the realized compute time. The utilization
  949. // then becomes a point-in-time measure of how active the
  950. // pool is.
  951. return totalMeanRuntime / capacity;
  952. }
  953. get duration () : number {
  954. return performance.now() - this.#pool.start;
  955. }
  956. static get isWorkerThread () : boolean {
  957. return commonState.isWorkerThread;
  958. }
  959. static get workerData () : any {
  960. return commonState.workerData;
  961. }
  962. static get version () : string {
  963. return version;
  964. }
  965. static get Piscina () {
  966. return Piscina;
  967. }
  968. static move (val : Transferable | TransferListItem | ArrayBufferView | ArrayBuffer | MessagePort) {
  969. if (val != null && typeof val === 'object' && typeof val !== 'function') {
  970. if (!isTransferable(val)) {
  971. if ((types as any).isArrayBufferView(val)) {
  972. val = new ArrayBufferViewTransferable(val as ArrayBufferView);
  973. } else {
  974. val = new DirectlyTransferable(val);
  975. }
  976. }
  977. markMovable(val);
  978. }
  979. return val;
  980. }
  981. static get transferableSymbol () { return kTransferable; }
  982. static get valueSymbol () { return kValue; }
  983. static get queueOptionsSymbol () { return kQueueOptions; }
  984. }
  985. export = Piscina;