Skip to content

Commit

Permalink
perf(cu): optimize chain locking and re-use of previous message evals
Browse files Browse the repository at this point in the history
  • Loading branch information
TillaTheHun0 committed Sep 9, 2024
1 parent 33a48b6 commit fd6cf74
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 243 deletions.
9 changes: 1 addition & 8 deletions servers/cu/src/domain/api/dryRun.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,7 @@ export function dryRunWith (env) {
* So we explicitly set cron to undefined, for posterity
*/
cron: undefined,
/**
* If we are evaluating up to a specific message, as indicated by
* the presence of messageTxId, then we make sure we get an exact match.
*
* Otherwise, we are evaluating up to the latest
*/
exact: !!messageTxId,
needsMemory: true
needsOnlyMemory: true
})
)
/**
Expand Down
5 changes: 1 addition & 4 deletions servers/cu/src/domain/api/readResult.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ export function readResultWith (env) {
* So we explicitly set cron to undefined, for posterity
*/
cron: undefined,
/**
* We want an exact match to this messages evaluation
*/
exact: true
needsOnlyMemory: false
}))
.map((res) => res.output)
.map(omit(['Memory']))
Expand Down
131 changes: 80 additions & 51 deletions servers/cu/src/domain/api/readState.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { isNotNil } from 'ramda'
import { isNotNil, join, split } from 'ramda'
import { Resolved, of, fromPromise } from 'hyper-async'

import { chainEvaluationWith } from '../lib/chainEvaluation.js'
Expand All @@ -9,24 +9,26 @@ import { evaluateWith } from '../lib/evaluate.js'
import { hydrateMessagesWith } from '../lib/hydrateMessages.js'
import { loadProcessMetaWith } from '../lib/loadProcessMeta.js'

export { pendingReadStates } from '../lib/chainEvaluation.js'

/**
* @typedef State
* @property {any} state
* @property {any} result
*
* @typedef ReadStateArgs
* @property {string} processId
* @property {string} to
* We will maintain a Map of currently executing readState calls.
*
* @callback ReadState
* @param {ReadStateArgs} args
* @returns {Promise<State>} result
* If another request comes in to invoke a readState that is already
* pending, then we will just return that one, instead of spinning up a new one
*
* @returns {ReadState}
* @type {Map<string, { startTime: Date, pending: Promise<any> }}
*/
export const pendingReadState = new Map()
const removePendingReadState = (key) => (res) => {
pendingReadState.delete(key)
return res
}

export const pendingReadStates = () => Object.fromEntries(pendingReadState.entries())

export function readStateWith (env) {
env.pendingReadState = pendingReadState
env.fromPendingKey = split(',')

const chainEvaluation = chainEvaluationWith(env)
const loadProcessMeta = loadProcessMetaWith(env)
const loadProcess = loadProcessWith(env)
Expand All @@ -35,7 +37,7 @@ export function readStateWith (env) {
const loadModule = loadModuleWith(env)
const evaluate = evaluateWith(env)

return ({ processId, messageId, to, ordinate, cron, exact, needsMemory }) => {
return ({ processId, messageId, to, ordinate, cron, needsOnlyMemory }) => {
messageId = messageId || [to, ordinate, cron].filter(isNotNil).join(':') || 'latest'

const stats = {
Expand All @@ -60,6 +62,8 @@ export function readStateWith (env) {
return res
}

const key = join(',', [processId, to, ordinate, cron, needsOnlyMemory])

/**
* The potential Promise that encapsulates the evaluation stream
* for this readState
Expand All @@ -70,45 +74,70 @@ export function readStateWith (env) {
*/
let pending
function next () {
if (!pending) {
/**
* The Async is forked into a Promise, which then wrapped
* into another Async.
*
* Since there is only one instance of the underlying Promise,
* there is only one instance of the work used to resolve each Async,
* every time, thus preventing duplication of work
*/
pending = of({ id: processId, messageId, to, ordinate, cron, stats, needsMemory })
.chain(loadProcessMeta)
.chain(loadProcess)
.chain(loadModule)
.chain(loadMessages)
.chain(hydrateMessages)
.chain(evaluate)
.chain((ctx) => Resolved({
...ctx,
result: ctx.output,
from: ctx.last.timestamp,
fromBlockHeight: ctx.last.blockHeight,
ordinate: ctx.last.ordinate
}))
.bimap(logStats, logStats)
.toPromise()
}
if (pending) return pending

/**
* The Async is forked into a Promise, which then wrapped
* into another Async.
*
* Since there is only one instance of the underlying Promise,
* there is only one instance of the work used to resolve each Async,
* every time, thus preventing duplication of work
*/
pending = of({ id: processId, messageId, to, ordinate, cron, stats, needsOnlyMemory })
.chain(loadProcessMeta)
.chain(loadProcess)
.chain((ctx) => {
/**
* An exact match was found, either a cached evaluation
* or the cache memory needed. So just return it without
* spinning up a new eval stream.
*
* The shape of ctx should match the below shape at the end
* of an eval stream, which loadProcess does
*/
if (ctx.exact) return Resolved({ ...ctx, result: ctx.result, output: ctx.result })

return of(ctx)
.chain(loadModule)
.chain(loadMessages)
.chain(hydrateMessages)
.chain(evaluate)
.chain((ctx) => Resolved({
...ctx,
result: ctx.output,
from: ctx.last.timestamp,
fromBlockHeight: ctx.last.blockHeight,
ordinate: ctx.last.ordinate,
fromCron: ctx.last.cron
}))
})
.bimap(logStats, logStats)
.toPromise()

return pending
}

return chainEvaluation({
processId,
messageId,
to,
ordinate,
cron,
exact,
needsMemory,
next: of().chain(fromPromise(next))
})
return chainEvaluation({ pendingKey: key, processId, messageId, to, ordinate, cron, needsOnlyMemory })
.map(([isNewEntry, res]) => {
if (!isNewEntry) return res

/**
* New evaluations must be performed, so place it
* in the map of pendingReadStates, and chain the new work to be performed
* off of the pending work, then clean up by removing itself from the map
*/
const newEntry = {
startTime: res.startTime,
chainedTo: res.chainedTo,
pending: res.pending
.then(next)
.finally(removePendingReadState(key))
}
pendingReadState.set(key, newEntry)

return newEntry
})
.chain(fromPromise((res) => res.pending))
}
}
Loading

0 comments on commit fd6cf74

Please sign in to comment.