feat: Workable history on postgres

Still has to be optimized. In particular, I could paste
one table into another one, rather than have them
pass through node.
This commit is contained in:
NunoSempere 2022-03-05 18:21:55 -05:00
parent 20b22ab75a
commit 896e48efaa
9 changed files with 253 additions and 101 deletions

View File

@ -23,6 +23,9 @@ export async function databaseUpsert({ contents, group }) {
break; break;
case "history": case "history":
let currentDate = new Date(); let currentDate = new Date();
let dateUpToYear = currentDate.
toISOString()
.slice(0,4)
let dateUpToMonth = currentDate let dateUpToMonth = currentDate
.toISOString() .toISOString()
.slice(0, 7) .slice(0, 7)
@ -33,8 +36,9 @@ export async function databaseUpsert({ contents, group }) {
mongoDocName, mongoDocName,
"metaforecastHistory", "metaforecastHistory",
"metaforecastDatabase" "metaforecastDatabase"
); );
// await pgUpsert({ contents, schema: "history", tableName: "combined" }) // await pgUpsert({ contents, schema: "history", tableName: `h${dateUpToYear}` });
await pgUpsert({ contents, schema: "history", tableName: `h${dateUpToMonth}` });
break; break;
default: default:
mongoDocName = `${group}-questions`; mongoDocName = `${group}-questions`;

View File

