Compare commits

...

2 Commits

Author SHA1 Message Date
Vyacheslav Matyukhin
1ec6f34908
fetchers-v3 WIP 2022-06-03 20:05:02 +03:00
Vyacheslav Matyukhin
571d968aab
feat: fetchers-v3 (WIP) 2022-06-03 20:05:00 +03:00
6 changed files with 283 additions and 129 deletions

View File

@ -93,3 +93,14 @@ model FrontpageId {
question Question @relation(fields: [id], references: [id], onDelete: Cascade)
id String @unique
}
model Robot {
id Int @id @default(autoincrement())
platform String
url String // non-unique, rescheduling always creates a new row
context Json
created DateTime @db.Timestamp(6)
scheduled DateTime @db.Timestamp(6) // can be equal to `created` or can be in the future for rescheduling or other purposes
completed DateTime? @db.Timestamp(6) // becomes non-null when the job is done
tried Int @default(0) // used to set a limit on max attempts for badly written platforms
}

View File

@ -1,7 +1,7 @@
import { Question } from "@prisma/client";
import { QuestionOption } from "../../common/types";
import { prisma } from "../database/prisma";
import { getRobot, Robot } from "../robot";
// This file includes comon types and functions for working with platforms.
// The registry of all platforms is in a separate file, ./registry.ts, to avoid circular dependencies.
@ -40,6 +40,10 @@ export type FetchedQuestion = Omit<
qualityindicators: Omit<QualityIndicators, "stars">; // slightly stronger type than Prisma's JsonValue
};
type MFStorage = {
upsert: (q: FetchedQuestion) => Promise<void>;
};
// fetcher should return null if platform failed to fetch questions for some reason
type PlatformFetcherV1 = () => Promise<FetchedQuestion[] | null>;
@ -53,13 +57,18 @@ type PlatformFetcherV2<ArgNames extends string> = (opts: {
args?: { [k in ArgNames]: string };
}) => Promise<PlatformFetcherV2Result>;
export type PlatformFetcher<ArgNames extends string> =
| PlatformFetcherV1
| PlatformFetcherV2<ArgNames>;
type PlatformFetcherV3<
ArgNames extends string,
RobotContext = unknown
> = (opts: {
args?: { [k in ArgNames]: string };
robot: Robot<RobotContext>;
storage: MFStorage;
}) => Promise<void>;
// using "" as ArgNames default is technically incorrect, but shouldn't cause any real issues
// (I couldn't find a better solution for signifying an empty value, though there probably is one)
export type Platform<ArgNames extends string = ""> = {
export type Platform<ArgNames extends string = "", RobotContext = unknown> = {
name: string; // short name for ids and `platform` db column, e.g. "xrisk"
label: string; // longer name for displaying on frontend etc., e.g. "X-risk estimates"
color: string; // used on frontend
@ -74,6 +83,11 @@ export type Platform<ArgNames extends string = ""> = {
fetcherArgs?: ArgNames[];
fetcher?: PlatformFetcherV2<ArgNames>;
}
| {
version: "v3";
fetcherArgs?: ArgNames[];
fetcher?: PlatformFetcherV3<ArgNames, RobotContext>;
}
);
// Typing notes:
@ -92,7 +106,7 @@ type PreparedQuestion = Omit<
export const prepareQuestion = (
q: FetchedQuestion,
platform: Platform<any>
platform: Platform<any, any>
): PreparedQuestion => {
return {
extra: {},
@ -120,14 +134,29 @@ export const upsertSingleQuestion = async (
// TODO - update history?
};
export const processPlatform = async <T extends string = "">(
platform: Platform<T>,
export const processPlatform = async <T extends string = "", RC = unknown>(
platform: Platform<T, RC>,
args?: { [k in T]: string }
) => {
if (!platform.fetcher) {
console.log(`Platform ${platform.name} doesn't have a fetcher, skipping`);
return;
}
if (platform.version === "v3") {
const robot = getRobot(platform);
const storage: MFStorage = {
async upsert(q) {
await upsertSingleQuestion(prepareQuestion(q, platform));
},
};
await platform.fetcher({
robot,
storage,
});
return;
}
const result =
platform.version === "v1"
? { questions: await platform.fetcher(), partial: false } // this is not exactly PlatformFetcherV2Result, since `questions` can be null

View File

@ -1,6 +1,4 @@
import Ajv, { JTDDataType, ValidateFunction } from "ajv/dist/jtd";
import axios from "axios";
import { sleep } from "../../utils/sleep";
import Ajv, { JTDDataType } from "ajv/dist/jtd";
// Type examples:
// - group: https://www.metaculus.com/api2/questions/9866/
@ -186,46 +184,38 @@ const validateShallowMultipleQuestions =
shallowMultipleQuestionsSchema
);
async function fetchWithRetries<T = unknown>(url: string): Promise<T> {
try {
const response = await axios.get<T>(url);
return response.data;
} catch (error) {
console.log(`Error while fetching ${url}`);
console.log(error);
if (axios.isAxiosError(error)) {
if (error.response?.headers["retry-after"]) {
const timeout = error.response.headers["retry-after"];
console.log(`Timeout: ${timeout}`);
await sleep(Number(timeout) * 1000 + 1000);
} else {
await sleep(RETRY_SLEEP_TIME);
}
}
}
const response = await axios.get<T>(url);
return response.data;
}
// async function fetchWithRetries<T = unknown>(url: string): Promise<T> {
// try {
// const response = await axios.get<T>(url);
// return response.data;
// } catch (error) {
// console.log(`Error while fetching ${url}`);
// console.log(error);
// if (axios.isAxiosError(error)) {
// if (error.response?.headers["retry-after"]) {
// const timeout = error.response.headers["retry-after"];
// console.log(`Timeout: ${timeout}`);
// await sleep(Number(timeout) * 1000 + 1000);
// } else {
// await sleep(RETRY_SLEEP_TIME);
// }
// }
// }
// const response = await axios.get<T>(url);
// return response.data;
// }
const fetchAndValidate = async <T = unknown>(
url: string,
validator: ValidateFunction<T>
): Promise<T> => {
console.log(url);
const data = await fetchWithRetries<object>(url);
if (validator(data)) {
return data;
}
throw new Error(
`Response validation for url ${url} failed: ` +
JSON.stringify(validator.errors)
);
};
export async function fetchApiQuestions(
next: string
export async function prepareApiQuestions(
data: unknown
): Promise<ApiMultipleQuestions> {
const data = await fetchAndValidate(next, validateShallowMultipleQuestions);
if (!validateShallowMultipleQuestions(data)) {
throw new Error(
`Response validation failed: ` +
JSON.stringify(validateShallowMultipleQuestions.errors) +
"\n\n" +
JSON.stringify(data)
);
}
const isDefined = <T>(argument: T | undefined): argument is T => {
return argument !== undefined;
@ -251,9 +241,16 @@ export async function fetchApiQuestions(
};
}
export async function fetchSingleApiQuestion(id: number): Promise<ApiQuestion> {
return await fetchAndValidate(
`https://www.metaculus.com/api2/questions/${id}/`,
validateQuestion
);
export async function prepareSingleApiQuestion(
data: unknown
): Promise<ApiQuestion> {
if (!validateQuestion(data)) {
throw new Error(
`Response validation failed: ` +
JSON.stringify(validateQuestion.errors) +
"\n\n" +
JSON.stringify(data)
);
}
return data;
}

View File

@ -1,20 +1,37 @@
import { FetchedQuestion, Platform } from "..";
import { average } from "../../../utils";
import { sleep } from "../../utils/sleep";
import { Robot, RobotJob } from "../../robot";
import {
ApiCommon,
ApiMultipleQuestions,
ApiPredictable,
ApiQuestion,
fetchApiQuestions,
fetchSingleApiQuestion,
prepareApiQuestions,
prepareSingleApiQuestion,
} from "./api";
const platformName = "metaculus";
const now = new Date().toISOString();
const SLEEP_TIME = 1000;
async function apiQuestionToFetchedQuestions(
type Context =
| {
type: "apiIndex";
}
| {
type: "apiQuestion";
};
const skip = (q: ApiPredictable): boolean => {
if (q.publish_time > now || now > q.resolve_time) {
return true;
}
if (q.number_of_predictions < 10) {
return true;
}
return false;
};
async function processApiQuestion(
apiQuestion: ApiQuestion
): Promise<FetchedQuestion[]> {
// one item can expand:
@ -22,16 +39,6 @@ async function apiQuestionToFetchedQuestions(
// - to 1 question if it's a simple forecast
// - to multiple questions if it's a group (see https://github.com/quantified-uncertainty/metaforecast/pull/84 for details)
const skip = (q: ApiPredictable): boolean => {
if (q.publish_time > now || now > q.resolve_time) {
return true;
}
if (q.number_of_predictions < 10) {
return true;
}
return false;
};
const buildFetchedQuestion = (
q: ApiPredictable & ApiCommon
): Omit<FetchedQuestion, "url" | "description" | "title"> => {
@ -72,19 +79,14 @@ async function apiQuestionToFetchedQuestions(
};
if (apiQuestion.type === "group") {
await sleep(SLEEP_TIME);
const apiQuestionDetails = await fetchSingleApiQuestion(apiQuestion.id);
if (apiQuestionDetails.type !== "group") {
throw new Error("Expected `group` type"); // shouldn't happen, this is mostly for typescript
}
return (apiQuestionDetails.sub_questions || [])
return (apiQuestion.sub_questions || [])
.filter((q) => !skip(q))
.map((sq) => {
const tmp = buildFetchedQuestion(sq);
return {
...tmp,
title: `${apiQuestion.title} (${sq.title})`,
description: apiQuestionDetails.description || "",
description: apiQuestion.description || "",
url: `https://www.metaculus.com${apiQuestion.page_url}?sub-question=${sq.id}`,
};
});
@ -96,81 +98,90 @@ async function apiQuestionToFetchedQuestions(
return [];
}
await sleep(SLEEP_TIME);
const apiQuestionDetails = await fetchSingleApiQuestion(apiQuestion.id);
const tmp = buildFetchedQuestion(apiQuestion);
return [
{
...tmp,
title: apiQuestion.title,
description: apiQuestionDetails.description || "",
description: apiQuestion.description || "",
url: "https://www.metaculus.com" + apiQuestion.page_url,
},
];
} else {
if (apiQuestion.type !== "claim") {
// should never happen, since `discriminator` in JTD schema causes a strict runtime check
console.log(
`Unknown metaculus question type: ${
(apiQuestion as any).type
}, skipping`
);
}
console.log(
`Unknown metaculus question type: ${apiQuestion.type}, skipping`
);
return [];
}
}
export const metaculus: Platform<"id" | "debug"> = {
async function processApiIndexQuestion(
apiQuestion: ApiQuestion,
robot: Robot<Context>
): Promise<void> {
if (apiQuestion.type === "group" || apiQuestion.type === "forecast") {
if (apiQuestion.type === "forecast" && skip(apiQuestion)) {
return;
}
await robot.schedule({
url: `https://www.metaculus.com/api2/questions/${apiQuestion.id}/`,
context: {
type: "apiQuestion",
},
});
}
}
export const metaculus: Platform<"id" | "debug", Context> = {
name: platformName,
label: "Metaculus",
color: "#006669",
version: "v2",
version: "v3",
fetcherArgs: ["id", "debug"],
async fetcher(opts) {
let allQuestions: FetchedQuestion[] = [];
async fetcher({ robot, storage }) {
await robot.schedule({
url: "https://www.metaculus.com/api2/questions/",
context: {
type: "apiIndex",
},
});
if (opts.args?.id) {
const id = Number(opts.args.id);
const apiQuestion = await fetchSingleApiQuestion(id);
const questions = await apiQuestionToFetchedQuestions(apiQuestion);
console.log(questions);
return {
questions,
partial: true,
};
}
for (
let job: RobotJob<Context> | undefined;
(job = await robot.nextJob());
let next: string | null = "https://www.metaculus.com/api2/questions/";
let i = 1;
while (next) {
console.log(`\nQuery #${i} - ${next}`);
) {
const data = await job.fetch();
await sleep(SLEEP_TIME);
const apiQuestions: ApiMultipleQuestions = await fetchApiQuestions(next);
const results = apiQuestions.results;
let j = false;
for (const result of results) {
const questions = await apiQuestionToFetchedQuestions(result);
for (const question of questions) {
console.log(`- ${question.title}`);
if ((!j && i % 20 === 0) || opts.args?.debug) {
console.log(question);
j = true;
}
allQuestions.push(question);
if (job.context.type === "apiIndex") {
const apiIndex = await prepareApiQuestions(data);
if (apiIndex.next) {
await robot.schedule({
url: apiIndex.next,
context: {
type: "apiIndex",
},
});
}
for (const apiQuestion of apiIndex.results) {
await processApiIndexQuestion(apiQuestion, robot);
// for (const question of questions) {
// console.log(`- ${question.title}`);
// allQuestions.push(question);
// }
}
} else if (job.context.type === "apiQuestion") {
const apiQuestion = await prepareSingleApiQuestion(data);
const fetchedQuestions = await processApiQuestion(apiQuestion);
for (const q of fetchedQuestions) {
await storage.upsert(q);
}
} else {
console.warn(`Unknown context type ${(job.context as any).type}`);
}
next = apiQuestions.next;
i += 1;
await job.done();
}
return {
questions: allQuestions,
partial: false,
};
},
calculateStars(data) {

View File

@ -18,7 +18,7 @@ import { wildeford } from "./wildeford";
import { xrisk } from "./xrisk";
// function instead of const array, this helps to fight circular dependencies
export const getPlatforms = (): Platform<string>[] => {
export const getPlatforms = (): Platform<string, any>[] => {
return [
betfair,
fantasyscotus,

106
src/backend/robot/index.ts Normal file
View File

@ -0,0 +1,106 @@
import axios from "axios";
import { prisma } from "../database/prisma";
import { Platform } from "../platforms";
// type Context = Prisma.JsonObject; // untyped for now, might become a generic in the future
export type RobotJob<Context> = {
context: Context;
fetch: () => Promise<unknown>;
done: () => Promise<void>;
};
export type Robot<Context> = {
nextJob: () => Promise<RobotJob<Context> | undefined>;
schedule: (args: { url: string; context?: Context }) => Promise<void>;
};
export const getRobot = <Context>(
platform: Platform<any, Context>
): Robot<Context> => {
return {
async nextJob() {
const jobData = await prisma.robot.findFirst({
where: {
platform: platform.name,
completed: {
equals: null,
},
scheduled: {
lte: new Date(),
},
},
orderBy: {
created: "asc",
},
});
if (!jobData) {
return;
}
await prisma.robot.update({
where: {
id: jobData?.id,
},
data: {
tried: jobData.tried + 1,
},
});
const job: RobotJob<Context> = {
context: jobData.context as Context,
async fetch() {
const data = await axios.get(jobData.url);
return data.data;
},
async done() {
await prisma.robot.update({
where: {
id: jobData.id,
},
data: {
completed: new Date(),
},
});
},
};
return job;
},
async schedule({ url, context = {} }) {
const now = new Date();
const oldJob = await prisma.robot.findFirst({
where: {
platform: platform.name,
url,
completed: {
equals: null,
},
},
});
if (oldJob) {
await prisma.robot.update({
where: {
id: oldJob.id,
},
data: {
created: now,
scheduled: now,
context,
},
});
} else {
await prisma.robot.create({
data: {
url,
platform: platform.name,
created: now,
scheduled: now,
context,
},
});
}
},
};
};