feat: fetchers-v3 (WIP)

This commit is contained in:
Vyacheslav Matyukhin 2022-06-03 16:52:19 +03:00
parent 00615c1e63
commit 571d968aab
No known key found for this signature in database
GPG Key ID: 3D2A774C5489F96C
6 changed files with 259 additions and 129 deletions

View File

@ -93,3 +93,14 @@ model FrontpageId {
question Question @relation(fields: [id], references: [id], onDelete: Cascade) question Question @relation(fields: [id], references: [id], onDelete: Cascade)
id String @unique id String @unique
} }
model Robot {
id Int @id @default(autoincrement())
platform String
url String // non-unique, rescheduling always creates a new row
context Json
created DateTime @db.Timestamp(6)
scheduled DateTime @db.Timestamp(6) // can be equal to `created` or can be in the future for rescheduling or other purposes
completed DateTime? @db.Timestamp(6) // becomes non-null when the job is done
tried Int @default(0) // used to set a limit on max attempts for badly written platforms
}

View File

@ -1,7 +1,7 @@
import { Question } from "@prisma/client"; import { Question } from "@prisma/client";
import { QuestionOption } from "../../common/types"; import { QuestionOption } from "../../common/types";
import { prisma } from "../database/prisma"; import { prisma } from "../database/prisma";
import { getRobot, Robot } from "../robot";
// This file includes comon types and functions for working with platforms. // This file includes comon types and functions for working with platforms.
// The registry of all platforms is in a separate file, ./registry.ts, to avoid circular dependencies. // The registry of all platforms is in a separate file, ./registry.ts, to avoid circular dependencies.
@ -40,6 +40,10 @@ export type FetchedQuestion = Omit<
qualityindicators: Omit<QualityIndicators, "stars">; // slightly stronger type than Prisma's JsonValue qualityindicators: Omit<QualityIndicators, "stars">; // slightly stronger type than Prisma's JsonValue
}; };
type MFStorage = {
upsert: (q: FetchedQuestion) => Promise<void>;
};
// fetcher should return null if platform failed to fetch questions for some reason // fetcher should return null if platform failed to fetch questions for some reason
type PlatformFetcherV1 = () => Promise<FetchedQuestion[] | null>; type PlatformFetcherV1 = () => Promise<FetchedQuestion[] | null>;
@ -53,13 +57,18 @@ type PlatformFetcherV2<ArgNames extends string> = (opts: {
args?: { [k in ArgNames]: string }; args?: { [k in ArgNames]: string };
}) => Promise<PlatformFetcherV2Result>; }) => Promise<PlatformFetcherV2Result>;
export type PlatformFetcher<ArgNames extends string> = type PlatformFetcherV3<
| PlatformFetcherV1 ArgNames extends string,
| PlatformFetcherV2<ArgNames>; RobotContext = unknown
> = (opts: {
args?: { [k in ArgNames]: string };
robot: Robot<RobotContext>;
storage: MFStorage;
}) => Promise<void>;
// using "" as ArgNames default is technically incorrect, but shouldn't cause any real issues // using "" as ArgNames default is technically incorrect, but shouldn't cause any real issues
// (I couldn't find a better solution for signifying an empty value, though there probably is one) // (I couldn't find a better solution for signifying an empty value, though there probably is one)
export type Platform<ArgNames extends string = ""> = { export type Platform<ArgNames extends string = "", RobotContext = unknown> = {
name: string; // short name for ids and `platform` db column, e.g. "xrisk" name: string; // short name for ids and `platform` db column, e.g. "xrisk"
label: string; // longer name for displaying on frontend etc., e.g. "X-risk estimates" label: string; // longer name for displaying on frontend etc., e.g. "X-risk estimates"
color: string; // used on frontend color: string; // used on frontend
@ -74,6 +83,11 @@ export type Platform<ArgNames extends string = ""> = {
fetcherArgs?: ArgNames[]; fetcherArgs?: ArgNames[];
fetcher?: PlatformFetcherV2<ArgNames>; fetcher?: PlatformFetcherV2<ArgNames>;
} }
| {
version: "v3";
fetcherArgs?: ArgNames[];
fetcher?: PlatformFetcherV3<ArgNames, RobotContext>;
}
); );
// Typing notes: // Typing notes:
@ -92,7 +106,7 @@ type PreparedQuestion = Omit<
export const prepareQuestion = ( export const prepareQuestion = (
q: FetchedQuestion, q: FetchedQuestion,
platform: Platform<any> platform: Platform<any, any>
): PreparedQuestion => { ): PreparedQuestion => {
return { return {
extra: {}, extra: {},
@ -120,14 +134,29 @@ export const upsertSingleQuestion = async (
// TODO - update history? // TODO - update history?
}; };
export const processPlatform = async <T extends string = "">( export const processPlatform = async <T extends string = "", RC = unknown>(
platform: Platform<T>, platform: Platform<T, RC>,
args?: { [k in T]: string } args?: { [k in T]: string }
) => { ) => {
if (!platform.fetcher) { if (!platform.fetcher) {
console.log(`Platform ${platform.name} doesn't have a fetcher, skipping`); console.log(`Platform ${platform.name} doesn't have a fetcher, skipping`);
return; return;
} }
if (platform.version === "v3") {
const robot = getRobot(platform);
const storage: MFStorage = {
async upsert(q) {
await upsertSingleQuestion(prepareQuestion(q, platform));
},
};
await platform.fetcher({
robot,
storage,
});
return;
}
const result = const result =
platform.version === "v1" platform.version === "v1"
? { questions: await platform.fetcher(), partial: false } // this is not exactly PlatformFetcherV2Result, since `questions` can be null ? { questions: await platform.fetcher(), partial: false } // this is not exactly PlatformFetcherV2Result, since `questions` can be null

View File

@ -1,6 +1,4 @@
import Ajv, { JTDDataType, ValidateFunction } from "ajv/dist/jtd"; import Ajv, { JTDDataType } from "ajv/dist/jtd";
import axios from "axios";
import { sleep } from "../../utils/sleep";
// Type examples: // Type examples:
// - group: https://www.metaculus.com/api2/questions/9866/ // - group: https://www.metaculus.com/api2/questions/9866/
@ -186,46 +184,38 @@ const validateShallowMultipleQuestions =
shallowMultipleQuestionsSchema shallowMultipleQuestionsSchema
); );
async function fetchWithRetries<T = unknown>(url: string): Promise<T> { // async function fetchWithRetries<T = unknown>(url: string): Promise<T> {
try { // try {
const response = await axios.get<T>(url); // const response = await axios.get<T>(url);
return response.data; // return response.data;
} catch (error) { // } catch (error) {
console.log(`Error while fetching ${url}`); // console.log(`Error while fetching ${url}`);
console.log(error); // console.log(error);
if (axios.isAxiosError(error)) { // if (axios.isAxiosError(error)) {
if (error.response?.headers["retry-after"]) { // if (error.response?.headers["retry-after"]) {
const timeout = error.response.headers["retry-after"]; // const timeout = error.response.headers["retry-after"];
console.log(`Timeout: ${timeout}`); // console.log(`Timeout: ${timeout}`);
await sleep(Number(timeout) * 1000 + 1000); // await sleep(Number(timeout) * 1000 + 1000);
} else { // } else {
await sleep(RETRY_SLEEP_TIME); // await sleep(RETRY_SLEEP_TIME);
} // }
} // }
} // }
const response = await axios.get<T>(url); // const response = await axios.get<T>(url);
return response.data; // return response.data;
} // }
const fetchAndValidate = async <T = unknown>( export async function prepareApiQuestions(
url: string, data: unknown
validator: ValidateFunction<T>
): Promise<T> => {
console.log(url);
const data = await fetchWithRetries<object>(url);
if (validator(data)) {
return data;
}
throw new Error(
`Response validation for url ${url} failed: ` +
JSON.stringify(validator.errors)
);
};
export async function fetchApiQuestions(
next: string
): Promise<ApiMultipleQuestions> { ): Promise<ApiMultipleQuestions> {
const data = await fetchAndValidate(next, validateShallowMultipleQuestions); if (!validateShallowMultipleQuestions(data)) {
throw new Error(
`Response validation failed: ` +
JSON.stringify(validateShallowMultipleQuestions.errors) +
"\n\n" +
JSON.stringify(data)
);
}
const isDefined = <T>(argument: T | undefined): argument is T => { const isDefined = <T>(argument: T | undefined): argument is T => {
return argument !== undefined; return argument !== undefined;
@ -251,9 +241,16 @@ export async function fetchApiQuestions(
}; };
} }
export async function fetchSingleApiQuestion(id: number): Promise<ApiQuestion> { export async function prepareSingleApiQuestion(
return await fetchAndValidate( data: unknown
`https://www.metaculus.com/api2/questions/${id}/`, ): Promise<ApiQuestion> {
validateQuestion if (!validateQuestion(data)) {
); throw new Error(
`Response validation failed: ` +
JSON.stringify(validateQuestion.errors) +
"\n\n" +
JSON.stringify(data)
);
}
return data;
} }

View File

@ -1,20 +1,37 @@
import { FetchedQuestion, Platform } from ".."; import { FetchedQuestion, Platform } from "..";
import { average } from "../../../utils"; import { average } from "../../../utils";
import { sleep } from "../../utils/sleep"; import { Robot, RobotJob } from "../../robot";
import { import {
ApiCommon, ApiCommon,
ApiMultipleQuestions,
ApiPredictable, ApiPredictable,
ApiQuestion, ApiQuestion,
fetchApiQuestions, prepareApiQuestions,
fetchSingleApiQuestion, prepareSingleApiQuestion,
} from "./api"; } from "./api";
const platformName = "metaculus"; const platformName = "metaculus";
const now = new Date().toISOString(); const now = new Date().toISOString();
const SLEEP_TIME = 1000; const SLEEP_TIME = 1000;
async function apiQuestionToFetchedQuestions( type Context =
| {
type: "apiIndex";
}
| {
type: "apiQuestion";
};
const skip = (q: ApiPredictable): boolean => {
if (q.publish_time > now || now > q.resolve_time) {
return true;
}
if (q.number_of_predictions < 10) {
return true;
}
return false;
};
async function processApiQuestion(
apiQuestion: ApiQuestion apiQuestion: ApiQuestion
): Promise<FetchedQuestion[]> { ): Promise<FetchedQuestion[]> {
// one item can expand: // one item can expand:
@ -22,16 +39,6 @@ async function apiQuestionToFetchedQuestions(
// - to 1 question if it's a simple forecast // - to 1 question if it's a simple forecast
// - to multiple questions if it's a group (see https://github.com/quantified-uncertainty/metaforecast/pull/84 for details) // - to multiple questions if it's a group (see https://github.com/quantified-uncertainty/metaforecast/pull/84 for details)
const skip = (q: ApiPredictable): boolean => {
if (q.publish_time > now || now > q.resolve_time) {
return true;
}
if (q.number_of_predictions < 10) {
return true;
}
return false;
};
const buildFetchedQuestion = ( const buildFetchedQuestion = (
q: ApiPredictable & ApiCommon q: ApiPredictable & ApiCommon
): Omit<FetchedQuestion, "url" | "description" | "title"> => { ): Omit<FetchedQuestion, "url" | "description" | "title"> => {
@ -72,19 +79,14 @@ async function apiQuestionToFetchedQuestions(
}; };
if (apiQuestion.type === "group") { if (apiQuestion.type === "group") {
await sleep(SLEEP_TIME); return (apiQuestion.sub_questions || [])
const apiQuestionDetails = await fetchSingleApiQuestion(apiQuestion.id);
if (apiQuestionDetails.type !== "group") {
throw new Error("Expected `group` type"); // shouldn't happen, this is mostly for typescript
}
return (apiQuestionDetails.sub_questions || [])
.filter((q) => !skip(q)) .filter((q) => !skip(q))
.map((sq) => { .map((sq) => {
const tmp = buildFetchedQuestion(sq); const tmp = buildFetchedQuestion(sq);
return { return {
...tmp, ...tmp,
title: `${apiQuestion.title} (${sq.title})`, title: `${apiQuestion.title} (${sq.title})`,
description: apiQuestionDetails.description || "", description: apiQuestion.description || "",
url: `https://www.metaculus.com${apiQuestion.page_url}?sub-question=${sq.id}`, url: `https://www.metaculus.com${apiQuestion.page_url}?sub-question=${sq.id}`,
}; };
}); });
@ -96,81 +98,90 @@ async function apiQuestionToFetchedQuestions(
return []; return [];
} }
await sleep(SLEEP_TIME);
const apiQuestionDetails = await fetchSingleApiQuestion(apiQuestion.id);
const tmp = buildFetchedQuestion(apiQuestion); const tmp = buildFetchedQuestion(apiQuestion);
return [ return [
{ {
...tmp, ...tmp,
title: apiQuestion.title, title: apiQuestion.title,
description: apiQuestionDetails.description || "", description: apiQuestion.description || "",
url: "https://www.metaculus.com" + apiQuestion.page_url, url: "https://www.metaculus.com" + apiQuestion.page_url,
}, },
]; ];
} else { } else {
if (apiQuestion.type !== "claim") { console.log(
// should never happen, since `discriminator` in JTD schema causes a strict runtime check `Unknown metaculus question type: ${apiQuestion.type}, skipping`
console.log( );
`Unknown metaculus question type: ${
(apiQuestion as any).type
}, skipping`
);
}
return []; return [];
} }
} }
export const metaculus: Platform<"id" | "debug"> = { async function processApiIndexQuestion(
apiQuestion: ApiQuestion,
robot: Robot<Context>
): Promise<void> {
if (apiQuestion.type === "group" || apiQuestion.type === "forecast") {
if (apiQuestion.type === "forecast" && skip(apiQuestion)) {
return;
}
await robot.schedule({
url: `https://www.metaculus.com/api2/questions/${apiQuestion.id}/`,
context: {
type: "apiQuestion",
},
});
}
}
export const metaculus: Platform<"id" | "debug", Context> = {
name: platformName, name: platformName,
label: "Metaculus", label: "Metaculus",
color: "#006669", color: "#006669",
version: "v2", version: "v3",
fetcherArgs: ["id", "debug"], fetcherArgs: ["id", "debug"],
async fetcher(opts) { async fetcher({ robot, storage }) {
let allQuestions: FetchedQuestion[] = []; await robot.schedule({
url: "https://www.metaculus.com/api2/questions/",
context: {
type: "apiIndex",
},
});
if (opts.args?.id) { for (
const id = Number(opts.args.id); let job: RobotJob<Context> | undefined;
const apiQuestion = await fetchSingleApiQuestion(id); (job = await robot.nextJob());
const questions = await apiQuestionToFetchedQuestions(apiQuestion);
console.log(questions);
return {
questions,
partial: true,
};
}
let next: string | null = "https://www.metaculus.com/api2/questions/"; ) {
let i = 1; const data = await job.fetch();
while (next) {
console.log(`\nQuery #${i} - ${next}`);
await sleep(SLEEP_TIME); if (job.context.type === "apiIndex") {
const apiQuestions: ApiMultipleQuestions = await fetchApiQuestions(next); const apiIndex = await prepareApiQuestions(data);
const results = apiQuestions.results; if (apiIndex.next) {
await robot.schedule({
let j = false; url: apiIndex.next,
context: {
for (const result of results) { type: "apiIndex",
const questions = await apiQuestionToFetchedQuestions(result); },
for (const question of questions) { });
console.log(`- ${question.title}`);
if ((!j && i % 20 === 0) || opts.args?.debug) {
console.log(question);
j = true;
}
allQuestions.push(question);
} }
for (const apiQuestion of apiIndex.results) {
await processApiIndexQuestion(apiQuestion, robot);
// for (const question of questions) {
// console.log(`- ${question.title}`);
// allQuestions.push(question);
// }
}
} else if (job.context.type === "apiQuestion") {
const apiQuestion = await prepareSingleApiQuestion(data);
const fetchedQuestions = await processApiQuestion(apiQuestion);
for (const q of fetchedQuestions) {
await storage.upsert(q);
}
} else {
console.warn(`Unknown context type ${(job.context as any).type}`);
} }
await job.done();
next = apiQuestions.next;
i += 1;
} }
return {
questions: allQuestions,
partial: false,
};
}, },
calculateStars(data) { calculateStars(data) {

View File

@ -18,7 +18,7 @@ import { wildeford } from "./wildeford";
import { xrisk } from "./xrisk"; import { xrisk } from "./xrisk";
// function instead of const array, this helps to fight circular dependencies // function instead of const array, this helps to fight circular dependencies
export const getPlatforms = (): Platform<string>[] => { export const getPlatforms = (): Platform<string, any>[] => {
return [ return [
betfair, betfair,
fantasyscotus, fantasyscotus,

View File

@ -0,0 +1,82 @@
import axios from "axios";
import { prisma } from "../database/prisma";
import { Platform } from "../platforms";
// type Context = Prisma.JsonObject; // untyped for now, might become a generic in the future
export type RobotJob<Context> = {
context: Context;
fetch: () => Promise<unknown>;
done: () => Promise<void>;
};
export type Robot<Context> = {
nextJob: () => Promise<RobotJob<Context> | undefined>;
schedule: (args: { url: string; context?: Context }) => Promise<void>;
};
export const getRobot = <Context>(
platform: Platform<any, Context>
): Robot<Context> => {
return {
async nextJob() {
const jobData = await prisma.robot.findFirst({
where: {
platform: platform.name,
completed: {
equals: null,
},
scheduled: {
lte: new Date(),
},
},
orderBy: {
created: "asc",
},
});
if (!jobData) {
return;
}
await prisma.robot.update({
where: {
id: jobData?.id,
},
data: {
tried: jobData.tried + 1,
},
});
const job: RobotJob<Context> = {
context: jobData.context as Context,
async fetch() {
const data = await axios.get(jobData.url);
return data.data;
},
async done() {
await prisma.robot.update({
where: {
id: jobData.id,
},
data: {
completed: new Date(),
},
});
},
};
return job;
},
async schedule({ url, context = {} }) {
const now = new Date();
await prisma.robot.create({
data: {
url,
platform: platform.name,
created: now,
scheduled: now,
context,
},
});
},
};
};