diff --git a/servers/cu/src/app.js b/servers/cu/src/app.js index acc6a3f04..6cc1faf28 100644 --- a/servers/cu/src/app.js +++ b/servers/cu/src/app.js @@ -30,6 +30,7 @@ export const server = pipeP( */ const server = app.listen({ port: config.port, host: '0.0.0.0' }, () => { logger(`Server is running on http://localhost:${config.port}`) + logger(`Server in unit mode: "${config.UNIT_MODE}"`) }) const memMonitor = setInterval(async () => { diff --git a/servers/cu/src/bootstrap.js b/servers/cu/src/bootstrap.js index 2f69061b4..268366960 100644 --- a/servers/cu/src/bootstrap.js +++ b/servers/cu/src/bootstrap.js @@ -101,7 +101,8 @@ export const createApis = async (ctx) => { id: workerId, MODE: ctx.MODE, LOG_CONFIG_PATH: ctx.LOG_CONFIG_PATH, - DEFAULT_LOG_LEVEL: ctx.DEFAULT_LOG_LEVEL + DEFAULT_LOG_LEVEL: ctx.DEFAULT_LOG_LEVEL, + DISABLE_PROCESS_EVALUATION_CACHE: ctx.DISABLE_PROCESS_EVALUATION_CACHE } } } diff --git a/servers/cu/src/config.js b/servers/cu/src/config.js index 0c6ca8694..67f09d5d4 100644 --- a/servers/cu/src/config.js +++ b/servers/cu/src/config.js @@ -41,6 +41,11 @@ const DEFAULT_PROCESS_WASM_MODULE_FORMATS = [ */ const serverConfigSchema = domainConfigSchema.extend({ MODE: z.enum(['development', 'production']), + /** + * Whether the unit is operating as a Compute Unit + * or Read Unit. Defaults to 'cu'. + */ + UNIT_MODE: z.enum(['cu', 'ru']), port: positiveIntSchema, ENABLE_METRICS_ENDPOINT: z.preprocess((val) => !!val, z.boolean()) }) @@ -51,13 +56,32 @@ const serverConfigSchema = domainConfigSchema.extend({ */ /* eslint-disable no-throw-literal */ +const preprocessUnitMode = (envConfig) => { + const { UNIT_MODE } = envConfig + + if (UNIT_MODE === 'cu') return envConfig + + /** + * A Read Unit's primary concern is serving dry-runs, + * and so does not create checkpoints and does not cache evaluation + * results to be served later. + */ + return { + ...envConfig, + DISABLE_PROCESS_EVALUATION_CACHE: true, + DISABLE_PROCESS_CHECKPOINT_CREATION: true, + DISABLE_PROCESS_FILE_CHECKPOINT_CREATION: true, + PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL: 0 + } +} + /** * If the WALLET is defined, then do nothing. * * Otherwise, check whether the WALLET_FILE env var is defined and load it contents * as WALLET */ -export const preprocessWallet = (envConfig) => { +const preprocessWallet = (envConfig) => { const { WALLET, WALLET_FILE, ...theRestOfTheConfig } = envConfig // WALLET takes precendent. nothing to do here @@ -83,7 +107,7 @@ export const preprocessWallet = (envConfig) => { const preprocessedServerConfigSchema = z.preprocess( (envConfig, zodRefinementContext) => { try { - return pipe(preprocessWallet, preprocessUrls)(envConfig) + return pipe(preprocessUnitMode, preprocessWallet, preprocessUrls)(envConfig) } catch (message) { zodRefinementContext.addIssue({ code: ZodIssueCode.custom, message }) } @@ -100,6 +124,7 @@ const preprocessedServerConfigSchema = z.preprocess( const CONFIG_ENVS = { development: { MODE, + UNIT_MODE: process.env.UNIT_MODE || 'cu', DEFAULT_LOG_LEVEL: process.env.DEFAULT_LOG_LEVEL || 'debug', LOG_CONFIG_PATH: process.env.LOG_CONFIG_PATH || '.loglevel', MODULE_MODE: process.env.MODULE_MODE, @@ -117,6 +142,7 @@ const CONFIG_ENVS = { PROCESS_CHECKPOINT_CREATION_THROTTLE: process.env.PROCESS_CHECKPOINT_CREATION_THROTTLE || ms('30m'), DISABLE_PROCESS_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_CHECKPOINT_CREATION !== 'false', DISABLE_PROCESS_FILE_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_FILE_CHECKPOINT_CREATION !== 'false', + DISABLE_PROCESS_EVALUATION_CACHE: process.env.DISABLE_PROCESS_EVALUATION_CACHE, /** * EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: Amount of gas for 2 hours of continuous compute (300_000_000_000_000) * This was calculated by creating a process built to do continuous compute. After 2 hours, this process used @@ -149,6 +175,7 @@ const CONFIG_ENVS = { }, production: { MODE, + UNIT_MODE: process.env.UNIT_MODE || 'cu', DEFAULT_LOG_LEVEL: process.env.DEFAULT_LOG_LEVEL || 'debug', LOG_CONFIG_PATH: process.env.LOG_CONFIG_PATH || '.loglevel', MODULE_MODE: process.env.MODULE_MODE, @@ -164,8 +191,9 @@ const CONFIG_ENVS = { WALLET_FILE: process.env.WALLET_FILE, MEM_MONITOR_INTERVAL: process.env.MEM_MONITOR_INTERVAL || ms('30s'), PROCESS_CHECKPOINT_CREATION_THROTTLE: process.env.PROCESS_CHECKPOINT_CREATION_THROTTLE || ms('30m'), - DISABLE_PROCESS_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_CHECKPOINT_CREATION !== 'false', // TODO: disabled by default for now. Enable by default later + DISABLE_PROCESS_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_CHECKPOINT_CREATION !== 'false', DISABLE_PROCESS_FILE_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_FILE_CHECKPOINT_CREATION !== 'false', + DISABLE_PROCESS_EVALUATION_CACHE: process.env.DISABLE_PROCESS_EVALUATION_CACHE, /** * EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: Amount of gas for 2 hours of continuous compute (300_000_000_000_000) * This was calculated by creating a process built to do continuous compute by adding and clearing a table. diff --git a/servers/cu/src/domain/lib/loadMessages.js b/servers/cu/src/domain/lib/loadMessages.js index c6ae74009..ebb3485a4 100644 --- a/servers/cu/src/domain/lib/loadMessages.js +++ b/servers/cu/src/domain/lib/loadMessages.js @@ -369,7 +369,7 @@ function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) { } } -export function maybePrependProcessMessage (ctx, logger) { +export function maybePrependProcessMessage (ctx, logger, loadTransactionData) { return async function * ($messages) { const isColdStart = isNil(ctx.from) @@ -405,6 +405,15 @@ export function maybePrependProcessMessage (ctx, logger) { */ if (done || (parseTags(value.message.Tags).Type !== 'Process') || value.message.Cron) { logger('Emitting process message at beginning of evaluation stream for process %s cold start', ctx.id) + + /** + * data for a process can potentially be very large, and it's only needed + * as part of the very first process message sent to the process (aka. Bootloader). + * + * So in lieu of caching the process data, we fetch it once here, on cold start, + */ + const processData = await loadTransactionData(ctx.id).then(res => res.text()) + yield { /** * Ensure the noSave flag is set, so evaluation does not persist @@ -416,7 +425,7 @@ export function maybePrependProcessMessage (ctx, logger) { message: { Id: ctx.id, Signature: ctx.signature, - Data: ctx.data, + Data: processData, Owner: ctx.owner, /** * the target of the process message is itself @@ -484,7 +493,7 @@ function loadScheduledMessagesWith ({ loadMessages, logger }) { ) } -function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, saveBlocks, logger }) { +function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, loadTransactionData, saveBlocks, logger }) { loadTimestamp = fromPromise(loadTimestampSchema.implement(loadTimestamp)) const reconcileBlocks = reconcileBlocksWith({ findBlocks, loadBlocksMeta, saveBlocks }) @@ -697,7 +706,7 @@ function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, save .map($messages => { return composeStreams( $messages, - Transform.from(maybePrependProcessMessage(ctx, logger)) + Transform.from(maybePrependProcessMessage(ctx, logger, loadTransactionData)) ) }) .map(messages => ({ messages })) diff --git a/servers/cu/src/domain/lib/loadMessages.test.js b/servers/cu/src/domain/lib/loadMessages.test.js index 8c92542e2..d9210edc4 100644 --- a/servers/cu/src/domain/lib/loadMessages.test.js +++ b/servers/cu/src/domain/lib/loadMessages.test.js @@ -475,6 +475,11 @@ describe('loadMessages', () => { } } + const loadTransactionData = async (id) => { + assert.equal(id, 'process-123') + return new Response('process data') + } + describe('should prepend the process message on cold start', () => { test('if first stream message is not the process', async () => { const $messages = Readable.from([ @@ -486,13 +491,14 @@ describe('loadMessages', () => { } } ]) - const $merged = maybePrependProcessMessage(ctx, logger)($messages) + const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages) const results = [] for await (const m of $merged) results.push(m) assert.equal(results.length, 2) assert.equal(results[0].name, 'Process Message process-123') + assert.equal(results[0].message.Data, 'process data') }) test('if the first stream message is a cron message', async () => { @@ -506,24 +512,26 @@ describe('loadMessages', () => { } } ]) - const $merged = maybePrependProcessMessage(ctx, logger)($messages) + const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages) const results = [] for await (const m of $merged) results.push(m) assert.equal(results.length, 2) assert.equal(results[0].name, 'Process Message process-123') + assert.equal(results[0].message.Data, 'process data') }) test('if there are no messages', async () => { const $messages = Readable.from([]) - const $merged = maybePrependProcessMessage(ctx, logger)($messages) + const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages) const results = [] for await (const m of $merged) results.push(m) assert.equal(results.length, 1) assert.equal(results[0].name, 'Process Message process-123') + assert.equal(results[0].message.Data, 'process data') }) }) @@ -546,7 +554,7 @@ describe('loadMessages', () => { } } ]) - const $merged = maybePrependProcessMessage(ctx, logger)($messages) + const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages) const results = [] for await (const m of $merged) results.push(m) diff --git a/servers/cu/src/domain/model.js b/servers/cu/src/domain/model.js index 3271e743a..d7ed87371 100644 --- a/servers/cu/src/domain/model.js +++ b/servers/cu/src/domain/model.js @@ -96,6 +96,11 @@ export const domainConfigSchema = z.object({ * Whether to disable File Process Checkpoint creation entirely. */ DISABLE_PROCESS_FILE_CHECKPOINT_CREATION: z.preprocess((val) => !!val, z.boolean()), + /** + * Whether to disable caching process evaluations, useful when operating as + * a RU + */ + DISABLE_PROCESS_EVALUATION_CACHE: z.preprocess((val) => !!val, z.boolean()), /** * If a process uses this amount of * gas, then it will immediately create a Checkpoint at the end of the diff --git a/servers/cu/src/effects/ao-evaluation.js b/servers/cu/src/effects/ao-evaluation.js index 33bcd94c1..d6ef1eec7 100644 --- a/servers/cu/src/effects/ao-evaluation.js +++ b/servers/cu/src/effects/ao-evaluation.js @@ -96,7 +96,7 @@ export function findEvaluationWith ({ db }) { } } -export function saveEvaluationWith ({ db, logger: _logger }) { +export function saveEvaluationWith ({ DISABLE_PROCESS_EVALUATION_CACHE, db, logger: _logger }) { const toEvaluationDoc = pipe( converge( unapply(mergeAll), @@ -138,8 +138,10 @@ export function saveEvaluationWith ({ db, logger: _logger }) { function createQuery (evaluation) { const evalDoc = toEvaluationDoc(evaluation) - const statements = [ - { + const statements = [] + + if (!DISABLE_PROCESS_EVALUATION_CACHE) { + statements.push({ sql: ` INSERT OR IGNORE INTO ${EVALUATIONS_TABLE} (id, "processId", "messageId", "deepHash", nonce, epoch, timestamp, ordinate, "blockHeight", cron, "evaluatedAt", output) @@ -160,8 +162,8 @@ export function saveEvaluationWith ({ db, logger: _logger }) { evalDoc.evaluatedAt.getTime(), JSON.stringify(evalDoc.output) ] - } - ] + }) + } /** * Cron messages are not needed to be saved in the messages table diff --git a/servers/cu/src/effects/ao-evaluation.test.js b/servers/cu/src/effects/ao-evaluation.test.js index d9902e5de..925c24e34 100644 --- a/servers/cu/src/effects/ao-evaluation.test.js +++ b/servers/cu/src/effects/ao-evaluation.test.js @@ -249,6 +249,38 @@ describe('ao-evaluation', () => { evaluatedAt }) }) + + test('noop insert evaluation if DISABLE_PROCESS_EVALUATION_CACHE', async () => { + const saveEvaluation = saveEvaluationSchema.implement( + saveEvaluationWith({ + DISABLE_PROCESS_EVALUATION_CACHE: true, + db: { + transaction: async (statements) => { + assert.equal(statements.length, 1) + const [{ sql: messageDocSql }] = statements + assert.ok(messageDocSql.trim().startsWith(`INSERT OR IGNORE INTO ${MESSAGES_TABLE}`)) + + return Promise.resolve('process-123,1702677252111,1') + } + }, + logger + }) + ) + + await saveEvaluation({ + isAssignment: false, + deepHash: 'deepHash-123', + timestamp: 1702677252111, + nonce: '1', + epoch: 0, + ordinate: 1, + blockHeight: 1234, + processId: 'process-123', + messageId: 'message-123', + output: { Messages: [{ foo: 'bar' }], Memory: 'foo' }, + evaluatedAt + }) + }) }) describe('findEvaluations', () => { diff --git a/servers/cu/src/effects/ao-process.js b/servers/cu/src/effects/ao-process.js index ec6a9dfad..ff4c13692 100644 --- a/servers/cu/src/effects/ao-process.js +++ b/servers/cu/src/effects/ao-process.js @@ -419,6 +419,13 @@ export function saveProcessWith ({ db }) { } return (process) => { return of(process) + /** + * The data for the process could be very large, so we do not persist + * it, and instead hydrate it on the process message later, if needed. + */ + .map(evolve({ + data: () => null + })) /** * Ensure the expected shape before writing to the db */ diff --git a/servers/cu/src/effects/ao-process.test.js b/servers/cu/src/effects/ao-process.test.js index 301af9c9d..278299f29 100644 --- a/servers/cu/src/effects/ao-process.test.js +++ b/servers/cu/src/effects/ao-process.test.js @@ -134,7 +134,7 @@ describe('ao-process', () => { assert.deepStrictEqual(parameters, [ 'process-123', 'sig-123', - 'data-123', + null, // data is nullified null, JSON.stringify({ address: 'owner-123', key: 'key-123' }), JSON.stringify([{ name: 'foo', value: 'bar' }]), diff --git a/servers/cu/src/effects/worker/evaluator/main.js b/servers/cu/src/effects/worker/evaluator/main.js index 69fec8d48..ed01449bc 100644 --- a/servers/cu/src/effects/worker/evaluator/main.js +++ b/servers/cu/src/effects/worker/evaluator/main.js @@ -18,7 +18,11 @@ export const createApis = async (ctx) => { CHECKPOINT_GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL }), bootstrapWasmInstance: WasmClient.bootstrapWasmInstanceWith(), - saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db, logger: ctx.logger }), + saveEvaluation: AoEvaluationClient.saveEvaluationWith({ + DISABLE_PROCESS_EVALUATION_CACHE: ctx.DISABLE_PROCESS_EVALUATION_CACHE, + db, + logger: ctx.logger + }), ARWEAVE_URL: ctx.ARWEAVE_URL, logger: ctx.logger }) diff --git a/servers/cu/src/routes/cron.js b/servers/cu/src/routes/cron.js index 9a5f49ab9..f79be4b2d 100644 --- a/servers/cu/src/routes/cron.js +++ b/servers/cu/src/routes/cron.js @@ -2,6 +2,7 @@ import { always, compose, identity } from 'ramda' import { z } from 'zod' import { withMetrics, withMiddleware, withProcessRestrictionFromPath } from './middleware/index.js' +import { withCuMode } from './middleware/withCuMode.js' /** * TODO: could be moved into a route utils or middleware @@ -37,8 +38,9 @@ export const withCronRoutes = app => { '/cron/:processId', compose( withMiddleware, - withMetrics({ tracesFrom: (req) => ({ process_id: req.params.processId }) }), + withCuMode, withProcessRestrictionFromPath, + withMetrics({ tracesFrom: (req) => ({ process_id: req.params.processId }) }), always(async (req, res) => { const { params: { processId }, diff --git a/servers/cu/src/routes/middleware/index.js b/servers/cu/src/routes/middleware/index.js index 08e0c9836..a31853bf8 100644 --- a/servers/cu/src/routes/middleware/index.js +++ b/servers/cu/src/routes/middleware/index.js @@ -5,6 +5,7 @@ import { withDomain } from './withDomain.js' export * from './withProcessRestriction.js' export * from './withMetrics.js' +export * from './withCuMode.js' /** * A convenience method that composes common middleware needed on most routes, diff --git a/servers/cu/src/routes/middleware/withCuMode.js b/servers/cu/src/routes/middleware/withCuMode.js new file mode 100644 index 000000000..001d4f845 --- /dev/null +++ b/servers/cu/src/routes/middleware/withCuMode.js @@ -0,0 +1,12 @@ +import { config } from '../../config.js' + +const withUnitMode = (mode) => (handler) => (req, res, next) => { + const { UNIT_MODE } = config + + if (UNIT_MODE !== mode) return res.status(404).send('Not Found') + + return handler(req, res, next) +} + +export const withCuMode = withUnitMode('cu') +export const withRuMode = withUnitMode('ru') diff --git a/servers/cu/src/routes/result.js b/servers/cu/src/routes/result.js index 371a5f20e..d946ce289 100644 --- a/servers/cu/src/routes/result.js +++ b/servers/cu/src/routes/result.js @@ -2,7 +2,7 @@ import { compose } from 'ramda' import { z } from 'zod' import { busyIn } from '../domain/utils.js' -import { withMetrics, withMiddleware, withProcessRestrictionFromQuery } from './middleware/index.js' +import { withMetrics, withMiddleware, withProcessRestrictionFromQuery, withCuMode } from './middleware/index.js' import { withInMemoryCache } from './middleware/withInMemoryCache.js' const inputSchema = z.object({ @@ -15,8 +15,9 @@ export const withResultRoutes = app => { '/result/:messageTxId', compose( withMiddleware, - withMetrics({ tracesFrom: (req) => ({ process_id: req.query['process-id'] }) }), + withCuMode, withProcessRestrictionFromQuery, + withMetrics({ tracesFrom: (req) => ({ process_id: req.query['process-id'] }) }), withInMemoryCache({ keyer: (req) => { const { params: { messageTxId } } = req diff --git a/servers/cu/src/routes/results.js b/servers/cu/src/routes/results.js index ae4c81dbd..9f5a8d456 100644 --- a/servers/cu/src/routes/results.js +++ b/servers/cu/src/routes/results.js @@ -1,8 +1,8 @@ import { always, compose, identity } from 'ramda' - -import { withMetrics, withMiddleware, withProcessRestrictionFromPath } from './middleware/index.js' import { z } from 'zod' +import { withMetrics, withMiddleware, withProcessRestrictionFromPath, withCuMode } from './middleware/index.js' + /** * TODO: could be moved into a route utils or middleware * @@ -47,8 +47,9 @@ export const withResultsRoutes = app => { '/results/:processId', compose( withMiddleware, - withMetrics({ tracesFrom: (req) => ({ process_id: req.params.processId }) }), + withCuMode, withProcessRestrictionFromPath, + withMetrics({ tracesFrom: (req) => ({ process_id: req.params.processId }) }), always(async (req, res) => { const { params: { processId }, diff --git a/servers/cu/src/routes/state.js b/servers/cu/src/routes/state.js index f6ac2f2ba..fe8ffb853 100644 --- a/servers/cu/src/routes/state.js +++ b/servers/cu/src/routes/state.js @@ -2,7 +2,7 @@ import { always, compose } from 'ramda' import { z } from 'zod' import { arrayBufferFromMaybeView, busyIn } from '../domain/utils.js' -import { withMetrics, withMiddleware, withProcessRestrictionFromPath } from './middleware/index.js' +import { withMetrics, withMiddleware, withProcessRestrictionFromPath, withCuMode } from './middleware/index.js' const inputSchema = z.object({ processId: z.string().min(1, 'an ao process id is required'), @@ -15,8 +15,9 @@ export const withStateRoutes = (app) => { '/state/:processId', compose( withMiddleware, - withMetrics({ tracesFrom: (req) => ({ process_id: req.params.processId }) }), + withCuMode, withProcessRestrictionFromPath, + withMetrics({ tracesFrom: (req) => ({ process_id: req.params.processId }) }), always(async (req, res) => { const { params: { processId },