diff --git a/src/asynciterable/operators/_flatten.ts b/src/asynciterable/operators/_flatten.ts index b2513d9c..e808a466 100644 --- a/src/asynciterable/operators/_flatten.ts +++ b/src/asynciterable/operators/_flatten.ts @@ -93,10 +93,12 @@ export class FlattenConcurrentAsyncIterable extends AsyncItera } if (active < concurrent) { pullNextOuter(value as TSource); + results[0] = outer.next(); } else { + // remove the outer iterator from the race, we're full + results[0] = NEVER_PROMISE; outerValues.push(value as TSource); } - results[0] = outer.next(); break; } case Type.INNER: { @@ -119,6 +121,10 @@ export class FlattenConcurrentAsyncIterable extends AsyncItera } case Type.INNER: { --active; + // add the outer iterator to the race, if its been removed and we are not yet done with it + if (results[0] === NEVER_PROMISE && !outerComplete) { + results[0] = outer.next(); + } // return the current slot to the pool innerIndices.push(index); // synchronously drain the `outerValues` buffer