Skip to content

Commit

Permalink
fix: ensure catchError/skip functions always return() source iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
jeengbe committed Jul 23, 2024
1 parent b8890f1 commit c31a8df
Show file tree
Hide file tree
Showing 14 changed files with 205 additions and 80 deletions.
2 changes: 1 addition & 1 deletion docs/asynciterable/creating.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ let value, done;

## Brief Interlude - `AsyncSink`

Very rarely will we ever need to create these async-iterables by hand, however, if you need a collection that you can add to as well as iterate, we have the `AsyncSink` class. This class serves as a basis for some of our operators such as binding to events and DOM and Node.js streams.
Very rarely will we ever need to create these async-iterables by hand, however, if you need a collection that you can add to as well as iterate, we have the `AsyncSink` class. This class serves as a basis for some of our operators such as binding to events and DOM and Node.js streams.

```typescript
import { AsyncSink } from 'ix/asynciterable';
Expand Down
3 changes: 1 addition & 2 deletions docs/asynciterable/transforming.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,5 @@ await subscription.pipe(
.forEach(handleBatch)
```

Using this operator makes sure that if messages slow down you'll still
handle them in a reasonable time whereas using `buffer` would leave you stuck until you get
Using this operator makes sure that if messages slow down you'll still handle them in a reasonable time whereas using `buffer` would leave you stuck until you get
the right amount of messages.
22 changes: 21 additions & 1 deletion spec/asynciterable-operators/catcherror-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import { jest } from '@jest/globals';
import '../asynciterablehelpers.js';
import { of, range, sequenceEqual, single, throwError } from 'ix/asynciterable/index.js';
import {
first,
from,
of,
range,
sequenceEqual,
single,
throwError,
} from 'ix/asynciterable/index.js';
import { catchError } from 'ix/asynciterable/operators/index.js';

