WIP: no schemas

This commit is contained in:
Vyacheslav Matyukhin 2022-03-30 13:53:22 +03:00
parent 111aa8c9af
commit 4eeab9c861
No known key found for this signature in database
GPG Key ID: 3D2A774C5489F96C
8 changed files with 74 additions and 181 deletions

View File

@ -1,43 +1,18 @@
import { pgRead, pgReadWithReadCredentials, pgUpsert } from "./pg-wrapper"; import { pgRead, pgReadWithReadCredentials, pgUpsert } from "./pg-wrapper";
const dateUpToYear = () => new Date().toISOString().slice(0, 4);
const dateUpToMonth = () =>
new Date().toISOString().slice(0, 7).replace("-", "_");
export async function databaseUpsert({ contents, group }) { export async function databaseUpsert({ contents, group }) {
// No, this should be more rational, ({contents, group, schema})? Or should this be managed by this layer? Unclear. // No, this should be more rational, ({contents, group, schema})? Or should this be managed by this layer? Unclear.
// (contents, documentName, collectionName = "metaforecastCollection", databaseName = "metaforecastDatabase"){ // (contents, documentName, collectionName = "metaforecastCollection", databaseName = "metaforecastDatabase"){
switch (group) { const tableName = group === "history" ? "h2022" : group;
case "combined": await pgUpsert({ contents, tableName });
await pgUpsert({ contents, schema: "latest", tableName: "combined" });
break;
case "history":
await pgUpsert({
contents,
schema: "history",
tableName: `h${dateUpToYear()}`,
});
await pgUpsert({
contents,
schema: "history",
tableName: `h${dateUpToMonth()}`,
});
break;
default:
await pgUpsert({ contents, schema: "latest", tableName: group });
}
} }
const readWithReader = async ( const readWithReader = async (
group: string, group: string,
reader: (opts: { schema: string; tableName: string }) => Promise<any> reader: (opts: { tableName: string }) => Promise<any>
) => { ) => {
const schema = group === "history" ? "history" : "latest"; const tableName = group === "history" ? "h2022" : group;
const tableName = group === "history" ? `h${dateUpToMonth()}` : group; const response = await reader({ tableName });
const response = await reader({
schema,
tableName,
});
console.log("Postgres: "); console.log("Postgres: ");
console.log(response.slice(0, 2)); console.log(response.slice(0, 2));

View File

@ -6,7 +6,6 @@ import { measureTime } from "../utils/measureTime";
import { roughSizeOfObject } from "../utils/roughSize"; import { roughSizeOfObject } from "../utils/roughSize";
// Definitions // Definitions
const schemas = ["latest", "history"];
const year = Number(new Date().toISOString().slice(0, 4)); const year = Number(new Date().toISOString().slice(0, 4));
const allowed_years = [year, year + 1].map((year) => `h${year}`); // tables can't begin with number const allowed_years = [year, year + 1].map((year) => `h${year}`); // tables can't begin with number
const allowed_months = [...Array(12).keys()] const allowed_months = [...Array(12).keys()]
@ -25,12 +24,10 @@ const tableNamesWhiteListHistory = [
...allowed_years, ...allowed_years,
...allowed_year_month_histories, ...allowed_year_month_histories,
]; ];
const createFullName = (schemaName, namesArray) =>
namesArray.map((name) => `${schemaName}.${name}`);
const tableWhiteList = [ const tableWhiteList = [
...createFullName("latest", tableNamesWhitelistLatest), ...tableNamesWhitelistLatest,
...createFullName("history", tableNamesWhiteListHistory), ...tableNamesWhiteListHistory,
"latest.dashboards", "dashboards",
]; ];
/* Postgres database connection code */ /* Postgres database connection code */
@ -80,12 +77,11 @@ export const runPgCommand = async ({
}; };
// Initialize // Initialize
let dropTable = (schema: string, table: string) => let dropTable = (table: string) => `DROP TABLE IF EXISTS ${table}`;
`DROP TABLE IF EXISTS ${schema}.${table}`; let createIndex = (table: string) =>
let createIndex = (schema: string, table: string) => `CREATE INDEX ${table}_id_index ON ${table} (id);`;
`CREATE INDEX ${schema}_${table}_id_index ON ${schema}.${table} (id);`; let createUniqueIndex = (table: string) =>
let createUniqueIndex = (schema: string, table: string) => `CREATE UNIQUE INDEX ${table}_id_index ON ${table} (id);`;
`CREATE UNIQUE INDEX ${schema}_${table}_id_index ON ${schema}.${table} (id);`;
async function pgInitializeScaffolding() { async function pgInitializeScaffolding() {
async function setPermissionsForPublicUser() { async function setPermissionsForPublicUser() {
@ -97,38 +93,30 @@ async function pgInitializeScaffolding() {
await runPgCommand({ command, pool: readWritePool }); await runPgCommand({ command, pool: readWritePool });
} }
let buildGrantSelectForSchema = (schema: string) =>
`GRANT SELECT ON ALL TABLES IN SCHEMA ${schema} TO public_read_only_user`;
for (let schema of schemas) {
await runPgCommand({ await runPgCommand({
command: buildGrantSelectForSchema(schema), command:
"GRANT SELECT ON ALL TABLES IN SCHEMA public TO public_read_only_user",
pool: readWritePool, pool: readWritePool,
}); });
}
let alterDefaultPrivilegesForSchema = (schema: string) =>
`ALTER DEFAULT PRIVILEGES IN SCHEMA ${schema} GRANT SELECT ON TABLES TO public_read_only_user`;
for (let schema of schemas) {
await runPgCommand({ await runPgCommand({
command: alterDefaultPrivilegesForSchema(schema), command:
"ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO public_read_only_user",
pool: readWritePool, pool: readWritePool,
}); });
} }
}
let YOLO = false; let YOLO = false;
if (YOLO) { if (YOLO) {
console.log("Create schemas"); console.log("Create schemas");
for (let schema of schemas) {
await runPgCommand({ await runPgCommand({
command: `CREATE SCHEMA IF NOT EXISTS ${schema}`, command: `CREATE SCHEMA IF NOT EXISTS public`,
pool: readWritePool, pool: readWritePool,
}); });
}
console.log(""); console.log("");
console.log("Set search path"); console.log("Set search path");
await runPgCommand({ await runPgCommand({
command: `SET search_path TO ${schemas.join(",")},public;`, command: `SET search_path TO public;`,
pool: readWritePool, pool: readWritePool,
}); });
console.log(""); console.log("");
@ -143,10 +131,7 @@ async function pgInitializeScaffolding() {
} }
} }
let buildMetaforecastTable = ( let buildMetaforecastTable = (table: string) => `CREATE TABLE ${table} (
schema: string,
table: string
) => `CREATE TABLE ${schema}.${table} (
id text, id text,
title text, title text,
url text, url text,
@ -166,23 +151,15 @@ async function pgInitializeLatest() {
let schema = "latest"; let schema = "latest";
for (let table of tableNamesWhitelistLatest) { for (let table of tableNamesWhitelistLatest) {
await runPgCommand({ await runPgCommand({
command: dropTable(schema, table), command: dropTable(schema),
pool: readWritePool, pool: readWritePool,
}); });
await runPgCommand({ await runPgCommand({
command: buildMetaforecastTable(schema, table), command: buildMetaforecastTable(schema),
pool: readWritePool, pool: readWritePool,
}); });
/*
if (schema == "history") {
await runPgCommand({ await runPgCommand({
command: createIndex(schema, table), command: createUniqueIndex(schema),
pool: readWritePool,
});
} else {
*/
await runPgCommand({
command: createUniqueIndex(schema, table),
pool: readWritePool, pool: readWritePool,
}); });
//} //}
@ -197,7 +174,7 @@ async function pgInitializeLatest() {
async function pgInitializeDashboards() { async function pgInitializeDashboards() {
let buildDashboard = () => let buildDashboard = () =>
`CREATE TABLE latest.dashboards ( `CREATE TABLE dashboards (
id text, id text,
title text, title text,
description text, description text,
@ -208,23 +185,10 @@ async function pgInitializeDashboards() {
);`; );`;
let YOLO = false; let YOLO = false;
if (YOLO) { if (YOLO) {
await runPgCommand({
command: `CREATE SCHEMA IF NOT EXISTS history;`,
pool: readWritePool,
});
console.log("");
console.log("Set search path");
await runPgCommand({
command: `SET search_path TO ${schemas.join(",")},public;`,
pool: readWritePool,
});
console.log("");
console.log("Create dashboard table and its index"); console.log("Create dashboard table and its index");
await runPgCommand({ await runPgCommand({
command: dropTable("latest", "dashboards"), command: dropTable("dashboards"),
pool: readWritePool, pool: readWritePool,
}); });
@ -234,7 +198,7 @@ async function pgInitializeDashboards() {
}); });
await runPgCommand({ await runPgCommand({
command: createUniqueIndex("latest", "dashboards"), command: createUniqueIndex("dashboards"),
pool: readWritePool, pool: readWritePool,
}); });
console.log(""); console.log("");
@ -245,10 +209,7 @@ async function pgInitializeDashboards() {
} }
} }
let buildHistoryTable = ( let buildHistoryTable = (table: string) => `CREATE TABLE ${table} (
schema: string,
table: string
) => `CREATE TABLE ${schema}.${table} (
id text, id text,
title text, title text,
url text, url text,
@ -264,41 +225,21 @@ export async function pgInitializeHistories() {
let YOLO = false; let YOLO = false;
if (YOLO) { if (YOLO) {
console.log("Drop all previous history tables (Danger!)"); console.log("Drop all previous history tables (Danger!)");
await runPgCommand({ console.log("TODO - drop history tables"); // hope we won't need it until we get proper migrations
command: `DROP SCHEMA history CASCADE;`,
pool: readWritePool,
});
console.log("");
console.log("Create schemas");
for (let schema of schemas) {
await runPgCommand({
command: `CREATE SCHEMA IF NOT EXISTS ${schema}`,
pool: readWritePool,
});
}
console.log("");
console.log("Set search path");
await runPgCommand({
command: `SET search_path TO ${schemas.join(",")},public;`,
pool: readWritePool,
});
console.log(""); console.log("");
console.log("Create tables & their indexes"); console.log("Create tables & their indexes");
let schema = "history";
for (let table of tableNamesWhiteListHistory) { for (let table of tableNamesWhiteListHistory) {
await runPgCommand({ await runPgCommand({
command: dropTable(schema, table), command: dropTable(table),
pool: readWritePool, pool: readWritePool,
}); });
await runPgCommand({ await runPgCommand({
command: buildHistoryTable(schema, table), command: buildHistoryTable(table),
pool: readWritePool, pool: readWritePool,
}); });
await runPgCommand({ await runPgCommand({
command: createIndex(schema, table), // Not unique!! command: createIndex(table), // Not unique!!
pool: readWritePool, pool: readWritePool,
}); });
} }
@ -314,11 +255,11 @@ async function pgInitializeFrontpage() {
let YOLO = false; let YOLO = false;
if (YOLO) { if (YOLO) {
await runPgCommand({ await runPgCommand({
command: dropTable("latest", "frontpage"), command: dropTable("frontpage"),
pool: readWritePool, pool: readWritePool,
}); });
await runPgCommand({ await runPgCommand({
command: `CREATE TABLE latest.frontpage ( command: `CREATE TABLE frontpage (
id serial primary key, id serial primary key,
frontpage_full jsonb, frontpage_full jsonb,
frontpage_sliced jsonb frontpage_sliced jsonb
@ -342,64 +283,51 @@ export async function pgInitialize() {
// Read // Read
async function pgReadWithPool({ async function pgReadWithPool({
schema,
tableName, tableName,
pool, pool,
}: { }: {
schema: string;
tableName: string; tableName: string;
pool: Pool; pool: Pool;
}) { }) {
if (tableWhiteList.includes(`${schema}.${tableName}`)) { if (tableWhiteList.includes(tableName)) {
let command = `SELECT * from ${schema}.${tableName}`; let command = `SELECT * from ${tableName}`;
let response = await runPgCommand({ command, pool }); let response = await runPgCommand({ command, pool });
let results = response.results; let results = response.results;
return results; return results;
} else { } else {
throw Error( throw Error(
`Table ${schema}.${tableName} not in whitelist; stopping to avoid tricky sql injections` `Table ${tableName} not in whitelist; stopping to avoid tricky sql injections`
); );
} }
} }
export async function pgRead({ export async function pgRead({ tableName }: { tableName: string }) {
schema, return await pgReadWithPool({ tableName, pool: readWritePool });
tableName,
}: {
schema: string;
tableName: string;
}) {
return await pgReadWithPool({ schema, tableName, pool: readWritePool });
} }
export async function pgReadWithReadCredentials({ export async function pgReadWithReadCredentials({
schema,
tableName, tableName,
}: { }: {
schema: string;
tableName: string; tableName: string;
}) { }) {
// currently does not work. // currently does not work.
/* return await pgReadWithPool({ /* return await pgReadWithPool({
schema,
tableName, tableName,
pool: readOnlyPool, pool: readOnlyPool,
}); });
*/ */
return await pgReadWithPool({ schema, tableName, pool: readWritePool }); return await pgReadWithPool({ tableName, pool: readWritePool });
} }
export async function pgGetByIds({ export async function pgGetByIds({
ids, ids,
schema,
table, table,
}: { }: {
ids: string[]; ids: string[];
schema: string;
table: string; table: string;
}) { }) {
let idstring = `( ${ids.map((id: string) => `'${id}'`).join(", ")} )`; // (1, 2, 3) let idstring = `( ${ids.map((id: string) => `'${id}'`).join(", ")} )`; // (1, 2, 3)
let command = `SELECT * from ${schema}.${table} where id in ${idstring}`; let command = `SELECT * from ${table} where id in ${idstring}`;
// see: https://stackoverflow.com/questions/5803472/sql-where-id-in-id1-id2-idn // see: https://stackoverflow.com/questions/5803472/sql-where-id-in-id1-id2-idn
let response = await runPgCommand({ command, pool: readWritePool }); let response = await runPgCommand({ command, pool: readWritePool });
let results = response.results; let results = response.results;
@ -409,23 +337,21 @@ export async function pgGetByIds({
export async function pgBulkInsert({ export async function pgBulkInsert({
data, data,
schema,
tableName, tableName,
client, client,
}: { }: {
data: Forecast[]; data: Forecast[];
schema: string;
tableName: string; tableName: string;
client: PoolClient; client: PoolClient;
}) { }) {
if (!tableWhiteList.includes(`${schema}.${tableName}`)) { if (!tableWhiteList.includes(tableName)) {
throw Error( throw Error(
`Table ${schema}.${tableName} not in whitelist; stopping to avoid tricky sql injections` `Table ${tableName} not in whitelist; stopping to avoid tricky sql injections`
); );
} }
const generateQuery = (rows: number) => { const generateQuery = (rows: number) => {
let text = `INSERT INTO ${schema}.${tableName} VALUES`; let text = `INSERT INTO ${tableName} VALUES`;
const cols = 10; const cols = 10;
const parts: string[] = []; const parts: string[] = [];
for (let r = 0; r < rows; r++) { for (let r = 0; r < rows; r++) {
@ -478,9 +404,9 @@ export async function pgBulkInsert({
} }
} }
export async function pgInsertIntoDashboard({ datum, schema, tableName }) { export async function pgInsertIntoDashboard({ datum, tableName }) {
if (tableWhiteList.includes(`${schema}.${tableName}`)) { if (tableWhiteList.includes(tableName)) {
let text = `INSERT INTO ${schema}.${tableName} VALUES($1, $2, $3, $4, $5, $6, $7)`; let text = `INSERT INTO ${tableName} VALUES($1, $2, $3, $4, $5, $6, $7)`;
let timestamp = datum.timestamp || new Date().toISOString(); let timestamp = datum.timestamp || new Date().toISOString();
timestamp = timestamp.slice(0, 19).replace("T", " "); timestamp = timestamp.slice(0, 19).replace("T", " ");
let values = [ let values = [
@ -505,7 +431,7 @@ export async function pgInsertIntoDashboard({ datum, schema, tableName }) {
return result; return result;
} else { } else {
throw Error( throw Error(
`Table ${schema}.${tableName} not in whitelist; stopping to avoid tricky sql injections` `Table ${tableName} not in whitelist; stopping to avoid tricky sql injections`
); );
} }
} }
@ -532,16 +458,15 @@ pgInsertIntoDashboard({
], ],
creator: "Nuño Sempere", creator: "Nuño Sempere",
}, },
schema: "latest",
tableName: "dashboards", tableName: "dashboards",
}); });
*/ */
export async function pgUpsert({ contents, schema, tableName }) { export async function pgUpsert({ contents, tableName }) {
if (!tableWhiteList.includes(`${schema}.${tableName}`)) { if (!tableWhiteList.includes(tableName)) {
console.log("tableWhiteList:"); console.log("tableWhiteList:");
console.log(tableWhiteList); console.log(tableWhiteList);
throw Error( throw Error(
`Table ${schema}.${tableName} not in whitelist; stopping to avoid tricky sql injections` `Table ${tableName} not in whitelist; stopping to avoid tricky sql injections`
); );
} }
@ -553,7 +478,7 @@ export async function pgUpsert({ contents, schema, tableName }) {
client.query(`DELETE FROM latest.${tableName}`); client.query(`DELETE FROM latest.${tableName}`);
} }
console.log( console.log(
`Upserting ${contents.length} rows into postgres table ${schema}.${tableName}.` `Upserting ${contents.length} rows into postgres table ${tableName}.`
); );
console.log( console.log(
`Expected to take ${Number((contents.length * 831.183) / 4422).toFixed( `Expected to take ${Number((contents.length * 831.183) / 4422).toFixed(
@ -563,13 +488,13 @@ export async function pgUpsert({ contents, schema, tableName }) {
)} minutes` )} minutes`
); );
await pgBulkInsert({ data: contents, schema, tableName, client }); await pgBulkInsert({ data: contents, tableName, client });
console.log( console.log(
`Inserted ${ `Inserted ${
contents.length contents.length
} rows with approximate cummulative size ${roughSizeOfObject( } rows with approximate cummulative size ${roughSizeOfObject(
contents contents
)} MB into ${schema}.${tableName}.` )} MB into ${tableName}.`
); );
console.log("Sample: "); console.log("Sample: ");

View File

@ -1,12 +1,9 @@
import { import { pgReadWithReadCredentials, pgUpsert } from "../../database/pg-wrapper";
databaseReadWithReadCredentials,
databaseUpsert,
} from "../../database/database-wrapper";
export async function updateHistory() { export async function updateHistory() {
let latest = await databaseReadWithReadCredentials({ group: "combined" }); let latest = await pgReadWithReadCredentials({ tableName: "combined" });
await databaseUpsert({ await pgUpsert({
contents: latest, contents: latest,
group: "history", tableName: "h2022",
}); });
} }

View File

@ -1,4 +1,4 @@
import { databaseRead, databaseUpsert } from "../database/database-wrapper"; import { pgRead, pgUpsert } from "../database/pg-wrapper";
import { platforms } from "../platforms"; import { platforms } from "../platforms";
/* Merge everything */ /* Merge everything */
@ -7,7 +7,7 @@ export async function mergeEverythingInner() {
let merged = []; let merged = [];
for (let platform of platforms) { for (let platform of platforms) {
const platformName = platform.name; const platformName = platform.name;
let json = await databaseRead({ group: platformName }); let json = await pgRead({ tableName: platformName });
console.log(`${platformName} has ${json.length} questions\n`); console.log(`${platformName} has ${json.length} questions\n`);
merged = merged.concat(json); merged = merged.concat(json);
} }
@ -23,6 +23,6 @@ export async function mergeEverythingInner() {
export async function mergeEverything() { export async function mergeEverything() {
let merged = await mergeEverythingInner(); let merged = await mergeEverythingInner();
await databaseUpsert({ contents: merged, group: "combined" }); await pgUpsert({ contents: merged, tableName: "combined" });
console.log("Done"); console.log("Done");
} }

View File

@ -3,7 +3,7 @@ import { pgRead, readWritePool } from "./database/pg-wrapper";
export async function getFrontpageRaw() { export async function getFrontpageRaw() {
const client = await readWritePool.connect(); const client = await readWritePool.connect();
const res = await client.query( const res = await client.query(
"SELECT frontpage_sliced FROM latest.frontpage ORDER BY id DESC LIMIT 1" "SELECT frontpage_sliced FROM frontpage ORDER BY id DESC LIMIT 1"
); );
if (!res.rows.length) return []; if (!res.rows.length) return [];
console.log(res.rows[0].frontpage_sliced); console.log(res.rows[0].frontpage_sliced);
@ -13,7 +13,7 @@ export async function getFrontpageRaw() {
export async function getFrontpageFullRaw() { export async function getFrontpageFullRaw() {
const client = await readWritePool.connect(); const client = await readWritePool.connect();
const res = await client.query( const res = await client.query(
"SELECT frontpage_full FROM latest.frontpage ORDER BY id DESC LIMIT 1" "SELECT frontpage_full FROM frontpage ORDER BY id DESC LIMIT 1"
); );
if (!res.rows.length) return []; if (!res.rows.length) return [];
console.log(res.rows[0]); console.log(res.rows[0]);
@ -38,14 +38,13 @@ export async function getFrontpage() {
export async function rebuildFrontpage() { export async function rebuildFrontpage() {
const frontpageFull = await pgRead({ const frontpageFull = await pgRead({
schema: "latest",
tableName: "combined", tableName: "combined",
}); });
const client = await readWritePool.connect(); const client = await readWritePool.connect();
const frontpageSliced = ( const frontpageSliced = (
await client.query(` await client.query(`
SELECT * FROM latest.combined SELECT * FROM combined
WHERE WHERE
(qualityindicators->>'stars')::int >= 3 (qualityindicators->>'stars')::int >= 3
AND description != '' AND description != ''
@ -56,7 +55,7 @@ export async function rebuildFrontpage() {
const start = Date.now(); const start = Date.now();
await client.query( await client.query(
"INSERT INTO latest.frontpage(frontpage_full, frontpage_sliced) VALUES($1, $2)", "INSERT INTO frontpage(frontpage_full, frontpage_sliced) VALUES($1, $2)",
[JSON.stringify(frontpageFull), JSON.stringify(frontpageSliced)] [JSON.stringify(frontpageFull), JSON.stringify(frontpageSliced)]
); );

View File

@ -27,7 +27,6 @@ export default async function handler(
creator: body.creator || "", creator: body.creator || "",
extra: [], extra: [],
}, },
schema: "latest",
tableName: "dashboards", tableName: "dashboards",
}); });
res.status(200).send({ res.status(200).send({

View File

@ -16,7 +16,6 @@ export default async function handler(
console.log(id); console.log(id);
let dashboardItemArray = await pgGetByIds({ let dashboardItemArray = await pgGetByIds({
ids: [id], ids: [id],
schema: "latest",
table: "dashboards", table: "dashboards",
}); });
if (!!dashboardItemArray && dashboardItemArray.length > 0) { if (!!dashboardItemArray && dashboardItemArray.length > 0) {
@ -24,7 +23,6 @@ export default async function handler(
console.log(dashboardItem); console.log(dashboardItem);
let dashboardContents = await pgGetByIds({ let dashboardContents = await pgGetByIds({
ids: dashboardItem.contents, ids: dashboardItem.contents,
schema: "latest",
table: "combined", table: "combined",
}); });
res.status(200).send({ res.status(200).send({

View File

@ -6,7 +6,7 @@ export default async function handler(
req: NextApiRequest, req: NextApiRequest,
res: NextApiResponse res: NextApiResponse
) { ) {
let allQuestions = await pgRead({ schema: "latest", tableName: "combined" }); let allQuestions = await pgRead({ tableName: "combined" });
console.log(allQuestions.map((element) => element.title).slice(0, 5)); console.log(allQuestions.map((element) => element.title).slice(0, 5));
console.log("..."); console.log("...");
res.status(200).json(allQuestions); res.status(200).json(allQuestions);