From 73f0cc918a3e649c46e53b5604744fb4be9f2ec4 Mon Sep 17 00:00:00 2001 From: Peli de Halleux Date: Wed, 25 Sep 2024 14:23:34 +0000 Subject: [PATCH] Add concurrency documentation and update promise queue implementation --- docs/genaisrc/genaiscript.d.ts | 6 +- .../docs/reference/scripts/concurrency.md | 66 +++++++++++++++++++ genaisrc/genaiscript.d.ts | 6 +- packages/auto/genaiscript.d.ts | 6 +- packages/core/src/concurrency.ts | 6 +- packages/core/src/genaisrc/genaiscript.d.ts | 6 +- packages/core/src/promptcontext.ts | 2 +- packages/core/src/types/prompt_template.d.ts | 6 +- .../sample/genaisrc/blog/genaiscript.d.ts | 6 +- packages/sample/genaisrc/genaiscript.d.ts | 6 +- .../sample/genaisrc/node/genaiscript.d.ts | 6 +- .../sample/genaisrc/python/genaiscript.d.ts | 6 +- .../sample/genaisrc/style/genaiscript.d.ts | 6 +- .../genaisrc/summarize-concurrent.genai.mjs | 2 +- packages/sample/src/aici/genaiscript.d.ts | 6 +- packages/sample/src/errors/genaiscript.d.ts | 6 +- packages/sample/src/genaiscript.d.ts | 6 +- packages/sample/src/makecode/genaiscript.d.ts | 6 +- packages/sample/src/tla/genaiscript.d.ts | 6 +- packages/sample/src/vision/genaiscript.d.ts | 6 +- packages/vscode/genaisrc/cmt.genai.mts | 18 +++-- packages/vscode/genaisrc/genaiscript.d.ts | 6 +- slides/genaisrc/genaiscript.d.ts | 6 +- 23 files changed, 99 insertions(+), 103 deletions(-) create mode 100644 docs/src/content/docs/reference/scripts/concurrency.md diff --git a/docs/genaisrc/genaiscript.d.ts b/docs/genaisrc/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/docs/genaisrc/genaiscript.d.ts +++ b/docs/genaisrc/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/docs/src/content/docs/reference/scripts/concurrency.md b/docs/src/content/docs/reference/scripts/concurrency.md new file mode 100644 index 0000000000..4e5e4f9e06 --- /dev/null +++ b/docs/src/content/docs/reference/scripts/concurrency.md @@ -0,0 +1,66 @@ +--- +title: Concurrency +description: How to run multiple prompts concurrently +sidebar: + order: 50 +--- + +When working with a GenAI, your program will likely be idling waiting for tokens to come back from the LLM. + +## await and async + +JavaScript has a wonderful support for non-blocking asynchronous APIs using [async functions](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function). + +```js +// takes a while +async function workM() { ... } + +// let other threads work while this function is running +await work() +``` + +This feature is leveraged in [inline prompts](/genaiscript/reference/scripts/inline-prompts) to wait for a LLM result or run multiple queries concurrently. + +## Serial vs concurrent execution + +In this example, we run each LLM queries 'serially' using `await`: + +```js +const poem = await prompt`write a poem` +const essay = await prompt`write an essay` +``` + +However, we can run all queries 'concurrently' to speed things up: + +```js +const [poem, essay] = await Promise.all( + prompt`write a poem`, + prompt`write an essay` +) +``` + +This works but it may become problematic if you have a lot of entries as you will create a lot of requests concurrently and probably hit some rate limiting boundaries. +Note that GenAIScript automatically limits the number of concurrent requests to a single model to prevent this scenario. + +## Promise queue + +The promise queue provides a way to run promises concurrently with a guaranteed concurrency limit, how many are allowed to run at the same time. +The difference with `Promise.all` is that you wrap each promise in a function. + +```js +const queue = host.promiseQueue(3) +const res = await queue.all( + () => prompt`write a poem` + () => prompt`write an essay` +) +``` + +Use the `mapAll` function to iterate over an array. + +```js +const queue = host.promiseQueue(3) +const summaries = await queue.mapAll( + env.files, + (file) => prompt`Summarize ${file}` +) +``` diff --git a/genaisrc/genaiscript.d.ts b/genaisrc/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/genaisrc/genaiscript.d.ts +++ b/genaisrc/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/packages/auto/genaiscript.d.ts b/packages/auto/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/packages/auto/genaiscript.d.ts +++ b/packages/auto/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/packages/core/src/concurrency.ts b/packages/core/src/concurrency.ts index 8837a49fb7..fa77ba377f 100644 --- a/packages/core/src/concurrency.ts +++ b/packages/core/src/concurrency.ts @@ -20,10 +20,8 @@ export function concurrentLimit( export class PLimitPromiseQueue implements PromiseQueue { private queue: LimitFunction - constructor(private readonly options: PromiseQueueOptions) { - this.queue = pLimit( - options?.concurrency ?? PROMISE_QUEUE_CONCURRENCY_DEFAULT - ) + constructor(concurrency?: number) { + this.queue = pLimit(concurrency ?? PROMISE_QUEUE_CONCURRENCY_DEFAULT) } async mapAll( diff --git a/packages/core/src/genaisrc/genaiscript.d.ts b/packages/core/src/genaisrc/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/packages/core/src/genaisrc/genaiscript.d.ts +++ b/packages/core/src/genaisrc/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/packages/core/src/promptcontext.ts b/packages/core/src/promptcontext.ts index 43dc36a645..0bae4c2ed9 100644 --- a/packages/core/src/promptcontext.ts +++ b/packages/core/src/promptcontext.ts @@ -225,7 +225,7 @@ export async function createPromptContext( await runtimeHost.select(message, options), input: async (message) => await runtimeHost.input(message), confirm: async (message) => await runtimeHost.confirm(message), - promiseQueue: (options) => new PLimitPromiseQueue(options), + promiseQueue: (concurrency) => new PLimitPromiseQueue(concurrency), }) const ctx: PromptContext & RunPromptContextNode = { diff --git a/packages/core/src/types/prompt_template.d.ts b/packages/core/src/types/prompt_template.d.ts index 85b712ecf3..0a18293f3f 100644 --- a/packages/core/src/types/prompt_template.d.ts +++ b/packages/core/src/types/prompt_template.d.ts @@ -2306,10 +2306,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2320,7 +2316,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/packages/sample/genaisrc/blog/genaiscript.d.ts b/packages/sample/genaisrc/blog/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/packages/sample/genaisrc/blog/genaiscript.d.ts +++ b/packages/sample/genaisrc/blog/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/packages/sample/genaisrc/genaiscript.d.ts b/packages/sample/genaisrc/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/packages/sample/genaisrc/genaiscript.d.ts +++ b/packages/sample/genaisrc/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/packages/sample/genaisrc/node/genaiscript.d.ts b/packages/sample/genaisrc/node/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/packages/sample/genaisrc/node/genaiscript.d.ts +++ b/packages/sample/genaisrc/node/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/packages/sample/genaisrc/python/genaiscript.d.ts b/packages/sample/genaisrc/python/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/packages/sample/genaisrc/python/genaiscript.d.ts +++ b/packages/sample/genaisrc/python/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/packages/sample/genaisrc/style/genaiscript.d.ts b/packages/sample/genaisrc/style/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/packages/sample/genaisrc/style/genaiscript.d.ts +++ b/packages/sample/genaisrc/style/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/packages/sample/genaisrc/summarize-concurrent.genai.mjs b/packages/sample/genaisrc/summarize-concurrent.genai.mjs index 67c4832bc3..a1b143862a 100644 --- a/packages/sample/genaisrc/summarize-concurrent.genai.mjs +++ b/packages/sample/genaisrc/summarize-concurrent.genai.mjs @@ -4,7 +4,7 @@ script({ files: "src/rag/*", }) -const queue = host.promiseQueue({ concurrency: 2 }) +const queue = host.promiseQueue(2) const summaries = await queue.mapAll(env.files, (file) => runPrompt( (_) => { diff --git a/packages/sample/src/aici/genaiscript.d.ts b/packages/sample/src/aici/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/packages/sample/src/aici/genaiscript.d.ts +++ b/packages/sample/src/aici/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/packages/sample/src/errors/genaiscript.d.ts b/packages/sample/src/errors/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/packages/sample/src/errors/genaiscript.d.ts +++ b/packages/sample/src/errors/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/packages/sample/src/genaiscript.d.ts b/packages/sample/src/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/packages/sample/src/genaiscript.d.ts +++ b/packages/sample/src/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/packages/sample/src/makecode/genaiscript.d.ts b/packages/sample/src/makecode/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/packages/sample/src/makecode/genaiscript.d.ts +++ b/packages/sample/src/makecode/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/packages/sample/src/tla/genaiscript.d.ts b/packages/sample/src/tla/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/packages/sample/src/tla/genaiscript.d.ts +++ b/packages/sample/src/tla/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/packages/sample/src/vision/genaiscript.d.ts b/packages/sample/src/vision/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/packages/sample/src/vision/genaiscript.d.ts +++ b/packages/sample/src/vision/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/packages/vscode/genaisrc/cmt.genai.mts b/packages/vscode/genaisrc/cmt.genai.mts index 2100df68f1..9ad8b1faa9 100644 --- a/packages/vscode/genaisrc/cmt.genai.mts +++ b/packages/vscode/genaisrc/cmt.genai.mts @@ -21,13 +21,15 @@ const { format, build } = env.vars let files = env.files if (files.length === 0) { // If no files are provided, read all modified files + const gitStatus = await host.exec("git status --porcelain") + const rx = /^\s+[M|U]\s+/ // modified or untracked files = await Promise.all( - (await host.exec("git status --porcelain")).stdout - .split("\n") - .filter((filename) => /^ [M|U]/.test(filename)) + gitStatus.stdout + .split(/\r?\n/g) + .filter((filename) => rx.test(filename)) .map( async (filename) => - await workspace.readText(filename.replace(/^ [M|U] /, "")) + await workspace.readText(filename.replace(rx, "")) ) ) } @@ -35,16 +37,18 @@ if (files.length === 0) { // custom filter to only process code files files = files.filter( ({ filename }) => - /\.(ts|cs|py|js|java)$/.test(filename) && // known languages only + /\.(py|m?ts|m?js|cs|java|c|cpp|h|hpp)$/.test(filename) && // known languages only !/\.test/.test(filename) // ignore test files ) // Shuffle files files = files.sort(() => Math.random() - 0.5) +console.log(YAML.stringify(files.map((f) => f.filename))) + // Process each file separately to avoid context explosion -const jobs = host.pQueue({ concurrency: 5 }) -await Promise.all(files.map((file) => jobs.add(processFile, file))) +const jobs = host.promiseQueue(5) +await jobs.mapAll(files, processFile) async function processFile(file: WorkspaceFile) { console.log(`processing ${file.filename}`) diff --git a/packages/vscode/genaisrc/genaiscript.d.ts b/packages/vscode/genaisrc/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/packages/vscode/genaisrc/genaiscript.d.ts +++ b/packages/vscode/genaisrc/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost { diff --git a/slides/genaisrc/genaiscript.d.ts b/slides/genaisrc/genaiscript.d.ts index 814e085455..ef3161baf2 100644 --- a/slides/genaisrc/genaiscript.d.ts +++ b/slides/genaisrc/genaiscript.d.ts @@ -2339,10 +2339,6 @@ interface PromiseQueue { ): Promise } -interface PromiseQueueOptions { - concurrency?: number -} - interface PromptHost extends ShellHost { /** * Starts a container @@ -2353,7 +2349,7 @@ interface PromptHost extends ShellHost { /** * Create a new promise queue to run async functions with limited concurrency */ - promiseQueue(options?: PromiseQueueOptions): PromiseQueue + promiseQueue(concurrency: number): PromiseQueue } interface ContainerHost extends ShellHost {