From b0f587f0a1139e5d84dffda22c797d700dea4905 Mon Sep 17 00:00:00 2001 From: Alisue Date: Tue, 15 Oct 2024 00:24:09 +0900 Subject: [PATCH] feat(async): add `repeatable` helper function --- .github/FUNDING.yml | 4 +- README.md | 21 ++++++++++ async/mod.ts | 1 + async/repeatable.ts | 40 ++++++++++++++++++++ async/repeatable_test.ts | 82 ++++++++++++++++++++++++++++++++++++++++ deno.jsonc | 2 + package-lock.json | 7 ++++ package.json | 1 + 8 files changed, 157 insertions(+), 1 deletion(-) create mode 100644 async/repeatable.ts create mode 100644 async/repeatable_test.ts diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml index 7e8acac..1fdc05e 100644 --- a/.github/FUNDING.yml +++ b/.github/FUNDING.yml @@ -1,6 +1,8 @@ # These are supported funding model platforms -github: [lambdalisue] # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2] +github: [ + lambdalisue, +] # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2] patreon: # Replace with a single Patreon username open_collective: # Replace with a single Open Collective username ko_fi: # Replace with a single Ko-fi username diff --git a/README.md b/README.md index 11adabd..00fa748 100644 --- a/README.md +++ b/README.md @@ -1115,6 +1115,27 @@ const iter = pipe( console.log(await Array.fromAsync(iter)); // [1, 2, 3, 1, 2, 3] ``` +### repeatable + +Transform an async iterable into a repeatable async iterable. It caches the +values of the original iterable so that it can be replayed. Useful for replaying +the costly async iterable. + +```ts +import { repeatable } from "@core/iterutil/async/repeatable"; +import { assertEquals } from "@std/assert"; + +const origin = (async function* () { + yield 1; + yield 2; + yield 3; +})(); +const iter = repeatable(origin); +assertEquals(await Array.fromAsync(iter), [1, 2, 3]); +assertEquals(await Array.fromAsync(iter), [1, 2, 3]); // iter can be replayed +assertEquals(await Array.fromAsync(origin), []); // origin is already consumed +``` + ### some Returns true if at least one element in the iterable satisfies the provided diff --git a/async/mod.ts b/async/mod.ts index aac738f..a543869 100644 --- a/async/mod.ts +++ b/async/mod.ts @@ -21,6 +21,7 @@ export * from "./pairwise.ts"; export * from "./partition.ts"; export * from "./reduce.ts"; export * from "./repeat.ts"; +export * from "./repeatable.ts"; export * from "./some.ts"; export * from "./take.ts"; export * from "./take_while.ts"; diff --git a/async/repeatable.ts b/async/repeatable.ts new file mode 100644 index 0000000..c1599db --- /dev/null +++ b/async/repeatable.ts @@ -0,0 +1,40 @@ +export function repeatable(iterable: AsyncIterable): AsyncIterable { + const cache: T[] = []; + let buildingCache: Promise | undefined = undefined; + let pendingResolvers: ((value: T) => void)[] = []; + let finished = false; + + return { + [Symbol.asyncIterator]: async function* () { + yield* cache; + + if (!finished) { + if (!buildingCache) { + buildingCache = (async () => { + try { + for await (const item of iterable) { + cache.push(item); + pendingResolvers.forEach((resolve) => resolve(item)); + pendingResolvers = []; + } + } finally { + finished = true; + } + })(); + } + } + let index = cache.length; + while (!finished || index < cache.length) { + if (index < cache.length) { + yield cache[index++]; + } else { + const nextItem = await new Promise((resolve) => { + pendingResolvers.push(resolve); + }); + yield nextItem; + index++; + } + } + }, + }; +} diff --git a/async/repeatable_test.ts b/async/repeatable_test.ts new file mode 100644 index 0000000..9a65589 --- /dev/null +++ b/async/repeatable_test.ts @@ -0,0 +1,82 @@ +import { test } from "@cross/test"; +import { delay } from "@std/async/delay"; +import { assertEquals } from "@std/assert"; +import { repeatable } from "./repeatable.ts"; + +async function* delayedGenerator(sideEffect?: () => void) { + yield 1; + await delay(100); + yield 2; + await delay(100); + yield 3; + sideEffect?.(); +} + +await test("repeatable should return the same sequence on multiple iterations", async () => { + const input = delayedGenerator(); + const it = repeatable(input); + + const result1 = await Array.fromAsync(it); + const result2 = await Array.fromAsync(it); + + assertEquals(result1, [1, 2, 3], "First iteration"); + assertEquals(result2, [1, 2, 3], "First iteration"); +}); + +await test("repeatable should call internal iterator only once", async () => { + let called = 0; + const input = delayedGenerator(() => called++); + const it = repeatable(input); + + const result1 = await Array.fromAsync(it); + const result2 = await Array.fromAsync(it); + + assertEquals(result1, [1, 2, 3], "First iteration"); + assertEquals(result2, [1, 2, 3], "First iteration"); + assertEquals(called, 1, "Internal iterator called only once"); +}); + +await test("repeatable should work correctly when consumed partially and then fully", async () => { + const input = delayedGenerator(); + const it = repeatable(input); + + const result1: number[] = []; + const firstIter = it[Symbol.asyncIterator](); + + result1.push((await firstIter.next()).value); // 1 + + const result2 = await Array.fromAsync(it); + + result1.push((await firstIter.next()).value); // 2 + result1.push((await firstIter.next()).value); // 3 + + assertEquals(result1, [1, 2, 3], "First iteration"); + assertEquals(result2, [1, 2, 3], "First iteration"); +}); + +await test("repeatable should cache values and return them immediately on subsequent iterations", async () => { + const input = delayedGenerator(); + const it = repeatable(input); + + const start = performance.now(); + const result1 = await Array.fromAsync(it); + const end1 = performance.now(); + const timeTaken1 = end1 - start; + + const start2 = performance.now(); + const result2 = await Array.fromAsync(it); + const end2 = performance.now(); + const timeTaken2 = end2 - start2; + + assertEquals(result1, [1, 2, 3], "First iteration"); + assertEquals(result2, [1, 2, 3], "Second iteration"); + + console.debug("Time taken for first consume:", timeTaken1); + console.debug("Time taken for second consume (with cache):", timeTaken2); + + if (timeTaken2 > timeTaken1 / 10) { + throw new Error( + "Second consume took too long, cache might not be working.", + ); + } +}); diff --git a/deno.jsonc b/deno.jsonc index 9ebf782..04e7905 100644 --- a/deno.jsonc +++ b/deno.jsonc @@ -27,6 +27,7 @@ "./async/partition": "./async/partition.ts", "./async/reduce": "./async/reduce.ts", "./async/repeat": "./async/repeat.ts", + "./async/repeatable": "./async/repeatable.ts", "./async/some": "./async/some.ts", "./async/take": "./async/take.ts", "./async/take-while": "./async/take_while.ts", @@ -261,6 +262,7 @@ "@core/unknownutil": "jsr:@core/unknownutil@^4.0.1", "@cross/test": "jsr:@cross/test@^0.0.9", "@std/assert": "jsr:@std/assert@^1.0.2", + "@std/async": "jsr:@std/async@^1.0.6", "@std/jsonc": "jsr:@std/jsonc@^1.0.0", "@std/path": "jsr:@std/path@^1.0.2", "@std/testing": "jsr:@std/testing@^1.0.0" diff --git a/package-lock.json b/package-lock.json index ecbb697..a43f7d6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "@core/unknownutil": "npm:@jsr/core__unknownutil@^4.2.0", "@cross/test": "npm:@jsr/cross__test@^0.0.9", "@std/assert": "npm:@jsr/std__assert@^1.0.2", + "@std/async": "npm:@jsr/std__async@^1.0.2", "@std/jsonc": "npm:@jsr/std__jsonc@^1.0.0-rc.3", "@std/path": "npm:@jsr/std__path@^1.0.2", "@std/testing": "npm:@jsr/std__testing@^1.0.0-rc.5" @@ -106,6 +107,12 @@ "@jsr/std__internal": "^1.0.1" } }, + "node_modules/@std/async": { + "name": "@jsr/std__async", + "version": "1.0.6", + "resolved": "https://npm.jsr.io/~/11/@jsr/std__async/1.0.6.tgz", + "integrity": "sha512-lIglWWtdtX1jFgMznUaX1zs1vDIA7lARZC+QrfwuHRPo/4k0nCkzEUOjCBczfqxbqviZKf7VjopGxOMAgy9nXA==" + }, "node_modules/@std/jsonc": { "name": "@jsr/std__jsonc", "version": "1.0.0-rc.3", diff --git a/package.json b/package.json index c873dee..c139e88 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,7 @@ "@core/unknownutil": "npm:@jsr/core__unknownutil@^4.2.0", "@cross/test": "npm:@jsr/cross__test@^0.0.9", "@std/assert": "npm:@jsr/std__assert@^1.0.2", + "@std/async": "npm:@jsr/std__async@^1.0.2", "@std/jsonc": "npm:@jsr/std__jsonc@^1.0.0-rc.3", "@std/path": "npm:@jsr/std__path@^1.0.2", "@std/testing": "npm:@jsr/std__testing@^1.0.0-rc.5"