wise-workers
A worker thread pool for CPU-bound tasks. It requires no configuration and has many powerful features:
-
Generator and AsyncGenerator function support
- Worker functions can be generator functions or async generator functions, making it easy to stream results back to the main thread. Iteration happens eagerly, to maximize parallelism (i.e., the main thread does not pause the generator function).
- Functions (callbacks) as arguments
- Functions can be passed to worker tasks. They become
async
functions in the worker thread, using MessagePort for communication under the hood.
- Functions can be passed to worker tasks. They become
- Cancellation support
- Tasks can be aborted using an AbortSignal.
- Automatic thread management
- Crashed threads are automatically respawned, unless they're crashing during startup (to prevent an infinite spawn loop).
- Zero-copy data transfer
- Data can be efficiently moved between threads without copying it. This is beneficial to performance when passing large Buffers between threads.
Installation
npm install wise-workers
Requires Node.js v16.x.x or later.
Usage
const ThreadPool = require('wise-workers');
const pool = new ThreadPool({ filename: require.resolve('./worker') });
const result = await pool.call('add', 2, 2); // => 4
worker.js
exports.add = (a, b) => a + b;
Example: Zero-copy
const ThreadPool = require('wise-workers');
const pool = new ThreadPool({ filename: require.resolve('./worker') });
const data = Buffer.alloc(1024 * 1024);
// pool.invoke() allows you to provide more options than pool.call()
const compressedData = await pool.invoke('compress', {
args: [data],
transferList: [data.buffer], // Pass the ArrayBuffer in the transferList
});
worker.js
const zlib = require('zlib');
const { move } = require('wise-workers');
exports.compress = (data) => {
const compressedData = zlib.gzipSync(data);
// Use move() to include a transferList in the return value.
return move(compressedData, [compressedData.buffer]);
};
Example: Generator function
When calling a generator function, you will get an async iterable object.
const ThreadPool = require('wise-workers');
const pool = new ThreadPool({ filename: require.resolve('./worker') });
const asyncIterable = await pool.call('readFile', 'data.csv');
for await (const chunk of asyncIterable) {
console.log(`got chunk of size ${chunk.byteLength} bytes`);
}
worker.js
const fs = require('fs');
const { move } = require('wise-workers');
exports.readFile = function* (filename, chunkSize = 1024 * 16) {
const fd = fs.openSync(filename);
try {
while (true) {
const buffer = Buffer.alloc(chunkSize);
const bytesRead = fs.readSync(fd, buffer, 0, chunkSize);
if (bytesRead > 0) {
const chunk = buffer.subarray(0, bytesRead);
// You can move() yielded values too
yield move(chunk, [chunk.buffer]);
}
if (bytesRead < chunkSize) {
break;
}
}
} finally {
fs.closeSync(fd);
}
};
Example: Callback function
You an pass callback functions to the worker, but they must be in the top-level arguments (they can't be nested within some other object). Callback functions can also be async
functions.
const ThreadPool = require('wise-workers');
const pool = new ThreadPool({ filename: require.resolve('./worker') });
const allowedList = new Set(getHugeDataset());
const result = await pool.call('search', searchTerm, (value) => {
return allowedList.has(value);
});
worker.js
exports.search = async (searchTerm, filter) => {
const matches = [];
for (const match of searchFor(searchTerm)) {
if (await filter(match)) {
matches.push(match);
}
}
return matches;
};
Currently, callback functions do not support "zero-copy" data transfer in their arguments or return values. This restriction may be lifted in the future.
Example: AbortSignal (cancellation)
Calling controller.abort()
will forcefully terminate the thread that's assigned to the associated task.
const ThreadPool = require('wise-workers');
const pool = new ThreadPool({ filename: require.resolve('./worker') });
const controller = new AbortController();
setTimeout(() => {
controller.abort();
}, 1000);
await pool.invoke('infiniteLoop', {
signal: controller.signal,
});
worker.js
exports.infiniteLoop = () => {
while (true) {}
};
Forcefulling aborting a thread is not a cheap operation, so it should only be used for exceptional/rare situations. For more common situations where performance is critical, you can use
util.transferableAbortSignal()
to implement your own co-operative cancellation logic.
API
new ThreadPool(options)
Creates a new thread pool. The following options are supported:
-
filename
(string, required)- The absolute path to the worker script or module. Both CommonJS and ESM modules are supported.
-
minThreads
(number, optional)- The minimum number of worker threads to keep in the pool. By default, this is equal to half the number of physical CPUs on the machine.
-
maxThreads
(number, optional)- The maximum number of worker threads to keep in the pool. By default, this is equal to the number of physical CPUs on the machine.
- The following options are passed directly to
new Worker()
under the hood:execArgv
argv
env
workerData
resourceLimits
trackUnmanagedFds
name
ThreadPool is an EventEmitter. It emits the following events:
-
error
: This occurs if a worker crashes unexpectedly. If this occurs while a worker is starting up, the ThreadPool will be destroyed. -
online:n
: Wheren
is an integer, these events occur when the number of online threads changes (i.e., when a new thread comes online or when an existing thread goes offline).
pool.call(methodName, [...args]) -> promise
Invokes a function exported by a worker thread. Even if the worker's function is synchronous, this method always returns a Promise.
The args can contain any value that is supported by the HTML structured clone algorithm. Additionally, functions may be passed within the top-level arguments (i.e., not nested within some other object), in which case they appear as async
functions in the worker thread.
If the worker method is a generator or async generator function, the returned promise will be resolved with an async iterable object.
pool.invoke(methodName, [options]) -> promise
This is the same as pool.call()
, except it supports more options:
-
args
(Array, optional)- The arguments to pass to the worker function.
-
transferList
(Array, optional)- A list of transferable objects within
args
that should be moved, rather than copied to the worker thread ("zero-copy").
- A list of transferable objects within
-
signal
(AbortSignal, optional)- An AbortSignal that, when aborted, will forcefully stop this task. If the signal is aborted after the task completes, nothing happens.
pool.destroy([error]) -> promise
Destroys the thread pool, cancelling any pending tasks and permanently terminating all threads. After being destroyed, the thread pool is no longer usable.
If an error
object is provided, all pending tasks will be rejected with it. Otherwise, a default error is used.
The returned promise resolves when all threads have finished shutting down.
ThreadPool properties
-
pool.filename
: The filename of the worker script being used. -
pool.threadCount
: The number of threads currently spawned within the pool. -
pool.onlineThreadCount
: The number of threads which are fully online, which means they have completed their initialization/startup and are capable of handling tasks. -
pool.activeThreadCount
: The number of threads which are busy with a pending task. -
pool.pendingTaskCount
: The number of pending tasks yet to be resolved. -
pool.destroyed
: Whether or not the thread pool is destroyed (boolean).
Static properties
-
ThreadPool.PHYSICAL_CORES
: The number of physical CPU cores detected on the machine. This number is used to calculate defaults forminThreads
andmaxThreads
when constructing a ThreadPool.