diff --git a/docs/asynciterable/creating.md b/docs/asynciterable/creating.md index ba34cdf1..d2a1468b 100644 --- a/docs/asynciterable/creating.md +++ b/docs/asynciterable/creating.md @@ -46,7 +46,7 @@ let value, done; ## Brief Interlude - `AsyncSink` -Very rarely will we ever need to create these async-iterables by hand, however, if you need a collection that you can add to as well as iterate, we have the `AsyncSink` class. This class serves as a basis for some of our operators such as binding to events and DOM and Node.js streams. +Very rarely will we ever need to create these async-iterables by hand, however, if you need a collection that you can add to as well as iterate, we have the `AsyncSink` class. This class serves as a basis for some of our operators such as binding to events and DOM and Node.js streams. ```typescript import { AsyncSink } from 'ix/asynciterable'; diff --git a/docs/asynciterable/transforming.md b/docs/asynciterable/transforming.md index b249cba4..d8824bdf 100644 --- a/docs/asynciterable/transforming.md +++ b/docs/asynciterable/transforming.md @@ -11,6 +11,5 @@ await subscription.pipe( .forEach(handleBatch) ``` -Using this operator makes sure that if messages slow down you'll still -handle them in a reasonable time whereas using `buffer` would leave you stuck until you get +Using this operator makes sure that if messages slow down you'll still handle them in a reasonable time whereas using `buffer` would leave you stuck until you get the right amount of messages. diff --git a/spec/asynciterable-operators/catcherror-spec.ts b/spec/asynciterable-operators/catcherror-spec.ts index c9aa3caa..c250c424 100644 --- a/spec/asynciterable-operators/catcherror-spec.ts +++ b/spec/asynciterable-operators/catcherror-spec.ts @@ -1,5 +1,14 @@ +import { jest } from '@jest/globals'; import '../asynciterablehelpers.js'; -import { of, range, sequenceEqual, single, throwError } from 'ix/asynciterable/index.js'; +import { + first, + from, + of, + range, + sequenceEqual, + single, + throwError, +} from 'ix/asynciterable/index.js'; import { catchError } from 'ix/asynciterable/operators/index.js'; test('AsyncIterable#catchError error catches', async () => { @@ -26,3 +35,14 @@ test('AsyncIterable#catchError source and handler types are composed', async () const res = xs.pipe(catchError(async (_: Error) => of('foo'))); await expect(sequenceEqual(res, xs)).resolves.toBeTruthy(); }); + +test('AsyncIterable#catchError calls return() on source iterator when stopped early', async () => { + const xs = range(0, 10)[Symbol.asyncIterator](); + const returnSpy = jest.spyOn(xs, 'return'); + + const res = from(xs).pipe(catchError((_: Error) => from([]))); + + await first(res); + + expect(returnSpy).toHaveBeenCalled(); +}); diff --git a/spec/asynciterable-operators/skip-spec.ts b/spec/asynciterable-operators/skip-spec.ts index 1423f5ee..1a97f99c 100644 --- a/spec/asynciterable-operators/skip-spec.ts +++ b/spec/asynciterable-operators/skip-spec.ts @@ -1,5 +1,6 @@ +import { jest } from '@jest/globals'; import { hasNext, noNext } from '../asynciterablehelpers.js'; -import { of, throwError } from 'ix/asynciterable/index.js'; +import { as, first, of, range, throwError } from 'ix/asynciterable/index.js'; import { skip } from 'ix/asynciterable/operators/index.js'; test('AsyncIterable#skip skips some', async () => { @@ -40,3 +41,14 @@ test('AsyncIterable#skip throws', async () => { const it = ys[Symbol.asyncIterator](); await expect(it.next()).rejects.toThrow(err); }); + +test('Iterable#skip calls return() on source iterator when stopped early', async () => { + const xs = range(0, 10)[Symbol.asyncIterator](); + const returnSpy = jest.spyOn(xs, 'return'); + + const res = as(xs).pipe(skip(2)); + + await first(res); + + expect(returnSpy).toHaveBeenCalled(); +}); diff --git a/spec/asynciterable/catcherror-spec.ts b/spec/asynciterable/catcherror-spec.ts index 3e4dbd09..d1d5ab3f 100644 --- a/spec/asynciterable/catcherror-spec.ts +++ b/spec/asynciterable/catcherror-spec.ts @@ -1,18 +1,28 @@ +import { jest } from '@jest/globals'; +import { skip } from 'ix/asynciterable/operators.js'; import { hasNext } from '../asynciterablehelpers.js'; -import { catchError, concat, range, sequenceEqual, throwError } from 'ix/asynciterable/index.js'; - -test('AsyncIterable#catch with no errors', async () => { +import { + catchError, + concat, + first, + from, + range, + sequenceEqual, + throwError, +} from 'ix/asynciterable/index.js'; + +test('AsyncIterable#catchError with no errors', async () => { const res = catchError(range(0, 5), range(5, 5)); expect(await sequenceEqual(res, range(0, 5))).toBeTruthy(); }); -test('AsyncIterable#catch with concat error', async () => { +test('AsyncIterable#catchError with concat error', async () => { const res = catchError(concat(range(0, 5), throwError(new Error())), range(5, 5)); expect(await sequenceEqual(res, range(0, 10))).toBeTruthy(); }); -test('AsyncIterable#catch still throws', async () => { +test('AsyncIterable#catchError still throws', async () => { const e1 = new Error(); const er1 = throwError(e1); @@ -31,3 +41,17 @@ test('AsyncIterable#catch still throws', async () => { await hasNext(it, 3); await expect(it.next()).rejects.toThrow(); }); + +test('AsyncIterable#catchError calls return() on source iterator when stopped early', async () => { + const e1 = new Error(); + const er1 = throwError(e1); + + const xs2 = range(2, 2)[Symbol.asyncIterator](); + const returnSpy = jest.spyOn(xs2, 'return'); + + const res = catchError(concat(range(0, 2), er1), from(xs2)).pipe(skip(2)); + + await first(res); + + expect(returnSpy).toHaveBeenCalled(); +}); diff --git a/spec/iterable-operators/catcherror-spec.ts b/spec/iterable-operators/catcherror-spec.ts index 8dd8220d..85c1a0a2 100644 --- a/spec/iterable-operators/catcherror-spec.ts +++ b/spec/iterable-operators/catcherror-spec.ts @@ -1,5 +1,6 @@ +import { jest } from '@jest/globals'; import '../iterablehelpers'; -import { of, range, sequenceEqual, single, throwError } from 'ix/iterable/index.js'; +import { first, from, of, range, sequenceEqual, single, throwError } from 'ix/iterable/index.js'; import { catchError } from 'ix/iterable/operators/index.js'; test('Iterable#catchError error catches', () => { @@ -26,3 +27,14 @@ test('Iterable#catchError source and handler types are composed', () => { const res = xs.pipe(catchError((_: Error) => of('foo'))); expect(sequenceEqual(res, xs)).toBeTruthy(); }); + +test('Iterable#catchError calls return() on source iterator when stopped early', () => { + const xs = range(0, 10)[Symbol.iterator](); + const returnSpy = jest.spyOn(xs, 'return'); + + const res = from(xs).pipe(catchError((_: Error) => from([]))); + + first(res); + + expect(returnSpy).toHaveBeenCalled(); +}); diff --git a/spec/iterable-operators/skip-spec.ts b/spec/iterable-operators/skip-spec.ts index cbd9593e..9411dc41 100644 --- a/spec/iterable-operators/skip-spec.ts +++ b/spec/iterable-operators/skip-spec.ts @@ -1,5 +1,6 @@ +import { jest } from '@jest/globals'; import { hasNext, noNext } from '../iterablehelpers.js'; -import { as, throwError } from 'ix/iterable/index.js'; +import { as, first, range, throwError } from 'ix/iterable/index.js'; import { skip } from 'ix/iterable/operators/index.js'; test('Iterable#skip skips some', () => { @@ -39,3 +40,14 @@ test('Iterable#skip throws', () => { const it = ys[Symbol.iterator](); expect(() => it.next()).toThrow(); }); + +test('Iterable#skip calls return() on source iterator when stopped early', () => { + const xs = range(0, 10)[Symbol.iterator](); + const returnSpy = jest.spyOn(xs, 'return'); + + const res = as(xs).pipe(skip(2)); + + first(res); + + expect(returnSpy).toHaveBeenCalled(); +}); diff --git a/spec/iterable/catcherror-spec.ts b/spec/iterable/catcherror-spec.ts index bd675516..658716fb 100644 --- a/spec/iterable/catcherror-spec.ts +++ b/spec/iterable/catcherror-spec.ts @@ -1,5 +1,15 @@ +import { jest } from '@jest/globals'; +import { skip } from 'ix/iterable/operators.js'; import { hasNext } from '../iterablehelpers.js'; -import { catchError, concat, range, sequenceEqual, throwError } from 'ix/iterable/index.js'; +import { + from, + catchError, + concat, + range, + sequenceEqual, + throwError, + first, +} from 'ix/iterable/index.js'; test('Iterable.catchError with no errors', () => { const res = catchError(range(0, 5), range(5, 5)); @@ -31,3 +41,17 @@ test('Iterable.catchError still throws', () => { hasNext(it, 3); expect(() => it.next()).toThrow(); }); + +test('Iterable.catchError calls return() on source iterator when stopped early', () => { + const e1 = new Error(); + const er1 = throwError(e1); + + const xs2 = range(2, 2)[Symbol.iterator](); + const returnSpy = jest.spyOn(xs2, 'return'); + + const res = catchError(concat(range(0, 2), er1), from(xs2)).pipe(skip(2)); + + first(res); + + expect(returnSpy).toHaveBeenCalled(); +}); diff --git a/src/asynciterable/catcherror.ts b/src/asynciterable/catcherror.ts index 90de1cc8..86c3dc93 100644 --- a/src/asynciterable/catcherror.ts +++ b/src/asynciterable/catcherror.ts @@ -24,24 +24,26 @@ export class CatchAllAsyncIterable extends AsyncIterableX { error = null; hasError = false; - while (1) { - let c = {}; + try { + while (1) { + let c = {}; - try { - const { done, value } = await it.next(); - if (done) { - await returnAsyncIterator(it); + try { + const { done, value } = await it.next(); + if (done) { + break; + } + c = value; + } catch (e) { + error = e; + hasError = true; break; } - c = value; - } catch (e) { - error = e; - hasError = true; - await returnAsyncIterator(it); - break; - } - yield c; + yield c; + } + } finally { + await returnAsyncIterator(it); } if (!hasError) { diff --git a/src/asynciterable/operators/catcherror.ts b/src/asynciterable/operators/catcherror.ts index 943cc83c..fb569010 100644 --- a/src/asynciterable/operators/catcherror.ts +++ b/src/asynciterable/operators/catcherror.ts @@ -30,23 +30,26 @@ export class CatchWithAsyncIterable extends AsyncIterableX>{}; - try { - c = await it.next(); - if (c.done) { - await returnAsyncIterator(it); + try { + while (1) { + let c = >{}; + + try { + c = await it.next(); + if (c.done) { + break; + } + } catch (e) { + err = await this._handler(e, signal); + hasError = true; break; } - } catch (e) { - err = await this._handler(e, signal); - hasError = true; - await returnAsyncIterator(it); - break; - } - yield c.value; + yield c.value; + } + } finally { + await returnAsyncIterator(it); } if (hasError) { diff --git a/src/asynciterable/operators/skip.ts b/src/asynciterable/operators/skip.ts index ac4f6943..89f92e6a 100644 --- a/src/asynciterable/operators/skip.ts +++ b/src/asynciterable/operators/skip.ts @@ -2,6 +2,7 @@ import { AsyncIterableX } from '../asynciterablex.js'; import { MonoTypeOperatorAsyncFunction } from '../../interfaces.js'; import { wrapWithAbort } from './withabort.js'; import { throwIfAborted } from '../../aborterror.js'; +import { returnAsyncIterator } from '../../util/returniterator.js'; /** @ignore */ export class SkipAsyncIterable extends AsyncIterableX { @@ -20,13 +21,18 @@ export class SkipAsyncIterable extends AsyncIterableX { const it = source[Symbol.asyncIterator](); let count = this._count; let next; - while (count > 0 && !(next = await it.next()).done) { - count--; - } - if (count <= 0) { - while (!(next = await it.next()).done) { - yield next.value; + + try { + while (count > 0 && !(next = await it.next()).done) { + count--; + } + if (count <= 0) { + while (!(next = await it.next()).done) { + yield next.value; + } } + } finally { + returnAsyncIterator(it); } } } diff --git a/src/iterable/catcherror.ts b/src/iterable/catcherror.ts index 827efbba..f43d5e60 100644 --- a/src/iterable/catcherror.ts +++ b/src/iterable/catcherror.ts @@ -20,24 +20,26 @@ export class CatchIterable extends IterableX { error = null; hasError = false; - while (1) { - let c = {}; + try { + while (1) { + let c = {}; - try { - const { done, value } = it.next(); - if (done) { - returnIterator(it); + try { + const { done, value } = it.next(); + if (done) { + break; + } + c = value; + } catch (e) { + error = e; + hasError = true; break; } - c = value; - } catch (e) { - error = e; - hasError = true; - returnIterator(it); - break; - } - yield c; + yield c; + } + } finally { + returnIterator(it); } if (!hasError) { diff --git a/src/iterable/operators/catcherror.ts b/src/iterable/operators/catcherror.ts index 3c2dd565..85873c56 100644 --- a/src/iterable/operators/catcherror.ts +++ b/src/iterable/operators/catcherror.ts @@ -17,24 +17,27 @@ export class CatchWithIterable extends IterableX | undefined; let hasError = false; const it = this._source[Symbol.iterator](); - while (1) { - let done: boolean | undefined; - let value: TSource; - try { - ({ done, value } = it.next()); - if (done) { - returnIterator(it); + try { + while (1) { + let done: boolean | undefined; + let value: TSource; + + try { + ({ done, value } = it.next()); + if (done) { + break; + } + } catch (e) { + err = this._handler(e); + hasError = true; break; } - } catch (e) { - err = this._handler(e); - hasError = true; - returnIterator(it); - break; - } - yield value; + yield value; + } + } finally { + returnIterator(it); } if (hasError) { diff --git a/src/iterable/operators/skip.ts b/src/iterable/operators/skip.ts index 44b9d585..f6cf0c9f 100644 --- a/src/iterable/operators/skip.ts +++ b/src/iterable/operators/skip.ts @@ -1,5 +1,6 @@ import { IterableX } from '../iterablex.js'; import { MonoTypeOperatorFunction } from '../../interfaces.js'; +import { returnIterator } from '../../util/returniterator.js'; /** @ignore */ export class SkipIterable extends IterableX { @@ -16,13 +17,18 @@ export class SkipIterable extends IterableX { const it = this._source[Symbol.iterator](); let count = this._count; let next; - while (count > 0 && !(next = it.next()).done) { - count--; - } - if (count <= 0) { - while (!(next = it.next()).done) { - yield next.value; + + try { + while (count > 0 && !(next = it.next()).done) { + count--; + } + if (count <= 0) { + while (!(next = it.next()).done) { + yield next.value; + } } + } finally { + returnIterator(it); } } }