Code Monkey home page Code Monkey logo

piscina's People

Contributors

0xflotus avatar addaleax avatar alan-agius4 avatar andersonjoseph avatar blackglory avatar chocobozzz avatar clayjones-at avatar clydin avatar deivu avatar dependabot[bot] avatar elyahou avatar groozin avatar hhprogram avatar jaoodxd avatar jasnell avatar marsup avatar metcoder95 avatar nicholas-l avatar prinzhorn avatar rafaelgss avatar samverschueren avatar simenb avatar skrylnikov avatar theanarkh avatar trivikr avatar zaubernerd avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

piscina's Issues

Feature: maxQueue = 'auto'

In doing a variety of performance tests, I've found that keeping maxQueue limited to a multiple of maxThreads generally has the best performance in terms of minimizing queue waits and keeping memory bounded. Right now the default maxQueue is Infinity which I think we should keep for now but I'd like to have a mode where maxQueue is set automatically based on the max thread count. maxQueue = 'auto' would do exactly that.

Renamed master -> current

Created a new current branch and made it the default. Will keep master around for a little while but the plan is to delete that branch

RangeError: Maximum call stack size exceeded

I have the below example

const path = require('path')
const csv = require('csvtojson')
const Pool = require('piscina')

const workerPool = new Pool({
    filename: path.resolve(__dirname, 'worker.js'),
})

csv()
    .fromFile('4mb.csv')
    .on('data', (data) => {
        const line = data.toString('utf8')
        workerPool.runTask(line)
    })
    .on('error', console.error)
    .on('end', () => {
        console.log('done!')
    })

worker.js: just parse json and print it. nothing fancy.

module.exports = (data) => {    
    console.log(JSON.parse(data))
}

I'm seeing a lot of errors like this

internal/worker/io.js:251
  const message = receiveMessageOnPort_(port);
                  ^

RangeError: Maximum call stack size exceeded
    at receiveMessageOnPort (<anonymous>)
    at receiveMessageOnPort (internal/worker/io.js:251:19)
    at WorkerInfo.processPendingMessages (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:174:29)
    at WorkerInfo.onMessage (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:249:32)
    at WorkerInfo._handleResponse (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:132:14)
    at WorkerInfo.processPendingMessages (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:175:22)
    at WorkerInfo.onMessage (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:249:32)
    at WorkerInfo._handleResponse (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:132:14)
    at WorkerInfo.processPendingMessages (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:175:22)
    at WorkerInfo.onMessage (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:249:32)
    at WorkerInfo._handleResponse (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:132:14)
    at WorkerInfo.processPendingMessages (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:175:22)
    at WorkerInfo.onMessage (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:249:32)
    at WorkerInfo._handleResponse (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:132:14)
    at WorkerInfo.processPendingMessages (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:175:22)
    at WorkerInfo.onMessage (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:249:32)

Typescript examples of use

I can't manange to make it work with typescript.
The big problem is the "worker.ts" file, when compiling to "worker.js" file there is no
module.exports = ({ a, b }) => { return a + b; };
In the JS file compiled.
Using export default function() in worker.ts file is not working at all.

Api allows workers to do initialization but it is not clear how to do cleanup

Hi there,

Docs says that it is possible to do initialization inside worker. I am using that feature to create a database connection like this:

async function initialize() {
  const conn = await connectToDb();
  return (query) => conn.exec(query);
}

module.exports = initialize();

During its lifetime, pool may decide to close inactive threads. In that case I need to close database connection.
But currently I didn't found any api which allows to do that.

Currently I set minThreads and maxThreads to the same value and have the following as a workaround:

const Piscina = require("piscina")

const workerCount = 10;

const pool = new Piscina({
    filename: __dirname + "/searchMessagesWorker.cjs",
    minThreads: workerCount,
    maxThreads: workerCount,
    idleTimeout: Integer.MAX_SAFE_INTEGER
})

// this will be set to `false` each time user calls `pool.runTask`
let poolIsEmpty = true
pool.on('drain', () => {
    poolIsEmpty = true
})
    
