Skip to content

Commit

Permalink
allow server integration tests to work with subquery relay
Browse files Browse the repository at this point in the history
  • Loading branch information
rjawesome committed Aug 7, 2024
1 parent d65bae2 commit 7374608
Showing 1 changed file with 27 additions and 3 deletions.
30 changes: 27 additions & 3 deletions src/controllers/threading/threadHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,34 @@ export async function runTask(req: Request, res: Response, route: string, useBul
}

if (process.env.USE_THREADING === "false") {
// Set up "inter thread messaging"
const { port1: workerSide, port2: parentSide } = new MessageChannel();
parentSide.on("message", async (msg: ThreadMessage) => {
switch (msg.type) {
case "subqueryRequest":
const { queries, options } = msg.value as {
queries: FrozenSubquery[];
options: QueryHandlerOptions;
};
debug(`Main thread receives ${queries.length} subqueries from worker.`);
subqueryRelay.subscribe(
await Promise.all(queries.map(async query => await Subquery.unfreeze(query))),
options,
({ hash, records, logs, apiUnavailable }) => {
parentSide.postMessage({
threadId: 0,
type: "subQueryResult",
value: { hash, records, logs, apiUnavailable },
} satisfies ThreadMessage);
},
);
break;
}
});
global.workerSide = workerSide;
// Threading disabled, just use the provided function in main event loop
const response = (await tasks[route](taskInfo)) as TrapiResponse;
parentSide.close();
return response;
} else if (!(queryQueue && useBullSync)) {
// Redis unavailable or query not to sync queue such as asyncquery_status
Expand Down Expand Up @@ -335,10 +361,8 @@ export async function runBullJob(job: BullJob, route: string, useAsync = true) {
export function taskResponse<T>(response: T, status: number = undefined): T {
if (global.workerSide) {
global.workerSide.postMessage({ threadId, type: 'result', value: response, status } satisfies ThreadMessage);
return undefined;
} else {
return response;
}
return response;
}

export function taskError(error: Error): void {
Expand Down

0 comments on commit 7374608

Please sign in to comment.