| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125 |
- import { Worker, MessageChannel, MessagePort, receiveMessageOnPort } from 'worker_threads';
- import { once } from 'events';
- import EventEmitterAsyncResource from 'eventemitter-asyncresource';
- import { AsyncResource } from 'async_hooks';
- import { cpus } from 'os';
- import { fileURLToPath, URL } from 'url';
- import { resolve } from 'path';
- import { inspect, types } from 'util';
- import assert from 'assert';
- import { Histogram, build } from 'hdr-histogram-js';
- import { performance } from 'perf_hooks';
- import hdrobj from 'hdr-histogram-percentiles-obj';
- import {
- ReadyMessage,
- RequestMessage,
- ResponseMessage,
- StartupMessage,
- commonState,
- kResponseCountField,
- kRequestCountField,
- kFieldCount,
- Transferable,
- Task,
- TaskQueue,
- kQueueOptions,
- isTaskQueue,
- isTransferable,
- markMovable,
- isMovable,
- kTransferable,
- kValue
- } from './common';
- import { version } from '../package.json';
- const cpuCount : number = (() => {
- try {
- return cpus().length;
- } catch {
- /* istanbul ignore next */
- return 1;
- }
- })();
- interface AbortSignalEventTargetAddOptions {
- once : boolean;
- };
- interface AbortSignalEventTarget {
- addEventListener : (
- name : 'abort',
- listener : () => void,
- options? : AbortSignalEventTargetAddOptions) => void;
- removeEventListener : (
- name : 'abort',
- listener : () => void) => void;
- aborted? : boolean;
- }
- interface AbortSignalEventEmitter {
- off : (name : 'abort', listener : () => void) => void;
- once : (name : 'abort', listener : () => void) => void;
- }
- type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter;
- function onabort (abortSignal : AbortSignalAny, listener : () => void) {
- if ('addEventListener' in abortSignal) {
- abortSignal.addEventListener('abort', listener, { once: true });
- } else {
- abortSignal.once('abort', listener);
- }
- }
- class AbortError extends Error {
- constructor () {
- super('The task has been aborted');
- }
- get name () { return 'AbortError'; }
- }
- type ResourceLimits = Worker extends {
- resourceLimits? : infer T;
- } ? T : {};
- type EnvSpecifier = typeof Worker extends {
- new (filename : never, options?: { env: infer T }) : Worker;
- } ? T : never;
- class ArrayTaskQueue implements TaskQueue {
- tasks : Task[] = [];
- get size () { return this.tasks.length; }
- shift () : Task | null {
- return this.tasks.shift() as Task;
- }
- push (task : Task) : void {
- this.tasks.push(task);
- }
- remove (task : Task) : void {
- const index = this.tasks.indexOf(task);
- assert.notStrictEqual(index, -1);
- this.tasks.splice(index, 1);
- }
- }
- interface Options {
- filename? : string | null,
- name?: string,
- minThreads? : number,
- maxThreads? : number,
- idleTimeout? : number,
- maxQueue? : number | 'auto',
- concurrentTasksPerWorker? : number,
- useAtomics? : boolean,
- resourceLimits? : ResourceLimits,
- argv? : string[],
- execArgv? : string[],
- env? : EnvSpecifier,
- workerData? : any,
- taskQueue? : TaskQueue,
- niceIncrement? : number,
- trackUnmanagedFds? : boolean,
- }
- interface FilledOptions extends Options {
- filename : string | null,
- name: string,
- minThreads : number,
- maxThreads : number,
- idleTimeout : number,
- maxQueue : number,
- concurrentTasksPerWorker : number,
- useAtomics: boolean,
- taskQueue : TaskQueue,
- niceIncrement : number
- }
- const kDefaultOptions : FilledOptions = {
- filename: null,
- name: 'default',
- minThreads: Math.max(cpuCount / 2, 1),
- maxThreads: cpuCount * 1.5,
- idleTimeout: 0,
- maxQueue: Infinity,
- concurrentTasksPerWorker: 1,
- useAtomics: true,
- taskQueue: new ArrayTaskQueue(),
- niceIncrement: 0,
- trackUnmanagedFds: true
- };
- interface RunOptions {
- transferList? : TransferList,
- filename? : string | null,
- signal? : AbortSignalAny | null,
- name? : string | null
- }
- interface FilledRunOptions extends RunOptions {
- transferList : TransferList | never,
- filename : string | null,
- signal : AbortSignalAny | null,
- name : string | null
- }
- const kDefaultRunOptions : FilledRunOptions = {
- transferList: undefined,
- filename: null,
- signal: null,
- name: null
- };
- class DirectlyTransferable implements Transferable {
- #value : object;
- constructor (value : object) {
- this.#value = value;
- }
- get [kTransferable] () : object { return this.#value; }
- get [kValue] () : object { return this.#value; }
- }
- class ArrayBufferViewTransferable implements Transferable {
- #view : ArrayBufferView;
- constructor (view : ArrayBufferView) {
- this.#view = view;
- }
- get [kTransferable] () : object { return this.#view.buffer; }
- get [kValue] () : object { return this.#view; }
- }
- let taskIdCounter = 0;
- type TaskCallback = (err : Error, result: any) => void;
- // Grab the type of `transferList` off `MessagePort`. At the time of writing,
- // only ArrayBuffer and MessagePort are valid, but let's avoid having to update
- // our types here every time Node.js adds support for more objects.
- type TransferList = MessagePort extends { postMessage(value : any, transferList : infer T) : any; } ? T : never;
- type TransferListItem = TransferList extends (infer T)[] ? T : never;
- function maybeFileURLToPath (filename : string) : string {
- return filename.startsWith('file:')
- ? fileURLToPath(new URL(filename))
- : filename;
- }
- // Extend AsyncResource so that async relations between posting a task and
- // receiving its result are visible to diagnostic tools.
- class TaskInfo extends AsyncResource implements Task {
- callback : TaskCallback;
- task : any;
- transferList : TransferList;
- filename : string;
- name : string;
- taskId : number;
- abortSignal : AbortSignalAny | null;
- abortListener : (() => void) | null = null;
- workerInfo : WorkerInfo | null = null;
- created : number;
- started : number;
- constructor (
- task : any,
- transferList : TransferList,
- filename : string,
- name : string,
- callback : TaskCallback,
- abortSignal : AbortSignalAny | null,
- triggerAsyncId : number) {
- super('Piscina.Task', { requireManualDestroy: true, triggerAsyncId });
- this.callback = callback;
- this.task = task;
- this.transferList = transferList;
- // If the task is a Transferable returned by
- // Piscina.move(), then add it to the transferList
- // automatically
- if (isMovable(task)) {
- // This condition should never be hit but typescript
- // complains if we dont do the check.
- /* istanbul ignore if */
- if (this.transferList == null) {
- this.transferList = [];
- }
- this.transferList =
- this.transferList.concat(task[kTransferable]);
- this.task = task[kValue];
- }
- this.filename = filename;
- this.name = name;
- this.taskId = taskIdCounter++;
- this.abortSignal = abortSignal;
- this.created = performance.now();
- this.started = 0;
- }
- releaseTask () : any {
- const ret = this.task;
- this.task = null;
- return ret;
- }
- done (err : Error | null, result? : any) : void {
- this.runInAsyncScope(this.callback, null, err, result);
- this.emitDestroy(); // `TaskInfo`s are used only once.
- // If an abort signal was used, remove the listener from it when
- // done to make sure we do not accidentally leak.
- if (this.abortSignal && this.abortListener) {
- if ('removeEventListener' in this.abortSignal && this.abortListener) {
- this.abortSignal.removeEventListener('abort', this.abortListener);
- } else {
- (this.abortSignal as AbortSignalEventEmitter).off(
- 'abort', this.abortListener);
- }
- }
- }
- get [kQueueOptions] () : object | null {
- return kQueueOptions in this.task ? this.task[kQueueOptions] : null;
- }
- }
- abstract class AsynchronouslyCreatedResource {
- onreadyListeners : (() => void)[] | null = [];
- markAsReady () : void {
- const listeners = this.onreadyListeners;
- assert(listeners !== null);
- this.onreadyListeners = null;
- for (const listener of listeners) {
- listener();
- }
- }
- isReady () : boolean {
- return this.onreadyListeners === null;
- }
- onReady (fn : () => void) {
- if (this.onreadyListeners === null) {
- fn(); // Zalgo is okay here.
- return;
- }
- this.onreadyListeners.push(fn);
- }
- abstract currentUsage() : number;
- }
- class AsynchronouslyCreatedResourcePool<
- T extends AsynchronouslyCreatedResource> {
- pendingItems = new Set<T>();
- readyItems = new Set<T>();
- maximumUsage : number;
- onAvailableListeners : ((item : T) => void)[];
- constructor (maximumUsage : number) {
- this.maximumUsage = maximumUsage;
- this.onAvailableListeners = [];
- }
- add (item : T) {
- this.pendingItems.add(item);
- item.onReady(() => {
- /* istanbul ignore else */
- if (this.pendingItems.has(item)) {
- this.pendingItems.delete(item);
- this.readyItems.add(item);
- this.maybeAvailable(item);
- }
- });
- }
- delete (item : T) {
- this.pendingItems.delete(item);
- this.readyItems.delete(item);
- }
- findAvailable () : T | null {
- let minUsage = this.maximumUsage;
- let candidate = null;
- for (const item of this.readyItems) {
- const usage = item.currentUsage();
- if (usage === 0) return item;
- if (usage < minUsage) {
- candidate = item;
- minUsage = usage;
- }
- }
- return candidate;
- }
- * [Symbol.iterator] () {
- yield * this.pendingItems;
- yield * this.readyItems;
- }
- get size () {
- return this.pendingItems.size + this.readyItems.size;
- }
- maybeAvailable (item : T) {
- /* istanbul ignore else */
- if (item.currentUsage() < this.maximumUsage) {
- for (const listener of this.onAvailableListeners) {
- listener(item);
- }
- }
- }
- onAvailable (fn : (item : T) => void) {
- this.onAvailableListeners.push(fn);
- }
- }
- type ResponseCallback = (response : ResponseMessage) => void;
- const Errors = {
- ThreadTermination:
- () => new Error('Terminating worker thread'),
- FilenameNotProvided:
- () => new Error('filename must be provided to run() or in options object'),
- TaskQueueAtLimit:
- () => new Error('Task queue is at limit'),
- NoTaskQueueAvailable:
- () => new Error('No task queue available and all Workers are busy')
- };
- class WorkerInfo extends AsynchronouslyCreatedResource {
- worker : Worker;
- taskInfos : Map<number, TaskInfo>;
- idleTimeout : NodeJS.Timeout | null = null; // eslint-disable-line no-undef
- port : MessagePort;
- sharedBuffer : Int32Array;
- lastSeenResponseCount : number = 0;
- onMessage : ResponseCallback;
- constructor (
- worker : Worker,
- port : MessagePort,
- onMessage : ResponseCallback) {
- super();
- this.worker = worker;
- this.port = port;
- this.port.on('message',
- (message : ResponseMessage) => this._handleResponse(message));
- this.onMessage = onMessage;
- this.taskInfos = new Map();
- this.sharedBuffer = new Int32Array(
- new SharedArrayBuffer(kFieldCount * Int32Array.BYTES_PER_ELEMENT));
- }
- destroy () : void {
- this.worker.terminate();
- this.port.close();
- this.clearIdleTimeout();
- for (const taskInfo of this.taskInfos.values()) {
- taskInfo.done(Errors.ThreadTermination());
- }
- this.taskInfos.clear();
- }
- clearIdleTimeout () : void {
- if (this.idleTimeout !== null) {
- clearTimeout(this.idleTimeout);
- this.idleTimeout = null;
- }
- }
- ref () : WorkerInfo {
- this.port.ref();
- return this;
- }
- unref () : WorkerInfo {
- // Note: Do not call ref()/unref() on the Worker itself since that may cause
- // a hard crash, see https://github.com/nodejs/node/pull/33394.
- this.port.unref();
- return this;
- }
- _handleResponse (message : ResponseMessage) : void {
- this.onMessage(message);
- if (this.taskInfos.size === 0) {
- // No more tasks running on this Worker means it should not keep the
- // process running.
- this.unref();
- }
- }
- postTask (taskInfo : TaskInfo) {
- assert(!this.taskInfos.has(taskInfo.taskId));
- const message : RequestMessage = {
- task: taskInfo.releaseTask(),
- taskId: taskInfo.taskId,
- filename: taskInfo.filename,
- name: taskInfo.name
- };
- try {
- this.port.postMessage(message, taskInfo.transferList);
- } catch (err) {
- // This would mostly happen if e.g. message contains unserializable data
- // or transferList is invalid.
- taskInfo.done(err);
- return;
- }
- taskInfo.workerInfo = this;
- this.taskInfos.set(taskInfo.taskId, taskInfo);
- this.ref();
- this.clearIdleTimeout();
- // Inform the worker that there are new messages posted, and wake it up
- // if it is waiting for one.
- Atomics.add(this.sharedBuffer, kRequestCountField, 1);
- Atomics.notify(this.sharedBuffer, kRequestCountField, 1);
- }
- processPendingMessages () {
- // If we *know* that there are more messages than we have received using
- // 'message' events yet, then try to load and handle them synchronously,
- // without the need to wait for more expensive events on the event loop.
- // This would usually break async tracking, but in our case, we already have
- // the extra TaskInfo/AsyncResource layer that rectifies that situation.
- const actualResponseCount =
- Atomics.load(this.sharedBuffer, kResponseCountField);
- if (actualResponseCount !== this.lastSeenResponseCount) {
- this.lastSeenResponseCount = actualResponseCount;
- let entry;
- while ((entry = receiveMessageOnPort(this.port)) !== undefined) {
- this._handleResponse(entry.message);
- }
- }
- }
- isRunningAbortableTask () : boolean {
- // If there are abortable tasks, we are running one at most per Worker.
- if (this.taskInfos.size !== 1) return false;
- const [[, task]] = this.taskInfos;
- return task.abortSignal !== null;
- }
- currentUsage () : number {
- if (this.isRunningAbortableTask()) return Infinity;
- return this.taskInfos.size;
- }
- }
- class ThreadPool {
- publicInterface : Piscina;
- workers : AsynchronouslyCreatedResourcePool<WorkerInfo>;
- options : FilledOptions;
- taskQueue : TaskQueue;
- skipQueue : TaskInfo[] = [];
- completed : number = 0;
- runTime : Histogram;
- waitTime : Histogram;
- start : number = performance.now();
- inProcessPendingMessages : boolean = false;
- startingUp : boolean = false;
- workerFailsDuringBootstrap : boolean = false;
- constructor (publicInterface : Piscina, options : Options) {
- this.publicInterface = publicInterface;
- this.taskQueue = options.taskQueue || new ArrayTaskQueue();
- this.runTime = build({ lowestDiscernibleValue: 1 });
- this.waitTime = build({ lowestDiscernibleValue: 1 });
- const filename =
- options.filename ? maybeFileURLToPath(options.filename) : null;
- this.options = { ...kDefaultOptions, ...options, filename, maxQueue: 0 };
- // The >= and <= could be > and < but this way we get 100 % coverage 🙃
- if (options.maxThreads !== undefined &&
- this.options.minThreads >= options.maxThreads) {
- this.options.minThreads = options.maxThreads;
- }
- if (options.minThreads !== undefined &&
- this.options.maxThreads <= options.minThreads) {
- this.options.maxThreads = options.minThreads;
- }
- if (options.maxQueue === 'auto') {
- this.options.maxQueue = this.options.maxThreads ** 2;
- } else {
- this.options.maxQueue = options.maxQueue ?? kDefaultOptions.maxQueue;
- }
- this.workers = new AsynchronouslyCreatedResourcePool<WorkerInfo>(
- this.options.concurrentTasksPerWorker);
- this.workers.onAvailable((w : WorkerInfo) => this._onWorkerAvailable(w));
- this.startingUp = true;
- this._ensureMinimumWorkers();
- this.startingUp = false;
- }
- _ensureMinimumWorkers () : void {
- while (this.workers.size < this.options.minThreads) {
- this._addNewWorker();
- }
- }
- _addNewWorker () : void {
- const pool = this;
- const worker = new Worker(resolve(__dirname, 'worker.js'), {
- env: this.options.env,
- argv: this.options.argv,
- execArgv: this.options.execArgv,
- resourceLimits: this.options.resourceLimits,
- workerData: this.options.workerData,
- trackUnmanagedFds: this.options.trackUnmanagedFds
- });
- const { port1, port2 } = new MessageChannel();
- const workerInfo = new WorkerInfo(worker, port1, onMessage);
- if (this.startingUp) {
- // There is no point in waiting for the initial set of Workers to indicate
- // that they are ready, we just mark them as such from the start.
- workerInfo.markAsReady();
- }
- const message : StartupMessage = {
- filename: this.options.filename,
- name: this.options.name,
- port: port2,
- sharedBuffer: workerInfo.sharedBuffer,
- useAtomics: this.options.useAtomics,
- niceIncrement: this.options.niceIncrement
- };
- worker.postMessage(message, [port2]);
- function onMessage (message : ResponseMessage) {
- const { taskId, result } = message;
- // In case of success: Call the callback that was passed to `runTask`,
- // remove the `TaskInfo` associated with the Worker, which marks it as
- // free again.
- const taskInfo = workerInfo.taskInfos.get(taskId);
- workerInfo.taskInfos.delete(taskId);
- pool.workers.maybeAvailable(workerInfo);
- /* istanbul ignore if */
- if (taskInfo === undefined) {
- const err = new Error(
- `Unexpected message from Worker: ${inspect(message)}`);
- pool.publicInterface.emit('error', err);
- } else {
- taskInfo.done(message.error, result);
- }
- pool._processPendingMessages();
- }
- worker.on('message', (message : ReadyMessage) => {
- if (message.ready === true) {
- if (workerInfo.currentUsage() === 0) {
- workerInfo.unref();
- }
- if (!workerInfo.isReady()) {
- workerInfo.markAsReady();
- }
- return;
- }
- worker.emit('error', new Error(
- `Unexpected message on Worker: ${inspect(message)}`));
- });
- worker.on('error', (err : Error) => {
- // Work around the bug in https://github.com/nodejs/node/pull/33394
- worker.ref = () => {};
- // In case of an uncaught exception: Call the callback that was passed to
- // `postTask` with the error, or emit an 'error' event if there is none.
- const taskInfos = [...workerInfo.taskInfos.values()];
- workerInfo.taskInfos.clear();
- // Remove the worker from the list and potentially start a new Worker to
- // replace the current one.
- this._removeWorker(workerInfo);
- if (workerInfo.isReady() && !this.workerFailsDuringBootstrap) {
- this._ensureMinimumWorkers();
- } else {
- // Do not start new workers over and over if they already fail during
- // bootstrap, there's no point.
- this.workerFailsDuringBootstrap = true;
- }
- if (taskInfos.length > 0) {
- for (const taskInfo of taskInfos) {
- taskInfo.done(err, null);
- }
- } else {
- this.publicInterface.emit('error', err);
- }
- });
- worker.unref();
- port1.on('close', () => {
- // The port is only closed if the Worker stops for some reason, but we
- // always .unref() the Worker itself. We want to receive e.g. 'error'
- // events on it, so we ref it once we know it's going to exit anyway.
- worker.ref();
- });
- this.workers.add(workerInfo);
- }
- _processPendingMessages () {
- if (this.inProcessPendingMessages || !this.options.useAtomics) {
- return;
- }
- this.inProcessPendingMessages = true;
- try {
- for (const workerInfo of this.workers) {
- workerInfo.processPendingMessages();
- }
- } finally {
- this.inProcessPendingMessages = false;
- }
- }
- _removeWorker (workerInfo : WorkerInfo) : void {
- workerInfo.destroy();
- this.workers.delete(workerInfo);
- }
- _onWorkerAvailable (workerInfo : WorkerInfo) : void {
- while ((this.taskQueue.size > 0 || this.skipQueue.length > 0) &&
- workerInfo.currentUsage() < this.options.concurrentTasksPerWorker) {
- // The skipQueue will have tasks that we previously shifted off
- // the task queue but had to skip over... we have to make sure
- // we drain that before we drain the taskQueue.
- const taskInfo = this.skipQueue.shift() ||
- this.taskQueue.shift() as TaskInfo;
- // If the task has an abortSignal and the worker has any other
- // tasks, we cannot distribute the task to it. Skip for now.
- if (taskInfo.abortSignal && workerInfo.taskInfos.size > 0) {
- this.skipQueue.push(taskInfo);
- break;
- }
- const now = performance.now();
- this.waitTime.recordValue(now - taskInfo.created);
- taskInfo.started = now;
- workerInfo.postTask(taskInfo);
- this._maybeDrain();
- return;
- }
- if (workerInfo.taskInfos.size === 0 &&
- this.workers.size > this.options.minThreads) {
- workerInfo.idleTimeout = setTimeout(() => {
- assert.strictEqual(workerInfo.taskInfos.size, 0);
- if (this.workers.size > this.options.minThreads) {
- this._removeWorker(workerInfo);
- }
- }, this.options.idleTimeout).unref();
- }
- }
- runTask (
- task : any,
- options : RunOptions) : Promise<any> {
- let {
- filename,
- name
- } = options;
- const {
- transferList = [],
- signal = null
- } = options;
- if (filename == null) {
- filename = this.options.filename;
- }
- if (name == null) {
- name = this.options.name;
- }
- if (typeof filename !== 'string') {
- return Promise.reject(Errors.FilenameNotProvided());
- }
- filename = maybeFileURLToPath(filename);
- let resolve : (result : any) => void;
- let reject : (err : Error) => void;
- // eslint-disable-next-line
- const ret = new Promise((res, rej) => { resolve = res; reject = rej; });
- const taskInfo = new TaskInfo(
- task,
- transferList,
- filename,
- name,
- (err : Error | null, result : any) => {
- this.completed++;
- if (taskInfo.started) {
- this.runTime.recordValue(performance.now() - taskInfo.started);
- }
- if (err !== null) {
- reject(err);
- } else {
- resolve(result);
- }
- },
- signal,
- this.publicInterface.asyncResource.asyncId());
- if (signal !== null) {
- // If the AbortSignal has an aborted property and it's truthy,
- // reject immediately.
- if ((signal as AbortSignalEventTarget).aborted) {
- return Promise.reject(new AbortError());
- }
- taskInfo.abortListener = () => {
- // Call reject() first to make sure we always reject with the AbortError
- // if the task is aborted, not with an Error from the possible
- // thread termination below.
- reject(new AbortError());
- if (taskInfo.workerInfo !== null) {
- // Already running: We cancel the Worker this is running on.
- this._removeWorker(taskInfo.workerInfo);
- this._ensureMinimumWorkers();
- } else {
- // Not yet running: Remove it from the queue.
- this.taskQueue.remove(taskInfo);
- }
- };
- onabort(signal, taskInfo.abortListener);
- }
- // If there is a task queue, there's no point in looking for an available
- // Worker thread. Add this task to the queue, if possible.
- if (this.taskQueue.size > 0) {
- const totalCapacity = this.options.maxQueue + this.pendingCapacity();
- if (this.taskQueue.size >= totalCapacity) {
- if (this.options.maxQueue === 0) {
- return Promise.reject(Errors.NoTaskQueueAvailable());
- } else {
- return Promise.reject(Errors.TaskQueueAtLimit());
- }
- } else {
- if (this.workers.size < this.options.maxThreads) {
- this._addNewWorker();
- }
- this.taskQueue.push(taskInfo);
- }
- return ret;
- }
- // Look for a Worker with a minimum number of tasks it is currently running.
- let workerInfo : WorkerInfo | null = this.workers.findAvailable();
- // If we want the ability to abort this task, use only workers that have
- // no running tasks.
- if (workerInfo !== null && workerInfo.currentUsage() > 0 && signal) {
- workerInfo = null;
- }
- // If no Worker was found, or that Worker was handling another task in some
- // way, and we still have the ability to spawn new threads, do so.
- let waitingForNewWorker = false;
- if ((workerInfo === null || workerInfo.currentUsage() > 0) &&
- this.workers.size < this.options.maxThreads) {
- this._addNewWorker();
- waitingForNewWorker = true;
- }
- // If no Worker is found, try to put the task into the queue.
- if (workerInfo === null) {
- if (this.options.maxQueue <= 0 && !waitingForNewWorker) {
- return Promise.reject(Errors.NoTaskQueueAvailable());
- } else {
- this.taskQueue.push(taskInfo);
- }
- return ret;
- }
- // TODO(addaleax): Clean up the waitTime/runTime recording.
- const now = performance.now();
- this.waitTime.recordValue(now - taskInfo.created);
- taskInfo.started = now;
- workerInfo.postTask(taskInfo);
- this._maybeDrain();
- return ret;
- }
- pendingCapacity () : number {
- return this.workers.pendingItems.size *
- this.options.concurrentTasksPerWorker;
- }
- _maybeDrain () {
- if (this.taskQueue.size === 0 && this.skipQueue.length === 0) {
- this.publicInterface.emit('drain');
- }
- }
- async destroy () {
- while (this.skipQueue.length > 0) {
- const taskInfo : TaskInfo = this.skipQueue.shift() as TaskInfo;
- taskInfo.done(new Error('Terminating worker thread'));
- }
- while (this.taskQueue.size > 0) {
- const taskInfo : TaskInfo = this.taskQueue.shift() as TaskInfo;
- taskInfo.done(new Error('Terminating worker thread'));
- }
- const exitEvents : Promise<any[]>[] = [];
- while (this.workers.size > 0) {
- const [workerInfo] = this.workers;
- exitEvents.push(once(workerInfo.worker, 'exit'));
- this._removeWorker(workerInfo);
- }
- await Promise.all(exitEvents);
- }
- }
- class Piscina extends EventEmitterAsyncResource {
- #pool : ThreadPool;
- constructor (options : Options = {}) {
- super({ ...options, name: 'Piscina' });
- if (typeof options.filename !== 'string' && options.filename != null) {
- throw new TypeError('options.filename must be a string or null');
- }
- if (typeof options.name !== 'string' && options.name != null) {
- throw new TypeError('options.name must be a string or null');
- }
- if (options.minThreads !== undefined &&
- (typeof options.minThreads !== 'number' || options.minThreads < 0)) {
- throw new TypeError('options.minThreads must be a non-negative integer');
- }
- if (options.maxThreads !== undefined &&
- (typeof options.maxThreads !== 'number' || options.maxThreads < 1)) {
- throw new TypeError('options.maxThreads must be a positive integer');
- }
- if (options.minThreads !== undefined && options.maxThreads !== undefined &&
- options.minThreads > options.maxThreads) {
- throw new RangeError('options.minThreads and options.maxThreads must not conflict');
- }
- if (options.idleTimeout !== undefined &&
- (typeof options.idleTimeout !== 'number' || options.idleTimeout < 0)) {
- throw new TypeError('options.idleTimeout must be a non-negative integer');
- }
- if (options.maxQueue !== undefined &&
- options.maxQueue !== 'auto' &&
- (typeof options.maxQueue !== 'number' || options.maxQueue < 0)) {
- throw new TypeError('options.maxQueue must be a non-negative integer');
- }
- if (options.concurrentTasksPerWorker !== undefined &&
- (typeof options.concurrentTasksPerWorker !== 'number' ||
- options.concurrentTasksPerWorker < 1)) {
- throw new TypeError(
- 'options.concurrentTasksPerWorker must be a positive integer');
- }
- if (options.useAtomics !== undefined &&
- typeof options.useAtomics !== 'boolean') {
- throw new TypeError('options.useAtomics must be a boolean value');
- }
- if (options.resourceLimits !== undefined &&
- (typeof options.resourceLimits !== 'object' ||
- options.resourceLimits === null)) {
- throw new TypeError('options.resourceLimits must be an object');
- }
- if (options.taskQueue !== undefined && !isTaskQueue(options.taskQueue)) {
- throw new TypeError('options.taskQueue must be a TaskQueue object');
- }
- if (options.niceIncrement !== undefined &&
- (typeof options.niceIncrement !== 'number' || options.niceIncrement < 0)) {
- throw new TypeError('options.niceIncrement must be a non-negative integer');
- }
- if (options.trackUnmanagedFds !== undefined &&
- typeof options.trackUnmanagedFds !== 'boolean') {
- throw new TypeError('options.trackUnmanagedFds must be a boolean value');
- }
- this.#pool = new ThreadPool(this, options);
- }
- /** @deprecated Use run(task, options) instead **/
- runTask (task : any, transferList? : TransferList, filename? : string, abortSignal? : AbortSignalAny) : Promise<any>;
- /** @deprecated Use run(task, options) instead **/
- runTask (task : any, transferList? : TransferList, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<any>;
- /** @deprecated Use run(task, options) instead **/
- runTask (task : any, transferList? : string, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<any>;
- /** @deprecated Use run(task, options) instead **/
- runTask (task : any, transferList? : AbortSignalAny, filename? : undefined, abortSignal? : undefined) : Promise<any>;
- /** @deprecated Use run(task, options) instead **/
- runTask (task : any, transferList? : any, filename? : any, signal? : any) {
- // If transferList is a string or AbortSignal, shift it.
- if ((typeof transferList === 'object' && !Array.isArray(transferList)) ||
- typeof transferList === 'string') {
- signal = filename as (AbortSignalAny | undefined);
- filename = transferList;
- transferList = undefined;
- }
- // If filename is an AbortSignal, shift it.
- if (typeof filename === 'object' && !Array.isArray(filename)) {
- signal = filename;
- filename = undefined;
- }
- if (transferList !== undefined && !Array.isArray(transferList)) {
- return Promise.reject(
- new TypeError('transferList argument must be an Array'));
- }
- if (filename !== undefined && typeof filename !== 'string') {
- return Promise.reject(
- new TypeError('filename argument must be a string'));
- }
- if (signal !== undefined && typeof signal !== 'object') {
- return Promise.reject(
- new TypeError('signal argument must be an object'));
- }
- return this.#pool.runTask(
- task, {
- transferList,
- filename: filename || null,
- name: 'default',
- signal: signal || null
- });
- }
- run (task : any, options : RunOptions = kDefaultRunOptions) {
- if (options === null || typeof options !== 'object') {
- return Promise.reject(
- new TypeError('options must be an object'));
- }
- const {
- transferList,
- filename,
- name,
- signal
- } = options;
- if (transferList !== undefined && !Array.isArray(transferList)) {
- return Promise.reject(
- new TypeError('transferList argument must be an Array'));
- }
- if (filename != null && typeof filename !== 'string') {
- return Promise.reject(
- new TypeError('filename argument must be a string'));
- }
- if (name != null && typeof name !== 'string') {
- return Promise.reject(new TypeError('name argument must be a string'));
- }
- if (signal != null && typeof signal !== 'object') {
- return Promise.reject(
- new TypeError('signal argument must be an object'));
- }
- return this.#pool.runTask(task, { transferList, filename, name, signal });
- }
- destroy () {
- return this.#pool.destroy();
- }
- get options () : FilledOptions {
- return this.#pool.options;
- }
- get threads () : Worker[] {
- const ret : Worker[] = [];
- for (const workerInfo of this.#pool.workers) { ret.push(workerInfo.worker); }
- return ret;
- }
- get queueSize () : number {
- const pool = this.#pool;
- return Math.max(pool.taskQueue.size - pool.pendingCapacity(), 0);
- }
- get completed () : number {
- return this.#pool.completed;
- }
- get waitTime () : any {
- const result = hdrobj.histAsObj(this.#pool.waitTime);
- return hdrobj.addPercentiles(this.#pool.waitTime, result);
- }
- get runTime () : any {
- const result = hdrobj.histAsObj(this.#pool.runTime);
- return hdrobj.addPercentiles(this.#pool.runTime, result);
- }
- get utilization () : number {
- // The capacity is the max compute time capacity of the
- // pool to this point in time as determined by the length
- // of time the pool has been running multiplied by the
- // maximum number of threads.
- const capacity = this.duration * this.#pool.options.maxThreads;
- const totalMeanRuntime = this.#pool.runTime.mean *
- this.#pool.runTime.totalCount;
- // We calculate the appoximate pool utilization by multiplying
- // the mean run time of all tasks by the number of runtime
- // samples taken and dividing that by the capacity. The
- // theory here is that capacity represents the absolute upper
- // limit of compute time this pool could ever attain (but
- // never will for a variety of reasons. Multiplying the
- // mean run time by the number of tasks sampled yields an
- // approximation of the realized compute time. The utilization
- // then becomes a point-in-time measure of how active the
- // pool is.
- return totalMeanRuntime / capacity;
- }
- get duration () : number {
- return performance.now() - this.#pool.start;
- }
- static get isWorkerThread () : boolean {
- return commonState.isWorkerThread;
- }
- static get workerData () : any {
- return commonState.workerData;
- }
- static get version () : string {
- return version;
- }
- static get Piscina () {
- return Piscina;
- }
- static move (val : Transferable | TransferListItem | ArrayBufferView | ArrayBuffer | MessagePort) {
- if (val != null && typeof val === 'object' && typeof val !== 'function') {
- if (!isTransferable(val)) {
- if ((types as any).isArrayBufferView(val)) {
- val = new ArrayBufferViewTransferable(val as ArrayBufferView);
- } else {
- val = new DirectlyTransferable(val);
- }
- }
- markMovable(val);
- }
- return val;
- }
- static get transferableSymbol () { return kTransferable; }
- static get valueSymbol () { return kValue; }
- static get queueOptionsSymbol () { return kQueueOptions; }
- }
- export = Piscina;
|