diff --git a/prisma/schema.prisma b/prisma/schema.prisma index ea31147..f6d0c41 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -93,3 +93,14 @@ model FrontpageId { question Question @relation(fields: [id], references: [id], onDelete: Cascade) id String @unique } + +model Robot { + id Int @id @default(autoincrement()) + platform String + url String // non-unique, rescheduling always creates a new row + context Json + created DateTime @db.Timestamp(6) + scheduled DateTime @db.Timestamp(6) // can be equal to `created` or can be in the future for rescheduling or other purposes + completed DateTime? @db.Timestamp(6) // becomes non-null when the job is done + tried Int @default(0) // used to set a limit on max attempts for badly written platforms +} diff --git a/src/backend/platforms/index.ts b/src/backend/platforms/index.ts index bc46403..366cdce 100644 --- a/src/backend/platforms/index.ts +++ b/src/backend/platforms/index.ts @@ -1,7 +1,7 @@ import { Question } from "@prisma/client"; - import { QuestionOption } from "../../common/types"; import { prisma } from "../database/prisma"; +import { getRobot, Robot } from "../robot"; // This file includes comon types and functions for working with platforms. // The registry of all platforms is in a separate file, ./registry.ts, to avoid circular dependencies. @@ -40,6 +40,10 @@ export type FetchedQuestion = Omit< qualityindicators: Omit; // slightly stronger type than Prisma's JsonValue }; +type MFStorage = { + upsert: (q: FetchedQuestion) => Promise; +}; + // fetcher should return null if platform failed to fetch questions for some reason type PlatformFetcherV1 = () => Promise; @@ -53,13 +57,18 @@ type PlatformFetcherV2 = (opts: { args?: { [k in ArgNames]: string }; }) => Promise; -export type PlatformFetcher = - | PlatformFetcherV1 - | PlatformFetcherV2; +type PlatformFetcherV3< + ArgNames extends string, + RobotContext = unknown +> = (opts: { + args?: { [k in ArgNames]: string }; + robot: Robot; + storage: MFStorage; +}) => Promise; // using "" as ArgNames default is technically incorrect, but shouldn't cause any real issues // (I couldn't find a better solution for signifying an empty value, though there probably is one) -export type Platform = { +export type Platform = { name: string; // short name for ids and `platform` db column, e.g. "xrisk" label: string; // longer name for displaying on frontend etc., e.g. "X-risk estimates" color: string; // used on frontend @@ -74,6 +83,11 @@ export type Platform = { fetcherArgs?: ArgNames[]; fetcher?: PlatformFetcherV2; } + | { + version: "v3"; + fetcherArgs?: ArgNames[]; + fetcher?: PlatformFetcherV3; + } ); // Typing notes: @@ -92,7 +106,7 @@ type PreparedQuestion = Omit< export const prepareQuestion = ( q: FetchedQuestion, - platform: Platform + platform: Platform ): PreparedQuestion => { return { extra: {}, @@ -120,14 +134,29 @@ export const upsertSingleQuestion = async ( // TODO - update history? }; -export const processPlatform = async ( - platform: Platform, +export const processPlatform = async ( + platform: Platform, args?: { [k in T]: string } ) => { if (!platform.fetcher) { console.log(`Platform ${platform.name} doesn't have a fetcher, skipping`); return; } + + if (platform.version === "v3") { + const robot = getRobot(platform); + const storage: MFStorage = { + async upsert(q) { + await upsertSingleQuestion(prepareQuestion(q, platform)); + }, + }; + await platform.fetcher({ + robot, + storage, + }); + return; + } + const result = platform.version === "v1" ? { questions: await platform.fetcher(), partial: false } // this is not exactly PlatformFetcherV2Result, since `questions` can be null diff --git a/src/backend/platforms/metaculus/api.ts b/src/backend/platforms/metaculus/api.ts index d8e2ad8..7597043 100644 --- a/src/backend/platforms/metaculus/api.ts +++ b/src/backend/platforms/metaculus/api.ts @@ -1,6 +1,4 @@ -import Ajv, { JTDDataType, ValidateFunction } from "ajv/dist/jtd"; -import axios from "axios"; -import { sleep } from "../../utils/sleep"; +import Ajv, { JTDDataType } from "ajv/dist/jtd"; // Type examples: // - group: https://www.metaculus.com/api2/questions/9866/ @@ -186,46 +184,38 @@ const validateShallowMultipleQuestions = shallowMultipleQuestionsSchema ); -async function fetchWithRetries(url: string): Promise { - try { - const response = await axios.get(url); - return response.data; - } catch (error) { - console.log(`Error while fetching ${url}`); - console.log(error); - if (axios.isAxiosError(error)) { - if (error.response?.headers["retry-after"]) { - const timeout = error.response.headers["retry-after"]; - console.log(`Timeout: ${timeout}`); - await sleep(Number(timeout) * 1000 + 1000); - } else { - await sleep(RETRY_SLEEP_TIME); - } - } - } - const response = await axios.get(url); - return response.data; -} +// async function fetchWithRetries(url: string): Promise { +// try { +// const response = await axios.get(url); +// return response.data; +// } catch (error) { +// console.log(`Error while fetching ${url}`); +// console.log(error); +// if (axios.isAxiosError(error)) { +// if (error.response?.headers["retry-after"]) { +// const timeout = error.response.headers["retry-after"]; +// console.log(`Timeout: ${timeout}`); +// await sleep(Number(timeout) * 1000 + 1000); +// } else { +// await sleep(RETRY_SLEEP_TIME); +// } +// } +// } +// const response = await axios.get(url); +// return response.data; +// } -const fetchAndValidate = async ( - url: string, - validator: ValidateFunction -): Promise => { - console.log(url); - const data = await fetchWithRetries(url); - if (validator(data)) { - return data; - } - throw new Error( - `Response validation for url ${url} failed: ` + - JSON.stringify(validator.errors) - ); -}; - -export async function fetchApiQuestions( - next: string +export async function prepareApiQuestions( + data: unknown ): Promise { - const data = await fetchAndValidate(next, validateShallowMultipleQuestions); + if (!validateShallowMultipleQuestions(data)) { + throw new Error( + `Response validation failed: ` + + JSON.stringify(validateShallowMultipleQuestions.errors) + + "\n\n" + + JSON.stringify(data) + ); + } const isDefined = (argument: T | undefined): argument is T => { return argument !== undefined; @@ -251,9 +241,16 @@ export async function fetchApiQuestions( }; } -export async function fetchSingleApiQuestion(id: number): Promise { - return await fetchAndValidate( - `https://www.metaculus.com/api2/questions/${id}/`, - validateQuestion - ); +export async function prepareSingleApiQuestion( + data: unknown +): Promise { + if (!validateQuestion(data)) { + throw new Error( + `Response validation failed: ` + + JSON.stringify(validateQuestion.errors) + + "\n\n" + + JSON.stringify(data) + ); + } + return data; } diff --git a/src/backend/platforms/metaculus/index.ts b/src/backend/platforms/metaculus/index.ts index 8dbfd63..c12a367 100644 --- a/src/backend/platforms/metaculus/index.ts +++ b/src/backend/platforms/metaculus/index.ts @@ -1,20 +1,37 @@ import { FetchedQuestion, Platform } from ".."; import { average } from "../../../utils"; -import { sleep } from "../../utils/sleep"; +import { Robot, RobotJob } from "../../robot"; import { ApiCommon, - ApiMultipleQuestions, ApiPredictable, ApiQuestion, - fetchApiQuestions, - fetchSingleApiQuestion, + prepareApiQuestions, + prepareSingleApiQuestion, } from "./api"; const platformName = "metaculus"; const now = new Date().toISOString(); const SLEEP_TIME = 1000; -async function apiQuestionToFetchedQuestions( +type Context = + | { + type: "apiIndex"; + } + | { + type: "apiQuestion"; + }; + +const skip = (q: ApiPredictable): boolean => { + if (q.publish_time > now || now > q.resolve_time) { + return true; + } + if (q.number_of_predictions < 10) { + return true; + } + return false; +}; + +async function processApiQuestion( apiQuestion: ApiQuestion ): Promise { // one item can expand: @@ -22,16 +39,6 @@ async function apiQuestionToFetchedQuestions( // - to 1 question if it's a simple forecast // - to multiple questions if it's a group (see https://github.com/quantified-uncertainty/metaforecast/pull/84 for details) - const skip = (q: ApiPredictable): boolean => { - if (q.publish_time > now || now > q.resolve_time) { - return true; - } - if (q.number_of_predictions < 10) { - return true; - } - return false; - }; - const buildFetchedQuestion = ( q: ApiPredictable & ApiCommon ): Omit => { @@ -72,19 +79,14 @@ async function apiQuestionToFetchedQuestions( }; if (apiQuestion.type === "group") { - await sleep(SLEEP_TIME); - const apiQuestionDetails = await fetchSingleApiQuestion(apiQuestion.id); - if (apiQuestionDetails.type !== "group") { - throw new Error("Expected `group` type"); // shouldn't happen, this is mostly for typescript - } - return (apiQuestionDetails.sub_questions || []) + return (apiQuestion.sub_questions || []) .filter((q) => !skip(q)) .map((sq) => { const tmp = buildFetchedQuestion(sq); return { ...tmp, title: `${apiQuestion.title} (${sq.title})`, - description: apiQuestionDetails.description || "", + description: apiQuestion.description || "", url: `https://www.metaculus.com${apiQuestion.page_url}?sub-question=${sq.id}`, }; }); @@ -96,81 +98,90 @@ async function apiQuestionToFetchedQuestions( return []; } - await sleep(SLEEP_TIME); - const apiQuestionDetails = await fetchSingleApiQuestion(apiQuestion.id); const tmp = buildFetchedQuestion(apiQuestion); return [ { ...tmp, title: apiQuestion.title, - description: apiQuestionDetails.description || "", + description: apiQuestion.description || "", url: "https://www.metaculus.com" + apiQuestion.page_url, }, ]; } else { - if (apiQuestion.type !== "claim") { - // should never happen, since `discriminator` in JTD schema causes a strict runtime check - console.log( - `Unknown metaculus question type: ${ - (apiQuestion as any).type - }, skipping` - ); - } + console.log( + `Unknown metaculus question type: ${apiQuestion.type}, skipping` + ); return []; } } -export const metaculus: Platform<"id" | "debug"> = { +async function processApiIndexQuestion( + apiQuestion: ApiQuestion, + robot: Robot +): Promise { + if (apiQuestion.type === "group" || apiQuestion.type === "forecast") { + if (apiQuestion.type === "forecast" && skip(apiQuestion)) { + return; + } + await robot.schedule({ + url: `https://www.metaculus.com/api2/questions/${apiQuestion.id}/`, + context: { + type: "apiQuestion", + }, + }); + } +} + +export const metaculus: Platform<"id" | "debug", Context> = { name: platformName, label: "Metaculus", color: "#006669", - version: "v2", + version: "v3", fetcherArgs: ["id", "debug"], - async fetcher(opts) { - let allQuestions: FetchedQuestion[] = []; + async fetcher({ robot, storage }) { + await robot.schedule({ + url: "https://www.metaculus.com/api2/questions/", + context: { + type: "apiIndex", + }, + }); - if (opts.args?.id) { - const id = Number(opts.args.id); - const apiQuestion = await fetchSingleApiQuestion(id); - const questions = await apiQuestionToFetchedQuestions(apiQuestion); - console.log(questions); - return { - questions, - partial: true, - }; - } + for ( + let job: RobotJob | undefined; + (job = await robot.nextJob()); - let next: string | null = "https://www.metaculus.com/api2/questions/"; - let i = 1; - while (next) { - console.log(`\nQuery #${i} - ${next}`); + ) { + const data = await job.fetch(); - await sleep(SLEEP_TIME); - const apiQuestions: ApiMultipleQuestions = await fetchApiQuestions(next); - const results = apiQuestions.results; - - let j = false; - - for (const result of results) { - const questions = await apiQuestionToFetchedQuestions(result); - for (const question of questions) { - console.log(`- ${question.title}`); - if ((!j && i % 20 === 0) || opts.args?.debug) { - console.log(question); - j = true; - } - allQuestions.push(question); + if (job.context.type === "apiIndex") { + const apiIndex = await prepareApiQuestions(data); + if (apiIndex.next) { + await robot.schedule({ + url: apiIndex.next, + context: { + type: "apiIndex", + }, + }); } + + for (const apiQuestion of apiIndex.results) { + await processApiIndexQuestion(apiQuestion, robot); + // for (const question of questions) { + // console.log(`- ${question.title}`); + // allQuestions.push(question); + // } + } + } else if (job.context.type === "apiQuestion") { + const apiQuestion = await prepareSingleApiQuestion(data); + const fetchedQuestions = await processApiQuestion(apiQuestion); + for (const q of fetchedQuestions) { + await storage.upsert(q); + } + } else { + console.warn(`Unknown context type ${(job.context as any).type}`); } - - next = apiQuestions.next; - i += 1; + await job.done(); } - - return { - questions: allQuestions, - partial: false, - }; }, calculateStars(data) { diff --git a/src/backend/platforms/registry.ts b/src/backend/platforms/registry.ts index ce022a6..3ec3234 100644 --- a/src/backend/platforms/registry.ts +++ b/src/backend/platforms/registry.ts @@ -18,7 +18,7 @@ import { wildeford } from "./wildeford"; import { xrisk } from "./xrisk"; // function instead of const array, this helps to fight circular dependencies -export const getPlatforms = (): Platform[] => { +export const getPlatforms = (): Platform[] => { return [ betfair, fantasyscotus, diff --git a/src/backend/robot/index.ts b/src/backend/robot/index.ts new file mode 100644 index 0000000..89369aa --- /dev/null +++ b/src/backend/robot/index.ts @@ -0,0 +1,82 @@ +import axios from "axios"; +import { prisma } from "../database/prisma"; +import { Platform } from "../platforms"; + +// type Context = Prisma.JsonObject; // untyped for now, might become a generic in the future + +export type RobotJob = { + context: Context; + fetch: () => Promise; + done: () => Promise; +}; + +export type Robot = { + nextJob: () => Promise | undefined>; + schedule: (args: { url: string; context?: Context }) => Promise; +}; + +export const getRobot = ( + platform: Platform +): Robot => { + return { + async nextJob() { + const jobData = await prisma.robot.findFirst({ + where: { + platform: platform.name, + completed: { + equals: null, + }, + scheduled: { + lte: new Date(), + }, + }, + orderBy: { + created: "asc", + }, + }); + if (!jobData) { + return; + } + await prisma.robot.update({ + where: { + id: jobData?.id, + }, + data: { + tried: jobData.tried + 1, + }, + }); + + const job: RobotJob = { + context: jobData.context as Context, + async fetch() { + const data = await axios.get(jobData.url); + return data.data; + }, + async done() { + await prisma.robot.update({ + where: { + id: jobData.id, + }, + data: { + completed: new Date(), + }, + }); + }, + }; + return job; + }, + + async schedule({ url, context = {} }) { + const now = new Date(); + await prisma.robot.create({ + data: { + url, + platform: platform.name, + created: now, + scheduled: now, + context, + }, + }); + }, + }; +};