From 73746086d617e71bb878df65b5bc211149a7d404 Mon Sep 17 00:00:00 2001 From: rjawesome Date: Wed, 7 Aug 2024 15:52:35 -0700 Subject: [PATCH] allow server integration tests to work with subquery relay --- src/controllers/threading/threadHandler.ts | 30 +++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/src/controllers/threading/threadHandler.ts b/src/controllers/threading/threadHandler.ts index 80c399a..c725c98 100644 --- a/src/controllers/threading/threadHandler.ts +++ b/src/controllers/threading/threadHandler.ts @@ -239,8 +239,34 @@ export async function runTask(req: Request, res: Response, route: string, useBul } if (process.env.USE_THREADING === "false") { + // Set up "inter thread messaging" + const { port1: workerSide, port2: parentSide } = new MessageChannel(); + parentSide.on("message", async (msg: ThreadMessage) => { + switch (msg.type) { + case "subqueryRequest": + const { queries, options } = msg.value as { + queries: FrozenSubquery[]; + options: QueryHandlerOptions; + }; + debug(`Main thread receives ${queries.length} subqueries from worker.`); + subqueryRelay.subscribe( + await Promise.all(queries.map(async query => await Subquery.unfreeze(query))), + options, + ({ hash, records, logs, apiUnavailable }) => { + parentSide.postMessage({ + threadId: 0, + type: "subQueryResult", + value: { hash, records, logs, apiUnavailable }, + } satisfies ThreadMessage); + }, + ); + break; + } + }); + global.workerSide = workerSide; // Threading disabled, just use the provided function in main event loop const response = (await tasks[route](taskInfo)) as TrapiResponse; + parentSide.close(); return response; } else if (!(queryQueue && useBullSync)) { // Redis unavailable or query not to sync queue such as asyncquery_status @@ -335,10 +361,8 @@ export async function runBullJob(job: BullJob, route: string, useAsync = true) { export function taskResponse(response: T, status: number = undefined): T { if (global.workerSide) { global.workerSide.postMessage({ threadId, type: 'result', value: response, status } satisfies ThreadMessage); - return undefined; - } else { - return response; } + return response; } export function taskError(error: Error): void {