Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrieve task index? #42

Open
FezVrasta opened this issue Feb 2, 2022 · 13 comments
Open

Retrieve task index? #42

FezVrasta opened this issue Feb 2, 2022 · 13 comments
Assignees

Comments

@FezVrasta
Copy link

I'm running the following code where my functions need to pick a web worker from a list in order to run.

const workersList = [worker1, worker2];
await PromisePoool
	.withConcurrency(workerList.length)
	.for(items)
	.process(async (item) => {
		const worker = workerList[executorIndex];

        await runMyLogic(item, worker);
	});

Right now I can't seem to find any way to know which executor is being used from the pool, may you advise?

@marcuspoehls
Copy link
Member

marcuspoehls commented Feb 2, 2022

@FezVrasta Hey Federico, the promise pool provides the current item index as the second argument in the process method:

const users = ['Federico', 'Marcus', 'Supercharge']

await PromisePool
  .for(users)
  .process(async (user, index, pool) => {
    // use the `index`
  })

Is it what you’re looking for?

@marcuspoehls marcuspoehls self-assigned this Feb 2, 2022
@FezVrasta
Copy link
Author

FezVrasta commented Feb 2, 2022

Thanks Marcus, but that's the index on the original users array, I'm looking for the id or index of the "worker" that is processing the data.

So, let's say the concurrency limit it set to 4, I need to know in which of the 4 available "slots" is the logic being executed.

@marcuspoehls
Copy link
Member

The package doesn't expose any "worker" information to you. All concurrency "slots" run in parallel. There’s no "worker". Under the hood, this package uses promises to wrap the processing. The "worker" is basically just a promise. Promises are eager and start their execution as soon as you create them. Then the promise pool retrieves the value from the promise which is either resolved or rejected.

@FezVrasta
Copy link
Author

I understand, but the whole point of the library is to limit the number of promises being evaluated at the same time, right?

I then need to know, of the allocated "threads", which one is being used by a given process execution.

I'm not sure if I'm being clear, I'm sorry.

@marcuspoehls
Copy link
Member

Yep, this package limits the number of promises being handled concurrently.

Can you describe your use case? I’m not sure if I understand what you’re looking for

@FezVrasta
Copy link
Author

It's what I explained in the original post, I have a pre-allocated number of web workers and I want each thread to use one of them.

Without the information I'm looking for I need to implement a separate idle/busy state management for my workers list and have each process execution pick one of the available ones. If instead I had this information, each process execution would already know which worker is allocated to it.

@FezVrasta
Copy link
Author

FezVrasta commented Feb 2, 2022

I think what you call them in the source code is tasks, I need the task index.

https://github.com/supercharge/promise-pool/blob/main/src/promise-pool-executor.ts#L300-L314

This would become something like this:

  startProcessing (item: T, index: number): void {
-   const task: Promise<void> = this.createTaskFor(item, index)
+   const task: Promise<void> = this.createTaskFor(item, index, this.tasks().length)
      .then(result => {
        this
          .removeActive(task)
          .save(result)
      })
      .catch(async error => {
        return this
          .removeActive(task)
          .handleErrorFor(error, item)
      })

    this.tasks().push(task)
  }

With this.tasks().length I get the index of the task that will be inserted right below.

I'm not sure how concurrency safe is my change, but I hope it gives the idea.

This change would ensure the taskIndex will always be unique across all the running tasks.

@FezVrasta FezVrasta changed the title Know the index, or id, of the executor? Retrieve task index? Feb 2, 2022
@marcuspoehls
Copy link
Member

@FezVrasta Hey Federico, I got sidetracked yesterday. Thank you for your patience!

I feel like the promise pool may not be the right tool for you. The promise pool is like Array.map with concurrency. It runs a list of items in parallel through a processing function.

I’m not sure your proposed change is helping in your situation. The return value of this.tasks().length is the number of currently active tasks. Imagine a promise pool running at most 2 tasks concurrently. The pool started and has 2 active tasks. Let’s say the task that started first finishes processing and gets removed from the list of active tasks. The pool now has 1 active task (the second task). Starting a new task will give you a task length of 1. You received the same task length of 1 when starting the second task earlier. Do you know what I mean?

@FezVrasta
Copy link
Author

FezVrasta commented Feb 3, 2022

Yes my example doesn't work, but you could return the indexOf the task, or just pre-allocate an array with N positions, one for each task, then set them to null when the task finishes, and allocate the next task to the first available spot.

feel like the promise pool may not be the right tool for you.

This is definitely the right tool for me, I got a version of my logic working with your library, it's just that I need to manually handle the workers allocation, and it's a bit of an annoyance considering that this library already has all the info I need but it doesn't expose it.

@FezVrasta
Copy link
Author

To add more context, I found this library that goes one step further: https://www.npmjs.com/package/tiny-promise-pool

Its result exposes the thread that has been allocated to run the promise, but, unfortunately, doesn't expose that information to the promise execution.

@marcuspoehls
Copy link
Member

Ok, looks like I’m missing something here 😃

Let me describe what I understand:

  • you want to run an async function
  • the async function should run on a worker thread
  • you’re managing the worker thread yourself
  • to increase concurrency you’re introducing multiple worker threads
  • this promise pool package should help you distribute the items you’re processing onto worker threads

If I’m understanding that correctly, you’re missing the information which "task" inside the promise pool finished processing. And knowing which "task" finished is important because you’re mapping them to individual worker threads. And when you know which task finished, you know which worker thread can take the next task. Is that correct?

@FezVrasta
Copy link
Author

Yes correct

@marcuspoehls
Copy link
Member

Ok 👍 I’ll think this through and how to implement it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants