Skip to content

Commit

Permalink
Merge branch 'otel' of https://github.com/biothings/bte-server into otel
Browse files Browse the repository at this point in the history
  • Loading branch information
rjawesome committed Dec 30, 2023
2 parents 110c407 + d7ec5aa commit b45ba6b
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 76 deletions.
2 changes: 1 addition & 1 deletion data/predicates.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion data/smartapi_specs.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
"@bull-board/express": "^5.9.1",
"@opentelemetry/api": "^1.7.0",
"@opentelemetry/auto-instrumentations-node": "^0.40.2",
"@opentelemetry/exporter-jaeger": "^1.19.0",
"@opentelemetry/exporter-metrics-otlp-proto": "^0.45.1",
"@opentelemetry/exporter-trace-otlp-proto": "^0.45.1",
"@opentelemetry/resources": "^1.18.1",
Expand Down
5 changes: 5 additions & 0 deletions src/config/apis.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ exports.API_LIST = {
name: "CTD API",
primarySource: true,
},
{
id: "326eb1e437303bee27d3cef29227125d",
name: "Complex Portal Web Service",
primarySource: true
},
{
id: "43af91b3d7cae43591083bff9d75c6dd",
name: "EBI Proteins API",
Expand Down
4 changes: 3 additions & 1 deletion src/config/smartapi_overrides.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"apis": {
"671b45c0301c8624abbd26ae78449ca2": "https://raw.githubusercontent.com/NCATS-Tangerine/translator-api-registry/orphanet-change/mydisease.info/smartapi.yaml",
"b772ebfbfa536bba37764d7fddb11d6f": "https://raw.githubusercontent.com/NCATS-Tangerine/translator-api-registry/orphanet-change/ncats_rare_source/smartapi.yaml",
"0212611d1c670f9107baf00b77f0889a": "https://raw.githubusercontent.com/NCATS-Tangerine/translator-api-registry/ctd-batch-query/CTD/smartapi.yaml"
"326eb1e437303bee27d3cef29227125d": "https://raw.githubusercontent.com/NCATS-Tangerine/translator-api-registry/orphanet-change/complexportal/smartapi.yaml",
"0212611d1c670f9107baf00b77f0889a": "https://raw.githubusercontent.com/NCATS-Tangerine/translator-api-registry/ctd-batch-query/CTD/smartapi.yaml",
"8f08d1446e0bb9c2b323713ce83e2bd3": "https://raw.githubusercontent.com/NCATS-Tangerine/translator-api-registry/mychem-fda-orphan-edit/mychem.info/openapi_full.yml"
}
}
4 changes: 2 additions & 2 deletions src/controllers/async/asyncquery.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ exports.getQueryResponse = async (jobID, logLevel = null) => {
);
const response = Object.fromEntries(values);
if (response.logs && logLevel) {
utils.filterForLogLevel(response, logLevel);
response.logs = utils.filterForLogLevel(response.logs, logLevel);
} else if (response.logs && originalLogLevel) {
utils.filterForLogLevel(response, originalLogLevel);
response.logs = utils.filterForLogLevel(response.logs, originalLogLevel);
}
return response ? response : undefined;
});
Expand Down
10 changes: 4 additions & 6 deletions src/controllers/opentelemetry.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
const opentelemetry = require("@opentelemetry/sdk-node");
const { getNodeAutoInstrumentations } = require("@opentelemetry/auto-instrumentations-node");
const { OTLPTraceExporter } = require("@opentelemetry/exporter-trace-otlp-proto");
const { OTLPMetricExporter } = require("@opentelemetry/exporter-metrics-otlp-proto");
const { PeriodicExportingMetricReader, ConsoleMetricExporter } = require("@opentelemetry/sdk-metrics");
const { Resource } = require("@opentelemetry/resources");
const { ConsoleSpanExporter } = require("@opentelemetry/sdk-trace-node");
const { isMainThread } = require('worker_threads');
const Debug = require("debug");
const debug = Debug("bte:biothings-explorer:otel-init");
const { JaegerExporter } = require("@opentelemetry/exporter-jaeger");

