Finish optimizing updateFooMetrics
functions (#489)
* Consolidate metrics updates into one batch job * Try batching updates of metrics * Don't look up all bets again for all contracts * Tidying up * Make computeTotalPool less needlessly inefficient looking
This commit is contained in:
parent
cb70ab3675
commit
4f96b9ef63
|
@ -16,8 +16,7 @@ export * from './on-fold-follow'
|
|||
export * from './on-fold-delete'
|
||||
export * from './on-view'
|
||||
export * from './unsubscribe'
|
||||
export * from './update-contract-metrics'
|
||||
export * from './update-user-metrics'
|
||||
export * from './update-metrics'
|
||||
export * from './update-recommendations'
|
||||
export * from './backup-db'
|
||||
export * from './change-user-info'
|
||||
|
|
|
@ -2,15 +2,12 @@ import { initAdmin } from './script-init'
|
|||
initAdmin()
|
||||
|
||||
import { log, logMemory } from '../utils'
|
||||
import { updateContractMetricsCore } from '../update-contract-metrics'
|
||||
import { updateUserMetricsCore } from '../update-user-metrics'
|
||||
import { updateMetricsCore } from '../update-metrics'
|
||||
|
||||
async function updateMetrics() {
|
||||
logMemory()
|
||||
log('Updating contract metrics...')
|
||||
await updateContractMetricsCore()
|
||||
log('Updating user metrics...')
|
||||
await updateUserMetricsCore()
|
||||
log('Updating metrics...')
|
||||
await updateMetricsCore()
|
||||
}
|
||||
|
||||
if (require.main === module) {
|
||||
|
|
|
@ -1,47 +0,0 @@
|
|||
import * as functions from 'firebase-functions'
|
||||
import * as admin from 'firebase-admin'
|
||||
import { max, sumBy } from 'lodash'
|
||||
|
||||
import { getValues, log, logMemory, mapAsync } from './utils'
|
||||
import { Bet } from '../../common/bet'
|
||||
|
||||
const firestore = admin.firestore()
|
||||
|
||||
const oneDay = 1000 * 60 * 60 * 24
|
||||
|
||||
const computeVolumes = async (contractId: string, durationsMs: number[]) => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
const longestDurationMs = max(durationsMs)!
|
||||
const allBets = await getValues<Bet>(
|
||||
firestore
|
||||
.collection(`contracts/${contractId}/bets`)
|
||||
.where('createdTime', '>', Date.now() - longestDurationMs)
|
||||
)
|
||||
return durationsMs.map((duration) => {
|
||||
const cutoff = Date.now() - duration
|
||||
const bets = allBets.filter((b) => b.createdTime > cutoff)
|
||||
return sumBy(bets, (bet) => (bet.isRedemption ? 0 : Math.abs(bet.amount)))
|
||||
})
|
||||
}
|
||||
|
||||
export const updateContractMetricsCore = async () => {
|
||||
const contractDocs = await firestore.collection('contracts').listDocuments()
|
||||
log(`Loaded ${contractDocs.length} contract IDs.`)
|
||||
logMemory()
|
||||
await mapAsync(contractDocs, async (doc) => {
|
||||
const [volume24Hours, volume7Days] = await computeVolumes(doc.id, [
|
||||
oneDay,
|
||||
oneDay * 7,
|
||||
])
|
||||
return await doc.update({
|
||||
volume24Hours,
|
||||
volume7Days,
|
||||
})
|
||||
})
|
||||
log(`Updated metrics for ${contractDocs.length} contracts.`)
|
||||
}
|
||||
|
||||
export const updateContractMetrics = functions
|
||||
.runWith({ memory: '1GB' })
|
||||
.pubsub.schedule('every 15 minutes')
|
||||
.onRun(updateContractMetricsCore)
|
94
functions/src/update-metrics.ts
Normal file
94
functions/src/update-metrics.ts
Normal file
|
@ -0,0 +1,94 @@
|
|||
import * as functions from 'firebase-functions'
|
||||
import * as admin from 'firebase-admin'
|
||||
import { groupBy, sum, sumBy } from 'lodash'
|
||||
|
||||
import { getValues, log, logMemory, writeUpdatesAsync } from './utils'
|
||||
import { Bet } from '../../common/bet'
|
||||
import { Contract } from '../../common/contract'
|
||||
import { User } from '../../common/user'
|
||||
import { calculatePayout } from '../../common/calculate'
|
||||
|
||||
const firestore = admin.firestore()
|
||||
|
||||
const oneDay = 1000 * 60 * 60 * 24
|
||||
|
||||
const computeInvestmentValue = (
|
||||
bets: Bet[],
|
||||
contractsDict: { [k: string]: Contract }
|
||||
) => {
|
||||
return sumBy(bets, (bet) => {
|
||||
const contract = contractsDict[bet.contractId]
|
||||
if (!contract || contract.isResolved) return 0
|
||||
if (bet.sale || bet.isSold) return 0
|
||||
|
||||
const payout = calculatePayout(contract, bet, 'MKT')
|
||||
return payout - (bet.loanAmount ?? 0)
|
||||
})
|
||||
}
|
||||
|
||||
const computeTotalPool = (contracts: Contract[]) => {
|
||||
return sum(contracts.map((contract) => sum(Object.values(contract.pool))))
|
||||
}
|
||||
|
||||
export const updateMetricsCore = async () => {
|
||||
const [users, contracts, bets] = await Promise.all([
|
||||
getValues<User>(firestore.collection('users')),
|
||||
getValues<Contract>(firestore.collection('contracts')),
|
||||
getValues<Bet>(firestore.collectionGroup('bets')),
|
||||
])
|
||||
log(
|
||||
`Loaded ${users.length} users, ${contracts.length} contracts, and ${bets.length} bets.`
|
||||
)
|
||||
logMemory()
|
||||
|
||||
const now = Date.now()
|
||||
const betsByContract = groupBy(bets, (bet) => bet.contractId)
|
||||
const contractUpdates = contracts.map((contract) => {
|
||||
const contractBets = betsByContract[contract.id] ?? []
|
||||
return {
|
||||
doc: firestore.collection('contracts').doc(contract.id),
|
||||
fields: {
|
||||
volume24Hours: computeVolume(contractBets, now - oneDay),
|
||||
volume7Days: computeVolume(contractBets, now - oneDay * 7),
|
||||
},
|
||||
}
|
||||
})
|
||||
await writeUpdatesAsync(firestore, contractUpdates)
|
||||
log(`Updated metrics for ${contracts.length} contracts.`)
|
||||
|
||||
const contractsById = Object.fromEntries(
|
||||
contracts.map((contract) => [contract.id, contract])
|
||||
)
|
||||
const contractsByUser = groupBy(contracts, (contract) => contract.creatorId)
|
||||
const betsByUser = groupBy(bets, (bet) => bet.userId)
|
||||
const userUpdates = users.map((user) => {
|
||||
const investmentValue = computeInvestmentValue(
|
||||
betsByUser[user.id] ?? [],
|
||||
contractsById
|
||||
)
|
||||
const creatorContracts = contractsByUser[user.id] ?? []
|
||||
const creatorVolume = computeTotalPool(creatorContracts)
|
||||
const totalValue = user.balance + investmentValue
|
||||
const totalPnL = totalValue - user.totalDeposits
|
||||
return {
|
||||
doc: firestore.collection('users').doc(user.id),
|
||||
fields: {
|
||||
totalPnLCached: totalPnL,
|
||||
creatorVolumeCached: creatorVolume,
|
||||
},
|
||||
}
|
||||
})
|
||||
await writeUpdatesAsync(firestore, userUpdates)
|
||||
log(`Updated metrics for ${users.length} users.`)
|
||||
}
|
||||
|
||||
const computeVolume = (contractBets: Bet[], since: number) => {
|
||||
return sumBy(contractBets, (b) =>
|
||||
b.createdTime > since && !b.isRedemption ? Math.abs(b.amount) : 0
|
||||
)
|
||||
}
|
||||
|
||||
export const updateMetrics = functions
|
||||
.runWith({ memory: '1GB' })
|
||||
.pubsub.schedule('every 15 minutes')
|
||||
.onRun(updateMetricsCore)
|
|
@ -1,79 +0,0 @@
|
|||
import * as functions from 'firebase-functions'
|
||||
import * as admin from 'firebase-admin'
|
||||
import { groupBy, sum, sumBy } from 'lodash'
|
||||
|
||||
import { getValues, log, logMemory, mapAsync } from './utils'
|
||||
import { Contract } from '../../common/contract'
|
||||
import { Bet } from '../../common/bet'
|
||||
import { User } from '../../common/user'
|
||||
import { calculatePayout } from '../../common/calculate'
|
||||
|
||||
const firestore = admin.firestore()
|
||||
|
||||
const computeInvestmentValue = (
|
||||
bets: Bet[],
|
||||
contractsDict: { [k: string]: Contract }
|
||||
) => {
|
||||
return sumBy(bets, (bet) => {
|
||||
const contract = contractsDict[bet.contractId]
|
||||
if (!contract || contract.isResolved) return 0
|
||||
if (bet.sale || bet.isSold) return 0
|
||||
|
||||
const payout = calculatePayout(contract, bet, 'MKT')
|
||||
return payout - (bet.loanAmount ?? 0)
|
||||
})
|
||||
}
|
||||
|
||||
const computeTotalPool = (
|
||||
user: User,
|
||||
contractsDict: { [k: string]: Contract }
|
||||
) => {
|
||||
const creatorContracts = Object.values(contractsDict).filter(
|
||||
(contract) => contract.creatorId === user.id
|
||||
)
|
||||
const pools = creatorContracts.map((contract) =>
|
||||
sum(Object.values(contract.pool))
|
||||
)
|
||||
return sum(pools)
|
||||
}
|
||||
|
||||
export const updateUserMetricsCore = async () => {
|
||||
const [users, contracts, bets] = await Promise.all([
|
||||
getValues<User>(firestore.collection('users')),
|
||||
getValues<Contract>(firestore.collection('contracts')),
|
||||
firestore.collectionGroup('bets').get(),
|
||||
])
|
||||
log(
|
||||
`Loaded ${users.length} users, ${contracts.length} contracts, and ${bets.docs.length} bets.`
|
||||
)
|
||||
logMemory()
|
||||
|
||||
const contractsDict = Object.fromEntries(
|
||||
contracts.map((contract) => [contract.id, contract])
|
||||
)
|
||||
|
||||
const betsByUser = groupBy(
|
||||
bets.docs.map((doc) => doc.data() as Bet),
|
||||
(bet) => bet.userId
|
||||
)
|
||||
|
||||
await mapAsync(users, async (user) => {
|
||||
const investmentValue = computeInvestmentValue(
|
||||
betsByUser[user.id] ?? [],
|
||||
contractsDict
|
||||
)
|
||||
const creatorVolume = computeTotalPool(user, contractsDict)
|
||||
const totalValue = user.balance + investmentValue
|
||||
const totalPnL = totalValue - user.totalDeposits
|
||||
return await firestore.collection('users').doc(user.id).update({
|
||||
totalPnLCached: totalPnL,
|
||||
creatorVolumeCached: creatorVolume,
|
||||
})
|
||||
})
|
||||
log(`Updated metrics for ${users.length} users.`)
|
||||
}
|
||||
|
||||
export const updateUserMetrics = functions
|
||||
.runWith({ memory: '1GB' })
|
||||
.pubsub.schedule('every 15 minutes')
|
||||
.onRun(updateUserMetricsCore)
|
|
@ -15,18 +15,25 @@ export const logMemory = () => {
|
|||
}
|
||||
}
|
||||
|
||||
export const mapAsync = async <T, U>(
|
||||
xs: T[],
|
||||
fn: (x: T) => Promise<U>,
|
||||
concurrency = 100
|
||||
type UpdateSpec = {
|
||||
doc: admin.firestore.DocumentReference
|
||||
fields: { [k: string]: unknown }
|
||||
}
|
||||
|
||||
export const writeUpdatesAsync = async (
|
||||
db: admin.firestore.Firestore,
|
||||
updates: UpdateSpec[],
|
||||
batchSize = 500 // 500 = Firestore batch limit
|
||||
) => {
|
||||
const results = []
|
||||
const chunks = chunk(xs, concurrency)
|
||||
const chunks = chunk(updates, batchSize)
|
||||
for (let i = 0; i < chunks.length; i++) {
|
||||
log(`${i * concurrency}/${xs.length} processed...`)
|
||||
results.push(...(await Promise.all(chunks[i].map(fn))))
|
||||
log(`${i * batchSize}/${updates.length} updates written...`)
|
||||
const batch = db.batch()
|
||||
for (const { doc, fields } of chunks[i]) {
|
||||
batch.update(doc, fields)
|
||||
}
|
||||
await batch.commit()
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
||||
export const isProd =
|
||||
|
|
Loading…
Reference in New Issue
Block a user