function shutdown() {
    // drainPromise will be resolved as soon pool will be drained
    const drainPromise = new Promise((resolve) => {
        if (poolIsEmpty) {
           resolve()
        }
        else {
            pool.once('drain', resolve)
        }
    })

    return drainPromise.then(
        () => {
            // no tasks are scheduled on the pool at this moment.
            // Schedule "workerCount" amount "close" tasks in a single loop,
            // assuming Piscina will assign a single task to each thread.
            const promises = []
            for (let i = 0; i < workerCount; ++i) {
                // this tells worker to close DB connection
                promises.push(pool.runTask({
                    type: 'close'
                }))
            }
            return Promise.all(promises)
        }
    ).then(
        () => pool.destroy()
    )
}

Not only it is complicated, but also fragile (depending on how Piscina schedules tasks under the hood, setting idleTimeout to very high number). Also, this way it is not possible to have dynamic number of threads in the pool.

Interesting, but cleanup functionality is something which is missing in all worker pool implementations I can find on npm. I am wondering why?
Maybe there is a completely different way to do cleanup inside worker thread?

I'll be grateful for any explanation/reading resource on this topic.
If this is something which is in scope of Piscina package I am ready to work on PR as well.

Feature: Dynamic scaling of minimum worker count

Currently the minimum number of workers is fixed at piscina start, with the minimum maintained at any given time. In discussion the idea of a dynamically scaled minimum based on the queue pressure was suggested. Essentially, rather than scale up one new worker at a time as needed when a new task is completed, automatically start launching N new workers (up to max worker count) while tasks are being added to the queue, where the minimum is dynamically calculated as a factor of the pending queue size.

Whether this would have a measurable performance improvement or not remains to be determined.

Feature: Interface Transferable / Piscina.move()

Currently, values returned by a worker are not transferable. That is, they will always be cloned because we currently have no way of telling Piscina that the value should be transferred instead.

module.exports = () => {
  const buf = Buffer.alloc(10);
  return buf; // Copies the buffer, rather than transferring it
}

My proposal is to add a new Piscina.move() function that takes an input object capable of being transferred and wraps it in a new Transferable interface (in typescript terms) that effectively marks it as being able to be transferred.

const { move } = require('piscina');

module.exports = () => {
  const buf = Buffer.alloc(10);
  return move(buf);
}

