diff --git a/functions/src/scripts/update-metrics.ts b/functions/src/scripts/update-metrics.ts new file mode 100644 index 00000000..f3095442 --- /dev/null +++ b/functions/src/scripts/update-metrics.ts @@ -0,0 +1,18 @@ +import { initAdmin } from './script-init' +initAdmin() + +import { log, logMemory } from '../utils' +import { updateContractMetricsCore } from '../update-contract-metrics' +import { updateUserMetricsCore } from '../update-user-metrics' + +async function updateMetrics() { + logMemory() + log('Updating contract metrics...') + await updateContractMetricsCore() + log('Updating user metrics...') + await updateUserMetricsCore() +} + +if (require.main === module) { + updateMetrics().then(() => process.exit()) +} diff --git a/functions/src/update-contract-metrics.ts b/functions/src/update-contract-metrics.ts index 97b2f0a6..3b7f8eef 100644 --- a/functions/src/update-contract-metrics.ts +++ b/functions/src/update-contract-metrics.ts @@ -2,33 +2,13 @@ import * as functions from 'firebase-functions' import * as admin from 'firebase-admin' import { max, sumBy } from 'lodash' -import { getValues } from './utils' +import { getValues, log, logMemory, mapAsync } from './utils' import { Bet } from '../../common/bet' -import { batchedWaitAll } from '../../common/util/promise' const firestore = admin.firestore() const oneDay = 1000 * 60 * 60 * 24 -export const updateContractMetrics = functions - .runWith({ memory: '1GB' }) - .pubsub.schedule('every 15 minutes') - .onRun(async () => { - const contractDocs = await firestore.collection('contracts').listDocuments() - await batchedWaitAll( - contractDocs.map((doc) => async () => { - const [volume24Hours, volume7Days] = await computeVolumes(doc.id, [ - oneDay, - oneDay * 7, - ]) - return doc.update({ - volume24Hours, - volume7Days, - }) - }) - ) - }) - const computeVolumes = async (contractId: string, durationsMs: number[]) => { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const longestDurationMs = max(durationsMs)! @@ -43,3 +23,25 @@ const computeVolumes = async (contractId: string, durationsMs: number[]) => { return sumBy(bets, (bet) => (bet.isRedemption ? 0 : Math.abs(bet.amount))) }) } + +export const updateContractMetricsCore = async () => { + const contractDocs = await firestore.collection('contracts').listDocuments() + log(`Loaded ${contractDocs.length} contract IDs.`) + logMemory() + await mapAsync(contractDocs, async (doc) => { + const [volume24Hours, volume7Days] = await computeVolumes(doc.id, [ + oneDay, + oneDay * 7, + ]) + return await doc.update({ + volume24Hours, + volume7Days, + }) + }) + log(`Updated metrics for ${contractDocs.length} contracts.`) +} + +export const updateContractMetrics = functions + .runWith({ memory: '1GB' }) + .pubsub.schedule('every 15 minutes') + .onRun(updateContractMetricsCore) diff --git a/functions/src/update-user-metrics.ts b/functions/src/update-user-metrics.ts index 66d6c153..42471981 100644 --- a/functions/src/update-user-metrics.ts +++ b/functions/src/update-user-metrics.ts @@ -2,52 +2,14 @@ import * as functions from 'firebase-functions' import * as admin from 'firebase-admin' import { groupBy, sum, sumBy } from 'lodash' -import { getValues } from './utils' +import { getValues, log, logMemory, mapAsync } from './utils' import { Contract } from '../../common/contract' import { Bet } from '../../common/bet' import { User } from '../../common/user' -import { batchedWaitAll } from '../../common/util/promise' import { calculatePayout } from '../../common/calculate' const firestore = admin.firestore() -export const updateUserMetrics = functions - .runWith({ memory: '1GB' }) - .pubsub.schedule('every 15 minutes') - .onRun(async () => { - const [users, contracts, bets] = await Promise.all([ - getValues(firestore.collection('users')), - getValues(firestore.collection('contracts')), - firestore.collectionGroup('bets').get(), - ]) - - const contractsDict = Object.fromEntries( - contracts.map((contract) => [contract.id, contract]) - ) - - const betsByUser = groupBy( - bets.docs.map((doc) => doc.data() as Bet), - (bet) => bet.userId - ) - - await batchedWaitAll( - users.map((user) => async () => { - const investmentValue = computeInvestmentValue( - betsByUser[user.id] ?? [], - contractsDict - ) - const creatorVolume = computeTotalPool(user, contractsDict) - const totalValue = user.balance + investmentValue - const totalPnL = totalValue - user.totalDeposits - - await firestore.collection('users').doc(user.id).update({ - totalPnLCached: totalPnL, - creatorVolumeCached: creatorVolume, - }) - }) - ) - }) - const computeInvestmentValue = ( bets: Bet[], contractsDict: { [k: string]: Contract } @@ -75,9 +37,43 @@ const computeTotalPool = ( return sum(pools) } -// const computeVolume = async (contract: Contract) => { -// const bets = await getValues( -// firestore.collection(`contracts/${contract.id}/bets`) -// ) -// return sumBy(bets, (bet) => Math.abs(bet.amount)) -// } +export const updateUserMetricsCore = async () => { + const [users, contracts, bets] = await Promise.all([ + getValues(firestore.collection('users')), + getValues(firestore.collection('contracts')), + firestore.collectionGroup('bets').get(), + ]) + log( + `Loaded ${users.length} users, ${contracts.length} contracts, and ${bets.docs.length} bets.` + ) + logMemory() + + const contractsDict = Object.fromEntries( + contracts.map((contract) => [contract.id, contract]) + ) + + const betsByUser = groupBy( + bets.docs.map((doc) => doc.data() as Bet), + (bet) => bet.userId + ) + + await mapAsync(users, async (user) => { + const investmentValue = computeInvestmentValue( + betsByUser[user.id] ?? [], + contractsDict + ) + const creatorVolume = computeTotalPool(user, contractsDict) + const totalValue = user.balance + investmentValue + const totalPnL = totalValue - user.totalDeposits + return await firestore.collection('users').doc(user.id).update({ + totalPnLCached: totalPnL, + creatorVolumeCached: creatorVolume, + }) + }) + log(`Updated metrics for ${users.length} users.`) +} + +export const updateUserMetrics = functions + .runWith({ memory: '1GB' }) + .pubsub.schedule('every 15 minutes') + .onRun(updateUserMetricsCore) diff --git a/functions/src/utils.ts b/functions/src/utils.ts index c0f92f94..7f69584b 100644 --- a/functions/src/utils.ts +++ b/functions/src/utils.ts @@ -1,8 +1,34 @@ import * as admin from 'firebase-admin' +import { chunk } from 'lodash' import { Contract } from '../../common/contract' import { PrivateUser, User } from '../../common/user' +export const log = (...args: unknown[]) => { + console.log(`[${new Date().toISOString()}]`, ...args) +} + +export const logMemory = () => { + const used = process.memoryUsage() + for (const [k, v] of Object.entries(used)) { + log(`${k} ${Math.round((v / 1024 / 1024) * 100) / 100} MB`) + } +} + +export const mapAsync = async ( + xs: T[], + fn: (x: T) => Promise, + concurrency = 100 +) => { + const results = [] + const chunks = chunk(xs, concurrency) + for (let i = 0; i < chunks.length; i++) { + log(`${i * concurrency}/${xs.length} processed...`) + results.push(...(await Promise.all(chunks[i].map(fn)))) + } + return results +} + export const isProd = admin.instanceId().app.options.projectId === 'mantic-markets'