debug("Initializing Opentelemetry instrumentation...");
const sdk = new opentelemetry.NodeSDK({
traceExporter: new OTLPTraceExporter({
url: process.env.JAEGER_URL ?? "http://localhost:4318/v1/traces",
traceExporter: new JaegerExporter({
host: process.env.JAEGER_HOST ?? "jaeger-otel-agent.sri",
port: parseInt(process.env.JAEGER_PORT ?? "6832"),
}),
instrumentations: isMainThread ? [getNodeAutoInstrumentations()] : [],
resource: new Resource({
Expand Down
125 changes: 65 additions & 60 deletions src/routes/v1/asyncquery_status.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,76 +31,81 @@ class VCheckQueryStatus {
//logger.info("query /query endpoint")
try {
debug(`checking query status of job ${req.params.id}`);
let by = req.data.options.by;
let job_id = req.params.id;
let jobID = req.params.id;
let queryQueue;
if (redisClient.clientEnabled) {
if (job_id.startsWith("BT_")) {
queryQueue = getQueryQueue("bte_query_queue_by_team");
} else if (job_id.startsWith("BA_")) {
queryQueue = getQueryQueue("bte_query_queue_by_api");
} else {
queryQueue = getQueryQueue("bte_query_queue");
}
if (!redisClient.clientEnabled) {
taskResponse({ error: "Redis service is unavailable" }, 503);
}
if (queryQueue) {
let job = await queryQueue.getJobFromId(job_id);

if (job === null) {
return taskResponse(null, 404);
}
await queryQueue.isReady();
const state = await job.getState();
let logs = await queryQueue.getJobLogs(job_id);
logs = logs.logs.map(log => JSON.parse(log));
let [status, description] = {
// convert to TRAPI states
completed: ["Completed", "The query has finished executing."],
failed: ["Failed", job.failedReason],
delayed: ["Queued", "The query is queued, but has been delayed."],
active: ["Running", "The query is currently being processed."],
waiting: ["Queued", "The query is waiting in the queue."],
paused: ["Queued", "The query is queued, but the queue is temporarily paused."],
stuck: ["Failed", "The query is stuck (if you see this, raise an issue)."],
null: ["Failed", "The query status is unknown, presumed failed (if you see this, raise an issue)."],
}[state];
let progress = job._progress;
if (status === "Failed" && !req.endpoint.includes("asyncquery_response")) {
if (description.includes("Promise timed out")) {
// something might break when calculating process.env.JOB_TIMEOUT so wrap it in try catch
try {
return taskResponse({
job_id,
status,
description: `Job was stopped after exceeding time limit of ${
parseInt(process.env.JOB_TIMEOUT ?? (1000 * 60 * 5).toString()) / 1000
}s`,
logs,
});
} catch (e) {
return taskResponse({ job_id, status, description, logs });
}
}
return taskResponse({ job_id, status, description, logs });
}
if (jobID.startsWith("BT_")) {
queryQueue = getQueryQueue("bte_query_queue_by_team");
} else if (jobID.startsWith("BA_")) {
queryQueue = getQueryQueue("bte_query_queue_by_api");
} else {
queryQueue = getQueryQueue("bte_query_queue");
}

let job = await queryQueue.getJobFromId(jobID);
if (job === null) {
return taskResponse(null, 404);
}

// If done, just give response if using the response_url
if ((state === "completed" || state === "failed") && req.endpoint.includes("asyncquery_response")) {
let returnValue;
const storedResponse = await getQueryResponse(job_id, req.data.options.logLevel);
await queryQueue.isReady();

if (!storedResponse.logs && logs) {
storedResponse.logs = logs;
const state = await job.getState();
let progress = job._progress;

let logs = await queryQueue.getJobLogs(jobID);
logs = logs.logs.map(log => JSON.parse(log));
const originalLogLevel = JSON.parse(await redisClient.client.getTimeout(`asyncQueryResult:logLevel:${jobID}`));
logs = utils.filterForLogLevel(logs, req.data.options.log_level ?? originalLogLevel);

// convert to TRAPI states
let [status, description] = {
completed: ["Completed", "The query has finished executing."],
failed: ["Failed", job.failedReason],
delayed: ["Queued", "The query is queued, but has been delayed."],
active: ["Running", "The query is currently being processed."],
waiting: ["Queued", "The query is waiting in the queue."],
paused: ["Queued", "The query is queued, but the queue is temporarily paused."],
stuck: ["Failed", "The query is stuck (if you see this, raise an issue)."],
null: ["Failed", "The query status is unknown, presumed failed (if you see this, raise an issue)."],
}[state];

if (status === "Failed" && !req.endpoint.includes("asyncquery_response")) {
if (description.includes("Promise timed out")) {
// something might break when calculating process.env.JOB_TIMEOUT so wrap it in try catch
try {
return taskResponse({
job_id: jobID,
status,
description: `Job was stopped after exceeding time limit of ${
parseInt(process.env.JOB_TIMEOUT ?? (1000 * 60 * 5).toString()) / 1000
}s`,
logs,
});
} catch (e) {
return taskResponse({ job_id: jobID, status, description, logs });
}
}
return taskResponse({ job_id: jobID, status, description, logs });
}

returnValue = storedResponse ? storedResponse : { error: "Response expired. Responses are kept 30 days." };
return taskResponse(returnValue, returnValue.statusCode || 200);
// If done, just give response if using asyncquery_response
if ((state === "completed" || state === "failed") && req.endpoint.includes("asyncquery_response")) {
let returnValue;
const storedResponse = await getQueryResponse(jobID, req.data.options.log_level);

if (storedResponse && !storedResponse.logs && logs) {
storedResponse.logs = logs;
}

taskResponse({ job_id, status, progress, description, response_url: job.data.url, logs }, 200);
} else {
taskResponse({ error: "Redis service is unavailable" }, 503);
returnValue = storedResponse ? storedResponse : { error: "Response expired. Responses are kept 30 days." };
return taskResponse(returnValue, returnValue.statusCode || 200);
}

// Otherwise respond for asyncquery_status
taskResponse({ job_id: jobID, status, progress, description, response_url: job.data.url, logs }, 200);
} catch (error) {
taskError(error);
}
Expand Down
2 changes: 1 addition & 1 deletion src/routes/v1/query_v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class V1RouteQuery {
await handler.query();

const response = handler.getResponse();
utils.filterForLogLevel(response, options.logLevel);
response.logs = utils.filterForLogLevel(response.logs, options.logLevel);
return taskResponse(response);
} catch (error) {
return taskError(error);
Expand Down
2 changes: 1 addition & 1 deletion src/routes/v1/query_v1_by_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class RouteQueryV1ByAPI {
handler.setQueryGraph(queryGraph);
await handler.query();
const response = handler.getResponse();
utils.filterForLogLevel(response, options.logLevel);
response.logs = utils.filterForLogLevel(response.logs, options.logLevel);
return taskResponse(response);
} catch (error) {
return taskError(error);
Expand Down
2 changes: 1 addition & 1 deletion src/routes/v1/query_v1_by_team.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class RouteQueryV1ByTeam {
handler.setQueryGraph(queryGraph);
await handler.query();
const response = handler.getResponse();
utils.filterForLogLevel(response, options.logLevel);
response.logs = utils.filterForLogLevel(response.logs, options.logLevel);
return taskResponse(response);
} catch (error) {
return taskError(error);
Expand Down
5 changes: 3 additions & 2 deletions src/utils/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,19 @@ exports.stringIsAValidUrl = s => {
}
};

exports.filterForLogLevel = (response, logLevel) => {
exports.filterForLogLevel = (logs, logLevel) => {
const logLevels = {
ERROR: 3,
WARNING: 2,
INFO: 1,
DEBUG: 0,
};
if (logLevel && Object.keys(logLevels).includes(logLevel)) {
response.logs = response.logs.filter(log => {
logs = logs.filter(log => {
return logLevels[log.level] >= logLevels[logLevel];
});
}
return logs;
};

exports.methodNotAllowed = (req, res, next) => res.status(405).send();

0 comments on commit b45ba6b

Please sign in to comment.