The return value of move() is a Transferable, a wrapper object that (a) marks the object as being transferable and (b) provides a reference to the actual internal transferable object (e.g. the underlying ArrayBuffer in the example above.

This would only work with top level objects... for instance, the following would not cause the buffer to be transferred:

module.exports = () => {
  return { m: move(Buffer.alloc(10)) }
}

The move() function would work out of the box with objects known to be transferable by Node.js (e.g. MessagePorts and TypedArrays). Users would be able to create their own transferable objects by implementing the Transferable interface on their classes.

Feature Request: Seamless stream and/or async iterable support

Would be nice to be able to provide a stream and/or async iterable (of buffers) as argument and have is seamlessly (using transferable) accessible in the worker.

e.g.

const handle = await fs.open(dst)
try {
  for await (const buf of worker.runTask(fs.createReadStream(src))) {
     await handle.write(buf)
  }
} finally {
  await handle.close()
}
module.exports = async function * (source) {
  for await (const buf of source) {
    yield veryExpensiveProcessing(buf)
  }
}

Crash on startup, no error given

I'm using typescript, __dirname is not defined, so I'm resolving my path manually.

this.workerPool = new Piscina({ filename: path.resolve("test/worker", 'worker.js'), maxThreads: 2 });

Doesn't go past that.

Support explicit registration of workers

Description

Add a mechanism for worker code to register itself with piscina instead of relying on the existing ./src/worker.ts logic that attempts to resolve and load (and cache) worker code.

Use-case

When bundling server-side code, ./dist/worker.js will not be picked up in the same bundle as ./dist/index.js because there are no statically-analyzable dependencies on the worker file. As a result, at runtime, the new location of the now-bundled ./dist/index.js is no longer adjacent to ./dist/worker.js on disk.

To get around this, it is possible to add ./dist/worker.js as a separate entrypoint to make sure it gets tracked by the bundler. An alternative solution, seen in workerpool is that it exposes a worker(functions: Record<string, (...args: any[]) => any>): void API that allows a worker to import the library's coordination code. The way I see this is that it inverts the relationship so that the worker has an explicit opt-in option.

A side-benefit that I'd love to see considered in piscina is that this provides a clean facility for registering functions that are not necessarily default exports and even for registering different named functions so that a dispatching layer isn't needed.

worker graceful shutdown

how can i handle graceful shutdown (connections and something like this ... ) of worker before terminate.
in case of destroy or no more threads ?

Feature: Thread/Task scheduling

@addaleax said:

In a similar vein, it would be nice to be able to set scheduling parameters for the spawned threads, but that would require upstream support in libuv first, then in Node.js, then here.

I've been considering this a bit and a tunable scheduling strategy would be interesting but we'd need to play with it a bit to make sure it's actually worthwhile. The strategies I'd like to explore are:

  • OS-level scheduling params (but as @addaleax points out, we have to wait on libuv/node support)
  • Piscina-level queue strategies: Done!
    • fifo - The strategy we currently use for draining the queue
    • lifo - my hypothesis is that this won't give any desirable perf but I'd like to profile various scenarios to see what impact flipping the drain order would have.
    • priority - use a priority queue to determine which tasks are selected next

Piscina vs Cluster

Why and when should I use Piscina instead of Node's Cluster API?
I see there are some nice abstractions but as long as there are issues - when using third party modules - like bundle size, watch for CVEs and vulnerable dependencies etc... I wonder if there is a good reason for using it.

I briefly read the source code and could not figure out when it is a good idea.

Is it safe and advisable to use with expressjs or hapi?

Reusable worker pool

Hello, I am staring using this library. I am trying to create a pool of workers then assign them jobs dynamically so as to reuse the same pool of workers. How can i achieve this?

Feature: Multi-step tasks

This is one I'm not at all convinced about but came up in a conversation and I said I would investigate.

Essentially, it's about submitting a single task specification that goes through multiple steps before finally resolving. The output of one step feeds into the input of the next. Exactly how to define that flow is unclear.

The way we would do this now is simply:

piscina.runTask(await piscina.runTask({}))

Which begs the question about whether an API for this case would even be necessary.

Feature: Wait for drain API

Currently Piscina will emit a drain event when the queue is empty, which we can await by wrapping it in a events.on() (e.g. await on(piscina, 'drain') ...

It would be nicer to have an async function that avoids having to use the additional events.on() wrapper...

e.g.

await piscina.drain();

Warnings about experimental ESM module loader while CommonJS is used

Hi,

I was trying to use Piscina in production and I've used CommonJS for all my code, including worker.

Basically, my worker looks like this:

async function initialize() {
    return (action) => {
         // do something cpu-intensive here, based on action received
    }
}
module.exports = initialize();

Each time the worker starts, I am getting a message in the console:

(node:94710) ExperimentalWarning: The ESM module loader is experimental.

That unfortunate, because it is polluting logs and is not in a json format which is the format of my app logger (I use Fastify with Pino).

Is it possible to check is file is written in CommonJS format and use require instead of import to load worker code?
Or maybe use require when file extension is cjs (taking into account package type if any?) ?

Feature: ESM support

I know you said you're working on this @addaleax but wanted to get the issue documented.

It would be great to have built-in support for ESM such that (a) Piscina can be used as ESM and (b) Workers can be defined as ESM.

Currently, creating a Piscina instance and pointing it at an ESM module fails:

const piscina = new Piscina({
  filename: path.resolve(__dirname, 'worker.mjs')
});
(node:443) UnhandledPromiseRejectionWarning: Error [ERR_REQUIRE_ESM]: Must use import to load ES Module: /home/james/nearform/piscina/examples/simple_esm/worker.mjs
    at Module.load (internal/modules/cjs/loader.js:1038:11)
    at Function.Module._load (internal/modules/cjs/loader.js:929:14)
    at Module.require (internal/modules/cjs/loader.js:1080:19)
    at require (internal/modules/cjs/helpers.js:72:18)
    at /home/james/nearform/piscina/dist/src/worker.js:28:63
    at async getHandler (/home/james/nearform/piscina/dist/src/worker.js:28:15)
    at async /home/james/nearform/piscina/dist/src/worker.js:93:29
(Use `node --trace-warnings ...` to show where the warning was created)
(node:443) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 1)
(node:443) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

Feature: pool utilization metric

@addaleax @mcollina ... In addition to the run time and wait time histograms, I would like to provide a calculation about the utilization of the pool but want to verify the approach before going too far.

Specifically, the pool utilization is a ratio of the total mean task run time to the total capacity run time of the pool. It is always a point-in-time measurement based on four calculations:

  • The pool duration -- the length of time the pool has been active (determined from the moment the Piscina object was created.

  • The pool capacity -- calculated as duration multiplied by maxThreads. This is the absolute upper bound of potential compute time for any task submitted by the queue. It is a limit that practically can never be reached for many practical reasons but is based on the assumption of 100% of the workers being 100% active for the duration of the Piscina object.

  • The pool approximate total mean run time -- calculated as the mean run time of all completed tasks multiplied by the total number of completed tasks. This is an approximation of the total amount of compute time actually realized by the pool.

  • The pool utilization -- calculated by dividing the approximate total run time time by the capacity to yield.

So, for instance, if the pool has been active for 100 milliseconds, has maxThreads = 10, and has processed 5 tasks with a mean runtime of 5 milliseconds each, we get:

  • duration: 100 milliseconds
  • capacity: 1000 milliseconds
  • approximate total mean run time: 25 milliseconds
  • utilization: 0.025

Our example queue, then, has an approximate upper limit of around 200 tasks that could have been completed within that 100 millisecond duration barring all other performance considerations.

Does this calculation seem reasonable?

In terms of API, there would be two new properties off Piscina:

  • duration -- provides the duration value
  • utilization -- provides the calculated point-in-time utilization value

We could potentially sample the utilization like we do with run times as the completion of each task but that seems generally unnecessary. If a use case for that crops up we can always add it later.

Feature: Allow worker export to be a Promise<Function>

Currently, a worker must export a Function immediately on loading. It would be nice (so we can allow for any async initialization tasks to occur before the ready message is sent) to also allow a worker to export a Promise ... e.g.

module.exports = (async () => {
  // Do any async init tasks here...
  return () => console.log('hello from the handler function')
})();

Refs: #57 (comment)

@addaleax ... whatcha think?

Progress

Hi , can you please add a simple example of a worker that send progress to main file before it's close

Feature request: task result streams or AsyncIterators

Hi folks -- thanks for putting together an awesome piece of software! I'd like to bring up a feature request to get an assessment for how hard it'd be to build into piscina: ReadableStream responses moved from worker threads to the calling thread. Instead of producing one Transferrable result at the end of a task, it'd be great to produce a stream or an AsyncIterator of results that the calling thread could access before the whole task is completed.

The concrete use case is using piscina to implement a server side renderer for client side React applications. Piscina is perfect for this: React SSR is a synchronous, CPU intensive task that usually blocks the main node thread for surprisingly long, and it'd be great to offload to a worker thread to keep the other work on the main event loop unblocked.

For maximum performance and the ideal user experience, React has built a streaming server side renderer that renders out chunks of HTML that can be streamed to the browser as they are produced, instead of having to wait until the entire render is complete before sending the first byte. This means the browser can start rendering, start prefetching other resources, etc etc, and is no extra work for the people building the React apps to support. See https://reactjs.org/docs/react-dom-server.html#rendertonodestream for more information.

I think architecturally this'd be a real sweet spot for piscina. To me, this library isn't the same as something like bullmq or pg-boss where tasks are persisted in some other system and worked by some other process. Piscina seems best at ephemeral, short lived tasks where you need the result quickly and don't want to pay as little of a network or serialization cost as possible. It's closer to a function call than those other things, and it'd be awesome if it supported the same return values that other function calls do, like Streams or AsyncIterators.

How to update state of each worker instance

Lets say each worker has global configuration state, that is same across all workers.
Is it possible to send new configuration object to each Worker to re-initialise its state?

Feature: `'drain'` event

We should add a piscina.on('drain', () => {}) event that is fired once all currently pending tasks have been either completed or dispatched to workers. The use case is to provide a signal for when it may be ok to start submitting tasks to the queue again. Specifically, if I am using a stream to push tasks into the queue, but I approach the queue limit, I want to pause the inbound stream to apply backpressure... then, once the queue has reached zero, I want a signal to indicate that I was unpause the feeding stream and resume submitting tasks.

We should consider carefully how this works because the timing of the event can be a performance bottleneck. For instance, if we wait until the all tasks are completed, then our queue will go idle while we wait for the next tasks to be submitted, which could cause our workers to be terminated unnecessarily. However, if we fire the drain event too early we could add churn.

One idea would be to set a load threshold as a percentage of queue limit as a configuration option for the drain event. That is, when queue limit is not unlimited, setting a load threshold of 0.25 would mean that when the queue size is 25% of the queue limit, the 'drain' event is emitted allowing us to start filling the queue back up.

Wanted to make sure we discussed the strategy on this before working on the implementation.

Missing tags

It’s currently very hard to identify which commit in this repo corresponds to which npm release. I’ll leave this as a reminder for me to go figure out which match to which and push tags for those.

Feature: Take os.loadavg() into account

It would be nice if piscina took os.loadavg() into account when deciding how many workers to use in a given situation, to dynamically respond to the system load. This can be helpful considering that the application that uses piscina may not be the main application running on a given host, and that the up to 1.5 × number of CPUs maxThreads default may be too much in a lot of circumstances.

The idea here is that we would not post new tasks until either less than minThreads tasks are currently running or the system drops below x CPU load (where x could be e.g. 100 % by default, but configurable).

In a similar vein, it would be nice to be able to set scheduling paramters for the spawned threads, but that would require upstream support in libuv first, then in Node.js, then here.

Feature request: Worker initiated messages send to parent

Hi, Thanks for the great software!

I'm wondering if Piscina could add support to allow workers actively sending messages to parent main thread, with or without waiting for parent's acknowledgement. This feature may greatly improve the use case when we need to:

  1. Send customized status updates from worker to main thread
  2. Look up resources which only exist in main thread
  3. Delegate I/O centric logic, or connection managements, e.g. DB connection pools to main thread to avoid excessive network connections in many workers
  4. Allow call another worker from the context of current worker, and etc.

For example we could have below code runs in the context of worker threads:

const { callParent, sendMessageToParent } = require('piscina');

(async function () {
const result = await callParent({ a: 4, b: 6 }); // child send message to parent
console.log(result); // waiting for parent acknowledges worker with the result
//or without having to wait for parent's ack:
sendMessageToParent({ a: 4, b: 6 })
})();

And in the context of main thread:
const Piscina = require('piscina');

const piscina = new Piscina({
filename: path.resolve(__dirname, 'worker.js')
});

piscina.on('childmessage', (data) => {
...
});

Differentiate different errors, export AbortError

I might be missing something, but there doesn't seem to be a way to differentiate between an error that happened with the task or an error that happened because it was aborted. Could AbortError be exported? So we can do:

try {
  await piscina.runTask(data, abortController);
} catch (err) {
  if(err instanceof AbortError) {
    console.log('The task was canceled');
  } else {
    console.log('The task failed');
  }
}

Because I don't need AbortError in my logs, they are an expected part of my usage.

Checklist to publishing first version

@mcollina @addaleax ... please update this with whatever additional items you think are needed.

Things to do before publishing version 1:

  • Completed Tests
    • 100% Code Coverage - #6 - CI is currently failing due to < 100 coverage
    • Green CI on all platforms - #6
  • Docs
    • Full API doc
    • Design explanation
    • Getting started
  • Examples
  • Benchmarks
  • Contributing guide
  • Code of conduct file - #4
  • Email to NearForm explaining project and intent
  • Blog post (for Clinic.js or NearForm blogs)
  • Introductory video (introduce project, how to use it, what it's for)
  • npm publish - I've published an empty placeholder to npm currently at version 1.0.0-pre. Once we're ready to flip the switch, any of the three of us should be able to publish the actual module.

Couple of questions:

  1. Where should piscina live? Is having it in my personal github sufficient for now or should we move it somewhere more general?

  2. @mcollina, is there a fastify integration angle here? e.g. any way that a fastify-piscina type plugin makes sense? Is there a graphql angle?

  3. Should we also export an ESM version?

Discuss: Streams in and Streams out

@mcollina @addaleax ... in looking through the possibility of using the worker pool for server side rendering, one of the difficulties is going to be the fact that existing SSR mechanisms (e.g. nextjs when using fastify-nextjs) require the ability to work directly with the request and response objects as streams (e.g. app.render(req.raw, reply.res, '/hello', req.query, {})). Within the worker thread, we do not have direct access to these streams, nor do we have access to the global nextjs application. The best we'd be able to do is create each worker such that it has it's own distinct image of the nextjs application without any streaming of the actual rendered data. That is obviously not ideal...

We should try to find a way of enabling streaming data into and out of a task. We can likely use the worker threads already wrapped process.stdin and process.stdout so long as the concurrency of each worker is exactly 1 task at a time.

Order guarantees?

What kind of ordering guarantees does piscina provide? i.e. is it FIFO or whoever finishes first?

ParentPort with multiple workers

Is there a way to have a single MessageChannel in the main process with multiple tasks like the parentPort

I tried with

(async function () {
  const channel = new MessageChannel()
  channel.port1.on('message', (message) => {
    console.log(message)
  })
  let a = 0
  while (a < 10) {
    piscina.runTask({ hello: `world${a}`, port: channel.port2 }, [channel.port2])
    a++
  }
})()

but it gets DataCloneError: MessagePort in transfer list is already detached

Worker State

Hi,
What if a worker has a state, like a db connection that should be open prior to processing,
Should I pass a JS file opens that connection upon being required, and delays processing of any message until that connection is open?

Feature: Canceling tasks

Not all tasks will be cancellable, but it would be nice to have cancellation as an option, either via API or by example to show how it is possible.

how to know if all the thread is initialized?

we can do some initial work before actually do some task:

async function initialize() {
  await someAsyncInitializationActivity();
  return ({ a, b }) => a + b;
}

module.exports = initialize();

if the minThreads is set, piscina will create threads automatically when new Piscina() .

but how to make sure all the thread is initialized before calling runTask?

use case:
lots of node process, need make sure one process's all threads is initialized before handle user request, otherwise the user request may get overtime error.

is there a 'ready' event?

Node silently exits with code 0, no error, when using a specific string for String.includes inside worker

Hey jasnell,

Node silently exits with code 0, no error, when using a specific string for String.includes inside worker. This happens during my call to await Promise.all(worker_promises) where worker_promises is an array of Piscina runTask() promises.

If you change it to a different string and run, it will exit normally producing full output.

I can give you my script to test with, but the problem is that it's designed to parse log file data from DDoS attacks (to count occurrences of requests matching certain patterns) and the dataset causing the issue is about 20GiB (1,186 gzipped log files). I can send it to you if you're okay with this.

Here are the two relevant sections of code and the output when it works:
Screen Shot 2021-02-10 at 6 08 44 PM
Screen Shot 2021-02-10 at 6 10 53 PM

Here are the two relevant sections of code and the output when it silently fails:
Screen Shot 2021-02-10 at 6 01 27 PM
Screen Shot 2021-02-10 at 6 02 02 PM

The dataset remains the same between runs and there are no other code changes.

I'm on node v14.15.5, macOS 11.2 x86

Discuss: runTasks (plural)

It would be interesting to investigate a variation on the runTask API that allows multiple jobs to be submitted that culminates with a single Promise... using All, Race, and AllSettled semantics...

e.g.

await piscina.all([ ... tasks])

await piscina.race([... tasks])

await piscina.allSettled([ ... tasks])

It would also be interesting to explore the ability to pass in an async generator that can feed tasks to piscina asynchronously ...

piscina.runTasks(my_async_generator)

Or ... a stream option

const my_task_transform = ...

stream.pipeline(source, my_task_transform, piscina.stream(), destination, () => {})

Really, just thinking about alternative ways of feeding the task queue...

Improvement: filename can be a module

/cc @addaleax

Because Piscina is using require/import to load the workers, it is actually possible for a worker to be a separate module. For example, given the following directory structure:

* node_modules
  * worker
    * index.js
    * package.json
* index.js

Where /module_modules/worker/index.js is the worker function,

From /index.js we can do the following and it just works...

'use strict';

const Piscina = require('../..');
const { resolve } = require('path');

const piscina = new Piscina({ filename: resolve('./node_modules', 'worker') });

(async function () {
  const result = await piscina.runTask({ a: 4, b: 6 });
  console.log(result); // Prints 10
})();

While it's great that this works but the resolve('./node_modules', ... is a bit unfortunate. Just specifying { filename: 'worker' } fails saying that the module worker cannot be found.

james@ubuntu:~/nearform/piscina/examples/module$ node
Welcome to Node.js v14.1.0.
Type ".help" for more information.
> require('worker')
[Function (anonymous)]
> const Piscina = require('../..')
undefined
> const p = new Piscina({ filename: 'worker' })
undefined
> p.runTask().then(console.log)
Promise { <pending> }
> (node:23776) UnhandledPromiseRejectionWarning: Error: Cannot find module 'worker'
Require stack:
- /home/james/nearform/piscina/dist/src/worker.js
    at Function.Module._resolveFilename (internal/modules/cjs/loader.js:1020:15)
    at Function.Module._load (internal/modules/cjs/loader.js:890:27)
    at Module.require (internal/modules/cjs/loader.js:1080:19)
    at require (internal/modules/cjs/helpers.js:72:18)
    at /home/james/nearform/piscina/dist/src/worker.js:45:67
    at async getHandler (/home/james/nearform/piscina/dist/src/worker.js:45:19)
    at async /home/james/nearform/piscina/dist/src/worker.js:111:29
(Use `node --trace-warnings ...` to show where the warning was created)
(node:23776) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 1)
(node:23776) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

Ideally new Piscina({ filename: 'worker' }) would Just Work here.

That said, filename likely isn't the best name for the property here either :-)

Feature: Task sets

This one I'm not yet convinced about but it came up in a conversation and I promised to consider it... the idea is to allow submitting multiple tasks at once and awaiting on all of them. The way we would do this now is:

await Promise.all([
  piscina.runTask(1),
  piscina.runTask(2),
  piscina.runTask(3),
]);

The suggestion would be for something like:

await piscina.runAll([1, 2, 3]);

Given that we can already do this with the current API, I'm not convinced there's anything we should do here.

Feature: task metrics

It would be helpful if metrics were built in.

  • Number of pending jobs
  • Number of active jobs
  • Number of completed jobs
  • Job duration histogram
  • Idle time histogram

(I will be investigating these)

piscina with jest

I have an issue in one of my repo where user uses jest and this issue are pop up from the log

I wonder if it's sth to do with piscina?

Jest has detected the following 3 open handles potentially keeping Jest from exiting:

  ●  Piscina

      at new EventEmitterReferencingAsyncResource (../node_modules/eventemitter-asyncresource/src/index.ts:23:5)
      at new EventEmitterAsyncResource (../node_modules/eventemitter-asyncresource/src/index.ts:46:7)
      at new Piscina (../node_modules/piscina/src/index.ts:827:5)
      at Object.<anonymous> (../node_modules/camaro/index.js:4:14)


  ●  WORKER

      at ThreadPool._addNewWorker (../node_modules/piscina/src/index.ts:520:20)
      at ThreadPool._ensureMinimumWorkers (../node_modules/piscina/src/index.ts:514:12)
      at new ThreadPool (../node_modules/piscina/src/index.ts:508:10)
      at new Piscina (../node_modules/piscina/src/index.ts:876:18)
      at Object.<anonymous> (../node_modules/camaro/index.js:4:14)


  ●  MESSAGEPORT

      at ThreadPool._addNewWorker (../node_modules/piscina/src/index.ts:528:30)
      at ThreadPool._ensureMinimumWorkers (../node_modules/piscina/src/index.ts:514:12)
      at new ThreadPool (../node_modules/piscina/src/index.ts:508:10)
      at new Piscina (../node_modules/piscina/src/index.ts:876:18)
      at Object.<anonymous> (../node_modules/camaro/index.js:4:14)

ref: tuananh/camaro#113 (comment)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.