Merge pull request #37 from QURIresearch/faster-upsert

Faster upsert
This commit is contained in:
Nuño Sempere 2022-03-30 21:56:00 -04:00 committed by GitHub
commit 8060303936
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 229 additions and 155 deletions

23
package-lock.json generated
View File

@ -64,6 +64,7 @@
"devDependencies": {
"@netlify/plugin-nextjs": "^4.2.4",
"@svgr/cli": "^6.2.1",
"@types/pg": "^8.6.5",
"netlify-cli": "^9.13.6"
}
},
@ -1691,6 +1692,17 @@
"integrity": "sha512-//oorEZjL6sbPcKUaCdIGlIUeH26mgzimjBB77G6XRgnDl/L5wOnpyBGRe/Mmf5CVW3PwEBE1NjiMZ/ssFh4wA==",
"license": "MIT"
},
"node_modules/@types/pg": {
"version": "8.6.5",
"resolved": "https://registry.npmjs.org/@types/pg/-/pg-8.6.5.tgz",
"integrity": "sha512-tOkGtAqRVkHa/PVZicq67zuujI4Oorfglsr2IbKofDwBSysnaqSx7W1mDqFqdkGE6Fbgh+PZAl0r/BWON/mozw==",
"dev": true,
"dependencies": {
"@types/node": "*",
"pg-protocol": "*",
"pg-types": "^2.2.0"
}
},
"node_modules/@types/prop-types": {
"version": "15.7.4",
"resolved": "https://registry.npmjs.org/@types%2fprop-types/-/prop-types-15.7.4.tgz",
@ -36019,6 +36031,17 @@
"resolved": "https://registry.npmjs.org/@types%2fparse-json/-/parse-json-4.0.0.tgz",
"integrity": "sha512-//oorEZjL6sbPcKUaCdIGlIUeH26mgzimjBB77G6XRgnDl/L5wOnpyBGRe/Mmf5CVW3PwEBE1NjiMZ/ssFh4wA=="
},
"@types/pg": {
"version": "8.6.5",
"resolved": "https://registry.npmjs.org/@types/pg/-/pg-8.6.5.tgz",
"integrity": "sha512-tOkGtAqRVkHa/PVZicq67zuujI4Oorfglsr2IbKofDwBSysnaqSx7W1mDqFqdkGE6Fbgh+PZAl0r/BWON/mozw==",
"dev": true,
"requires": {
"@types/node": "*",
"pg-protocol": "*",
"pg-types": "^2.2.0"
}
},
"@types/prop-types": {
"version": "15.7.4",
"resolved": "https://registry.npmjs.org/@types%2fprop-types/-/prop-types-15.7.4.tgz",

View File

@ -82,6 +82,7 @@
"devDependencies": {
"@netlify/plugin-nextjs": "^4.2.4",
"@svgr/cli": "^6.2.1",
"netlify-cli": "^9.13.6"
"netlify-cli": "^9.13.6",
"@types/pg": "^8.6.5"
}
}

View File