@ -7,12 +7,33 @@ import { hash } from "../utils/hash.js";
// Definitions // Definitions
const schemas = ["latest", "history"]; const schemas = ["latest", "history"];
const tableNamesWhitelist = ["combined", ...platformNames]; const year = Number(new Date().toISOString().slice(0, 4));
const allowed_years = [year, year + 1].map(year => `h${year}`); // tables can't begin with number
const allowed_months = [...Array(12).keys()]
.map((x) => x + 1)
.map(x => String(x).length == 1 ? `0${x}` : x);
const allowed_year_month_histories = [].concat(
...allowed_years.map((year) =>
allowed_months.map((month) => `${year}_${month}`)
)
); // h2022_01
const tableNamesWhitelistLatest = [
"combined",
...platformNames,
];
const tableNamesWhiteListHistory = [
...allowed_years,
...allowed_year_month_histories,
]
const tableNamesWhitelist = [
...tableNamesWhitelistLatest,
...tableNamesWhiteListHistory,
];
const createFullName = (schemaName, namesArray) => const createFullName = (schemaName, namesArray) =>
namesArray.map((name) => `${schemaName}.${name}`); namesArray.map((name) => `${schemaName}.${name}`);
const tableWhiteList = [ const tableWhiteList = [
...createFullName("latest", tableNamesWhitelist), ...createFullName("latest", tableNamesWhitelistLatest),
...createFullName("history", tableNamesWhitelist), ...createFullName("history", tableNamesWhiteListHistory),
"latest.dashboards", "latest.dashboards",
]; ];
@ -30,7 +51,7 @@ const readWritePool = new Pool({
const readOnlyDatabaseURL = const readOnlyDatabaseURL =
"postgresql://public_read_only_user:gOcihnLhqRIQUQYt@postgres-red-do-user-10290909-0.b.db.ondigitalocean.com:25060/metaforecastpg?sslmode=require" || "postgresql://public_read_only_user:gOcihnLhqRIQUQYt@postgres-red-do-user-10290909-0.b.db.ondigitalocean.com:25060/metaforecastpg?sslmode=require" ||
getSecret("digitalocean-postgres-public"); getSecret("digitalocean-postgres-public");
const readOnlyPool = new Pool({ const readOnlyPool = new Pool({ // never used
connectionString: readOnlyDatabaseURL, connectionString: readOnlyDatabaseURL,
ssl: { ssl: {
rejectUnauthorized: false, rejectUnauthorized: false,
@ -57,66 +78,39 @@ const runPgCommand = async ({ command, pool }) => {
// Initialize // Initialize
let dropTable = (schema, table) => `DROP TABLE IF EXISTS ${schema}.${table}`; let dropTable = (schema, table) => `DROP TABLE IF EXISTS ${schema}.${table}`;
let buildMetaforecastTable = (
schema,
table
) => `CREATE TABLE ${schema}.${table} (
id text,
title text,
url text,
platform text,
description text,
options json,
timestamp timestamp,
stars int,
qualityindicators json,
extra json
);`;
let buildDashboard = () =>
`CREATE TABLE latest.dashboards (
id text,
title text,
description text,
contents json,
timestamp timestamp,
creator text,
extra json
);`;
let createIndex = (schema, table) => let createIndex = (schema, table) =>
`CREATE INDEX ${schema}_${table}_id_index ON ${schema}.${table} (id);`; `CREATE INDEX ${schema}_${table}_id_index ON ${schema}.${table} (id);`;
let createUniqueIndex = (schema, table) => let createUniqueIndex = (schema, table) =>
`CREATE UNIQUE INDEX ${schema}_${table}_id_index ON ${schema}.${table} (id);`; `CREATE UNIQUE INDEX ${schema}_${table}_id_index ON ${schema}.${table} (id);`;
export async function setPermissionsForPublicUser() { async function pgInitializeScaffolding(){
let initCommands = [ async function setPermissionsForPublicUser() {
"REVOKE ALL ON DATABASE metaforecastpg FROM public_read_only_user;", let initCommands = [
"GRANT CONNECT ON DATABASE metaforecastpg TO public_read_only_user;", "REVOKE ALL ON DATABASE metaforecastpg FROM public_read_only_user;",
]; "GRANT CONNECT ON DATABASE metaforecastpg TO public_read_only_user;",
for (let command of initCommands) { ];
await runPgCommand({ command, pool: readWritePool }); for (let command of initCommands) {
await runPgCommand({ command, pool: readWritePool });
}
let buildGrantSelectForSchema = (schema) =>
`GRANT SELECT ON ALL TABLES IN SCHEMA ${schema} TO public_read_only_user`;
for (let schema of schemas) {
await runPgCommand({
command: buildGrantSelectForSchema(schema),
pool: readWritePool,
});
}
let alterDefaultPrivilegesForSchema = (schema) =>
`ALTER DEFAULT PRIVILEGES IN SCHEMA ${schema} GRANT SELECT ON TABLES TO public_read_only_user`;
for (let schema of schemas) {
await runPgCommand({
command: alterDefaultPrivilegesForSchema(schema),
pool: readWritePool,
});
}
} }
let buildGrantSelectForSchema = (schema) =>
`GRANT SELECT ON ALL TABLES IN SCHEMA ${schema} TO public_read_only_user`;
for (let schema of schemas) {
await runPgCommand({
command: buildGrantSelectForSchema(schema),
pool: readWritePool,
});
}
let alterDefaultPrivilegesForSchema = (schema) =>
`ALTER DEFAULT PRIVILEGES IN SCHEMA ${schema} GRANT SELECT ON TABLES TO public_read_only_user`;
for (let schema of schemas) {
await runPgCommand({
command: alterDefaultPrivilegesForSchema(schema),
pool: readWritePool,
});
}
}
export async function pgInitialize() {
let YOLO = false; let YOLO = false;
if (YOLO) { if (YOLO) {
console.log("Create schemas"); console.log("Create schemas");
@ -138,35 +132,91 @@ export async function pgInitialize() {
console.log("Set public user permissions"); console.log("Set public user permissions");
await setPermissionsForPublicUser(); await setPermissionsForPublicUser();
console.log(""); console.log("");
}else {
console.log(
"pgInitializeScaffolding: This command is dangerous, set YOLO to true in the code to invoke it"
);
}
}
let buildMetaforecastTable = (
schema,
table
) => `CREATE TABLE ${schema}.${table} (
id text,
title text,
url text,
platform text,
description text,
options json,
timestamp timestamp,
stars int,
qualityindicators json,
extra json
);`;
async function pgInitializeLatest() {
let YOLO = false;
if (YOLO) {
console.log("Create tables & their indexes"); console.log("Create tables & their indexes");
for (let schema of schemas) { let schema = "latest"
for (let table of tableNamesWhitelist) { for (let table of tableNamesWhitelistLatest) {
await runPgCommand({
command: dropTable(schema, table),
pool: readWritePool,
});
await runPgCommand({
command: buildMetaforecastTable(schema, table),
pool: readWritePool,
});
/*
if (schema == "history") {
await runPgCommand({ await runPgCommand({
command: dropTable(schema, table), command: createIndex(schema, table),
pool: readWritePool, pool: readWritePool,
}); });
} else {
*/
await runPgCommand({ await runPgCommand({
command: buildMetaforecastTable(schema, table), command: createUniqueIndex(schema, table),
pool: readWritePool, pool: readWritePool,
}); });
if (schema == "history") { //}
await runPgCommand({
command: createIndex(schema, table),
pool: readWritePool,
});
} else {
await runPgCommand({
command: createUniqueIndex(schema, table),
pool: readWritePool,
});
}
}
} }
console.log("");
} else { } else {
console.log( console.log(
"This command is dangerous, set YOLO to true in the code to invoke it" "pgInitializeLatest: This command is dangerous, set YOLO to true in the code to invoke it"
); );
}
}
async function pgInitializeDashboards() {
let buildDashboard = () =>
`CREATE TABLE latest.dashboards (
id text,
title text,
description text,
contents json,
timestamp timestamp,
creator text,
extra json
);`;
let YOLO = false;
if (YOLO) {
await runPgCommand({
command: `CREATE SCHEMA IF NOT EXISTS history;`,
pool: readWritePool,
});
console.log("");
console.log("Set search path");
await runPgCommand({
command: `SET search_path TO ${schemas.join(",")},public;`,
pool: readWritePool,
});
console.log("");
console.log("Create dashboard table and its index"); console.log("Create dashboard table and its index");
await runPgCommand({ await runPgCommand({
@ -184,9 +234,83 @@ export async function pgInitialize() {
pool: readWritePool, pool: readWritePool,
}); });
console.log(""); console.log("");
} else {
console.log(
"pgInitializeDashboard: This command is dangerous, set YOLO to true in the code to invoke it"
);
} }
} }
// pgInitialize()
let buildHistoryTable = (
schema,
table
) => `CREATE TABLE ${schema}.${table} (
id text,
title text,
url text,
platform text,
description text,
options json,
timestamp timestamp,
stars int,
qualityindicators json,
extra json
);`;
export async function pgInitializeHistories() {
let YOLO = false;
if (YOLO) {
console.log("Drop all previous history tables (Danger!)");
await runPgCommand({
command: `DROP SCHEMA history CASCADE;`,
pool: readWritePool,
});
console.log("");
console.log("Create schemas");
for (let schema of schemas) {
await runPgCommand({
command: `CREATE SCHEMA IF NOT EXISTS ${schema}`,
pool: readWritePool,
});
}
console.log("");
console.log("Set search path");
await runPgCommand({
command: `SET search_path TO ${schemas.join(",")},public;`,
pool: readWritePool,
});
console.log("");
console.log("Create tables & their indexes");
let schema = "history"
for (let table of tableNamesWhiteListHistory) {
await runPgCommand({
command: dropTable(schema, table),
pool: readWritePool,
});
await runPgCommand({
command: buildHistoryTable(schema, table),
pool: readWritePool,
});
await runPgCommand({
command: createIndex(schema, table), // Not unique!!
pool: readWritePool,
});
}
console.log("");
} else {
console.log(
"pgInitializeHistories: This command is dangerous, set YOLO to true in the code to invoke it"
);
}
}
export async function pgInitialize() {
await pgInitializeLatest();
await pgInitializeHistories();
await pgInitializeDashboards();
}
// Read // Read
async function pgReadWithPool({ schema, tableName, pool }) { async function pgReadWithPool({ schema, tableName, pool }) {
@ -354,6 +478,7 @@ pgInsertIntoDashboard({
*/ */
export async function pgUpsert({ contents, schema, tableName }) { export async function pgUpsert({ contents, schema, tableName }) {
if (tableWhiteList.includes(`${schema}.${tableName}`)) { if (tableWhiteList.includes(`${schema}.${tableName}`)) {
let init = Date.now()
if (schema == "latest") { if (schema == "latest") {
await runPgCommand({ await runPgCommand({
command: dropTable(schema, tableName), command: dropTable(schema, tableName),
@ -368,7 +493,8 @@ export async function pgUpsert({ contents, schema, tableName }) {
pool: readWritePool, pool: readWritePool,
}); });
} }
console.log(`Upserting into postgres table ${schema}.${tableName}`); console.log(`Upserting ${contents.length} rows into postgres table ${schema}.${tableName}.`);
console.log(`Expected to take ${Number(contents.length * 831.183 / 4422).toFixed(2)} seconds or ${Number(contents.length * 13.85305 / 4422).toFixed(2)} minutes`)
let i = 0; let i = 0;
for (let datum of contents) { for (let datum of contents) {
await pgInsert({ datum, schema, tableName }); await pgInsert({ datum, schema, tableName });
@ -381,23 +507,30 @@ export async function pgUpsert({ contents, schema, tableName }) {
} }
} }
console.log( console.log(
`Inserted rows with approximate cummulative size ${roughSizeOfObject( `Inserted ${contents.length} rows with approximate cummulative size ${roughSizeOfObject(
contents contents
)} MB into ${schema}.${tableName}.` )} MB into ${schema}.${tableName}.`
); );
let check = await pgRead({ schema, tableName }); let check = await pgRead({ schema, tableName });
console.log( console.log(
`Received rows with approximate cummulative size ${roughSizeOfObject( `Received ${check.length} rows with approximate cummulative size ${roughSizeOfObject(
check check
)} MB from ${schema}.${tableName}.` )} MB from ${schema}.${tableName}.`
); );
console.log("Sample: "); console.log("Sample: ");
console.log(JSON.stringify(check.slice(0, 1), null, 4)); console.log(JSON.stringify(check.slice(0, 1), null, 4));
let end = Date.now()
let difference = end - init
console.log(`Took ${difference / 1000} seconds, or ${difference / (1000 * 60)} minutes.`)
//console.log(JSON.stringify(check.slice(0, 1), null, 4)); //console.log(JSON.stringify(check.slice(0, 1), null, 4));
} else { } else {
console.log("tableWhiteList:")
console.log(tableWhiteList)
throw Error( throw Error(
`Table ${schema}.${tableName} not in whitelist; stopping to avoid tricky sql injections` `Table ${schema}.${tableName} not in whitelist; stopping to avoid tricky sql injections`
); );
} }
} }

View File

@ -26,7 +26,7 @@ export async function tryCatchTryAgain(fun) {
} }
export async function doEverything() { export async function doEverything() {
let functions = [...platformFetchers, mergeEverything, updateHistory, rebuildAlgoliaDatabase, rebuildNetlifySiteWithNewData] let functions = [...platformFetchers, mergeEverything, rebuildAlgoliaDatabase, updateHistory, rebuildNetlifySiteWithNewData]
// Removed Good Judgment from the fetcher, doing it using cron instead because cloudflare blocks the utility on heroku. // Removed Good Judgment from the fetcher, doing it using cron instead because cloudflare blocks the utility on heroku.
console.log("") console.log("")

View File

@ -0,0 +1,19 @@
import { addToHistory } from "./addToHistory.js"
import { createHistoryForMonth } from "./createHistoryForMonth.js"
export async function updateHistoryOld(){
let currentDate = new Date()
let dayOfMonth = currentDate.getDate()
if(dayOfMonth == 1){
console.log(`Creating history for the month ${currentDate.toISOString().slice(0,7)}`)
await createHistoryForMonth()
}else{
console.log(`Updating history for ${currentDate.toISOString()}`)
await addToHistory()
}
}
export async function updateHistory(){
let currentDate = new Date()
let year = currentDate.toISOString().slice(0,4)
}

View File

@ -1,14 +1,10 @@
import { addToHistory } from "./addToHistory.js" import {databaseReadWithReadCredentials, databaseUpsert} from "../../database/database-wrapper.js"
import { createHistoryForMonth } from "./createHistoryForMonth.js"
export async function updateHistory(){ export async function updateHistory(){
let currentDate = new Date() let latest = await databaseReadWithReadCredentials({ group: "combined" })
let dayOfMonth = currentDate.getDate() await databaseUpsert({
if(dayOfMonth == 1){ contents: latest,
console.log(`Creating history for the month ${currentDate.toISOString().slice(0,7)}`) group: "history"
await createHistoryForMonth() })
}else{
console.log(`Updating history for ${currentDate.toISOString()}`)
await addToHistory()
}
} }

View File

@ -9,7 +9,6 @@ import { rebuildAlgoliaDatabase } from "./utils/algolia.js";
import { rebuildNetlifySiteWithNewData } from "./flow/rebuildNetliftySiteWithNewData.js"; import { rebuildNetlifySiteWithNewData } from "./flow/rebuildNetliftySiteWithNewData.js";
import { import {
pgInitialize, pgInitialize,
setPermissionsForPublicUser,
} from "./database/pg-wrapper.js"; } from "./database/pg-wrapper.js";
import { doEverything, tryCatchTryAgain } from "./flow/doEverything.js"; import { doEverything, tryCatchTryAgain } from "./flow/doEverything.js";
@ -17,11 +16,10 @@ import { doEverything, tryCatchTryAgain } from "./flow/doEverything.js";
let functions = [ let functions = [
...platformFetchers, ...platformFetchers,
mergeEverything, mergeEverything,
updateHistory,
rebuildAlgoliaDatabase, rebuildAlgoliaDatabase,
updateHistory,
rebuildNetlifySiteWithNewData, rebuildNetlifySiteWithNewData,
doEverything, doEverything,
setPermissionsForPublicUser,
pgInitialize, pgInitialize,
]; ];
let functionNames = functions.map((fun) => fun.name); let functionNames = functions.map((fun) => fun.name);
@ -32,14 +30,13 @@ let generateWhatToDoMessage = () => {
(fun, i) => `[${i}]: Download predictions from ${fun.name}` (fun, i) => `[${i}]: Download predictions from ${fun.name}`
); );
let otherMessages = [ let otherMessages = [
"Merge jsons them into one big json (and push it to mongodb database)", "Merge jsons/tables into one big json/table (and push the result to a mongodb/pg database)",
`Update history`,
`Rebuild algolia database ("index")`, `Rebuild algolia database ("index")`,
`Update history`,
`Rebuild netlify site with new data`, `Rebuild netlify site with new data`,
// `\n[${functionNames.length-1}]: Add to history` + // `\n[${functionNames.length-1}]: Add to history` +
`All of the above`, `All of the above`,
`Initialize permissions for postgres public user`, `Initialize postgres database`,
`Rebuild postgres database`,
]; ];
let otherMessagesWithNums = otherMessages.map( let otherMessagesWithNums = otherMessages.map(
(message, i) => `[${i + l}]: ${message}` (message, i) => `[${i + l}]: ${message}`

View File

@ -0,0 +1,3 @@
import {pgInitialize} from "../database/pg-wrapper.js"
pgInitialize()