Skip to content

Commit

Permalink
Merge branch 'master' of github.com:ReactiveX/IxJS into fea/add-docs-…
Browse files Browse the repository at this point in the history
…action
  • Loading branch information
trxcllnt committed Dec 29, 2023
2 parents 3f44748 + 2fcd440 commit 0688845
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 33 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"CHANGELOG.md"
],
"dependencies": {
"@types/node": "^13.7.4",
"@types/node": ">=13.7.4",
"tslib": "^2.3.0"
},
"devDependencies": {
Expand Down Expand Up @@ -99,7 +99,7 @@
"ts-jest": "28.0.7",
"ts-node": "8.6.2",
"typedoc": "0.25.4",
"typescript": "4.6.2",
"typescript": "5.3.3",
"validate-commit-msg": "2.14.0",
"web-stream-tools": "0.0.1",
"web-streams-polyfill": "2.1.0",
Expand Down
61 changes: 37 additions & 24 deletions src/asynciterable/asasynciterable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,54 @@ import { AsyncIterableX } from './asynciterablex';
import { OperatorAsyncFunction, UnaryFunction } from '../interfaces';
import { Transform, TransformCallback, TransformOptions } from 'stream';

export interface AsyncIterableTransform<T> extends AsyncIterableX<T>, Transform {
export interface AsyncIterableTransform<T>
extends AsyncIterableX<T>,
NodeJS.ReadableStream,
NodeJS.WritableStream {
new (options?: TransformOptions): AsyncIterableTransform<T>;
pipe<R>(...operations: UnaryFunction<AsyncIterable<T>, R>[]): R;
pipe<R>(...operations: OperatorAsyncFunction<T, R>[]): AsyncIterableX<R>;
pipe<R extends NodeJS.WritableStream>(writable: R, options?: { end?: boolean }): R;
[Symbol.asyncIterator](): AsyncIterableIterator<T>;
[Symbol.asyncIterator](): AsyncIterableIterator<any>;
}

const asyncIterableMixin = Symbol('asyncIterableMixin');

/** @ignore */
export class AsyncIterableTransform<T> extends Transform {
private static [asyncIterableMixin] = false;
constructor(options?: TransformOptions) {
super(options);
// If this is the first time AsyncIterableTransform is being constructed,
// mixin the methods from the AsyncIterableX's prototype.
if (!AsyncIterableTransform[asyncIterableMixin]) {
AsyncIterableTransform[asyncIterableMixin] = true;
Object.defineProperties(
AsyncIterableTransform.prototype,
Object.getOwnPropertyDescriptors(AsyncIterableX.prototype)
);
}
}
/** @nocollapse */
_flush(callback: TransformCallback): void {
callback();
}
/** @nocollapse */
_transform(chunk: any, _encoding: string, callback: TransformCallback): void {
callback(null, chunk);
export function AsyncIterableTransform<T>(
this: AsyncIterableTransform<T>,
options?: TransformOptions
) {
Transform.call(this as any, options);
// If this is the first time AsyncIterableTransform is being constructed,
// mixin the methods from the AsyncIterableX's prototype.
if (!AsyncIterableTransform[asyncIterableMixin]) {
AsyncIterableTransform[asyncIterableMixin] = true;
Object.defineProperties(
AsyncIterableTransform.prototype,
Object.getOwnPropertyDescriptors(AsyncIterableX.prototype)
);
}
}

AsyncIterableTransform.prototype = Object.create(Transform.prototype);

AsyncIterableTransform[asyncIterableMixin] = false;

/** @nocollapse */
AsyncIterableTransform.prototype._flush = function (callback: TransformCallback): void {
callback();
};
/** @nocollapse */
AsyncIterableTransform.prototype._transform = function (
chunk: any,
_encoding: string,
callback: TransformCallback
): void {
callback(null, chunk);
};

/** @ignore */
export function asAsyncIterable<T>(options: TransformOptions = {}) {
return new AsyncIterableTransform<T>(options);
return Reflect.construct(AsyncIterableTransform, [options]) as AsyncIterableTransform<T>;
}
8 changes: 4 additions & 4 deletions src/asynciterable/fromdomstream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ async function* _consumeReader<T>(

/** @ignore */
async function* defaultReaderToAsyncIterator<T = any>(reader: ReadableStreamDefaultReader<T>) {
let r: ReadableStreamDefaultReadResult<T>;
let r: ReadableStreamReadResult<T>;
while (!(r = await reader.read()).done) {
yield r.value;
}
}

/** @ignore */
async function* byobReaderToAsyncIterator(reader: ReadableStreamReader<Uint8Array>) {
let r: ReadableStreamDefaultReadResult<Uint8Array>;
let r: ReadableStreamReadResult<Uint8Array>;
let value: number | ArrayBufferLike = yield null!;
while (!(r = await readNext(reader, value, 0)).done) {
value = yield r.value;
Expand All @@ -85,7 +85,7 @@ async function readNext(
reader: ReadableStreamReader<Uint8Array>,
bufferOrLen: ArrayBufferLike | number,
offset: number
): Promise<ReadableStreamDefaultReadResult<Uint8Array>> {
): Promise<ReadableStreamReadResult<Uint8Array>> {
let size: number;
let buffer: ArrayBufferLike;

Expand All @@ -108,7 +108,7 @@ async function readInto(
buffer: ArrayBufferLike,
offset: number,
size: number
): Promise<ReadableStreamDefaultReadResult<Uint8Array>> {
): Promise<ReadableStreamReadResult<Uint8Array>> {
let innerOffset = offset;
if (innerOffset >= size) {
return { done: false, value: new Uint8Array(buffer, 0, size) };
Expand Down
1 change: 1 addition & 0 deletions src/asynciterable/operators/withabort.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export class WithAbortAsyncIterable<TSource> extends AsyncIterableX<TSource> {
}

[Symbol.asyncIterator](): AsyncIterator<TSource> {
// @ts-ignore
return this._source[Symbol.asyncIterator](this._signal);
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/asynciterable/todomstream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ class UnderlyingAsyncIterableDefaultSource<TSource = any> extends AbstractUnderl
super(source);
}
// eslint-disable-next-line consistent-return
async pull(controller: ReadableStreamDefaultController<TSource>) {
async pull(controller: ReadableStreamController<TSource>) {
const source = this._source;
if (source) {
const r = await source.next(controller.desiredSize);
if (!r.done) {
return controller.enqueue(r.value);
return (controller as ReadableStreamDefaultController<TSource>).enqueue(r.value);
}
}
controller.close();
Expand Down Expand Up @@ -87,7 +87,9 @@ class UnderlyingAsyncIterableByteSource<TSource extends ArrayBufferView = Uint8A
// eslint-disable-next-line consistent-return
async pull(controller: ReadableStreamController<TSource>) {
if (!(controller as any).byobRequest) {
return await this.fallbackDefaultSource.pull(controller);
return await this.fallbackDefaultSource.pull(
controller as ReadableStreamDefaultController<TSource>
);
}
if (this._source) {
const { view } = (controller as any).byobRequest;
Expand Down

0 comments on commit 0688845

Please sign in to comment.