Skip to content

Commit

Permalink
perf(cu): use cached readState on dryRun if within allowed max age #984
Browse files Browse the repository at this point in the history
… (#1009)

* perf(cu): optimize chain locking and re-use of previous message evals

* perf(cu): use cached readState on dryRun if within allowed max age #984
  • Loading branch information
TillaTheHun0 authored Sep 10, 2024
1 parent 33724cb commit f3acc36
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 336 deletions.
9 changes: 7 additions & 2 deletions servers/cu/src/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ async function readFile (file) {
export const createApis = async (ctx) => {
ctx.logger('Creating business logic apis')

const setTimeout = (...args) => lt.setTimeout(...args)
const clearTimeout = (...args) => lt.clearTimeout(...args)

const { locate } = schedulerUtilsConnect({
cacheSize: 100,
GRAPHQL_URL: ctx.GRAPHQL_URL,
Expand Down Expand Up @@ -230,8 +233,8 @@ export const createApis = async (ctx) => {
TTL: ctx.PROCESS_MEMORY_CACHE_TTL,
writeProcessMemoryFile,
logger: ctx.logger,
setTimeout: (...args) => lt.setTimeout(...args),
clearTimeout: (...args) => lt.clearTimeout(...args)
setTimeout,
clearTimeout
})

const loadWasmModule = WasmClient.loadWasmModuleWith({
Expand Down Expand Up @@ -381,6 +384,8 @@ export const createApis = async (ctx) => {
const dryRunLogger = ctx.logger.child('dryRun')
const dryRun = dryRunWith({
...sharedDeps(dryRunLogger),
setTimeout,
clearTimeout,
loadMessageMeta: AoSuClient.loadMessageMetaWith({ fetch: ctx.fetch, logger: dryRunLogger }),
/**
* Dry-runs use a separate worker thread pool, so as to not block
Expand Down
256 changes: 158 additions & 98 deletions servers/cu/src/domain/api/dryRun.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Readable } from 'node:stream'
import { omit } from 'ramda'
import { Resolved, of } from 'hyper-async'
import { omit, pick } from 'ramda'
import { Rejected, Resolved, of } from 'hyper-async'

import { loadMessageMetaWith } from '../lib/loadMessageMeta.js'
import { evaluateWith } from '../lib/evaluate.js'
Expand All @@ -9,6 +9,39 @@ import { loadModuleWith } from '../lib/loadModule.js'
import { mapFrom } from '../utils.js'
import { readStateWith } from './readState.js'

const DEFAULT_MAX_PROCESS_AGE = 2000
/**
* TODO: should this be an effect or shared util?
* just keeping here for sake of locality for now
*/
const TtlCache = ({ setTimeout, clearTimeout }) => {
const cache = {
data: new Map(),
timers: new Map(),
set: (k, v, ttl) => {
if (cache.timers.has(k)) clearTimeout(cache.timers.get(k))
const t = setTimeout(() => cache.delete(k), ttl)
t.unref()
cache.timers.set(k, t)
cache.data.set(k, v)
},
get: k => cache.data.get(k),
has: k => cache.data.has(k),
delete: k => {
if (cache.timers.has(k)) clearTimeout(cache.timers.get(k))
cache.timers.delete(k)
return cache.data.delete(k)
},
clear: () => {
cache.data.clear()
for (const v of cache.timers.values()) clearTimeout(v)
cache.timers.clear()
}
}

return cache
}

/**
* @typedef Env
*
Expand All @@ -25,6 +58,7 @@ import { readStateWith } from './readState.js'
* @returns {ReadResult}
*/
export function dryRunWith (env) {
const logger = env.logger
const loadMessageMeta = loadMessageMetaWith(env)
const loadModule = loadModuleWith(env)
const readState = readStateWith(env)
Expand All @@ -37,30 +71,48 @@ export function dryRunWith (env) {
loadEvaluator: env.loadDryRunEvaluator
})

return ({ processId, messageTxId, dryRun }) => {
return of({ messageTxId })
.chain(({ messageTxId }) => {
/**
* Load the metadata associated with the messageId ie.
* it's timestamp and ordinate, so readState can evaluate
* up to that point (if it hasn't already)
*/
if (messageTxId) return loadMessageMeta({ processId, messageTxId })
const readStateCache = TtlCache(env)

function loadMessageCtx ({ messageTxId, processId }) {
/**
* Load the metadata associated with the messageId ie.
* it's timestamp and ordinate, so readState can evaluate
* up to that point (if it hasn't already)
*/
if (messageTxId) return loadMessageMeta({ processId, messageTxId })

/**
* No messageTxId provided so evaluate up to latest
*/
return Resolved({
processId,
timestamp: undefined,
nonce: undefined
})
}

function ensureProcessLoaded ({ maxProcessAge }) {
return (ctx) => of(ctx)
.chain((ctx) => {
/**
* No messageTxId provided so evaluate up to latest
* Check to see whether process memory from a previous readState,
* executed by a previous dryRun, can be used to perform this dryRun
*/
return Resolved({
processId,
to: undefined,
ordinate: undefined
})
const cached = readStateCache.get(ctx.processId)

if (cached && new Date().getTime() - cached.age <= maxProcessAge) {
logger.debug(
'Using recently cached process memory for dry-run to process "%s": "%j"',
ctx.processId,
pick(['from', 'ordinate', 'fromBlockHeight', 'fromCron'], cached.ctx)
)
return Resolved(cached.ctx)
}

return Rejected(ctx)
})
/**
* Read up to the specified 'to', or latest
*/
.chain((res) =>
readState({
.bichain(
(res) => readState({
processId: res.processId,
to: res.timestamp,
/**
Expand All @@ -74,97 +126,105 @@ export function dryRunWith (env) {
* So we explicitly set cron to undefined, for posterity
*/
cron: undefined,
needsOnlyMemory: true
}).map((res) => {
const cached = { age: new Date().getTime(), ctx: res }
/**
* If we are evaluating up to a specific message, as indicated by
* the presence of messageTxId, then we make sure we get an exact match.
*
* Otherwise, we are evaluating up to the latest
* Cache the readState. Since we are not copying ctx,
* this should have a fairly minimal footprint, only adding
* the overhead of maintaining the map, timers, for the specified
* age
*/
exact: !!messageTxId,
needsMemory: true
})
readStateCache.set(res.id, cached, 4000)
return res
}),
Resolved
)
}

function ensureModuleLoaded (ctx) {
/**
* If a cached evaluation was found and immediately returned,
* then we will have not loaded the module and attached it to ctx.
*
* So we check if ctx.module is set, and load the Module if not.
*
* This check will prevent us from unnecessarily loading the module
* from Arweave, twice.
*/
if (!ctx.moduleId) return loadModule(ctx)

/**
* The module was loaded by readState, as part of evaluation,
* so no need to load it again. Just reuse it
*/
return Resolved(ctx)
}

return ({ processId, messageTxId, maxProcessAge = DEFAULT_MAX_PROCESS_AGE, dryRun }) => {
return of({ processId, messageTxId })
.chain(loadMessageCtx)
.chain(ensureProcessLoaded({ maxProcessAge }))
.chain(ensureModuleLoaded)
/**
* We've read up to 'to', now inject the dry-run message
*
* { id, owner, tags, output: { Memory, Error, Messages, Spawns, Output } }
*/
.chain((readStateRes) => {
return of(readStateRes)
.chain((ctx) => {
/**
* If a cached evaluation was found and immediately returned,
* then we will have not loaded the module and attached it to ctx.
*
* So we check if ctx.module is set, and load the Module if not.
*
* This check will prevent us from unnecessarily loading the module
* from Arweave, twice.
*/
if (!ctx.moduleId) return loadModule(ctx)

.chain((ctx) => {
async function * dryRunMessage () {
/**
* Dry run messages are not signed, and therefore
* will not have a verifiable Id, Signature, Owner, etc.
*
* NOTE:
* Dry Run messages are not signed, therefore not verifiable.
*
* This is generally okay, because dry-run message evaluations
* are Read-Only and not persisted -- the primary use-case for Dry Run is to enable
* retrieving a view of a processes state, without having to send a bonafide message.
*
* However, we should keep in mind the implications. One implication is that spoofing
* Owner or other fields on a Dry-Run message (unverifiable context) exposes a way to
* "poke and prod" a process modules for vulnerabilities.
*/
yield messageSchema.parse({
/**
* The module was loaded by readState, as part of evaluation,
* so no need to load it again. Just reuse it
* Don't save the dryRun message
*/
return Resolved(ctx)
})
.chain((ctx) => {
async function * dryRunMessage () {
noSave: true,
deepHash: undefined,
cron: undefined,
ordinate: ctx.ordinate,
name: 'Dry Run Message',
message: {
/**
* Dry run messages are not signed, and therefore
* will not have a verifiable Id, Signature, Owner, etc.
*
* NOTE:
* Dry Run messages are not signed, therefore not verifiable.
*
* This is generally okay, because dry-run message evaluations
* are Read-Only and not persisted -- the primary use-case for Dry Run is to enable
* retrieving a view of a processes state, without having to send a bonafide message.
* We default timestamp and block-height using
* the current evaluation.
*
* However, we should keep in mind the implications. One implication is that spoofing
* Owner or other fields on a Dry-Run message (unverifiable context) exposes a way to
* "poke and prod" a process modules for vulnerabilities.
* The Dry-Run message can overwrite them
*/
yield messageSchema.parse({
/**
* Don't save the dryRun message
*/
noSave: true,
deepHash: undefined,
cron: undefined,
ordinate: ctx.ordinate,
name: 'Dry Run Message',
message: {
/**
* We default timestamp and block-height using
* the current evaluation.
*
* The Dry-Run message can overwrite them
*/
Timestamp: ctx.from,
'Block-Height': ctx.fromBlockHeight,
Cron: false,
Target: processId,
...dryRun,
From: mapFrom({ tags: dryRun.Tags, owner: dryRun.Owner }),
'Read-Only': true
},
AoGlobal: {
Process: { Id: processId, Owner: ctx.owner, Tags: ctx.tags },
Module: { Id: ctx.moduleId, Owner: ctx.moduleOwner, Tags: ctx.moduleTags }
}
})
Timestamp: ctx.from,
'Block-Height': ctx.fromBlockHeight,
Cron: false,
Target: processId,
...dryRun,
From: mapFrom({ tags: dryRun.Tags, owner: dryRun.Owner }),
'Read-Only': true
},
AoGlobal: {
Process: { Id: processId, Owner: ctx.owner, Tags: ctx.tags },
Module: { Id: ctx.moduleId, Owner: ctx.moduleOwner, Tags: ctx.moduleTags }
}

/**
* Pass a messages stream to evaluate that only emits the single dry-run
* message and then completes
*/
return evaluate({ ...ctx, dryRun: true, messages: Readable.from(dryRunMessage()) })
})
}

/**
* Pass a messages stream to evaluate that only emits the single dry-run
* message and then completes
*/
return evaluate({ ...ctx, dryRun: true, messages: Readable.from(dryRunMessage()) })
})
.map((res) => res.output)
.map(omit(['Memory']))
.map((res) => omit(['Memory'], res.output))
}
}
5 changes: 1 addition & 4 deletions servers/cu/src/domain/api/readResult.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ export function readResultWith (env) {
* So we explicitly set cron to undefined, for posterity
*/
cron: undefined,
/**
* We want an exact match to this messages evaluation
*/
exact: true
needsOnlyMemory: false
}))
.map((res) => res.output)
.map(omit(['Memory']))
Expand Down
Loading

0 comments on commit f3acc36

Please sign in to comment.