Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental fetchers #91

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,14 @@ model FrontpageId {
question Question @relation(fields: [id], references: [id], onDelete: Cascade)
id String @unique
}

model Robot {
id Int @id @default(autoincrement())
platform String
url String // non-unique, rescheduling always creates a new row
context Json
created DateTime @db.Timestamp(6)
scheduled DateTime @db.Timestamp(6) // can be equal to `created` or can be in the future for rescheduling or other purposes
completed DateTime? @db.Timestamp(6) // becomes non-null when the job is done
tried Int @default(0) // used to set a limit on max attempts for badly written platforms
}
45 changes: 37 additions & 8 deletions src/backend/platforms/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Question } from "@prisma/client";

import { QuestionOption } from "../../common/types";
import { prisma } from "../database/prisma";
import { getRobot, Robot } from "../robot";

// This file includes comon types and functions for working with platforms.
// The registry of all platforms is in a separate file, ./registry.ts, to avoid circular dependencies.
Expand Down Expand Up @@ -40,6 +40,10 @@ export type FetchedQuestion = Omit<
qualityindicators: Omit<QualityIndicators, "stars">; // slightly stronger type than Prisma's JsonValue
};

type MFStorage = {
upsert: (q: FetchedQuestion) => Promise<void>;
};

// fetcher should return null if platform failed to fetch questions for some reason
type PlatformFetcherV1 = () => Promise<FetchedQuestion[] | null>;

Expand All @@ -53,13 +57,18 @@ type PlatformFetcherV2<ArgNames extends string> = (opts: {
args?: { [k in ArgNames]: string };
}) => Promise<PlatformFetcherV2Result>;

export type PlatformFetcher<ArgNames extends string> =
| PlatformFetcherV1
| PlatformFetcherV2<ArgNames>;
type PlatformFetcherV3<
ArgNames extends string,
RobotContext = unknown
> = (opts: {
args?: { [k in ArgNames]: string };
robot: Robot<RobotContext>;
storage: MFStorage;
}) => Promise<void>;

// using "" as ArgNames default is technically incorrect, but shouldn't cause any real issues
// (I couldn't find a better solution for signifying an empty value, though there probably is one)
export type Platform<ArgNames extends string = ""> = {
export type Platform<ArgNames extends string = "", RobotContext = unknown> = {
name: string; // short name for ids and `platform` db column, e.g. "xrisk"
label: string; // longer name for displaying on frontend etc., e.g. "X-risk estimates"
color: string; // used on frontend
Expand All @@ -74,6 +83,11 @@ export type Platform<ArgNames extends string = ""> = {
fetcherArgs?: ArgNames[];
fetcher?: PlatformFetcherV2<ArgNames>;
}
| {
version: "v3";
fetcherArgs?: ArgNames[];
fetcher?: PlatformFetcherV3<ArgNames, RobotContext>;
}
);

// Typing notes:
Expand All @@ -92,7 +106,7 @@ type PreparedQuestion = Omit<

export const prepareQuestion = (
q: FetchedQuestion,
platform: Platform<any>
platform: Platform<any, any>
): PreparedQuestion => {
return {
extra: {},
Expand Down Expand Up @@ -120,14 +134,29 @@ export const upsertSingleQuestion = async (
// TODO - update history?
};

export const processPlatform = async <T extends string = "">(
platform: Platform<T>,
export const processPlatform = async <T extends string = "", RC = unknown>(
platform: Platform<T, RC>,
args?: { [k in T]: string }
) => {
if (!platform.fetcher) {
console.log(`Platform ${platform.name} doesn't have a fetcher, skipping`);
return;
}

if (platform.version === "v3") {
const robot = getRobot(platform);
const storage: MFStorage = {
async upsert(q) {
await upsertSingleQuestion(prepareQuestion(q, platform));
},
};
await platform.fetcher({
robot,
storage,
});
return;
}

const result =
platform.version === "v1"
? { questions: await platform.fetcher(), partial: false } // this is not exactly PlatformFetcherV2Result, since `questions` can be null
Expand Down
89 changes: 43 additions & 46 deletions src/backend/platforms/metaculus/api.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import Ajv, { JTDDataType, ValidateFunction } from "ajv/dist/jtd";
import axios from "axios";
import { sleep } from "../../utils/sleep";
import Ajv, { JTDDataType } from "ajv/dist/jtd";

// Type examples:
// - group: https://www.metaculus.com/api2/questions/9866/
Expand Down Expand Up @@ -186,46 +184,38 @@ const validateShallowMultipleQuestions =
shallowMultipleQuestionsSchema
);

async function fetchWithRetries<T = unknown>(url: string): Promise<T> {
try {
const response = await axios.get<T>(url);
return response.data;
} catch (error) {
console.log(`Error while fetching ${url}`);
console.log(error);
if (axios.isAxiosError(error)) {
if (error.response?.headers["retry-after"]) {
const timeout = error.response.headers["retry-after"];
console.log(`Timeout: ${timeout}`);
await sleep(Number(timeout) * 1000 + 1000);
} else {
await sleep(RETRY_SLEEP_TIME);
}
}
}
const response = await axios.get<T>(url);
return response.data;
}
// async function fetchWithRetries<T = unknown>(url: string): Promise<T> {
// try {
// const response = await axios.get<T>(url);
// return response.data;
// } catch (error) {
// console.log(`Error while fetching ${url}`);
// console.log(error);
// if (axios.isAxiosError(error)) {
// if (error.response?.headers["retry-after"]) {
// const timeout = error.response.headers["retry-after"];
// console.log(`Timeout: ${timeout}`);
// await sleep(Number(timeout) * 1000 + 1000);
// } else {
// await sleep(RETRY_SLEEP_TIME);
// }
// }
// }
// const response = await axios.get<T>(url);
// return response.data;
// }

const fetchAndValidate = async <T = unknown>(
url: string,
validator: ValidateFunction<T>
): Promise<T> => {
console.log(url);
const data = await fetchWithRetries<object>(url);
if (validator(data)) {
return data;
}
throw new Error(
`Response validation for url ${url} failed: ` +
JSON.stringify(validator.errors)
);
};

export async function fetchApiQuestions(
next: string
export async function prepareApiQuestions(
data: unknown
): Promise<ApiMultipleQuestions> {
const data = await fetchAndValidate(next, validateShallowMultipleQuestions);
if (!validateShallowMultipleQuestions(data)) {
throw new Error(
`Response validation failed: ` +
JSON.stringify(validateShallowMultipleQuestions.errors) +
"\n\n" +
JSON.stringify(data)
);
}

const isDefined = <T>(argument: T | undefined): argument is T => {
return argument !== undefined;
Expand All @@ -251,9 +241,16 @@ export async function fetchApiQuestions(
};
}

export async function fetchSingleApiQuestion(id: number): Promise<ApiQuestion> {
return await fetchAndValidate(
`https://www.metaculus.com/api2/questions/${id}/`,
validateQuestion
);
export async function prepareSingleApiQuestion(
data: unknown
): Promise<ApiQuestion> {
if (!validateQuestion(data)) {
throw new Error(
`Response validation failed: ` +
JSON.stringify(validateQuestion.errors) +
"\n\n" +
JSON.stringify(data)
);
}
return data;
}
Loading