From 1ea4ba582c0feaf1ac3470a44da897d058d7d630 Mon Sep 17 00:00:00 2001 From: NeuralFlux <40491005+NeuralFlux@users.noreply.github.com> Date: Thu, 31 Oct 2024 13:13:47 -0400 Subject: [PATCH 1/8] chore: refactor route var names to camel case --- src/config/index.ts | 8 ++++---- src/controllers/async/asyncquery_queue.ts | 2 +- src/controllers/threading/threadHandler.ts | 4 ++-- src/routes/bullboard.ts | 4 ++-- src/routes/v1/asyncquery_v1_by_api.ts | 4 ++-- src/routes/v1/asyncquery_v1_by_team.ts | 4 ++-- src/routes/v1/query_v1_by_api.ts | 2 +- src/routes/v1/query_v1_by_team.ts | 2 +- src/types.ts | 2 +- 9 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/config/index.ts b/src/config/index.ts index 68bb76b..d83e688 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -86,14 +86,14 @@ export default class Config { }); this.app.use("/", fastLimiter); this.app.use("/v1/query", medLimiter); - this.app.use("/v1/team/:team_name/query", medLimiter); - this.app.use("/v1/team/:smartapiID/query", fastLimiter); + this.app.use("/v1/team/:teamName/query", medLimiter); + this.app.use("/v1/team/:smartAPIID/query", fastLimiter); this.app.use("/v1/meta_knowledge_graph", fastLimiter); this.app.use("/v1/team/:teamName/meta_knowledge_graph", fastLimiter); - this.app.use("/v1/smartapi/:smartapiID/meta_knowledge_graph", fastLimiter); + this.app.use("/v1/smartapi/:smartAPIID/meta_knowledge_graph", fastLimiter); this.app.use("/v1/asyncquery", fastLimiter); this.app.use("/v1/team/:teamName/asyncquery", fastLimiter); - this.app.use("/v1/smartapi/:smartapiID/asyncquery", fastLimiter); + this.app.use("/v1/smartapi/:smartAPIID/asyncquery", fastLimiter); this.app.use("/queues", fastLimiter); } diff --git a/src/controllers/async/asyncquery_queue.ts b/src/controllers/async/asyncquery_queue.ts index a536b5e..669a9ac 100644 --- a/src/controllers/async/asyncquery_queue.ts +++ b/src/controllers/async/asyncquery_queue.ts @@ -79,7 +79,7 @@ export function getQueryQueue(name: string): BullQueue { workflow: [ { id: - job.data.route.includes(":smartapi_id") || job.data.route.includes(":team_name") + job.data.route.includes(":smartAPIID") || job.data.route.includes(":teamName") ? "lookup" : "lookup_and_score", }, diff --git a/src/controllers/threading/threadHandler.ts b/src/controllers/threading/threadHandler.ts index 6a6916a..d84449a 100644 --- a/src/controllers/threading/threadHandler.ts +++ b/src/controllers/threading/threadHandler.ts @@ -226,11 +226,11 @@ export async function runTask(req: Request, res: Response, route: string, useBul route, queryGraph: (req.body as TrapiQuery)?.message?.query_graph, workflow: (req.body as TrapiQuery)?.workflow, + smartAPIID: req.params.smartAPIID, + teamName: req.params.teamName, options: { logLevel: (req.body as TrapiQuery).log_level || (req.query.log_level as string), submitter: (req.body as TrapiQuery).submitter, - smartAPIID: req.params.smartapi_id, - teamName: req.params.team_name, ...req.query, }, params: req.params, diff --git a/src/routes/bullboard.ts b/src/routes/bullboard.ts index 33b1fcf..77c7775 100644 --- a/src/routes/bullboard.ts +++ b/src/routes/bullboard.ts @@ -23,8 +23,8 @@ class BullBoardPage { } const queues = { "/v1/asynquery": getQueryQueue("bte_query_queue"), - "/v1/smartapi/{smartapi_id}/asyncquery": getQueryQueue("bte_query_queue_by_api"), - "/v1/team/{team_name}/asyncquery": getQueryQueue("bte_query_queue_by_team"), + "/v1/smartapi/{smartAPIID}/asyncquery": getQueryQueue("bte_query_queue_by_api"), + "/v1/team/{teamName}/asyncquery": getQueryQueue("bte_query_queue_by_team"), "/v1/query": getQueryQueue("bte_sync_query_queue"), }; diff --git a/src/routes/v1/asyncquery_v1_by_api.ts b/src/routes/v1/asyncquery_v1_by_api.ts index 60d06ef..a687010 100644 --- a/src/routes/v1/asyncquery_v1_by_api.ts +++ b/src/routes/v1/asyncquery_v1_by_api.ts @@ -20,12 +20,12 @@ import { BteRoute } from "../../types"; class V1AsyncQueryByAPI implements BteRoute { setRoutes(app: Express) { app - .route("/v1/smartapi/:smartapi_id/asyncquery") + .route("/v1/smartapi/:smartAPIID/asyncquery") .post(swaggerValidation.validate, async (req: Request, res: Response, next: NextFunction) => { const queueData: QueueData = { route: req.route.path, queryGraph: req.body?.message.query_graph, - smartAPIID: req.params.smartapi_id, + smartAPIID: req.params.smartAPIID, workflow: req.body?.workflow, callback_url: req.body?.callback, options: { diff --git a/src/routes/v1/asyncquery_v1_by_team.ts b/src/routes/v1/asyncquery_v1_by_team.ts index dc8c719..21af997 100644 --- a/src/routes/v1/asyncquery_v1_by_team.ts +++ b/src/routes/v1/asyncquery_v1_by_team.ts @@ -21,12 +21,12 @@ import { QueueData, TaskInfo } from "@biothings-explorer/types"; class V1AsyncQueryByTeam implements BteRoute { setRoutes(app: Express) { app - .route("/v1/team/:team_name/asyncquery") + .route("/v1/team/:teamName/asyncquery") .post(swaggerValidation.validate, (async (req: Request, res: Response, next: NextFunction) => { const queueData: QueueData = { route: req.route.path, queryGraph: req.body?.message.query_graph, - teamName: req.params.team_name, + teamName: req.params.teamName, workflow: req.body?.workflow, callback_url: req.body?.callback, options: { diff --git a/src/routes/v1/query_v1_by_api.ts b/src/routes/v1/query_v1_by_api.ts index fa7bb31..c9c1707 100644 --- a/src/routes/v1/query_v1_by_api.ts +++ b/src/routes/v1/query_v1_by_api.ts @@ -19,7 +19,7 @@ import { Express, NextFunction, Request, RequestHandler, Response } from "expres class V1QueryByAPI implements BteRoute { setRoutes(app: Express) { app - .route("/v1/smartapi/:smartapi_id/query") + .route("/v1/smartapi/:smartAPIID/query") .post(swaggerValidation.validate, (async (req: Request, res: Response, next: NextFunction) => { try { const response = await runTask(req, res, path.parse(__filename).name); diff --git a/src/routes/v1/query_v1_by_team.ts b/src/routes/v1/query_v1_by_team.ts index 7192925..129078e 100644 --- a/src/routes/v1/query_v1_by_team.ts +++ b/src/routes/v1/query_v1_by_team.ts @@ -18,7 +18,7 @@ import { TaskInfo } from "@biothings-explorer/types"; class V1QueryByTeam { setRoutes(app: Express) { app - .route("/v1/team/:team_name/query") + .route("/v1/team/:teamName/query") .post(swaggerValidation.validate, (async (req: Request, res: Response, next: NextFunction) => { try { const response = await runTask(req, res, path.parse(__filename).name); diff --git a/src/types.ts b/src/types.ts index f3f7a9e..947c0a0 100644 --- a/src/types.ts +++ b/src/types.ts @@ -38,7 +38,7 @@ export interface SmartApiOverrideConfig { } export interface SmartApiOverrideList { - [smartapi_id: string]: string; + [smartAPIID: string]: string; } export interface SmartApiOverrides { From 1e3f83d79f5b2e24226b382997d9b7aae4129ef6 Mon Sep 17 00:00:00 2001 From: NeuralFlux <40491005+NeuralFlux@users.noreply.github.com> Date: Thu, 31 Oct 2024 13:16:26 -0400 Subject: [PATCH 2/8] fix: add exclusive r/w lock on smartapi files for consistency --- package.json | 1 + src/controllers/cron/update_local_smartapi.ts | 5 +- src/controllers/meta_knowledge_graph.ts | 64 ++++++++++++++----- src/routes/v1/meta_knowledge_graph_v1.ts | 17 +++-- src/utils/common.ts | 25 ++++++++ 5 files changed, 84 insertions(+), 28 deletions(-) diff --git a/package.json b/package.json index 3d50019..bebfd9e 100644 --- a/package.json +++ b/package.json @@ -114,6 +114,7 @@ "node-cron": "^2.0.3", "npm": "^9.9.0", "piscina": "^3.2.0", + "proper-lockfile": "^4.1.2", "ps-node": "^0.1.6", "snake-case": "^3.0.4", "stream-chunker": "^1.2.8", diff --git a/src/controllers/cron/update_local_smartapi.ts b/src/controllers/cron/update_local_smartapi.ts index 502e806..7294c8f 100644 --- a/src/controllers/cron/update_local_smartapi.ts +++ b/src/controllers/cron/update_local_smartapi.ts @@ -14,6 +14,7 @@ import { SmartApiOverrides } from "../../types"; import apiList from "../../config/api_list"; import MetaKG, { SmartAPISpec } from "@biothings-explorer/smartapi-kg"; import { redisClient } from "@biothings-explorer/utils"; +import { writeFileWithLock } from "../../utils/common"; const userAgent = `BTE/${process.env.NODE_ENV === "production" ? "prod" : "dev"} Node/${process.version} ${ process.platform @@ -325,9 +326,9 @@ async function updateSmartAPISpecs() { delete obj._score; }); - await fs.writeFile(localFilePath, JSON.stringify({ hits: hits })); + await writeFileWithLock(localFilePath, JSON.stringify({ hits: hits })); const predicatesInfo = await getOpsFromPredicatesEndpoints(res.data.hits); - await fs.writeFile(predicatesFilePath, JSON.stringify(predicatesInfo)); + await writeFileWithLock(predicatesFilePath, JSON.stringify(predicatesInfo)); // Create a new metakg const metakg = new MetaKG(); diff --git a/src/controllers/meta_knowledge_graph.ts b/src/controllers/meta_knowledge_graph.ts index 7affdc5..e61838b 100644 --- a/src/controllers/meta_knowledge_graph.ts +++ b/src/controllers/meta_knowledge_graph.ts @@ -1,10 +1,12 @@ import meta_kg, { KGQualifiersObject } from "@biothings-explorer/smartapi-kg"; import { snakeCase } from "snake-case"; +import lockfile from "proper-lockfile"; import path from "path"; import PredicatesLoadingError from "../utils/errors/predicates_error"; const debug = require("debug")("bte:biothings-explorer-trapi:metakg"); import apiList from "../config/api_list"; import { supportedLookups } from "@biothings-explorer/query_graph_handler"; +import MetaKG from "@biothings-explorer/smartapi-kg"; interface PredicateInfo { predicate: string; @@ -31,25 +33,50 @@ export default class MetaKnowledgeGraphHandler { const smartapi_specs = path.resolve(__dirname, "../../data/smartapi_specs.json"); const predicates = path.resolve(__dirname, "../../data/predicates.json"); const kg = new meta_kg(smartapi_specs, predicates); + try { - if (smartAPIID !== undefined) { - debug(`Constructing with SmartAPI ID ${smartAPIID}`); - kg.constructMetaKGSync(false, { apiList, smartAPIID: smartAPIID }); - } else if (teamName !== undefined) { - debug(`Constructing with team ${teamName}`); - kg.constructMetaKGSync(false, { apiList, teamName: teamName }); - } else { - debug(`Constructing with default`); - kg.constructMetaKGSync(true, { apiList }); - } - if (kg.ops.length === 0) { - debug(`Found 0 operations`); - throw new PredicatesLoadingError("Not Found - 0 operations"); + // obtain exclusive lock to avoid cron job updating the file + // NOTE: we trade off some read parallelism for consistency here + const release = await lockfile.lock(smartapi_specs, { + retries: { + retries: 10, + factor: 2, + minTimeout: 100, + maxTimeout: 1000, + }, + stale: 5000, + }); + + try { + if (smartAPIID !== undefined) { + debug(`Constructing with SmartAPI ID ${smartAPIID}`); + kg.constructMetaKGSync(false, { apiList, smartAPIID: smartAPIID }); + } else if (teamName !== undefined) { + debug(`Constructing with team ${teamName}`); + kg.constructMetaKGSync(false, { apiList, teamName: teamName }); + } else { + debug(`Constructing with default`); + kg.constructMetaKGSync(true, { apiList }); + } + if (kg.ops.length === 0) { + debug(`Found 0 operations`); + throw new PredicatesLoadingError("Not Found - 0 operations"); + } + return kg; + } catch (error) { + debug(`ERROR getting graph with ID:${smartAPIID} team:${teamName} because ${error}`); + throw new PredicatesLoadingError(`Failed to Load MetaKG: ${error}`); + } finally { + await release(); } - return kg; } catch (error) { - debug(`ERROR getting graph with ID:${smartAPIID} team:${teamName} because ${error}`); - throw new PredicatesLoadingError(`Failed to Load MetaKG: ${error}`); + if (error instanceof PredicatesLoadingError) { + throw error; + } + else { + debug(`ERROR locking file because ${error}.`); + throw new PredicatesLoadingError(`Failed to Lock File: ${error}`); + } } } @@ -86,10 +113,13 @@ export default class MetaKnowledgeGraphHandler { } async getKG( + metakg: MetaKG = undefined, smartAPIID: string = this.smartAPIID, teamName: string = this.teamName, ): Promise<{ nodes: {}; edges: any[] }> { - const kg = await this._loadMetaKG(smartAPIID, teamName); + // read metakg from files if not globally defined + const kg = metakg ?? await this._loadMetaKG(smartAPIID, teamName); + let knowledge_graph = { nodes: {}, edges: [], diff --git a/src/routes/v1/meta_knowledge_graph_v1.ts b/src/routes/v1/meta_knowledge_graph_v1.ts index 947e98c..204729a 100644 --- a/src/routes/v1/meta_knowledge_graph_v1.ts +++ b/src/routes/v1/meta_knowledge_graph_v1.ts @@ -5,6 +5,8 @@ import * as utils from "../../utils/common"; import { runTask, taskResponse, taskError } from "../../controllers/threading/threadHandler"; import { Express, NextFunction, Request, Response, RequestHandler } from "express"; +import MetaKnowledgeGraph from "@biothings-explorer/smartapi-kg"; + class MetaKG { setRoutes(app: Express) { app @@ -23,15 +25,12 @@ class MetaKG { async task(taskInfo: TaskInfo) { try { - let kg = undefined; - - // read metakg from files if not globally defined - if(!taskInfo.data.options.metakg) { - const metaKGHandler = new handler(undefined); - kg = await metaKGHandler.getKG(); - } else { - kg = taskInfo.data.options.metakg; - } + const metaKGHandler = new handler(undefined); + let metakg = undefined; + // initialize MetaKG only if ops are provided because handler logic is built upon that + if (taskInfo.data.options.metakg_ops !== undefined) + metakg = new MetaKnowledgeGraph(undefined, undefined, taskInfo.data.options.metakg_ops); + const kg = await metaKGHandler.getKG(metakg); // response.logs = utils.filterForLogLevel(response.logs, options.logLevel); return taskResponse(kg); } catch (error) { diff --git a/src/utils/common.ts b/src/utils/common.ts index 9a3c500..3607bce 100644 --- a/src/utils/common.ts +++ b/src/utils/common.ts @@ -2,6 +2,7 @@ import WorkflowError from "./errors/workflow_error"; import { URL } from "url"; import yaml2json from "js-yaml"; import fs from "fs/promises"; +import * as lockfile from 'proper-lockfile'; import path from "path"; import { TrapiLog, TrapiSchema, TrapiWorkflow } from "@biothings-explorer/types"; import { NextFunction, Request, Response } from "express"; @@ -64,3 +65,27 @@ export function filterForLogLevel(logs: TrapiLog[], logLevel: string) { export function methodNotAllowed(_req: Request, res: Response, _next: NextFunction) { res.status(405).send(); } + +export async function writeFileWithLock(filePath: string, data: string) { + let release: (() => Promise) | undefined; + + try { + release = await lockfile.lock(filePath, { + retries: { + retries: 10, // number of retry attempts + factor: 2, // exponential backoff factor + minTimeout: 100, // initial retry delay in milliseconds + maxTimeout: 1000 // maximum retry delay in milliseconds + }, + stale: 5000 // lock expiration in milliseconds to prevent deadlocks + }); + + await fs.writeFile(filePath, data); + } catch (error) { + // console.error("Failed to write file:", error); + } finally { + if (release) { + await release(); + } + } +} \ No newline at end of file From 6518716b49ecf144c80bbe0f1cc7caa2e7ceac49 Mon Sep 17 00:00:00 2001 From: NeuralFlux <40491005+NeuralFlux@users.noreply.github.com> Date: Thu, 31 Oct 2024 13:19:23 -0400 Subject: [PATCH 3/8] fix: update team and api metakg endpoints to use globally loaded graph in async fashion --- src/routes/index.ts | 4 +-- .../v1/meta_knowledge_graph_v1_by_api.ts | 27 ++++++++++++++++--- .../v1/meta_knowledge_graph_v1_by_team.ts | 25 ++++++++++++++--- 3 files changed, 47 insertions(+), 9 deletions(-) diff --git a/src/routes/index.ts b/src/routes/index.ts index ffa0673..80c9e6e 100644 --- a/src/routes/index.ts +++ b/src/routes/index.ts @@ -52,9 +52,9 @@ export const tasks: TaskByRoute = { asyncquery_v1_by_team: (taskInfo: TaskInfo) => V1AsyncQueryByTeam.task(taskInfo), // load MetaKG from global meta_knowledge_graph_v1: (taskInfo: TaskInfo) => V1MetaKG.task(taskInfo), + meta_knowledge_graph_v1_by_team: (taskInfo: TaskInfo) => V1MetaKGByTeam.task(taskInfo), + meta_knowledge_graph_v1_by_api: (taskInfo: TaskInfo) => V1MetaKGByAPI.task(taskInfo), // Not threaded due to being lightweight/speed being higher priority // performance: (taskInfo: TaskInfo) => Performance.task(taskInfo), // metakg: (taskInfo: TaskInfo) => MetaKG.task(taskInfo), - // meta_knowledge_graph_v1_by_api: (taskInfo: TaskInfo) => V1MetaKGByAPI.task(taskInfo), - // meta_knowledge_graph_v1_by_team: (taskInfo: TaskInfo) => V1MetaKGByTeam.task(taskInfo), }; diff --git a/src/routes/v1/meta_knowledge_graph_v1_by_api.ts b/src/routes/v1/meta_knowledge_graph_v1_by_api.ts index ea6e165..0e25c13 100644 --- a/src/routes/v1/meta_knowledge_graph_v1_by_api.ts +++ b/src/routes/v1/meta_knowledge_graph_v1_by_api.ts @@ -1,23 +1,42 @@ import handler from "../../controllers/meta_knowledge_graph"; import * as utils from "../../utils/common"; +import path from "path"; +import { TaskInfo } from "@biothings-explorer/types"; +import { runTask, taskResponse, taskError } from "../../controllers/threading/threadHandler"; import { Express, NextFunction, Request, Response, RequestHandler } from "express"; +import MetaKnowledgeGraph from "@biothings-explorer/smartapi-kg"; + class MetaKGByAPI { setRoutes(app: Express) { app - .route("/v1/smartapi/:smartapiID/meta_knowledge_graph") + .route("/v1/smartapi/:smartAPIID/meta_knowledge_graph") .get((async (req: Request, res: Response, next: NextFunction) => { try { - const metaKGHandler = new handler(req.params.smartapiID); - const kg = await metaKGHandler.getKG(); + const response = await runTask(req, res, path.parse(__filename).name); res.setHeader("Content-Type", "application/json"); - res.end(JSON.stringify(kg)); + res.end(JSON.stringify(response)); } catch (error) { next(error); } })as RequestHandler) .all(utils.methodNotAllowed); } + + async task(taskInfo: TaskInfo) { + try { + const metaKGHandler = new handler(taskInfo.data.smartAPIID, undefined); + let metakg = undefined; + // initialize MetaKG only if ops are provided because handler logic is built upon that + if (taskInfo.data.options.metakg_ops !== undefined) + metakg = new MetaKnowledgeGraph(undefined, undefined, taskInfo.data.options.metakg_ops); + const kg = await metaKGHandler.getKG(metakg); + // response.logs = utils.filterForLogLevel(response.logs, options.logLevel); + return taskResponse(kg); + } catch (error) { + taskError(error as Error); + } + } } export default new MetaKGByAPI(); diff --git a/src/routes/v1/meta_knowledge_graph_v1_by_team.ts b/src/routes/v1/meta_knowledge_graph_v1_by_team.ts index 03e9061..3d3f90e 100644 --- a/src/routes/v1/meta_knowledge_graph_v1_by_team.ts +++ b/src/routes/v1/meta_knowledge_graph_v1_by_team.ts @@ -1,23 +1,42 @@ import handler from "../../controllers/meta_knowledge_graph"; +import path from "path"; +import { TaskInfo } from "@biothings-explorer/types"; import * as utils from "../../utils/common"; +import { runTask, taskResponse, taskError } from "../../controllers/threading/threadHandler"; import { Express, NextFunction, Request, Response, RequestHandler } from "express"; +import MetaKnowledgeGraph from "@biothings-explorer/smartapi-kg"; + class MetaKGByTeam { setRoutes(app: Express) { app .route("/v1/team/:teamName/meta_knowledge_graph") .get((async (req: Request, res: Response, next: NextFunction) => { try { - const metaKGHandler = new handler(undefined, req.params.teamName); - const kg = await metaKGHandler.getKG(); + const response = await runTask(req, res, path.parse(__filename).name); res.setHeader("Content-Type", "application/json"); - res.end(JSON.stringify(kg)); + res.end(JSON.stringify(response)); } catch (error) { next(error); } }) as RequestHandler) .all(utils.methodNotAllowed); } + + async task(taskInfo: TaskInfo) { + try { + const metaKGHandler = new handler(undefined, taskInfo.data.teamName); + let metakg = undefined; + // initialize MetaKG only if ops are provided because handler logic is built upon that + if (taskInfo.data.options.metakg_ops !== undefined) + metakg = new MetaKnowledgeGraph(undefined, undefined, taskInfo.data.options.metakg_ops); + const kg = await metaKGHandler.getKG(metakg); + // response.logs = utils.filterForLogLevel(response.logs, options.logLevel); + return taskResponse(kg); + } catch (error) { + taskError(error as Error); + } + } } export default new MetaKGByTeam(); From 88bf8ce0a3d84cafa040cb0341cd188b7520feb2 Mon Sep 17 00:00:00 2001 From: NeuralFlux <40491005+NeuralFlux@users.noreply.github.com> Date: Tue, 5 Nov 2024 12:46:51 -0500 Subject: [PATCH 4/8] chore: refactor file locking to utils package --- src/controllers/meta_knowledge_graph.ts | 56 +++++++------------------ src/utils/common.ts | 25 +---------- 2 files changed, 17 insertions(+), 64 deletions(-) diff --git a/src/controllers/meta_knowledge_graph.ts b/src/controllers/meta_knowledge_graph.ts index e61838b..b3cd593 100644 --- a/src/controllers/meta_knowledge_graph.ts +++ b/src/controllers/meta_knowledge_graph.ts @@ -35,48 +35,24 @@ export default class MetaKnowledgeGraphHandler { const kg = new meta_kg(smartapi_specs, predicates); try { - // obtain exclusive lock to avoid cron job updating the file - // NOTE: we trade off some read parallelism for consistency here - const release = await lockfile.lock(smartapi_specs, { - retries: { - retries: 10, - factor: 2, - minTimeout: 100, - maxTimeout: 1000, - }, - stale: 5000, - }); - - try { - if (smartAPIID !== undefined) { - debug(`Constructing with SmartAPI ID ${smartAPIID}`); - kg.constructMetaKGSync(false, { apiList, smartAPIID: smartAPIID }); - } else if (teamName !== undefined) { - debug(`Constructing with team ${teamName}`); - kg.constructMetaKGSync(false, { apiList, teamName: teamName }); - } else { - debug(`Constructing with default`); - kg.constructMetaKGSync(true, { apiList }); - } - if (kg.ops.length === 0) { - debug(`Found 0 operations`); - throw new PredicatesLoadingError("Not Found - 0 operations"); - } - return kg; - } catch (error) { - debug(`ERROR getting graph with ID:${smartAPIID} team:${teamName} because ${error}`); - throw new PredicatesLoadingError(`Failed to Load MetaKG: ${error}`); - } finally { - await release(); - } - } catch (error) { - if (error instanceof PredicatesLoadingError) { - throw error; + if (smartAPIID !== undefined) { + debug(`Constructing with SmartAPI ID ${smartAPIID}`); + kg.constructMetaKGSync(false, { apiList, smartAPIID: smartAPIID }); + } else if (teamName !== undefined) { + debug(`Constructing with team ${teamName}`); + kg.constructMetaKGSync(false, { apiList, teamName: teamName }); + } else { + debug(`Constructing with default`); + kg.constructMetaKGSync(true, { apiList }); } - else { - debug(`ERROR locking file because ${error}.`); - throw new PredicatesLoadingError(`Failed to Lock File: ${error}`); + if (kg.ops.length === 0) { + debug(`Found 0 operations`); + throw new PredicatesLoadingError("Not Found - 0 operations"); } + return kg; + } catch (error) { + debug(`ERROR getting graph with ID:${smartAPIID} team:${teamName} because ${error}`); + throw new PredicatesLoadingError(`Failed to Load MetaKG: ${error}`); } } diff --git a/src/utils/common.ts b/src/utils/common.ts index 3607bce..a4a24ae 100644 --- a/src/utils/common.ts +++ b/src/utils/common.ts @@ -6,6 +6,7 @@ import * as lockfile from 'proper-lockfile'; import path from "path"; import { TrapiLog, TrapiSchema, TrapiWorkflow } from "@biothings-explorer/types"; import { NextFunction, Request, Response } from "express"; +import { LOCKFILE_RETRY_CONFIG } from "@biothings-explorer/utils"; const schema: unknown[] = []; @@ -64,28 +65,4 @@ export function filterForLogLevel(logs: TrapiLog[], logLevel: string) { export function methodNotAllowed(_req: Request, res: Response, _next: NextFunction) { res.status(405).send(); -} - -export async function writeFileWithLock(filePath: string, data: string) { - let release: (() => Promise) | undefined; - - try { - release = await lockfile.lock(filePath, { - retries: { - retries: 10, // number of retry attempts - factor: 2, // exponential backoff factor - minTimeout: 100, // initial retry delay in milliseconds - maxTimeout: 1000 // maximum retry delay in milliseconds - }, - stale: 5000 // lock expiration in milliseconds to prevent deadlocks - }); - - await fs.writeFile(filePath, data); - } catch (error) { - // console.error("Failed to write file:", error); - } finally { - if (release) { - await release(); - } - } } \ No newline at end of file From a4992a3f646b42b2663eccccc261080915c2e6c3 Mon Sep 17 00:00:00 2001 From: NeuralFlux <40491005+NeuralFlux@users.noreply.github.com> Date: Tue, 5 Nov 2024 12:47:41 -0500 Subject: [PATCH 5/8] fix: add read-only syncing for processes that do not write to smartapi specs --- src/controllers/cron/update_local_smartapi.ts | 67 ++++++++++++++++--- 1 file changed, 57 insertions(+), 10 deletions(-) diff --git a/src/controllers/cron/update_local_smartapi.ts b/src/controllers/cron/update_local_smartapi.ts index 7294c8f..825c8d0 100644 --- a/src/controllers/cron/update_local_smartapi.ts +++ b/src/controllers/cron/update_local_smartapi.ts @@ -12,9 +12,9 @@ import SMARTAPI_EXCLUSIONS from "../../config/smartapi_exclusions"; import getSmartApiOverrideConfig from "../../config/smartapi_overrides"; import { SmartApiOverrides } from "../../types"; import apiList from "../../config/api_list"; -import MetaKG, { SmartAPISpec } from "@biothings-explorer/smartapi-kg"; -import { redisClient } from "@biothings-explorer/utils"; -import { writeFileWithLock } from "../../utils/common"; +import MetaKG from "@biothings-explorer/smartapi-kg"; +import { lockWithActionAsync, redisClient } from "@biothings-explorer/utils"; +import { setTimeout } from "timers/promises"; const userAgent = `BTE/${process.env.NODE_ENV === "production" ? "prod" : "dev"} Node/${process.version} ${ process.platform @@ -326,17 +326,42 @@ async function updateSmartAPISpecs() { delete obj._score; }); - await writeFileWithLock(localFilePath, JSON.stringify({ hits: hits })); + await lockWithActionAsync(localFilePath, async () => { + await fs.writeFile(localFilePath, JSON.stringify({ hits: hits })); + }, debug) + const predicatesInfo = await getOpsFromPredicatesEndpoints(res.data.hits); - await writeFileWithLock(predicatesFilePath, JSON.stringify(predicatesInfo)); + await lockWithActionAsync(predicatesFilePath, async () => { + await fs.writeFile(predicatesFilePath, JSON.stringify(predicatesInfo)); + }, debug); // Create a new metakg const metakg = new MetaKG(); metakg.constructMetaKGSync(true, { predicates: predicatesInfo, smartapiSpecs: { hits: hits as any }, apiList }); global.metakg = metakg; - global.smartapi = { hits }; + global.smartapi = { hits }; // hits is an array, but smartapi must be a dict }; +async function loadGlobalMetaKGReadOnly() { + await setTimeout(30000); + const localFilePath = path.resolve(__dirname, "../../../data/smartapi_specs.json"); + const predicatesFilePath = path.resolve(__dirname, "../../../data/predicates.json"); + + const metakg = new MetaKG(localFilePath, predicatesFilePath); + metakg.constructMetaKGSync(true, { apiList }); + global.metakg = metakg; + + global.smartapi = await lockWithActionAsync( + localFilePath, + async () => { + const file = await fs.readFile(localFilePath, 'utf-8'); + const hits = JSON.parse(file); + return hits; + }, + debug + ); +} + async function getAPIOverrides(data: { total?: number; hits: any }, overrides: SmartApiOverrides) { // if only_overrides is enabled, only overridden apis are used if (overrides.config.only_overrides) { @@ -423,12 +448,35 @@ export default function manageSmartApi() { process.env.INSTANCE_ID && process.env.INSTANCE_ID === "0", // Only one PM2 cluster instance should sync ].every(condition => condition); + /* + We schedule 2 cron jobs, one for non-syncing processes and one for the syncing process. + The non-syncing processes will only read from the local copy of the SmartAPI specs + after a 30 second timeout each time. + Whereas the syncing process will update the local copy of the SmartAPI specs. + We also run them once initially. + */ if (!should_sync) { - debug(`SmartAPI sync disabled, server process ${process.pid} disabling smartapi updates.`); + debug(`Server process ${process.pid} disabling smartapi updates. SmartAPI files will be read from but not written to.`); + cron.schedule("*/10 * * * *", async () => { + debug(`Reading from SmartAPI specs now at ${new Date().toUTCString()}!`); + try { + await loadGlobalMetaKGReadOnly(); + debug("Reading local copy of SmartAPI specs successful."); + } catch (err) { + debug(`Reading local copy of SmartAPI specs failed! The error message is ${err.toString()}`); + } + }); + + loadGlobalMetaKGReadOnly() + .then(() => { + debug("Reading local copy of SmartAPI specs successful."); + }) + .catch(err => { + debug(`Reading local copy of SmartAPI specs failed! The error message is ${err.toString()}`); + }); return; } - // Otherwise, schedule sync! cron.schedule("*/10 * * * *", async () => { debug(`Updating local copy of SmartAPI specs now at ${new Date().toUTCString()}!`); try { @@ -439,7 +487,6 @@ export default function manageSmartApi() { } }); - // Run at start once debug(`Running initial update of SmartAPI specs now at ${new Date().toUTCString()}`); updateSmartAPISpecs() .then(() => { @@ -448,4 +495,4 @@ export default function manageSmartApi() { .catch(err => { debug(`Updating local copy of SmartAPI specs failed! The error message is ${err.toString()}`); }); -} +} \ No newline at end of file From f2f6f54bf74cafeb50a938ae013593ad1ebba51e Mon Sep 17 00:00:00 2001 From: NeuralFlux <40491005+NeuralFlux@users.noreply.github.com> Date: Tue, 5 Nov 2024 17:32:26 -0500 Subject: [PATCH 6/8] fix: make association func async to support file locking --- .../integration/controllers/association.test.ts | 16 ++++++++-------- src/controllers/association.ts | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/__test__/integration/controllers/association.test.ts b/__test__/integration/controllers/association.test.ts index 0b57c09..df47a02 100644 --- a/__test__/integration/controllers/association.test.ts +++ b/__test__/integration/controllers/association.test.ts @@ -2,26 +2,26 @@ import assoc from "../../../src/controllers/association"; describe("Test association module", () => { test("By default, should return all associations", async () => { - const res = assoc(); + const res = await assoc(); expect(res.length).toBeGreaterThan(10); expect(res[0]).toHaveProperty("subject"); expect(res[0]).toHaveProperty("api"); }); test("If sub specified, should only return associations related to the sub", async () => { - const res = assoc("Gene"); + const res = await assoc("Gene"); const inputTypes = new Set(res.map(item => item.subject)); expect(Array.from(inputTypes)).toHaveLength(1); expect(Array.from(inputTypes)).toEqual(["Gene"]); }); test("If invalid sub specified, should only empty list", async () => { - const res = assoc("Gene1"); + const res = await assoc("Gene1"); expect(res).toEqual([]); }); test("If obj specified, should only return associations related to the obj", async () => { - const res = assoc(undefined, "SmallMolecule"); + const res = await assoc(undefined, "SmallMolecule"); const outputTypes = new Set(res.map(item => item.object)); const inputTypes = new Set(res.map(item => item.subject)); expect(inputTypes.size).toBeGreaterThan(1); @@ -30,7 +30,7 @@ describe("Test association module", () => { }); test("If pred specified, should only return associations related to the pred", async () => { - const res = assoc(undefined, undefined, "treats"); + const res = await assoc(undefined, undefined, "treats"); const preds = new Set(res.map(item => item.predicate)); const inputTypes = new Set(res.map(item => item.subject)); expect(inputTypes.size).toBeGreaterThan(1); @@ -39,7 +39,7 @@ describe("Test association module", () => { }); test("If api specified, should only return associations related to the api", async () => { - const res = assoc(undefined, undefined, undefined, undefined, "MyGene.info API"); + const res = await assoc(undefined, undefined, undefined, undefined, "MyGene.info API"); const apis = new Set(res.map(item => item.api.name)); const inputTypes = new Set(res.map(item => item.subject)); expect(inputTypes.size).toBeGreaterThan(1); @@ -48,7 +48,7 @@ describe("Test association module", () => { }); test("If source specified, should only return associations related to the source", async () => { - const res = assoc(undefined, undefined, undefined, undefined, undefined, "infores:disgenet"); + const res = await assoc(undefined, undefined, undefined, undefined, undefined, "infores:disgenet"); const sources = new Set(res.map(item => item.provided_by)); const inputTypes = new Set(res.map(item => item.subject)); expect(inputTypes.size).toBeGreaterThan(1); @@ -57,7 +57,7 @@ describe("Test association module", () => { }); test("If both sub and obj specified, should only return associations related to both sub and obj", async () => { - const res = assoc("Gene", "SmallMolecule"); + const res = await assoc("Gene", "SmallMolecule"); const outputTypes = new Set(res.map(item => item.object)); const inputTypes = new Set(res.map(item => item.subject)); expect(Array.from(inputTypes)).toHaveLength(1); diff --git a/src/controllers/association.ts b/src/controllers/association.ts index 1c206a7..0b3deff 100644 --- a/src/controllers/association.ts +++ b/src/controllers/association.ts @@ -22,21 +22,21 @@ export interface AssocResult { }; } -export default function ( +export default async function ( sub: string = undefined, obj: string = undefined, pred: string = undefined, component: string = undefined, api: string = undefined, source: string = undefined, -): AssocResult[] { +): Promise { const smartapi_specs = path.resolve(__dirname, "../../data/smartapi_specs.json"); debug(`smartapi specs loaded: ${smartapi_specs}`); const predicates = path.resolve(__dirname, "../../data/predicates.json"); debug(`predicates endpoints loaded, ${predicates}`); const kg = new meta_kg(smartapi_specs, predicates); debug("metakg initialized"); - kg.constructMetaKGSync(true, {}); + await kg.constructMetaKGWithFileLock(true, {}); debug(`metakg loaded: ${kg.ops.length} ops`); const associations: AssocResult[] = []; const filtered_res = kg.filter({ From 7745a7a3683f43845f4c274e5f3396510a4e22ac Mon Sep 17 00:00:00 2001 From: NeuralFlux <40491005+NeuralFlux@users.noreply.github.com> Date: Tue, 5 Nov 2024 17:33:34 -0500 Subject: [PATCH 7/8] fix: make sync kg construction support async file locks --- src/controllers/cron/update_local_smartapi.ts | 10 +++++----- src/controllers/meta_knowledge_graph.ts | 6 +++--- src/routes/metakg.ts | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/controllers/cron/update_local_smartapi.ts b/src/controllers/cron/update_local_smartapi.ts index 825c8d0..e65d033 100644 --- a/src/controllers/cron/update_local_smartapi.ts +++ b/src/controllers/cron/update_local_smartapi.ts @@ -326,18 +326,18 @@ async function updateSmartAPISpecs() { delete obj._score; }); - await lockWithActionAsync(localFilePath, async () => { + await lockWithActionAsync([localFilePath], async () => { await fs.writeFile(localFilePath, JSON.stringify({ hits: hits })); }, debug) const predicatesInfo = await getOpsFromPredicatesEndpoints(res.data.hits); - await lockWithActionAsync(predicatesFilePath, async () => { + await lockWithActionAsync([predicatesFilePath], async () => { await fs.writeFile(predicatesFilePath, JSON.stringify(predicatesInfo)); }, debug); // Create a new metakg const metakg = new MetaKG(); - metakg.constructMetaKGSync(true, { predicates: predicatesInfo, smartapiSpecs: { hits: hits as any }, apiList }); + await metakg.constructMetaKGWithFileLock(true, { predicates: predicatesInfo, smartapiSpecs: { hits: hits as any }, apiList }); global.metakg = metakg; global.smartapi = { hits }; // hits is an array, but smartapi must be a dict }; @@ -348,11 +348,11 @@ async function loadGlobalMetaKGReadOnly() { const predicatesFilePath = path.resolve(__dirname, "../../../data/predicates.json"); const metakg = new MetaKG(localFilePath, predicatesFilePath); - metakg.constructMetaKGSync(true, { apiList }); + await metakg.constructMetaKGWithFileLock(true, { apiList }); global.metakg = metakg; global.smartapi = await lockWithActionAsync( - localFilePath, + [localFilePath], async () => { const file = await fs.readFile(localFilePath, 'utf-8'); const hits = JSON.parse(file); diff --git a/src/controllers/meta_knowledge_graph.ts b/src/controllers/meta_knowledge_graph.ts index b3cd593..dd5eae4 100644 --- a/src/controllers/meta_knowledge_graph.ts +++ b/src/controllers/meta_knowledge_graph.ts @@ -37,13 +37,13 @@ export default class MetaKnowledgeGraphHandler { try { if (smartAPIID !== undefined) { debug(`Constructing with SmartAPI ID ${smartAPIID}`); - kg.constructMetaKGSync(false, { apiList, smartAPIID: smartAPIID }); + await kg.constructMetaKGWithFileLock(false, { apiList, smartAPIID: smartAPIID }); } else if (teamName !== undefined) { debug(`Constructing with team ${teamName}`); - kg.constructMetaKGSync(false, { apiList, teamName: teamName }); + await kg.constructMetaKGWithFileLock(false, { apiList, teamName: teamName }); } else { debug(`Constructing with default`); - kg.constructMetaKGSync(true, { apiList }); + await kg.constructMetaKGWithFileLock(true, { apiList }); } if (kg.ops.length === 0) { debug(`Found 0 operations`); diff --git a/src/routes/metakg.ts b/src/routes/metakg.ts index 5e49edd..22e06b6 100644 --- a/src/routes/metakg.ts +++ b/src/routes/metakg.ts @@ -18,7 +18,7 @@ class MetaKG { if (req.query.provided_by !== undefined) { source = utils.removeQuotesFromQuery(req.query.provided_by as string); } - const assocs = assoc( + const assocs = await assoc( req.query.subject as string, req.query.object as string, req.query.predicate as string, From d9bd79d4a1adb06b378e41cb82629914197dc3e7 Mon Sep 17 00:00:00 2001 From: NeuralFlux <40491005+NeuralFlux@users.noreply.github.com> Date: Thu, 7 Nov 2024 16:06:43 -0500 Subject: [PATCH 8/8] chore: rename metakg_ops back to original --- src/routes/v1/meta_knowledge_graph_v1.ts | 4 ++-- src/routes/v1/meta_knowledge_graph_v1_by_api.ts | 4 ++-- src/routes/v1/meta_knowledge_graph_v1_by_team.ts | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/routes/v1/meta_knowledge_graph_v1.ts b/src/routes/v1/meta_knowledge_graph_v1.ts index 204729a..a3e3350 100644 --- a/src/routes/v1/meta_knowledge_graph_v1.ts +++ b/src/routes/v1/meta_knowledge_graph_v1.ts @@ -28,8 +28,8 @@ class MetaKG { const metaKGHandler = new handler(undefined); let metakg = undefined; // initialize MetaKG only if ops are provided because handler logic is built upon that - if (taskInfo.data.options.metakg_ops !== undefined) - metakg = new MetaKnowledgeGraph(undefined, undefined, taskInfo.data.options.metakg_ops); + if (taskInfo.data.options.metakg !== undefined) + metakg = new MetaKnowledgeGraph(undefined, undefined, taskInfo.data.options.metakg); const kg = await metaKGHandler.getKG(metakg); // response.logs = utils.filterForLogLevel(response.logs, options.logLevel); return taskResponse(kg); diff --git a/src/routes/v1/meta_knowledge_graph_v1_by_api.ts b/src/routes/v1/meta_knowledge_graph_v1_by_api.ts index 0e25c13..4f5b5e3 100644 --- a/src/routes/v1/meta_knowledge_graph_v1_by_api.ts +++ b/src/routes/v1/meta_knowledge_graph_v1_by_api.ts @@ -28,8 +28,8 @@ class MetaKGByAPI { const metaKGHandler = new handler(taskInfo.data.smartAPIID, undefined); let metakg = undefined; // initialize MetaKG only if ops are provided because handler logic is built upon that - if (taskInfo.data.options.metakg_ops !== undefined) - metakg = new MetaKnowledgeGraph(undefined, undefined, taskInfo.data.options.metakg_ops); + if (taskInfo.data.options.metakg !== undefined) + metakg = new MetaKnowledgeGraph(undefined, undefined, taskInfo.data.options.metakg); const kg = await metaKGHandler.getKG(metakg); // response.logs = utils.filterForLogLevel(response.logs, options.logLevel); return taskResponse(kg); diff --git a/src/routes/v1/meta_knowledge_graph_v1_by_team.ts b/src/routes/v1/meta_knowledge_graph_v1_by_team.ts index 3d3f90e..a800d75 100644 --- a/src/routes/v1/meta_knowledge_graph_v1_by_team.ts +++ b/src/routes/v1/meta_knowledge_graph_v1_by_team.ts @@ -28,8 +28,8 @@ class MetaKGByTeam { const metaKGHandler = new handler(undefined, taskInfo.data.teamName); let metakg = undefined; // initialize MetaKG only if ops are provided because handler logic is built upon that - if (taskInfo.data.options.metakg_ops !== undefined) - metakg = new MetaKnowledgeGraph(undefined, undefined, taskInfo.data.options.metakg_ops); + if (taskInfo.data.options.metakg !== undefined) + metakg = new MetaKnowledgeGraph(undefined, undefined, taskInfo.data.options.metakg); const kg = await metaKGHandler.getKG(metakg); // response.logs = utils.filterForLogLevel(response.logs, options.logLevel); return taskResponse(kg);