diff --git a/functions/src/index.ts b/functions/src/index.ts index 2e2ee54c..b21ae6fd 100644 --- a/functions/src/index.ts +++ b/functions/src/index.ts @@ -16,8 +16,7 @@ export * from './on-fold-follow' export * from './on-fold-delete' export * from './on-view' export * from './unsubscribe' -export * from './update-contract-metrics' -export * from './update-user-metrics' +export * from './update-metrics' export * from './update-recommendations' export * from './backup-db' export * from './change-user-info' diff --git a/functions/src/scripts/update-metrics.ts b/functions/src/scripts/update-metrics.ts index f3095442..e34f83d8 100644 --- a/functions/src/scripts/update-metrics.ts +++ b/functions/src/scripts/update-metrics.ts @@ -2,15 +2,12 @@ import { initAdmin } from './script-init' initAdmin() import { log, logMemory } from '../utils' -import { updateContractMetricsCore } from '../update-contract-metrics' -import { updateUserMetricsCore } from '../update-user-metrics' +import { updateMetricsCore } from '../update-metrics' async function updateMetrics() { logMemory() - log('Updating contract metrics...') - await updateContractMetricsCore() - log('Updating user metrics...') - await updateUserMetricsCore() + log('Updating metrics...') + await updateMetricsCore() } if (require.main === module) { diff --git a/functions/src/update-contract-metrics.ts b/functions/src/update-contract-metrics.ts deleted file mode 100644 index 3b7f8eef..00000000 --- a/functions/src/update-contract-metrics.ts +++ /dev/null @@ -1,47 +0,0 @@ -import * as functions from 'firebase-functions' -import * as admin from 'firebase-admin' -import { max, sumBy } from 'lodash' - -import { getValues, log, logMemory, mapAsync } from './utils' -import { Bet } from '../../common/bet' - -const firestore = admin.firestore() - -const oneDay = 1000 * 60 * 60 * 24 - -const computeVolumes = async (contractId: string, durationsMs: number[]) => { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const longestDurationMs = max(durationsMs)! - const allBets = await getValues( - firestore - .collection(`contracts/${contractId}/bets`) - .where('createdTime', '>', Date.now() - longestDurationMs) - ) - return durationsMs.map((duration) => { - const cutoff = Date.now() - duration - const bets = allBets.filter((b) => b.createdTime > cutoff) - return sumBy(bets, (bet) => (bet.isRedemption ? 0 : Math.abs(bet.amount))) - }) -} - -export const updateContractMetricsCore = async () => { - const contractDocs = await firestore.collection('contracts').listDocuments() - log(`Loaded ${contractDocs.length} contract IDs.`) - logMemory() - await mapAsync(contractDocs, async (doc) => { - const [volume24Hours, volume7Days] = await computeVolumes(doc.id, [ - oneDay, - oneDay * 7, - ]) - return await doc.update({ - volume24Hours, - volume7Days, - }) - }) - log(`Updated metrics for ${contractDocs.length} contracts.`) -} - -export const updateContractMetrics = functions - .runWith({ memory: '1GB' }) - .pubsub.schedule('every 15 minutes') - .onRun(updateContractMetricsCore) diff --git a/functions/src/update-metrics.ts b/functions/src/update-metrics.ts new file mode 100644 index 00000000..0a51e13c --- /dev/null +++ b/functions/src/update-metrics.ts @@ -0,0 +1,94 @@ +import * as functions from 'firebase-functions' +import * as admin from 'firebase-admin' +import { groupBy, sum, sumBy } from 'lodash' + +import { getValues, log, logMemory, writeUpdatesAsync } from './utils' +import { Bet } from '../../common/bet' +import { Contract } from '../../common/contract' +import { User } from '../../common/user' +import { calculatePayout } from '../../common/calculate' + +const firestore = admin.firestore() + +const oneDay = 1000 * 60 * 60 * 24 + +const computeInvestmentValue = ( + bets: Bet[], + contractsDict: { [k: string]: Contract } +) => { + return sumBy(bets, (bet) => { + const contract = contractsDict[bet.contractId] + if (!contract || contract.isResolved) return 0 + if (bet.sale || bet.isSold) return 0 + + const payout = calculatePayout(contract, bet, 'MKT') + return payout - (bet.loanAmount ?? 0) + }) +} + +const computeTotalPool = (contracts: Contract[]) => { + return sum(contracts.map((contract) => sum(Object.values(contract.pool)))) +} + +export const updateMetricsCore = async () => { + const [users, contracts, bets] = await Promise.all([ + getValues(firestore.collection('users')), + getValues(firestore.collection('contracts')), + getValues(firestore.collectionGroup('bets')), + ]) + log( + `Loaded ${users.length} users, ${contracts.length} contracts, and ${bets.length} bets.` + ) + logMemory() + + const now = Date.now() + const betsByContract = groupBy(bets, (bet) => bet.contractId) + const contractUpdates = contracts.map((contract) => { + const contractBets = betsByContract[contract.id] ?? [] + return { + doc: firestore.collection('contracts').doc(contract.id), + fields: { + volume24Hours: computeVolume(contractBets, now - oneDay), + volume7Days: computeVolume(contractBets, now - oneDay * 7), + }, + } + }) + await writeUpdatesAsync(firestore, contractUpdates) + log(`Updated metrics for ${contracts.length} contracts.`) + + const contractsById = Object.fromEntries( + contracts.map((contract) => [contract.id, contract]) + ) + const contractsByUser = groupBy(contracts, (contract) => contract.creatorId) + const betsByUser = groupBy(bets, (bet) => bet.userId) + const userUpdates = users.map((user) => { + const investmentValue = computeInvestmentValue( + betsByUser[user.id] ?? [], + contractsById + ) + const creatorContracts = contractsByUser[user.id] ?? [] + const creatorVolume = computeTotalPool(creatorContracts) + const totalValue = user.balance + investmentValue + const totalPnL = totalValue - user.totalDeposits + return { + doc: firestore.collection('users').doc(user.id), + fields: { + totalPnLCached: totalPnL, + creatorVolumeCached: creatorVolume, + }, + } + }) + await writeUpdatesAsync(firestore, userUpdates) + log(`Updated metrics for ${users.length} users.`) +} + +const computeVolume = (contractBets: Bet[], since: number) => { + return sumBy(contractBets, (b) => + b.createdTime > since && !b.isRedemption ? Math.abs(b.amount) : 0 + ) +} + +export const updateMetrics = functions + .runWith({ memory: '1GB' }) + .pubsub.schedule('every 15 minutes') + .onRun(updateMetricsCore) diff --git a/functions/src/update-user-metrics.ts b/functions/src/update-user-metrics.ts deleted file mode 100644 index 42471981..00000000 --- a/functions/src/update-user-metrics.ts +++ /dev/null @@ -1,79 +0,0 @@ -import * as functions from 'firebase-functions' -import * as admin from 'firebase-admin' -import { groupBy, sum, sumBy } from 'lodash' - -import { getValues, log, logMemory, mapAsync } from './utils' -import { Contract } from '../../common/contract' -import { Bet } from '../../common/bet' -import { User } from '../../common/user' -import { calculatePayout } from '../../common/calculate' - -const firestore = admin.firestore() - -const computeInvestmentValue = ( - bets: Bet[], - contractsDict: { [k: string]: Contract } -) => { - return sumBy(bets, (bet) => { - const contract = contractsDict[bet.contractId] - if (!contract || contract.isResolved) return 0 - if (bet.sale || bet.isSold) return 0 - - const payout = calculatePayout(contract, bet, 'MKT') - return payout - (bet.loanAmount ?? 0) - }) -} - -const computeTotalPool = ( - user: User, - contractsDict: { [k: string]: Contract } -) => { - const creatorContracts = Object.values(contractsDict).filter( - (contract) => contract.creatorId === user.id - ) - const pools = creatorContracts.map((contract) => - sum(Object.values(contract.pool)) - ) - return sum(pools) -} - -export const updateUserMetricsCore = async () => { - const [users, contracts, bets] = await Promise.all([ - getValues(firestore.collection('users')), - getValues(firestore.collection('contracts')), - firestore.collectionGroup('bets').get(), - ]) - log( - `Loaded ${users.length} users, ${contracts.length} contracts, and ${bets.docs.length} bets.` - ) - logMemory() - - const contractsDict = Object.fromEntries( - contracts.map((contract) => [contract.id, contract]) - ) - - const betsByUser = groupBy( - bets.docs.map((doc) => doc.data() as Bet), - (bet) => bet.userId - ) - - await mapAsync(users, async (user) => { - const investmentValue = computeInvestmentValue( - betsByUser[user.id] ?? [], - contractsDict - ) - const creatorVolume = computeTotalPool(user, contractsDict) - const totalValue = user.balance + investmentValue - const totalPnL = totalValue - user.totalDeposits - return await firestore.collection('users').doc(user.id).update({ - totalPnLCached: totalPnL, - creatorVolumeCached: creatorVolume, - }) - }) - log(`Updated metrics for ${users.length} users.`) -} - -export const updateUserMetrics = functions - .runWith({ memory: '1GB' }) - .pubsub.schedule('every 15 minutes') - .onRun(updateUserMetricsCore) diff --git a/functions/src/utils.ts b/functions/src/utils.ts index 7f69584b..9cac3409 100644 --- a/functions/src/utils.ts +++ b/functions/src/utils.ts @@ -15,18 +15,25 @@ export const logMemory = () => { } } -export const mapAsync = async ( - xs: T[], - fn: (x: T) => Promise, - concurrency = 100 +type UpdateSpec = { + doc: admin.firestore.DocumentReference + fields: { [k: string]: unknown } +} + +export const writeUpdatesAsync = async ( + db: admin.firestore.Firestore, + updates: UpdateSpec[], + batchSize = 500 // 500 = Firestore batch limit ) => { - const results = [] - const chunks = chunk(xs, concurrency) + const chunks = chunk(updates, batchSize) for (let i = 0; i < chunks.length; i++) { - log(`${i * concurrency}/${xs.length} processed...`) - results.push(...(await Promise.all(chunks[i].map(fn)))) + log(`${i * batchSize}/${updates.length} updates written...`) + const batch = db.batch() + for (const { doc, fields } of chunks[i]) { + batch.update(doc, fields) + } + await batch.commit() } - return results } export const isProd =