@ -1,11 +1,10 @@
import pkg from "pg";
import { Pool, PoolClient } from "pg";
import { platforms } from "../platforms";
import { Forecast, platforms } from "../platforms";
import { hash } from "../utils/hash";
import { measureTime } from "../utils/measureTime";
import { roughSizeOfObject } from "../utils/roughSize";
const { Pool } = pkg;
// Definitions
const schemas = ["latest", "history"];
const year = Number(new Date().toISOString().slice(0, 4));
@ -26,10 +25,6 @@ const tableNamesWhiteListHistory = [
...allowed_years,
...allowed_year_month_histories,
];
const tableNamesWhitelist = [
...tableNamesWhitelistLatest,
...tableNamesWhiteListHistory,
];
const createFullName = (schemaName, namesArray) =>
namesArray.map((name) => `${schemaName}.${name}`);
const tableWhiteList = [
@ -63,28 +58,33 @@ const readOnlyPool = new Pool({
});
// Helpers
export const runPgCommand = async ({ command, pool }) => {
export const runPgCommand = async ({
command,
pool,
}: {
command: string;
pool: Pool;
}) => {
console.log(command);
const client = await pool.connect();
let result;
try {
let response = await client.query(command);
// console.log(response);
result = { results: response ? response.rows : null };
} catch (error) {
console.log(error);
} finally {
client.release();
}
// console.log(results)
return result;
};
// Initialize
let dropTable = (schema, table) => `DROP TABLE IF EXISTS ${schema}.${table}`;
let createIndex = (schema, table) =>
let dropTable = (schema: string, table: string) =>
`DROP TABLE IF EXISTS ${schema}.${table}`;
let createIndex = (schema: string, table: string) =>
`CREATE INDEX ${schema}_${table}_id_index ON ${schema}.${table} (id);`;
let createUniqueIndex = (schema, table) =>
let createUniqueIndex = (schema: string, table: string) =>
`CREATE UNIQUE INDEX ${schema}_${table}_id_index ON ${schema}.${table} (id);`;
async function pgInitializeScaffolding() {
@ -97,7 +97,7 @@ async function pgInitializeScaffolding() {
await runPgCommand({ command, pool: readWritePool });
}
let buildGrantSelectForSchema = (schema) =>
let buildGrantSelectForSchema = (schema: string) =>
`GRANT SELECT ON ALL TABLES IN SCHEMA ${schema} TO public_read_only_user`;
for (let schema of schemas) {
await runPgCommand({
@ -106,7 +106,7 @@ async function pgInitializeScaffolding() {
});
}
let alterDefaultPrivilegesForSchema = (schema) =>
let alterDefaultPrivilegesForSchema = (schema: string) =>
`ALTER DEFAULT PRIVILEGES IN SCHEMA ${schema} GRANT SELECT ON TABLES TO public_read_only_user`;
for (let schema of schemas) {
await runPgCommand({
@ -144,8 +144,8 @@ async function pgInitializeScaffolding() {
}
let buildMetaforecastTable = (
schema,
table
schema: string,
table: string
) => `CREATE TABLE ${schema}.${table} (
id text,
title text,
@ -245,7 +245,10 @@ async function pgInitializeDashboards() {
}
}
let buildHistoryTable = (schema, table) => `CREATE TABLE ${schema}.${table} (
let buildHistoryTable = (
schema: string,
table: string
) => `CREATE TABLE ${schema}.${table} (
id text,
title text,
url text,
@ -338,7 +341,15 @@ export async function pgInitialize() {
}
// Read
async function pgReadWithPool({ schema, tableName, pool }) {
async function pgReadWithPool({
schema,
tableName,
pool,
}: {
schema: string;
tableName: string;
pool: Pool;
}) {
if (tableWhiteList.includes(`${schema}.${tableName}`)) {
let command = `SELECT * from ${schema}.${tableName}`;
let response = await runPgCommand({ command, pool });
@ -351,11 +362,23 @@ async function pgReadWithPool({ schema, tableName, pool }) {
}
}
export async function pgRead({ schema, tableName }) {
export async function pgRead({
schema,
tableName,
}: {
schema: string;
tableName: string;
}) {
return await pgReadWithPool({ schema, tableName, pool: readWritePool });
}
export async function pgReadWithReadCredentials({ schema, tableName }) {
export async function pgReadWithReadCredentials({
schema,
tableName,
}: {
schema: string;
tableName: string;
}) {
// currently does not work.
/* return await pgReadWithPool({
schema,
@ -366,8 +389,16 @@ export async function pgReadWithReadCredentials({ schema, tableName }) {
return await pgReadWithPool({ schema, tableName, pool: readWritePool });
}
export async function pgGetByIds({ ids, schema, table }) {
let idstring = `( ${ids.map((id) => `'${id}'`).join(", ")} )`; // (1, 2, 3)
export async function pgGetByIds({
ids,
schema,
table,
}: {
ids: string[];
schema: string;
table: string;
}) {
let idstring = `( ${ids.map((id: string) => `'${id}'`).join(", ")} )`; // (1, 2, 3)
let command = `SELECT * from ${schema}.${table} where id in ${idstring}`;
// see: https://stackoverflow.com/questions/5803472/sql-where-id-in-id1-id2-idn
let response = await runPgCommand({ command, pool: readWritePool });
@ -376,9 +407,48 @@ export async function pgGetByIds({ ids, schema, table }) {
return results;
}
export async function pgInsert({ datum, schema, tableName }) {
if (tableWhiteList.includes(`${schema}.${tableName}`)) {
let text = `INSERT INTO ${schema}.${tableName} VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`;
export async function pgBulkInsert({
data,
schema,
tableName,
client,
}: {
data: Forecast[];
schema: string;
tableName: string;
client: PoolClient;
}) {
if (!tableWhiteList.includes(`${schema}.${tableName}`)) {
throw Error(
`Table ${schema}.${tableName} not in whitelist; stopping to avoid tricky sql injections`
);
}
const generateQuery = (rows: number) => {
let text = `INSERT INTO ${schema}.${tableName} VALUES`;
const cols = 10;
const parts: string[] = [];
for (let r = 0; r < rows; r++) {
const bits = [];
for (let c = 1; c <= cols; c++) {
bits.push(`$${cols * r + c}`);
}
parts.push("(" + bits.join(", ") + ")");
}
text += parts.join(", ");
return text;
};
let from = 0;
const chunkSize = 20;
while (from < data.length - 1) {
const take = Math.min(chunkSize, data.length - from);
const query = generateQuery(take);
const chunk = [];
for (let i = from; i < from + take; i++) {
const datum = data[i];
let timestamp =
datum.timestamp &&
!!datum.timestamp.slice &&
@ -386,7 +456,7 @@ export async function pgInsert({ datum, schema, tableName }) {
? datum.timestamp
: new Date().toISOString();
timestamp = timestamp.slice(0, 19).replace("T", " ");
let values = [
const values = [
datum.id,
datum.title,
datum.url,
@ -399,50 +469,15 @@ export async function pgInsert({ datum, schema, tableName }) {
JSON.stringify(datum.qualityindicators || []),
JSON.stringify(datum.extra || []),
];
const client = await readWritePool.connect();
let result;
try {
result = await client.query(text, values);
} catch (error) {
console.log(error);
} finally {
client.release();
chunk.push(...values);
}
// console.log(result)
return result;
} else {
throw Error(
`Table ${schema}.${tableName} not in whitelist; stopping to avoid tricky sql injections`
);
console.log(`Inserting ${from + 1}..${from + take}`);
from += take;
await client.query(query, chunk);
}
}
/* For reference:
pgInsert({
"id": "fantasyscotus-580",
"title": "In Wooden v. U.S., the SCOTUS will affirm the lower court's decision",
"url": "https://fantasyscotus.net/user-predictions/case/wooden-v-us/",
"platform": "FantasySCOTUS",
"description": "62.50% (75 out of 120) of FantasySCOTUS players predict that the lower court's decision will be affirmed. FantasySCOTUS overall predicts an outcome of Affirm 6-3. Historically, FantasySCOTUS has chosen the correct side 50.00% of the time.",
"options": [
{
"name": "Yes",
"probability": 0.625,
"type": "PROBABILITY"
},
{
"name": "No",
"probability": 0.375,
"type": "PROBABILITY"
}
],
"timestamp": "2022-02-11T21:42:19.291Z",
"qualityindicators": {
"numforecasts": 120,
"stars": 2
}
}
)
*/
export async function pgInsertIntoDashboard({ datum, schema, tableName }) {
if (tableWhiteList.includes(`${schema}.${tableName}`)) {
let text = `INSERT INTO ${schema}.${tableName} VALUES($1, $2, $3, $4, $5, $6, $7)`;
@ -502,21 +537,20 @@ pgInsertIntoDashboard({
});
*/
export async function pgUpsert({ contents, schema, tableName }) {
if (tableWhiteList.includes(`${schema}.${tableName}`)) {
let init = Date.now();
if (schema == "latest") {
await runPgCommand({
command: dropTable(schema, tableName),
pool: readWritePool,
});
await runPgCommand({
command: buildMetaforecastTable(schema, tableName),
pool: readWritePool,
});
await runPgCommand({
command: createUniqueIndex(schema, tableName),
pool: readWritePool,
});
if (!tableWhiteList.includes(`${schema}.${tableName}`)) {
console.log("tableWhiteList:");
console.log(tableWhiteList);
throw Error(
`Table ${schema}.${tableName} not in whitelist; stopping to avoid tricky sql injections`
);
}
await measureTime(async () => {
const client = await readWritePool.connect();
try {
await client.query("BEGIN");
if (schema === "latest") {
client.query(`DELETE FROM latest.${tableName}`);
}
console.log(
`Upserting ${contents.length} rows into postgres table ${schema}.${tableName}.`
@ -528,17 +562,8 @@ export async function pgUpsert({ contents, schema, tableName }) {
2
)} minutes`
);
let i = 0;
for (let datum of contents) {
await pgInsert({ datum, schema, tableName });
if (i < 10) {
console.log(`Inserted ${datum.id}`);
i++;
} else if (i == 10) {
console.log("...");
i++;
}
}
await pgBulkInsert({ data: contents, schema, tableName, client });
console.log(
`Inserted ${
contents.length
@ -546,31 +571,15 @@ export async function pgUpsert({ contents, schema, tableName }) {
contents
)} MB into ${schema}.${tableName}.`
);
let check = await pgRead({ schema, tableName });
console.log(
`Received ${
check.length
} rows with approximate cummulative size ${roughSizeOfObject(
check
)} MB from ${schema}.${tableName}.`
);
console.log("Sample: ");
console.log(JSON.stringify(check.slice(0, 1), null, 4));
let end = Date.now();
let difference = end - init;
console.log(
`Took ${difference / 1000} seconds, or ${
difference / (1000 * 60)
} minutes.`
);
//console.log(JSON.stringify(check.slice(0, 1), null, 4));
} else {
console.log("tableWhiteList:");
console.log(tableWhiteList);
throw Error(
`Table ${schema}.${tableName} not in whitelist; stopping to avoid tricky sql injections`
);
console.log(JSON.stringify(contents.slice(0, 1), null, 4));
await client.query("COMMIT");
} catch (e) {
await client.query("ROLLBACK");
throw e;
} finally {
client.release();
}
});
}

View File

@ -18,13 +18,48 @@ import { xrisk } from "./xrisk";
export interface Forecast {
id: string;
// "fantasyscotus-580"
title: string;
// "In Wooden v. U.S., the SCOTUS will affirm the lower court's decision"
url: string;
// "https://fantasyscotus.net/user-predictions/case/wooden-v-us/"
description: string;
// "62.50% (75 out of 120) of FantasySCOTUS players predict that the lower court's decision will be affirmed. FantasySCOTUS overall predicts an outcome of Affirm 6-3. Historically, FantasySCOTUS has chosen the correct side 50.00% of the time."
platform: string;
// "FantasySCOTUS"
options: any[];
/*
[
{
"name": "Yes",
"probability": 0.625,
"type": "PROBABILITY"
},
{
"name": "No",
"probability": 0.375,
"type": "PROBABILITY"
}
]
*/
timestamp: string;
// "2022-02-11T21:42:19.291Z"
stars?: number;
// 2
qualityindicators: any;
/*
{
"numforecasts": 120,
"stars": 2
}
*/
extra?: any;
}

View File

@ -6,7 +6,6 @@ import { mergeEverythingInner } from "../flow/mergeEverything";
let cookie = process.env.ALGOLIA_MASTER_API_KEY;
const algoliaAppId = process.env.NEXT_PUBLIC_ALGOLIA_APP_ID;
const client = algoliasearch(algoliaAppId, cookie);
console.log(`Initializing algolia index for ${algoliaAppId}`);
const index = client.initIndex("metaforecast");
export async function rebuildAlgoliaDatabaseTheHardWay() {
@ -54,8 +53,6 @@ export async function rebuildAlgoliaDatabaseTheEasyWay() {
}));
// this is necessary to filter by missing attributes https://www.algolia.com/doc/guides/managing-results/refine-results/filtering/how-to/filter-by-null-or-missing-attributes/
console.log(index.appId, index.indexName);
if (index.exists()) {
console.log("Index exists");
await index.replaceAllObjects(records, { safe: true });

View File

@ -0,0 +1,9 @@
export const measureTime = async (f: () => Promise<void>) => {
const init = Date.now();
await f();
const end = Date.now();
const difference = end - init;
console.log(
`Took ${difference / 1000} seconds, or ${difference / (1000 * 60)} minutes.`
);
};