Improve visibility of updateFooMetrics
functions behavior (#485)
* Make updateFooMetrics functions manually testable * Add logging, test script to metrics update functions * Improve on `batchedWaitAll` for update functions
This commit is contained in:
parent
6ac129a0b8
commit
8fce8d5f23
18
functions/src/scripts/update-metrics.ts
Normal file
18
functions/src/scripts/update-metrics.ts
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
import { initAdmin } from './script-init'
|
||||||
|
initAdmin()
|
||||||
|
|
||||||
|
import { log, logMemory } from '../utils'
|
||||||
|
import { updateContractMetricsCore } from '../update-contract-metrics'
|
||||||
|
import { updateUserMetricsCore } from '../update-user-metrics'
|
||||||
|
|
||||||
|
async function updateMetrics() {
|
||||||
|
logMemory()
|
||||||
|
log('Updating contract metrics...')
|
||||||
|
await updateContractMetricsCore()
|
||||||
|
log('Updating user metrics...')
|
||||||
|
await updateUserMetricsCore()
|
||||||
|
}
|
||||||
|
|
||||||
|
if (require.main === module) {
|
||||||
|
updateMetrics().then(() => process.exit())
|
||||||
|
}
|
|
@ -2,33 +2,13 @@ import * as functions from 'firebase-functions'
|
||||||
import * as admin from 'firebase-admin'
|
import * as admin from 'firebase-admin'
|
||||||
import { max, sumBy } from 'lodash'
|
import { max, sumBy } from 'lodash'
|
||||||
|
|
||||||
import { getValues } from './utils'
|
import { getValues, log, logMemory, mapAsync } from './utils'
|
||||||
import { Bet } from '../../common/bet'
|
import { Bet } from '../../common/bet'
|
||||||
import { batchedWaitAll } from '../../common/util/promise'
|
|
||||||
|
|
||||||
const firestore = admin.firestore()
|
const firestore = admin.firestore()
|
||||||
|
|
||||||
const oneDay = 1000 * 60 * 60 * 24
|
const oneDay = 1000 * 60 * 60 * 24
|
||||||
|
|
||||||
export const updateContractMetrics = functions
|
|
||||||
.runWith({ memory: '1GB' })
|
|
||||||
.pubsub.schedule('every 15 minutes')
|
|
||||||
.onRun(async () => {
|
|
||||||
const contractDocs = await firestore.collection('contracts').listDocuments()
|
|
||||||
await batchedWaitAll(
|
|
||||||
contractDocs.map((doc) => async () => {
|
|
||||||
const [volume24Hours, volume7Days] = await computeVolumes(doc.id, [
|
|
||||||
oneDay,
|
|
||||||
oneDay * 7,
|
|
||||||
])
|
|
||||||
return doc.update({
|
|
||||||
volume24Hours,
|
|
||||||
volume7Days,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
)
|
|
||||||
})
|
|
||||||
|
|
||||||
const computeVolumes = async (contractId: string, durationsMs: number[]) => {
|
const computeVolumes = async (contractId: string, durationsMs: number[]) => {
|
||||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||||
const longestDurationMs = max(durationsMs)!
|
const longestDurationMs = max(durationsMs)!
|
||||||
|
@ -43,3 +23,25 @@ const computeVolumes = async (contractId: string, durationsMs: number[]) => {
|
||||||
return sumBy(bets, (bet) => (bet.isRedemption ? 0 : Math.abs(bet.amount)))
|
return sumBy(bets, (bet) => (bet.isRedemption ? 0 : Math.abs(bet.amount)))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const updateContractMetricsCore = async () => {
|
||||||
|
const contractDocs = await firestore.collection('contracts').listDocuments()
|
||||||
|
log(`Loaded ${contractDocs.length} contract IDs.`)
|
||||||
|
logMemory()
|
||||||
|
await mapAsync(contractDocs, async (doc) => {
|
||||||
|
const [volume24Hours, volume7Days] = await computeVolumes(doc.id, [
|
||||||
|
oneDay,
|
||||||
|
oneDay * 7,
|
||||||
|
])
|
||||||
|
return await doc.update({
|
||||||
|
volume24Hours,
|
||||||
|
volume7Days,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
log(`Updated metrics for ${contractDocs.length} contracts.`)
|
||||||
|
}
|
||||||
|
|
||||||
|
export const updateContractMetrics = functions
|
||||||
|
.runWith({ memory: '1GB' })
|
||||||
|
.pubsub.schedule('every 15 minutes')
|
||||||
|
.onRun(updateContractMetricsCore)
|
||||||
|
|
|
@ -2,52 +2,14 @@ import * as functions from 'firebase-functions'
|
||||||
import * as admin from 'firebase-admin'
|
import * as admin from 'firebase-admin'
|
||||||
import { groupBy, sum, sumBy } from 'lodash'
|
import { groupBy, sum, sumBy } from 'lodash'
|
||||||
|
|
||||||
import { getValues } from './utils'
|
import { getValues, log, logMemory, mapAsync } from './utils'
|
||||||
import { Contract } from '../../common/contract'
|
import { Contract } from '../../common/contract'
|
||||||
import { Bet } from '../../common/bet'
|
import { Bet } from '../../common/bet'
|
||||||
import { User } from '../../common/user'
|
import { User } from '../../common/user'
|
||||||
import { batchedWaitAll } from '../../common/util/promise'
|
|
||||||
import { calculatePayout } from '../../common/calculate'
|
import { calculatePayout } from '../../common/calculate'
|
||||||
|
|
||||||
const firestore = admin.firestore()
|
const firestore = admin.firestore()
|
||||||
|
|
||||||
export const updateUserMetrics = functions
|
|
||||||
.runWith({ memory: '1GB' })
|
|
||||||
.pubsub.schedule('every 15 minutes')
|
|
||||||
.onRun(async () => {
|
|
||||||
const [users, contracts, bets] = await Promise.all([
|
|
||||||
getValues<User>(firestore.collection('users')),
|
|
||||||
getValues<Contract>(firestore.collection('contracts')),
|
|
||||||
firestore.collectionGroup('bets').get(),
|
|
||||||
])
|
|
||||||
|
|
||||||
const contractsDict = Object.fromEntries(
|
|
||||||
contracts.map((contract) => [contract.id, contract])
|
|
||||||
)
|
|
||||||
|
|
||||||
const betsByUser = groupBy(
|
|
||||||
bets.docs.map((doc) => doc.data() as Bet),
|
|
||||||
(bet) => bet.userId
|
|
||||||
)
|
|
||||||
|
|
||||||
await batchedWaitAll(
|
|
||||||
users.map((user) => async () => {
|
|
||||||
const investmentValue = computeInvestmentValue(
|
|
||||||
betsByUser[user.id] ?? [],
|
|
||||||
contractsDict
|
|
||||||
)
|
|
||||||
const creatorVolume = computeTotalPool(user, contractsDict)
|
|
||||||
const totalValue = user.balance + investmentValue
|
|
||||||
const totalPnL = totalValue - user.totalDeposits
|
|
||||||
|
|
||||||
await firestore.collection('users').doc(user.id).update({
|
|
||||||
totalPnLCached: totalPnL,
|
|
||||||
creatorVolumeCached: creatorVolume,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
)
|
|
||||||
})
|
|
||||||
|
|
||||||
const computeInvestmentValue = (
|
const computeInvestmentValue = (
|
||||||
bets: Bet[],
|
bets: Bet[],
|
||||||
contractsDict: { [k: string]: Contract }
|
contractsDict: { [k: string]: Contract }
|
||||||
|
@ -75,9 +37,43 @@ const computeTotalPool = (
|
||||||
return sum(pools)
|
return sum(pools)
|
||||||
}
|
}
|
||||||
|
|
||||||
// const computeVolume = async (contract: Contract) => {
|
export const updateUserMetricsCore = async () => {
|
||||||
// const bets = await getValues<Bet>(
|
const [users, contracts, bets] = await Promise.all([
|
||||||
// firestore.collection(`contracts/${contract.id}/bets`)
|
getValues<User>(firestore.collection('users')),
|
||||||
// )
|
getValues<Contract>(firestore.collection('contracts')),
|
||||||
// return sumBy(bets, (bet) => Math.abs(bet.amount))
|
firestore.collectionGroup('bets').get(),
|
||||||
// }
|
])
|
||||||
|
log(
|
||||||
|
`Loaded ${users.length} users, ${contracts.length} contracts, and ${bets.docs.length} bets.`
|
||||||
|
)
|
||||||
|
logMemory()
|
||||||
|
|
||||||
|
const contractsDict = Object.fromEntries(
|
||||||
|
contracts.map((contract) => [contract.id, contract])
|
||||||
|
)
|
||||||
|
|
||||||
|
const betsByUser = groupBy(
|
||||||
|
bets.docs.map((doc) => doc.data() as Bet),
|
||||||
|
(bet) => bet.userId
|
||||||
|
)
|
||||||
|
|
||||||
|
await mapAsync(users, async (user) => {
|
||||||
|
const investmentValue = computeInvestmentValue(
|
||||||
|
betsByUser[user.id] ?? [],
|
||||||
|
contractsDict
|
||||||
|
)
|
||||||
|
const creatorVolume = computeTotalPool(user, contractsDict)
|
||||||
|
const totalValue = user.balance + investmentValue
|
||||||
|
const totalPnL = totalValue - user.totalDeposits
|
||||||
|
return await firestore.collection('users').doc(user.id).update({
|
||||||
|
totalPnLCached: totalPnL,
|
||||||
|
creatorVolumeCached: creatorVolume,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
log(`Updated metrics for ${users.length} users.`)
|
||||||
|
}
|
||||||
|
|
||||||
|
export const updateUserMetrics = functions
|
||||||
|
.runWith({ memory: '1GB' })
|
||||||
|
.pubsub.schedule('every 15 minutes')
|
||||||
|
.onRun(updateUserMetricsCore)
|
||||||
|
|
|
@ -1,8 +1,34 @@
|
||||||
import * as admin from 'firebase-admin'
|
import * as admin from 'firebase-admin'
|
||||||
|
|
||||||
|
import { chunk } from 'lodash'
|
||||||
import { Contract } from '../../common/contract'
|
import { Contract } from '../../common/contract'
|
||||||
import { PrivateUser, User } from '../../common/user'
|
import { PrivateUser, User } from '../../common/user'
|
||||||
|
|
||||||
|
export const log = (...args: unknown[]) => {
|
||||||
|
console.log(`[${new Date().toISOString()}]`, ...args)
|
||||||
|
}
|
||||||
|
|
||||||
|
export const logMemory = () => {
|
||||||
|
const used = process.memoryUsage()
|
||||||
|
for (const [k, v] of Object.entries(used)) {
|
||||||
|
log(`${k} ${Math.round((v / 1024 / 1024) * 100) / 100} MB`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const mapAsync = async <T, U>(
|
||||||
|
xs: T[],
|
||||||
|
fn: (x: T) => Promise<U>,
|
||||||
|
concurrency = 100
|
||||||
|
) => {
|
||||||
|
const results = []
|
||||||
|
const chunks = chunk(xs, concurrency)
|
||||||
|
for (let i = 0; i < chunks.length; i++) {
|
||||||
|
log(`${i * concurrency}/${xs.length} processed...`)
|
||||||
|
results.push(...(await Promise.all(chunks[i].map(fn))))
|
||||||
|
}
|
||||||
|
return results
|
||||||
|
}
|
||||||
|
|
||||||
export const isProd =
|
export const isProd =
|
||||||
admin.instanceId().app.options.projectId === 'mantic-markets'
|
admin.instanceId().app.options.projectId === 'mantic-markets'
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user