diff --git a/package-lock.json b/package-lock.json index 5d0695b..9adc78a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -64,6 +64,7 @@ "devDependencies": { "@netlify/plugin-nextjs": "^4.2.4", "@svgr/cli": "^6.2.1", + "@types/pg": "^8.6.5", "netlify-cli": "^9.13.6" } }, @@ -1691,6 +1692,17 @@ "integrity": "sha512-//oorEZjL6sbPcKUaCdIGlIUeH26mgzimjBB77G6XRgnDl/L5wOnpyBGRe/Mmf5CVW3PwEBE1NjiMZ/ssFh4wA==", "license": "MIT" }, + "node_modules/@types/pg": { + "version": "8.6.5", + "resolved": "https://registry.npmjs.org/@types/pg/-/pg-8.6.5.tgz", + "integrity": "sha512-tOkGtAqRVkHa/PVZicq67zuujI4Oorfglsr2IbKofDwBSysnaqSx7W1mDqFqdkGE6Fbgh+PZAl0r/BWON/mozw==", + "dev": true, + "dependencies": { + "@types/node": "*", + "pg-protocol": "*", + "pg-types": "^2.2.0" + } + }, "node_modules/@types/prop-types": { "version": "15.7.4", "resolved": "https://registry.npmjs.org/@types%2fprop-types/-/prop-types-15.7.4.tgz", @@ -36019,6 +36031,17 @@ "resolved": "https://registry.npmjs.org/@types%2fparse-json/-/parse-json-4.0.0.tgz", "integrity": "sha512-//oorEZjL6sbPcKUaCdIGlIUeH26mgzimjBB77G6XRgnDl/L5wOnpyBGRe/Mmf5CVW3PwEBE1NjiMZ/ssFh4wA==" }, + "@types/pg": { + "version": "8.6.5", + "resolved": "https://registry.npmjs.org/@types/pg/-/pg-8.6.5.tgz", + "integrity": "sha512-tOkGtAqRVkHa/PVZicq67zuujI4Oorfglsr2IbKofDwBSysnaqSx7W1mDqFqdkGE6Fbgh+PZAl0r/BWON/mozw==", + "dev": true, + "requires": { + "@types/node": "*", + "pg-protocol": "*", + "pg-types": "^2.2.0" + } + }, "@types/prop-types": { "version": "15.7.4", "resolved": "https://registry.npmjs.org/@types%2fprop-types/-/prop-types-15.7.4.tgz", diff --git a/package.json b/package.json index 29338e8..c3186cc 100644 --- a/package.json +++ b/package.json @@ -82,6 +82,7 @@ "devDependencies": { "@netlify/plugin-nextjs": "^4.2.4", "@svgr/cli": "^6.2.1", - "netlify-cli": "^9.13.6" + "netlify-cli": "^9.13.6", + "@types/pg": "^8.6.5" } } diff --git a/src/backend/database/pg-wrapper.ts b/src/backend/database/pg-wrapper.ts index 09d2aae..0b21c9b 100644 --- a/src/backend/database/pg-wrapper.ts +++ b/src/backend/database/pg-wrapper.ts @@ -1,11 +1,10 @@ -import pkg from "pg"; +import { Pool, PoolClient } from "pg"; -import { platforms } from "../platforms"; +import { Forecast, platforms } from "../platforms"; import { hash } from "../utils/hash"; +import { measureTime } from "../utils/measureTime"; import { roughSizeOfObject } from "../utils/roughSize"; -const { Pool } = pkg; - // Definitions const schemas = ["latest", "history"]; const year = Number(new Date().toISOString().slice(0, 4)); @@ -26,10 +25,6 @@ const tableNamesWhiteListHistory = [ ...allowed_years, ...allowed_year_month_histories, ]; -const tableNamesWhitelist = [ - ...tableNamesWhitelistLatest, - ...tableNamesWhiteListHistory, -]; const createFullName = (schemaName, namesArray) => namesArray.map((name) => `${schemaName}.${name}`); const tableWhiteList = [ @@ -63,28 +58,33 @@ const readOnlyPool = new Pool({ }); // Helpers -export const runPgCommand = async ({ command, pool }) => { +export const runPgCommand = async ({ + command, + pool, +}: { + command: string; + pool: Pool; +}) => { console.log(command); const client = await pool.connect(); let result; try { let response = await client.query(command); - // console.log(response); result = { results: response ? response.rows : null }; } catch (error) { console.log(error); } finally { client.release(); } - // console.log(results) return result; }; // Initialize -let dropTable = (schema, table) => `DROP TABLE IF EXISTS ${schema}.${table}`; -let createIndex = (schema, table) => +let dropTable = (schema: string, table: string) => + `DROP TABLE IF EXISTS ${schema}.${table}`; +let createIndex = (schema: string, table: string) => `CREATE INDEX ${schema}_${table}_id_index ON ${schema}.${table} (id);`; -let createUniqueIndex = (schema, table) => +let createUniqueIndex = (schema: string, table: string) => `CREATE UNIQUE INDEX ${schema}_${table}_id_index ON ${schema}.${table} (id);`; async function pgInitializeScaffolding() { @@ -97,7 +97,7 @@ async function pgInitializeScaffolding() { await runPgCommand({ command, pool: readWritePool }); } - let buildGrantSelectForSchema = (schema) => + let buildGrantSelectForSchema = (schema: string) => `GRANT SELECT ON ALL TABLES IN SCHEMA ${schema} TO public_read_only_user`; for (let schema of schemas) { await runPgCommand({ @@ -106,7 +106,7 @@ async function pgInitializeScaffolding() { }); } - let alterDefaultPrivilegesForSchema = (schema) => + let alterDefaultPrivilegesForSchema = (schema: string) => `ALTER DEFAULT PRIVILEGES IN SCHEMA ${schema} GRANT SELECT ON TABLES TO public_read_only_user`; for (let schema of schemas) { await runPgCommand({ @@ -144,8 +144,8 @@ async function pgInitializeScaffolding() { } let buildMetaforecastTable = ( - schema, - table + schema: string, + table: string ) => `CREATE TABLE ${schema}.${table} ( id text, title text, @@ -245,7 +245,10 @@ async function pgInitializeDashboards() { } } -let buildHistoryTable = (schema, table) => `CREATE TABLE ${schema}.${table} ( +let buildHistoryTable = ( + schema: string, + table: string +) => `CREATE TABLE ${schema}.${table} ( id text, title text, url text, @@ -338,7 +341,15 @@ export async function pgInitialize() { } // Read -async function pgReadWithPool({ schema, tableName, pool }) { +async function pgReadWithPool({ + schema, + tableName, + pool, +}: { + schema: string; + tableName: string; + pool: Pool; +}) { if (tableWhiteList.includes(`${schema}.${tableName}`)) { let command = `SELECT * from ${schema}.${tableName}`; let response = await runPgCommand({ command, pool }); @@ -351,11 +362,23 @@ async function pgReadWithPool({ schema, tableName, pool }) { } } -export async function pgRead({ schema, tableName }) { +export async function pgRead({ + schema, + tableName, +}: { + schema: string; + tableName: string; +}) { return await pgReadWithPool({ schema, tableName, pool: readWritePool }); } -export async function pgReadWithReadCredentials({ schema, tableName }) { +export async function pgReadWithReadCredentials({ + schema, + tableName, +}: { + schema: string; + tableName: string; +}) { // currently does not work. /* return await pgReadWithPool({ schema, @@ -366,8 +389,16 @@ export async function pgReadWithReadCredentials({ schema, tableName }) { return await pgReadWithPool({ schema, tableName, pool: readWritePool }); } -export async function pgGetByIds({ ids, schema, table }) { - let idstring = `( ${ids.map((id) => `'${id}'`).join(", ")} )`; // (1, 2, 3) +export async function pgGetByIds({ + ids, + schema, + table, +}: { + ids: string[]; + schema: string; + table: string; +}) { + let idstring = `( ${ids.map((id: string) => `'${id}'`).join(", ")} )`; // (1, 2, 3) let command = `SELECT * from ${schema}.${table} where id in ${idstring}`; // see: https://stackoverflow.com/questions/5803472/sql-where-id-in-id1-id2-idn let response = await runPgCommand({ command, pool: readWritePool }); @@ -376,73 +407,77 @@ export async function pgGetByIds({ ids, schema, table }) { return results; } -export async function pgInsert({ datum, schema, tableName }) { - if (tableWhiteList.includes(`${schema}.${tableName}`)) { - let text = `INSERT INTO ${schema}.${tableName} VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`; - let timestamp = - datum.timestamp && - !!datum.timestamp.slice && - !isNaN(Date.parse(datum.timestamp)) - ? datum.timestamp - : new Date().toISOString(); - timestamp = timestamp.slice(0, 19).replace("T", " "); - let values = [ - datum.id, - datum.title, - datum.url, - datum.platform, - datum.description || "", - JSON.stringify(datum.options || []), - timestamp, // fix - datum.stars || - (datum.qualityindicators ? datum.qualityindicators.stars : 2), - JSON.stringify(datum.qualityindicators || []), - JSON.stringify(datum.extra || []), - ]; - const client = await readWritePool.connect(); - let result; - try { - result = await client.query(text, values); - } catch (error) { - console.log(error); - } finally { - client.release(); - } - // console.log(result) - return result; - } else { +export async function pgBulkInsert({ + data, + schema, + tableName, + client, +}: { + data: Forecast[]; + schema: string; + tableName: string; + client: PoolClient; +}) { + if (!tableWhiteList.includes(`${schema}.${tableName}`)) { throw Error( `Table ${schema}.${tableName} not in whitelist; stopping to avoid tricky sql injections` ); } + + const generateQuery = (rows: number) => { + let text = `INSERT INTO ${schema}.${tableName} VALUES`; + const cols = 10; + const parts: string[] = []; + for (let r = 0; r < rows; r++) { + const bits = []; + for (let c = 1; c <= cols; c++) { + bits.push(`$${cols * r + c}`); + } + parts.push("(" + bits.join(", ") + ")"); + } + + text += parts.join(", "); + return text; + }; + + let from = 0; + const chunkSize = 20; + while (from < data.length - 1) { + const take = Math.min(chunkSize, data.length - from); + const query = generateQuery(take); + + const chunk = []; + for (let i = from; i < from + take; i++) { + const datum = data[i]; + let timestamp = + datum.timestamp && + !!datum.timestamp.slice && + !isNaN(Date.parse(datum.timestamp)) + ? datum.timestamp + : new Date().toISOString(); + timestamp = timestamp.slice(0, 19).replace("T", " "); + const values = [ + datum.id, + datum.title, + datum.url, + datum.platform, + datum.description || "", + JSON.stringify(datum.options || []), + timestamp, // fix + datum.stars || + (datum.qualityindicators ? datum.qualityindicators.stars : 2), + JSON.stringify(datum.qualityindicators || []), + JSON.stringify(datum.extra || []), + ]; + chunk.push(...values); + } + + console.log(`Inserting ${from + 1}..${from + take}`); + from += take; + await client.query(query, chunk); + } } -/* For reference: -pgInsert({ - "id": "fantasyscotus-580", - "title": "In Wooden v. U.S., the SCOTUS will affirm the lower court's decision", - "url": "https://fantasyscotus.net/user-predictions/case/wooden-v-us/", - "platform": "FantasySCOTUS", - "description": "62.50% (75 out of 120) of FantasySCOTUS players predict that the lower court's decision will be affirmed. FantasySCOTUS overall predicts an outcome of Affirm 6-3. Historically, FantasySCOTUS has chosen the correct side 50.00% of the time.", - "options": [ - { - "name": "Yes", - "probability": 0.625, - "type": "PROBABILITY" - }, - { - "name": "No", - "probability": 0.375, - "type": "PROBABILITY" - } - ], - "timestamp": "2022-02-11T21:42:19.291Z", - "qualityindicators": { - "numforecasts": 120, - "stars": 2 - } - } -) -*/ + export async function pgInsertIntoDashboard({ datum, schema, tableName }) { if (tableWhiteList.includes(`${schema}.${tableName}`)) { let text = `INSERT INTO ${schema}.${tableName} VALUES($1, $2, $3, $4, $5, $6, $7)`; @@ -502,75 +537,49 @@ pgInsertIntoDashboard({ }); */ export async function pgUpsert({ contents, schema, tableName }) { - if (tableWhiteList.includes(`${schema}.${tableName}`)) { - let init = Date.now(); - if (schema == "latest") { - await runPgCommand({ - command: dropTable(schema, tableName), - pool: readWritePool, - }); - await runPgCommand({ - command: buildMetaforecastTable(schema, tableName), - pool: readWritePool, - }); - await runPgCommand({ - command: createUniqueIndex(schema, tableName), - pool: readWritePool, - }); - } - console.log( - `Upserting ${contents.length} rows into postgres table ${schema}.${tableName}.` - ); - console.log( - `Expected to take ${Number((contents.length * 831.183) / 4422).toFixed( - 2 - )} seconds or ${Number((contents.length * 13.85305) / 4422).toFixed( - 2 - )} minutes` - ); - let i = 0; - for (let datum of contents) { - await pgInsert({ datum, schema, tableName }); - if (i < 10) { - console.log(`Inserted ${datum.id}`); - i++; - } else if (i == 10) { - console.log("..."); - i++; - } - } - console.log( - `Inserted ${ - contents.length - } rows with approximate cummulative size ${roughSizeOfObject( - contents - )} MB into ${schema}.${tableName}.` - ); - let check = await pgRead({ schema, tableName }); - console.log( - `Received ${ - check.length - } rows with approximate cummulative size ${roughSizeOfObject( - check - )} MB from ${schema}.${tableName}.` - ); - console.log("Sample: "); - console.log(JSON.stringify(check.slice(0, 1), null, 4)); - - let end = Date.now(); - let difference = end - init; - console.log( - `Took ${difference / 1000} seconds, or ${ - difference / (1000 * 60) - } minutes.` - ); - - //console.log(JSON.stringify(check.slice(0, 1), null, 4)); - } else { + if (!tableWhiteList.includes(`${schema}.${tableName}`)) { console.log("tableWhiteList:"); console.log(tableWhiteList); throw Error( `Table ${schema}.${tableName} not in whitelist; stopping to avoid tricky sql injections` ); } + + await measureTime(async () => { + const client = await readWritePool.connect(); + try { + await client.query("BEGIN"); + if (schema === "latest") { + client.query(`DELETE FROM latest.${tableName}`); + } + console.log( + `Upserting ${contents.length} rows into postgres table ${schema}.${tableName}.` + ); + console.log( + `Expected to take ${Number((contents.length * 831.183) / 4422).toFixed( + 2 + )} seconds or ${Number((contents.length * 13.85305) / 4422).toFixed( + 2 + )} minutes` + ); + + await pgBulkInsert({ data: contents, schema, tableName, client }); + console.log( + `Inserted ${ + contents.length + } rows with approximate cummulative size ${roughSizeOfObject( + contents + )} MB into ${schema}.${tableName}.` + ); + + console.log("Sample: "); + console.log(JSON.stringify(contents.slice(0, 1), null, 4)); + await client.query("COMMIT"); + } catch (e) { + await client.query("ROLLBACK"); + throw e; + } finally { + client.release(); + } + }); } diff --git a/src/backend/platforms/index.ts b/src/backend/platforms/index.ts index 301b13e..3d28c14 100644 --- a/src/backend/platforms/index.ts +++ b/src/backend/platforms/index.ts @@ -18,13 +18,48 @@ import { xrisk } from "./xrisk"; export interface Forecast { id: string; + // "fantasyscotus-580" + title: string; + // "In Wooden v. U.S., the SCOTUS will affirm the lower court's decision" + url: string; + // "https://fantasyscotus.net/user-predictions/case/wooden-v-us/" + description: string; + // "62.50% (75 out of 120) of FantasySCOTUS players predict that the lower court's decision will be affirmed. FantasySCOTUS overall predicts an outcome of Affirm 6-3. Historically, FantasySCOTUS has chosen the correct side 50.00% of the time." platform: string; + // "FantasySCOTUS" + options: any[]; + /* + [ + { + "name": "Yes", + "probability": 0.625, + "type": "PROBABILITY" + }, + { + "name": "No", + "probability": 0.375, + "type": "PROBABILITY" + } + ] + */ + timestamp: string; + // "2022-02-11T21:42:19.291Z" + + stars?: number; + // 2 + qualityindicators: any; + /* + { + "numforecasts": 120, + "stars": 2 + } + */ extra?: any; } diff --git a/src/backend/utils/algolia.ts b/src/backend/utils/algolia.ts index d881fbe..32a138c 100644 --- a/src/backend/utils/algolia.ts +++ b/src/backend/utils/algolia.ts @@ -6,7 +6,6 @@ import { mergeEverythingInner } from "../flow/mergeEverything"; let cookie = process.env.ALGOLIA_MASTER_API_KEY; const algoliaAppId = process.env.NEXT_PUBLIC_ALGOLIA_APP_ID; const client = algoliasearch(algoliaAppId, cookie); -console.log(`Initializing algolia index for ${algoliaAppId}`); const index = client.initIndex("metaforecast"); export async function rebuildAlgoliaDatabaseTheHardWay() { @@ -54,8 +53,6 @@ export async function rebuildAlgoliaDatabaseTheEasyWay() { })); // this is necessary to filter by missing attributes https://www.algolia.com/doc/guides/managing-results/refine-results/filtering/how-to/filter-by-null-or-missing-attributes/ - console.log(index.appId, index.indexName); - if (index.exists()) { console.log("Index exists"); await index.replaceAllObjects(records, { safe: true }); diff --git a/src/backend/utils/measureTime.ts b/src/backend/utils/measureTime.ts new file mode 100644 index 0000000..a26c0b5 --- /dev/null +++ b/src/backend/utils/measureTime.ts @@ -0,0 +1,9 @@ +export const measureTime = async (f: () => Promise) => { + const init = Date.now(); + await f(); + const end = Date.now(); + const difference = end - init; + console.log( + `Took ${difference / 1000} seconds, or ${difference / (1000 * 60)} minutes.` + ); +};