test('AsyncIterable#catchError error catches', async () => {
Expand All @@ -26,3 +35,14 @@ test('AsyncIterable#catchError source and handler types are composed', async ()
const res = xs.pipe(catchError(async (_: Error) => of('foo')));
await expect(sequenceEqual(res, xs)).resolves.toBeTruthy();
});

test('AsyncIterable#catchError calls return() on source iterator when stopped early', async () => {
const xs = range(0, 10)[Symbol.asyncIterator]();
const returnSpy = jest.spyOn(xs, 'return');

const res = from(xs).pipe(catchError((_: Error) => from([])));

await first(res);

expect(returnSpy).toHaveBeenCalled();
});
14 changes: 13 additions & 1 deletion spec/asynciterable-operators/skip-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { jest } from '@jest/globals';
import { hasNext, noNext } from '../asynciterablehelpers.js';
import { of, throwError } from 'ix/asynciterable/index.js';
import { as, first, of, range, throwError } from 'ix/asynciterable/index.js';
import { skip } from 'ix/asynciterable/operators/index.js';

test('AsyncIterable#skip skips some', async () => {
Expand Down Expand Up @@ -40,3 +41,14 @@ test('AsyncIterable#skip throws', async () => {
const it = ys[Symbol.asyncIterator]();
await expect(it.next()).rejects.toThrow(err);
});

test('Iterable#skip calls return() on source iterator when stopped early', async () => {
const xs = range(0, 10)[Symbol.asyncIterator]();
const returnSpy = jest.spyOn(xs, 'return');

const res = as(xs).pipe(skip(2));

await first(res);

expect(returnSpy).toHaveBeenCalled();
});
34 changes: 29 additions & 5 deletions spec/asynciterable/catcherror-spec.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
import { jest } from '@jest/globals';
import { skip } from 'ix/asynciterable/operators.js';
import { hasNext } from '../asynciterablehelpers.js';
import { catchError, concat, range, sequenceEqual, throwError } from 'ix/asynciterable/index.js';

test('AsyncIterable#catch with no errors', async () => {
import {
catchError,
concat,
first,
from,
range,
sequenceEqual,
throwError,
} from 'ix/asynciterable/index.js';

test('AsyncIterable#catchError with no errors', async () => {
const res = catchError(range(0, 5), range(5, 5));
expect(await sequenceEqual(res, range(0, 5))).toBeTruthy();
});

test('AsyncIterable#catch with concat error', async () => {
test('AsyncIterable#catchError with concat error', async () => {
const res = catchError(concat(range(0, 5), throwError(new Error())), range(5, 5));

expect(await sequenceEqual(res, range(0, 10))).toBeTruthy();
});

test('AsyncIterable#catch still throws', async () => {
test('AsyncIterable#catchError still throws', async () => {
const e1 = new Error();
const er1 = throwError(e1);

Expand All @@ -31,3 +41,17 @@ test('AsyncIterable#catch still throws', async () => {
await hasNext(it, 3);
await expect(it.next()).rejects.toThrow();
});

test('AsyncIterable#catchError calls return() on source iterator when stopped early', async () => {
const e1 = new Error();
const er1 = throwError(e1);

const xs2 = range(2, 2)[Symbol.asyncIterator]();
const returnSpy = jest.spyOn(xs2, 'return');

const res = catchError(concat(range(0, 2), er1), from(xs2)).pipe(skip(2));

await first(res);

expect(returnSpy).toHaveBeenCalled();
});
14 changes: 13 additions & 1 deletion spec/iterable-operators/catcherror-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { jest } from '@jest/globals';
import '../iterablehelpers';
import { of, range, sequenceEqual, single, throwError } from 'ix/iterable/index.js';
import { first, from, of, range, sequenceEqual, single, throwError } from 'ix/iterable/index.js';
import { catchError } from 'ix/iterable/operators/index.js';

test('Iterable#catchError error catches', () => {
Expand All @@ -26,3 +27,14 @@ test('Iterable#catchError source and handler types are composed', () => {
const res = xs.pipe(catchError((_: Error) => of('foo')));
expect(sequenceEqual(res, xs)).toBeTruthy();
});

test('Iterable#catchError calls return() on source iterator when stopped early', () => {
const xs = range(0, 10)[Symbol.iterator]();
const returnSpy = jest.spyOn(xs, 'return');

const res = from(xs).pipe(catchError((_: Error) => from([])));

first(res);

expect(returnSpy).toHaveBeenCalled();
});
14 changes: 13 additions & 1 deletion spec/iterable-operators/skip-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { jest } from '@jest/globals';
import { hasNext, noNext } from '../iterablehelpers.js';
import { as, throwError } from 'ix/iterable/index.js';
import { as, first, range, throwError } from 'ix/iterable/index.js';
import { skip } from 'ix/iterable/operators/index.js';

test('Iterable#skip skips some', () => {
Expand Down Expand Up @@ -39,3 +40,14 @@ test('Iterable#skip throws', () => {
const it = ys[Symbol.iterator]();
expect(() => it.next()).toThrow();
});

test('Iterable#skip calls return() on source iterator when stopped early', () => {
const xs = range(0, 10)[Symbol.iterator]();
const returnSpy = jest.spyOn(xs, 'return');

const res = as(xs).pipe(skip(2));

first(res);

expect(returnSpy).toHaveBeenCalled();
});
26 changes: 25 additions & 1 deletion spec/iterable/catcherror-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
import { jest } from '@jest/globals';
import { skip } from 'ix/iterable/operators.js';
import { hasNext } from '../iterablehelpers.js';
import { catchError, concat, range, sequenceEqual, throwError } from 'ix/iterable/index.js';
import {
from,
catchError,
concat,
range,
sequenceEqual,
throwError,
first,
} from 'ix/iterable/index.js';

test('Iterable.catchError with no errors', () => {
const res = catchError(range(0, 5), range(5, 5));
Expand Down Expand Up @@ -31,3 +41,17 @@ test('Iterable.catchError still throws', () => {
hasNext(it, 3);
expect(() => it.next()).toThrow();
});

test('Iterable.catchError calls return() on source iterator when stopped early', () => {
const e1 = new Error();
const er1 = throwError(e1);

const xs2 = range(2, 2)[Symbol.iterator]();
const returnSpy = jest.spyOn(xs2, 'return');

const res = catchError(concat(range(0, 2), er1), from(xs2)).pipe(skip(2));

first(res);

expect(returnSpy).toHaveBeenCalled();
});
30 changes: 16 additions & 14 deletions src/asynciterable/catcherror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,26 @@ export class CatchAllAsyncIterable<TSource> extends AsyncIterableX<TSource> {
error = null;
hasError = false;

while (1) {
let c = <TSource>{};
try {
while (1) {
let c = <TSource>{};

try {
const { done, value } = await it.next();
if (done) {
await returnAsyncIterator(it);
try {
const { done, value } = await it.next();
if (done) {
break;
}
c = value;
} catch (e) {
error = e;
hasError = true;
break;
}
c = value;
} catch (e) {
error = e;
hasError = true;
await returnAsyncIterator(it);
break;
}

yield c;
yield c;
}
} finally {
await returnAsyncIterator(it);
}

if (!hasError) {
Expand Down
29 changes: 16 additions & 13 deletions src/asynciterable/operators/catcherror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,26 @@ export class CatchWithAsyncIterable<TSource, TResult> extends AsyncIterableX<TSo
let hasError = false;
const source = wrapWithAbort(this._source, signal);
const it = source[Symbol.asyncIterator]();
while (1) {
let c = <IteratorResult<TSource>>{};

try {
c = await it.next();
if (c.done) {
await returnAsyncIterator(it);
try {
while (1) {
let c = <IteratorResult<TSource>>{};

try {
c = await it.next();
if (c.done) {
break;
}
} catch (e) {
err = await this._handler(e, signal);
hasError = true;
break;
}
} catch (e) {
err = await this._handler(e, signal);
hasError = true;
await returnAsyncIterator(it);
break;
}

yield c.value;
yield c.value;
}
} finally {
await returnAsyncIterator(it);
}

if (hasError) {
Expand Down
18 changes: 12 additions & 6 deletions src/asynciterable/operators/skip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { AsyncIterableX } from '../asynciterablex.js';
import { MonoTypeOperatorAsyncFunction } from '../../interfaces.js';
import { wrapWithAbort } from './withabort.js';
import { throwIfAborted } from '../../aborterror.js';
import { returnAsyncIterator } from '../../util/returniterator.js';

/** @ignore */
export class SkipAsyncIterable<TSource> extends AsyncIterableX<TSource> {
Expand All @@ -20,13 +21,18 @@ export class SkipAsyncIterable<TSource> extends AsyncIterableX<TSource> {
const it = source[Symbol.asyncIterator]();
let count = this._count;
let next;
while (count > 0 && !(next = await it.next()).done) {
count--;
}
if (count <= 0) {
while (!(next = await it.next()).done) {
yield next.value;

try {
while (count > 0 && !(next = await it.next()).done) {
count--;
}
if (count <= 0) {
while (!(next = await it.next()).done) {
yield next.value;
}
}
} finally {
returnAsyncIterator(it);
}
}
}
Expand Down
30 changes: 16 additions & 14 deletions src/iterable/catcherror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,26 @@ export class CatchIterable<TSource> extends IterableX<TSource> {
error = null;
hasError = false;

while (1) {
let c = <TSource>{};
try {
while (1) {
let c = <TSource>{};

try {
const { done, value } = it.next();
if (done) {
returnIterator(it);
try {
const { done, value } = it.next();
if (done) {
break;
}
c = value;
} catch (e) {
error = e;
hasError = true;
break;
}
c = value;
} catch (e) {
error = e;
hasError = true;
returnIterator(it);
break;
}

yield c;
yield c;
}
} finally {
returnIterator(it);
}

if (!hasError) {
Expand Down
Loading

0 comments on commit c31a8df

Please sign in to comment.