From 6914c5e32cfef538658b7c90756931388c717f80 Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Fri, 1 Nov 2024 18:38:01 -0700 Subject: [PATCH] Fixes #13816 (#13906) Co-authored-by: pfg Co-authored-by: Ryan Gonzalez Co-authored-by: Ben Grant Co-authored-by: Dave Caruso --- CONTRIBUTING.md | 2 +- cmake/Options.cmake | 2 +- src/bun.js/bindings/ErrorCode.ts | 1 + src/js/builtins/ProcessObjectInternals.ts | 5 ++ src/js/builtins/ReadableStream.ts | 17 +++- .../ReadableStreamDefaultController.ts | 5 +- .../builtins/ReadableStreamDefaultReader.ts | 5 +- src/js/builtins/ReadableStreamInternals.ts | 38 +++++--- test/js/third_party/prompts/prompts.test.ts | 6 +- test/js/web/streams/streams.test.js | 88 +++++++++++++++++++ 10 files changed, 148 insertions(+), 21 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e143d693422a5b..9feff2712177b1 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -290,7 +290,7 @@ $ xcode-select --install Bun defaults to linking `libatomic` statically, as not all systems have it. If you are building on a distro that does not have a static libatomic available, you can run the following command to enable dynamic linking: ```bash -$ bun setup -DUSE_STATIC_LIBATOMIC=OFF +$ bun run build -DUSE_STATIC_LIBATOMIC=OFF ``` The built version of Bun may not work on other systems if compiled this way. diff --git a/cmake/Options.cmake b/cmake/Options.cmake index 36137c50cbb741..726a94a4b454b6 100644 --- a/cmake/Options.cmake +++ b/cmake/Options.cmake @@ -138,7 +138,7 @@ if(CMAKE_HOST_LINUX AND NOT WIN32 AND NOT APPLE) OUTPUT_STRIP_TRAILING_WHITESPACE ERROR_QUIET ) - if(LINUX_DISTRO MATCHES "NAME=\"(Arch|Manjaro|Artix) Linux\"|NAME=\"openSUSE Tumbleweed\"") + if(LINUX_DISTRO MATCHES "NAME=\"(Arch|Manjaro|Artix) Linux( ARM)?\"|NAME=\"openSUSE Tumbleweed\"") set(DEFAULT_STATIC_LIBATOMIC OFF) endif() endif() diff --git a/src/bun.js/bindings/ErrorCode.ts b/src/bun.js/bindings/ErrorCode.ts index 692ad34a3e7d7e..ed8185e191d110 100644 --- a/src/bun.js/bindings/ErrorCode.ts +++ b/src/bun.js/bindings/ErrorCode.ts @@ -45,6 +45,7 @@ export default [ ["ERR_BUFFER_OUT_OF_BOUNDS", RangeError, "RangeError"], ["ERR_UNKNOWN_SIGNAL", TypeError, "TypeError"], ["ERR_SOCKET_BAD_PORT", RangeError, "RangeError"], + ["ERR_STREAM_RELEASE_LOCK", Error, "AbortError"], // Bun-specific ["ERR_FORMDATA_PARSE_ERROR", TypeError, "TypeError"], diff --git a/src/js/builtins/ProcessObjectInternals.ts b/src/js/builtins/ProcessObjectInternals.ts index b62f3028b3332e..506596a53ffe74 100644 --- a/src/js/builtins/ProcessObjectInternals.ts +++ b/src/js/builtins/ProcessObjectInternals.ts @@ -195,6 +195,10 @@ export function getStdinStream(fd) { } } } catch (err) { + if (err?.code === "ERR_STREAM_RELEASE_LOCK") { + // Not a bug. Happens in unref(). + return; + } stream.destroy(err); } } @@ -212,6 +216,7 @@ export function getStdinStream(fd) { $debug('on("resume");'); ref(); stream._undestroy(); + stream_destroyed = false; }); stream._readableState.reading = false; diff --git a/src/js/builtins/ReadableStream.ts b/src/js/builtins/ReadableStream.ts index ed45aaab372278..6e7e2d5951d310 100644 --- a/src/js/builtins/ReadableStream.ts +++ b/src/js/builtins/ReadableStream.ts @@ -108,6 +108,7 @@ export function initializeReadableStream( $linkTimeConstant; export function readableStreamToArray(stream: ReadableStream): Promise { + if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream); // this is a direct stream var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource"); if (underlyingSource !== undefined) { @@ -119,6 +120,7 @@ export function readableStreamToArray(stream: ReadableStream): Promise { + if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream); // this is a direct stream var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource"); if (underlyingSource !== undefined) { @@ -137,6 +139,7 @@ export function readableStreamToText(stream: ReadableStream): Promise { $linkTimeConstant; export function readableStreamToArrayBuffer(stream: ReadableStream): Promise | ArrayBuffer { + if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream); // this is a direct stream var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource"); if (underlyingSource !== undefined) { @@ -216,6 +219,7 @@ export function readableStreamToArrayBuffer(stream: ReadableStream) $linkTimeConstant; export function readableStreamToBytes(stream: ReadableStream): Promise | Uint8Array { + if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream); // this is a direct stream var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource"); @@ -297,6 +301,7 @@ export function readableStreamToFormData( stream: ReadableStream, contentType: string | ArrayBuffer | ArrayBufferView, ): Promise { + if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream); if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked")); return Bun.readableStreamToBlob(stream).then(blob => { return FormData.from(blob, contentType); @@ -305,6 +310,7 @@ export function readableStreamToFormData( $linkTimeConstant; export function readableStreamToJSON(stream: ReadableStream): unknown { + if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream); if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked")); let result = $tryUseReadableStreamBufferedFastPath(stream, "json"); if (result) { @@ -326,6 +332,7 @@ export function readableStreamToJSON(stream: ReadableStream): unknown { $linkTimeConstant; export function readableStreamToBlob(stream: ReadableStream): Promise { + if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream); if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked")); return ( @@ -422,7 +429,15 @@ export function pipeThrough(this, streams, options) { if ($isWritableStreamLocked(internalWritable)) throw $makeTypeError("WritableStream is locked"); - $readableStreamPipeToWritableStream(this, internalWritable, preventClose, preventAbort, preventCancel, signal); + const promise = $readableStreamPipeToWritableStream( + this, + internalWritable, + preventClose, + preventAbort, + preventCancel, + signal, + ); + $markPromiseAsHandled(promise); return readable; } diff --git a/src/js/builtins/ReadableStreamDefaultController.ts b/src/js/builtins/ReadableStreamDefaultController.ts index 7c1c9ace77d6b8..6a04addc337d4a 100644 --- a/src/js/builtins/ReadableStreamDefaultController.ts +++ b/src/js/builtins/ReadableStreamDefaultController.ts @@ -33,8 +33,9 @@ export function initializeReadableStreamDefaultController(this, stream, underlyi export function enqueue(this, chunk) { if (!$isReadableStreamDefaultController(this)) throw $makeThisTypeError("ReadableStreamDefaultController", "enqueue"); - if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this)) - throw new TypeError("ReadableStreamDefaultController is not in a state where chunk can be enqueued"); + if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this)) { + throw $ERR_INVALID_STATE("ReadableStreamDefaultController is not in a state where chunk can be enqueued"); + } return $readableStreamDefaultControllerEnqueue(this, chunk); } diff --git a/src/js/builtins/ReadableStreamDefaultReader.ts b/src/js/builtins/ReadableStreamDefaultReader.ts index 2ff8e385f04c27..9ddb3e3f38d33d 100644 --- a/src/js/builtins/ReadableStreamDefaultReader.ts +++ b/src/js/builtins/ReadableStreamDefaultReader.ts @@ -172,10 +172,7 @@ export function releaseLock(this) { if (!$getByIdDirectPrivate(this, "ownerReadableStream")) return; - if ($getByIdDirectPrivate(this, "readRequests")?.isNotEmpty()) - throw new TypeError("There are still pending read requests, cannot release the lock"); - - $readableStreamReaderGenericRelease(this); + $readableStreamDefaultReaderRelease(this); } $getter; diff --git a/src/js/builtins/ReadableStreamInternals.ts b/src/js/builtins/ReadableStreamInternals.ts index 7f95f39ee9b9d7..72b42a2c76dc5c 100644 --- a/src/js/builtins/ReadableStreamInternals.ts +++ b/src/js/builtins/ReadableStreamInternals.ts @@ -331,7 +331,10 @@ export function pipeToDoReadWrite(pipeState) { pipeState.pendingReadPromiseCapability.resolve.$call(undefined, canWrite); if (!canWrite) return; - pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value); + pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value).$then( + undefined, + () => {}, + ); }, e => { pipeState.pendingReadPromiseCapability.resolve.$call(undefined, false); @@ -396,7 +399,7 @@ export function pipeToClosingMustBePropagatedForward(pipeState) { action(); return; } - $getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").promise.$then(action, undefined); + $getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").promise.$then(action, () => {}); } export function pipeToClosingMustBePropagatedBackward(pipeState) { @@ -1367,20 +1370,18 @@ export function readableStreamError(stream, error) { if (!reader) return; + $getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(undefined, error); + const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise; + $markPromiseAsHandled(promise); + if ($isReadableStreamDefaultReader(reader)) { - const requests = $getByIdDirectPrivate(reader, "readRequests"); - $putByIdDirectPrivate(reader, "readRequests", $createFIFO()); - for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error); + $readableStreamDefaultReaderErrorReadRequests(reader, error); } else { $assert($isReadableStreamBYOBReader(reader)); const requests = $getByIdDirectPrivate(reader, "readIntoRequests"); $putByIdDirectPrivate(reader, "readIntoRequests", $createFIFO()); for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error); } - - $getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(undefined, error); - const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise; - $markPromiseAsHandled(promise); } export function readableStreamDefaultControllerShouldCallPull(controller) { @@ -1608,6 +1609,15 @@ export function isReadableStreamDisturbed(stream) { return stream.$disturbed; } +$visibility = "Private"; +export function readableStreamDefaultReaderRelease(reader) { + $readableStreamReaderGenericRelease(reader); + $readableStreamDefaultReaderErrorReadRequests( + reader, + $ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()"), + ); +} + $visibility = "Private"; export function readableStreamReaderGenericRelease(reader) { $assert(!!$getByIdDirectPrivate(reader, "ownerReadableStream")); @@ -1616,11 +1626,11 @@ export function readableStreamReaderGenericRelease(reader) { if ($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === $streamReadable) $getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call( undefined, - $makeTypeError("releasing lock of reader whose stream is still in readable state"), + $ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()"), ); else $putByIdDirectPrivate(reader, "closedPromiseCapability", { - promise: $newHandledRejectedPromise($makeTypeError("reader released lock")), + promise: $newHandledRejectedPromise($ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()")), }); const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise; @@ -1636,6 +1646,12 @@ export function readableStreamReaderGenericRelease(reader) { $putByIdDirectPrivate(reader, "ownerReadableStream", undefined); } +export function readableStreamDefaultReaderErrorReadRequests(reader, error) { + const requests = $getByIdDirectPrivate(reader, "readRequests"); + $putByIdDirectPrivate(reader, "readRequests", $createFIFO()); + for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error); +} + export function readableStreamDefaultControllerCanCloseOrEnqueue(controller) { if ($getByIdDirectPrivate(controller, "closeRequested")) { return false; diff --git a/test/js/third_party/prompts/prompts.test.ts b/test/js/third_party/prompts/prompts.test.ts index 4d58ee36afb340..00765fe76dbe1e 100644 --- a/test/js/third_party/prompts/prompts.test.ts +++ b/test/js/third_party/prompts/prompts.test.ts @@ -9,7 +9,11 @@ test("works with prompts", async () => { stdin: "pipe", }); - await Bun.sleep(100); + const reader = child.stdout.getReader(); + + await reader.read(); + reader.releaseLock(); + child.stdin.write("dylan\n"); await Bun.sleep(100); child.stdin.write("999\n"); diff --git a/test/js/web/streams/streams.test.js b/test/js/web/streams/streams.test.js index 4f769a5420a9e0..1caa6eb7aecc02 100644 --- a/test/js/web/streams/streams.test.js +++ b/test/js/web/streams/streams.test.js @@ -756,6 +756,55 @@ it("ReadableStream for empty file closes immediately", async () => { expect(chunks.length).toBe(0); }); +it("ReadableStream errors the stream on pull rejection", async () => { + let stream = new ReadableStream({ + pull(controller) { + return Promise.reject("pull rejected"); + }, + }); + + let reader = stream.getReader(); + let closed = reader.closed.catch(err => `closed: ${err}`); + let read = reader.read().catch(err => `read: ${err}`); + expect(await Promise.race([closed, read])).toBe("closed: pull rejected"); + expect(await read).toBe("read: pull rejected"); +}); + +it("ReadableStream rejects pending reads when the lock is released", async () => { + let { resolve, promise } = Promise.withResolvers(); + let stream = new ReadableStream({ + async pull(controller) { + controller.enqueue("123"); + await promise; + controller.enqueue("456"); + controller.close(); + }, + }); + + let reader = stream.getReader(); + expect((await reader.read()).value).toBe("123"); + + let read = reader.read(); + reader.releaseLock(); + expect(read).rejects.toThrow( + expect.objectContaining({ + name: "AbortError", + code: "ERR_STREAM_RELEASE_LOCK", + }), + ); + expect(reader.closed).rejects.toThrow( + expect.objectContaining({ + name: "AbortError", + code: "ERR_STREAM_RELEASE_LOCK", + }), + ); + + resolve(); + + reader = stream.getReader(); + expect((await reader.read()).value).toBe("456"); +}); + it("new Response(stream).arrayBuffer() (bytes)", async () => { var queue = [Buffer.from("abdefgh")]; var stream = new ReadableStream({ @@ -1053,3 +1102,42 @@ it("fs.createReadStream(filename) should be able to break inside async loop", as expect(true).toBe(true); } }); + +it("pipeTo doesn't cause unhandled rejections on readable errors", async () => { + // https://github.com/WebKit/WebKit/blob/3a75b5d2de94aa396a99b454ac47f3be9e0dc726/LayoutTests/streams/pipeTo-unhandled-promise.html + let unhandledRejectionCaught = false; + + const catchUnhandledRejection = () => { + unhandledRejectionCaught = true; + }; + process.on("unhandledRejection", catchUnhandledRejection); + + const writable = new WritableStream(); + const readable = new ReadableStream({ start: c => c.error("error") }); + readable.pipeTo(writable).catch(() => {}); + + await Bun.sleep(15); + + process.off("unhandledRejection", catchUnhandledRejection); + + expect(unhandledRejectionCaught).toBe(false); +}); + +it("pipeThrough doesn't cause unhandled rejections on readable errors", async () => { + let unhandledRejectionCaught = false; + + const catchUnhandledRejection = () => { + unhandledRejectionCaught = true; + }; + process.on("unhandledRejection", catchUnhandledRejection); + + const readable = new ReadableStream({ start: c => c.error("error") }); + const ts = new TransformStream(); + readable.pipeThrough(ts); + + await Bun.sleep(15); + + process.off("unhandledRejection", catchUnhandledRejection); + + expect(unhandledRejectionCaught).toBe(false); +});