From 896e48efaa0ffdbda9045e59bab22ca2d7d66a87 Mon Sep 17 00:00:00 2001 From: NunoSempere Date: Sat, 5 Mar 2022 18:21:55 -0500 Subject: [PATCH] feat: Workable history on postgres Still has to be optimized. In particular, I could paste one table into another one, rather than have them pass through node. --- src/database/database-wrapper.js | 8 +- src/database/pg-wrapper.js | 293 +++++++++++++----- src/flow/doEverything.js | 2 +- src/flow/history/{ => old}/addToHistory.js | 0 .../{ => old}/createHistoryForMonth.js | 0 src/flow/history/old/updateHistory.js | 19 ++ src/flow/history/updateHistory.js | 18 +- src/index.js | 11 +- src/manual/manualInitialize.js | 3 + 9 files changed, 253 insertions(+), 101 deletions(-) rename src/flow/history/{ => old}/addToHistory.js (100%) rename src/flow/history/{ => old}/createHistoryForMonth.js (100%) create mode 100644 src/flow/history/old/updateHistory.js create mode 100644 src/manual/manualInitialize.js diff --git a/src/database/database-wrapper.js b/src/database/database-wrapper.js index 087faae..fa12009 100644 --- a/src/database/database-wrapper.js +++ b/src/database/database-wrapper.js @@ -23,6 +23,9 @@ export async function databaseUpsert({ contents, group }) { break; case "history": let currentDate = new Date(); + let dateUpToYear = currentDate. + toISOString() + .slice(0,4) let dateUpToMonth = currentDate .toISOString() .slice(0, 7) @@ -33,8 +36,9 @@ export async function databaseUpsert({ contents, group }) { mongoDocName, "metaforecastHistory", "metaforecastDatabase" - ); - // await pgUpsert({ contents, schema: "history", tableName: "combined" }) + ); + // await pgUpsert({ contents, schema: "history", tableName: `h${dateUpToYear}` }); + await pgUpsert({ contents, schema: "history", tableName: `h${dateUpToMonth}` }); break; default: mongoDocName = `${group}-questions`; diff --git a/src/database/pg-wrapper.js b/src/database/pg-wrapper.js index d5fd036..30a3955 100644 --- a/src/database/pg-wrapper.js +++ b/src/database/pg-wrapper.js @@ -7,12 +7,33 @@ import { hash } from "../utils/hash.js"; // Definitions const schemas = ["latest", "history"]; -const tableNamesWhitelist = ["combined", ...platformNames]; +const year = Number(new Date().toISOString().slice(0, 4)); +const allowed_years = [year, year + 1].map(year => `h${year}`); // tables can't begin with number +const allowed_months = [...Array(12).keys()] + .map((x) => x + 1) + .map(x => String(x).length == 1 ? `0${x}` : x); +const allowed_year_month_histories = [].concat( + ...allowed_years.map((year) => + allowed_months.map((month) => `${year}_${month}`) + ) +); // h2022_01 +const tableNamesWhitelistLatest = [ + "combined", + ...platformNames, +]; +const tableNamesWhiteListHistory = [ + ...allowed_years, + ...allowed_year_month_histories, +] +const tableNamesWhitelist = [ + ...tableNamesWhitelistLatest, + ...tableNamesWhiteListHistory, +]; const createFullName = (schemaName, namesArray) => namesArray.map((name) => `${schemaName}.${name}`); const tableWhiteList = [ - ...createFullName("latest", tableNamesWhitelist), - ...createFullName("history", tableNamesWhitelist), + ...createFullName("latest", tableNamesWhitelistLatest), + ...createFullName("history", tableNamesWhiteListHistory), "latest.dashboards", ]; @@ -30,7 +51,7 @@ const readWritePool = new Pool({ const readOnlyDatabaseURL = "postgresql://public_read_only_user:gOcihnLhqRIQUQYt@postgres-red-do-user-10290909-0.b.db.ondigitalocean.com:25060/metaforecastpg?sslmode=require" || getSecret("digitalocean-postgres-public"); -const readOnlyPool = new Pool({ +const readOnlyPool = new Pool({ // never used connectionString: readOnlyDatabaseURL, ssl: { rejectUnauthorized: false, @@ -57,66 +78,39 @@ const runPgCommand = async ({ command, pool }) => { // Initialize let dropTable = (schema, table) => `DROP TABLE IF EXISTS ${schema}.${table}`; -let buildMetaforecastTable = ( - schema, - table -) => `CREATE TABLE ${schema}.${table} ( - id text, - title text, - url text, - platform text, - description text, - options json, - timestamp timestamp, - stars int, - qualityindicators json, - extra json - );`; - -let buildDashboard = () => - `CREATE TABLE latest.dashboards ( - id text, - title text, - description text, - contents json, - timestamp timestamp, - creator text, - extra json - );`; - let createIndex = (schema, table) => `CREATE INDEX ${schema}_${table}_id_index ON ${schema}.${table} (id);`; let createUniqueIndex = (schema, table) => `CREATE UNIQUE INDEX ${schema}_${table}_id_index ON ${schema}.${table} (id);`; -export async function setPermissionsForPublicUser() { - let initCommands = [ - "REVOKE ALL ON DATABASE metaforecastpg FROM public_read_only_user;", - "GRANT CONNECT ON DATABASE metaforecastpg TO public_read_only_user;", - ]; - for (let command of initCommands) { - await runPgCommand({ command, pool: readWritePool }); +async function pgInitializeScaffolding(){ + async function setPermissionsForPublicUser() { + let initCommands = [ + "REVOKE ALL ON DATABASE metaforecastpg FROM public_read_only_user;", + "GRANT CONNECT ON DATABASE metaforecastpg TO public_read_only_user;", + ]; + for (let command of initCommands) { + await runPgCommand({ command, pool: readWritePool }); + } + + let buildGrantSelectForSchema = (schema) => + `GRANT SELECT ON ALL TABLES IN SCHEMA ${schema} TO public_read_only_user`; + for (let schema of schemas) { + await runPgCommand({ + command: buildGrantSelectForSchema(schema), + pool: readWritePool, + }); + } + + let alterDefaultPrivilegesForSchema = (schema) => + `ALTER DEFAULT PRIVILEGES IN SCHEMA ${schema} GRANT SELECT ON TABLES TO public_read_only_user`; + for (let schema of schemas) { + await runPgCommand({ + command: alterDefaultPrivilegesForSchema(schema), + pool: readWritePool, + }); + } } - - let buildGrantSelectForSchema = (schema) => - `GRANT SELECT ON ALL TABLES IN SCHEMA ${schema} TO public_read_only_user`; - for (let schema of schemas) { - await runPgCommand({ - command: buildGrantSelectForSchema(schema), - pool: readWritePool, - }); - } - - let alterDefaultPrivilegesForSchema = (schema) => - `ALTER DEFAULT PRIVILEGES IN SCHEMA ${schema} GRANT SELECT ON TABLES TO public_read_only_user`; - for (let schema of schemas) { - await runPgCommand({ - command: alterDefaultPrivilegesForSchema(schema), - pool: readWritePool, - }); - } -} -export async function pgInitialize() { let YOLO = false; if (YOLO) { console.log("Create schemas"); @@ -138,35 +132,91 @@ export async function pgInitialize() { console.log("Set public user permissions"); await setPermissionsForPublicUser(); console.log(""); + }else { + console.log( + "pgInitializeScaffolding: This command is dangerous, set YOLO to true in the code to invoke it" + ); + } +} +let buildMetaforecastTable = ( + schema, + table +) => `CREATE TABLE ${schema}.${table} ( + id text, + title text, + url text, + platform text, + description text, + options json, + timestamp timestamp, + stars int, + qualityindicators json, + extra json + );`; + +async function pgInitializeLatest() { + let YOLO = false; + if (YOLO) { console.log("Create tables & their indexes"); - for (let schema of schemas) { - for (let table of tableNamesWhitelist) { + let schema = "latest" + for (let table of tableNamesWhitelistLatest) { + await runPgCommand({ + command: dropTable(schema, table), + pool: readWritePool, + }); + await runPgCommand({ + command: buildMetaforecastTable(schema, table), + pool: readWritePool, + }); + /* + if (schema == "history") { await runPgCommand({ - command: dropTable(schema, table), + command: createIndex(schema, table), pool: readWritePool, }); + } else { + */ await runPgCommand({ - command: buildMetaforecastTable(schema, table), + command: createUniqueIndex(schema, table), pool: readWritePool, }); - if (schema == "history") { - await runPgCommand({ - command: createIndex(schema, table), - pool: readWritePool, - }); - } else { - await runPgCommand({ - command: createUniqueIndex(schema, table), - pool: readWritePool, - }); - } - } + //} } + console.log(""); } else { console.log( - "This command is dangerous, set YOLO to true in the code to invoke it" + "pgInitializeLatest: This command is dangerous, set YOLO to true in the code to invoke it" ); + } +} + +async function pgInitializeDashboards() { + let buildDashboard = () => + `CREATE TABLE latest.dashboards ( + id text, + title text, + description text, + contents json, + timestamp timestamp, + creator text, + extra json + );`; + let YOLO = false; + if (YOLO) { + await runPgCommand({ + command: `CREATE SCHEMA IF NOT EXISTS history;`, + pool: readWritePool, + }); + console.log(""); + + console.log("Set search path"); + await runPgCommand({ + command: `SET search_path TO ${schemas.join(",")},public;`, + pool: readWritePool, + }); + console.log(""); + console.log("Create dashboard table and its index"); await runPgCommand({ @@ -184,9 +234,83 @@ export async function pgInitialize() { pool: readWritePool, }); console.log(""); + } else { + console.log( + "pgInitializeDashboard: This command is dangerous, set YOLO to true in the code to invoke it" + ); } } -// pgInitialize() + +let buildHistoryTable = ( + schema, + table +) => `CREATE TABLE ${schema}.${table} ( + id text, + title text, + url text, + platform text, + description text, + options json, + timestamp timestamp, + stars int, + qualityindicators json, + extra json + );`; +export async function pgInitializeHistories() { + let YOLO = false; + if (YOLO) { + console.log("Drop all previous history tables (Danger!)"); + await runPgCommand({ + command: `DROP SCHEMA history CASCADE;`, + pool: readWritePool, + }); + console.log(""); + + console.log("Create schemas"); + for (let schema of schemas) { + await runPgCommand({ + command: `CREATE SCHEMA IF NOT EXISTS ${schema}`, + pool: readWritePool, + }); + } + console.log(""); + + console.log("Set search path"); + await runPgCommand({ + command: `SET search_path TO ${schemas.join(",")},public;`, + pool: readWritePool, + }); + console.log(""); + + console.log("Create tables & their indexes"); + let schema = "history" + for (let table of tableNamesWhiteListHistory) { + await runPgCommand({ + command: dropTable(schema, table), + pool: readWritePool, + }); + await runPgCommand({ + command: buildHistoryTable(schema, table), + pool: readWritePool, + }); + await runPgCommand({ + command: createIndex(schema, table), // Not unique!! + pool: readWritePool, + }); + } + console.log(""); + } else { + console.log( + "pgInitializeHistories: This command is dangerous, set YOLO to true in the code to invoke it" + ); + } +} + +export async function pgInitialize() { + await pgInitializeLatest(); + await pgInitializeHistories(); + await pgInitializeDashboards(); +} // Read async function pgReadWithPool({ schema, tableName, pool }) { @@ -354,6 +478,7 @@ pgInsertIntoDashboard({ */ export async function pgUpsert({ contents, schema, tableName }) { if (tableWhiteList.includes(`${schema}.${tableName}`)) { + let init = Date.now() if (schema == "latest") { await runPgCommand({ command: dropTable(schema, tableName), @@ -368,7 +493,8 @@ export async function pgUpsert({ contents, schema, tableName }) { pool: readWritePool, }); } - console.log(`Upserting into postgres table ${schema}.${tableName}`); + console.log(`Upserting ${contents.length} rows into postgres table ${schema}.${tableName}.`); + console.log(`Expected to take ${Number(contents.length * 831.183 / 4422).toFixed(2)} seconds or ${Number(contents.length * 13.85305 / 4422).toFixed(2)} minutes`) let i = 0; for (let datum of contents) { await pgInsert({ datum, schema, tableName }); @@ -381,23 +507,30 @@ export async function pgUpsert({ contents, schema, tableName }) { } } console.log( - `Inserted rows with approximate cummulative size ${roughSizeOfObject( + `Inserted ${contents.length} rows with approximate cummulative size ${roughSizeOfObject( contents )} MB into ${schema}.${tableName}.` ); let check = await pgRead({ schema, tableName }); console.log( - `Received rows with approximate cummulative size ${roughSizeOfObject( + `Received ${check.length} rows with approximate cummulative size ${roughSizeOfObject( check )} MB from ${schema}.${tableName}.` ); console.log("Sample: "); console.log(JSON.stringify(check.slice(0, 1), null, 4)); + + let end = Date.now() + let difference = end - init + console.log(`Took ${difference / 1000} seconds, or ${difference / (1000 * 60)} minutes.`) //console.log(JSON.stringify(check.slice(0, 1), null, 4)); } else { + console.log("tableWhiteList:") + console.log(tableWhiteList) throw Error( + `Table ${schema}.${tableName} not in whitelist; stopping to avoid tricky sql injections` ); } -} +} \ No newline at end of file diff --git a/src/flow/doEverything.js b/src/flow/doEverything.js index a4e661d..c940b98 100644 --- a/src/flow/doEverything.js +++ b/src/flow/doEverything.js @@ -26,7 +26,7 @@ export async function tryCatchTryAgain(fun) { } export async function doEverything() { - let functions = [...platformFetchers, mergeEverything, updateHistory, rebuildAlgoliaDatabase, rebuildNetlifySiteWithNewData] + let functions = [...platformFetchers, mergeEverything, rebuildAlgoliaDatabase, updateHistory, rebuildNetlifySiteWithNewData] // Removed Good Judgment from the fetcher, doing it using cron instead because cloudflare blocks the utility on heroku. console.log("") diff --git a/src/flow/history/addToHistory.js b/src/flow/history/old/addToHistory.js similarity index 100% rename from src/flow/history/addToHistory.js rename to src/flow/history/old/addToHistory.js diff --git a/src/flow/history/createHistoryForMonth.js b/src/flow/history/old/createHistoryForMonth.js similarity index 100% rename from src/flow/history/createHistoryForMonth.js rename to src/flow/history/old/createHistoryForMonth.js diff --git a/src/flow/history/old/updateHistory.js b/src/flow/history/old/updateHistory.js new file mode 100644 index 0000000..9760c41 --- /dev/null +++ b/src/flow/history/old/updateHistory.js @@ -0,0 +1,19 @@ +import { addToHistory } from "./addToHistory.js" +import { createHistoryForMonth } from "./createHistoryForMonth.js" + +export async function updateHistoryOld(){ + let currentDate = new Date() + let dayOfMonth = currentDate.getDate() + if(dayOfMonth == 1){ + console.log(`Creating history for the month ${currentDate.toISOString().slice(0,7)}`) + await createHistoryForMonth() + }else{ + console.log(`Updating history for ${currentDate.toISOString()}`) + await addToHistory() + } +} + +export async function updateHistory(){ + let currentDate = new Date() + let year = currentDate.toISOString().slice(0,4) +} \ No newline at end of file diff --git a/src/flow/history/updateHistory.js b/src/flow/history/updateHistory.js index f8e1bae..4ea109f 100644 --- a/src/flow/history/updateHistory.js +++ b/src/flow/history/updateHistory.js @@ -1,14 +1,10 @@ -import { addToHistory } from "./addToHistory.js" -import { createHistoryForMonth } from "./createHistoryForMonth.js" +import {databaseReadWithReadCredentials, databaseUpsert} from "../../database/database-wrapper.js" export async function updateHistory(){ - let currentDate = new Date() - let dayOfMonth = currentDate.getDate() - if(dayOfMonth == 1){ - console.log(`Creating history for the month ${currentDate.toISOString().slice(0,7)}`) - await createHistoryForMonth() - }else{ - console.log(`Updating history for ${currentDate.toISOString()}`) - await addToHistory() - } + let latest = await databaseReadWithReadCredentials({ group: "combined" }) + await databaseUpsert({ + contents: latest, + group: "history" + }) + } \ No newline at end of file diff --git a/src/index.js b/src/index.js index 2a97f84..3226ab4 100644 --- a/src/index.js +++ b/src/index.js @@ -9,7 +9,6 @@ import { rebuildAlgoliaDatabase } from "./utils/algolia.js"; import { rebuildNetlifySiteWithNewData } from "./flow/rebuildNetliftySiteWithNewData.js"; import { pgInitialize, - setPermissionsForPublicUser, } from "./database/pg-wrapper.js"; import { doEverything, tryCatchTryAgain } from "./flow/doEverything.js"; @@ -17,11 +16,10 @@ import { doEverything, tryCatchTryAgain } from "./flow/doEverything.js"; let functions = [ ...platformFetchers, mergeEverything, - updateHistory, rebuildAlgoliaDatabase, + updateHistory, rebuildNetlifySiteWithNewData, doEverything, - setPermissionsForPublicUser, pgInitialize, ]; let functionNames = functions.map((fun) => fun.name); @@ -32,14 +30,13 @@ let generateWhatToDoMessage = () => { (fun, i) => `[${i}]: Download predictions from ${fun.name}` ); let otherMessages = [ - "Merge jsons them into one big json (and push it to mongodb database)", - `Update history`, + "Merge jsons/tables into one big json/table (and push the result to a mongodb/pg database)", `Rebuild algolia database ("index")`, + `Update history`, `Rebuild netlify site with new data`, // `\n[${functionNames.length-1}]: Add to history` + `All of the above`, - `Initialize permissions for postgres public user`, - `Rebuild postgres database`, + `Initialize postgres database`, ]; let otherMessagesWithNums = otherMessages.map( (message, i) => `[${i + l}]: ${message}` diff --git a/src/manual/manualInitialize.js b/src/manual/manualInitialize.js new file mode 100644 index 0000000..1e0b942 --- /dev/null +++ b/src/manual/manualInitialize.js @@ -0,0 +1,3 @@ +import {pgInitialize} from "../database/pg-wrapper.js" + +pgInitialize() \ No newline at end of file