diff --git a/servers/cu/package-lock.json b/servers/cu/package-lock.json index 1091292ea..23eae338c 100644 --- a/servers/cu/package-lock.json +++ b/servers/cu/package-lock.json @@ -18,7 +18,7 @@ "bytes": "^3.1.2", "cors": "^2.8.5", "dataloader": "^2.2.2", - "debug": "^4.3.5", + "debug": "^4.3.6", "dotenv": "^16.4.5", "fast-glob": "^3.3.2", "fastify": "^4.28.1", @@ -26,13 +26,14 @@ "hyper-async": "^1.1.2", "keccak": "^3.0.4", "long-timeout": "^0.1.1", - "lru-cache": "^10.3.0", + "lru-cache": "^11.0.0", "ms": "^2.1.3", "opossum": "^8.1.4", "p-map": "^7.0.2", + "p-queue": "^8.0.1", "prom-client": "^15.1.3", "ramda": "^0.30.1", - "undici": "^6.19.2", + "undici": "^6.19.5", "warp-arbundles": "^1.0.4", "workerpool": "^9.1.3", "zod": "^3.23.8" @@ -162,6 +163,12 @@ "node": ">=18" } }, + "node_modules/@permaweb/ao-scheduler-utils/node_modules/lru-cache": { + "version": "10.4.3", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.4.3.tgz", + "integrity": "sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ==", + "license": "ISC" + }, "node_modules/@permaweb/weavedrive": { "version": "0.0.6", "resolved": "https://registry.npmjs.org/@permaweb/weavedrive/-/weavedrive-0.0.6.tgz", @@ -515,9 +522,10 @@ "integrity": "sha512-8YnDaaf7N3k/q5HnTJVuzSyLETjoZjVmHc4AeKAzOvKHEFQKcn64OKBfzHYtE9zGjctNM7V9I0MfnUVLpi7M5g==" }, "node_modules/debug": { - "version": "4.3.5", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.5.tgz", - "integrity": "sha512-pt0bNEmneDIvdL1Xsd9oDQ/wrQRkXDT4AUWlNZNPKvW5x/jyO9VFXkJUP07vQ2upmw5PlaITaPKc31jK13V+jg==", + "version": "4.3.6", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.6.tgz", + "integrity": "sha512-O/09Bd4Z1fBrU4VzkhFqVgpPzaGbw6Sm9FEkBT1A/YBXQFGuuSxa1dN2nxgxS34JmKXqYx8CZAwEVoJFImUXIg==", + "license": "MIT", "dependencies": { "ms": "2.1.2" }, @@ -592,6 +600,12 @@ "node": ">=6" } }, + "node_modules/eventemitter3": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==", + "license": "MIT" + }, "node_modules/events": { "version": "3.3.0", "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", @@ -961,12 +975,12 @@ "integrity": "sha512-BFRuQUqc7x2NWxfJBCyUrN8iYUYznzL9JROmRz1gZ6KlOIgmoD+njPVbb+VNn2nGMKggMsK79iUNErillsrx7w==" }, "node_modules/lru-cache": { - "version": "10.3.0", - "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.3.0.tgz", - "integrity": "sha512-CQl19J/g+Hbjbv4Y3mFNNXFEL/5t/KCg8POCuUqd4rMKjGG+j1ybER83hxV58zL+dFI1PTkt3GNFSHRt+d8qEQ==", + "version": "11.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-11.0.0.tgz", + "integrity": "sha512-Qv32eSV1RSCfhY3fpPE2GNZ8jgM9X7rdAfemLWqTUxwiyIC4jJ6Sy0fZ8H+oLWevO6i4/bizg7c8d8i6bxrzbA==", "license": "ISC", "engines": { - "node": "14 || >=16.14" + "node": "20 || >=22" } }, "node_modules/merge2": { @@ -1164,6 +1178,34 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-queue": { + "version": "8.0.1", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-8.0.1.tgz", + "integrity": "sha512-NXzu9aQJTAzbBqOt2hwsR63ea7yvxJc0PwN/zobNAudYfb1B7R08SzB4TsLeSbUCuG467NhnoT0oO6w1qRO+BA==", + "license": "MIT", + "dependencies": { + "eventemitter3": "^5.0.1", + "p-timeout": "^6.1.2" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/p-timeout": { + "version": "6.1.2", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.2.tgz", + "integrity": "sha512-UbD77BuZ9Bc9aABo74gfXhNvzC9Tx7SxtHSh1fxvx3jTLLYvmVhiQZZrJzqqU0jKbN32kb5VOKiLEQI/3bIjgQ==", + "license": "MIT", + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/picomatch": { "version": "2.3.1", "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-2.3.1.tgz", @@ -1732,9 +1774,9 @@ "dev": true }, "node_modules/undici": { - "version": "6.19.2", - "resolved": "https://registry.npmjs.org/undici/-/undici-6.19.2.tgz", - "integrity": "sha512-JfjKqIauur3Q6biAtHJ564e3bWa8VvT+7cSiOJHFbX4Erv6CLGDpg8z+Fmg/1OI/47RA+GI2QZaF48SSaLvyBA==", + "version": "6.19.5", + "resolved": "https://registry.npmjs.org/undici/-/undici-6.19.5.tgz", + "integrity": "sha512-LryC15SWzqQsREHIOUybavaIHF5IoL0dJ9aWWxL/PgT1KfqAW5225FZpDUFlt9xiDMS2/S7DOKhFWA7RLksWdg==", "license": "MIT", "engines": { "node": ">=18.17" @@ -1963,6 +2005,13 @@ "lru-cache": "^10.2.2", "ramda": "^0.30.0", "zod": "^3.23.5" + }, + "dependencies": { + "lru-cache": { + "version": "10.4.3", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.4.3.tgz", + "integrity": "sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ==" + } } }, "@permaweb/weavedrive": { @@ -2229,9 +2278,9 @@ "integrity": "sha512-8YnDaaf7N3k/q5HnTJVuzSyLETjoZjVmHc4AeKAzOvKHEFQKcn64OKBfzHYtE9zGjctNM7V9I0MfnUVLpi7M5g==" }, "debug": { - "version": "4.3.5", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.5.tgz", - "integrity": "sha512-pt0bNEmneDIvdL1Xsd9oDQ/wrQRkXDT4AUWlNZNPKvW5x/jyO9VFXkJUP07vQ2upmw5PlaITaPKc31jK13V+jg==", + "version": "4.3.6", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.6.tgz", + "integrity": "sha512-O/09Bd4Z1fBrU4VzkhFqVgpPzaGbw6Sm9FEkBT1A/YBXQFGuuSxa1dN2nxgxS34JmKXqYx8CZAwEVoJFImUXIg==", "requires": { "ms": "2.1.2" }, @@ -2279,6 +2328,11 @@ "resolved": "https://registry.npmjs.org/event-target-shim/-/event-target-shim-5.0.1.tgz", "integrity": "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==" }, + "eventemitter3": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==" + }, "events": { "version": "3.3.0", "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", @@ -2564,9 +2618,9 @@ "integrity": "sha512-BFRuQUqc7x2NWxfJBCyUrN8iYUYznzL9JROmRz1gZ6KlOIgmoD+njPVbb+VNn2nGMKggMsK79iUNErillsrx7w==" }, "lru-cache": { - "version": "10.3.0", - "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.3.0.tgz", - "integrity": "sha512-CQl19J/g+Hbjbv4Y3mFNNXFEL/5t/KCg8POCuUqd4rMKjGG+j1ybER83hxV58zL+dFI1PTkt3GNFSHRt+d8qEQ==" + "version": "11.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-11.0.0.tgz", + "integrity": "sha512-Qv32eSV1RSCfhY3fpPE2GNZ8jgM9X7rdAfemLWqTUxwiyIC4jJ6Sy0fZ8H+oLWevO6i4/bizg7c8d8i6bxrzbA==" }, "merge2": { "version": "1.4.1", @@ -2700,6 +2754,20 @@ "resolved": "https://registry.npmjs.org/p-map/-/p-map-7.0.2.tgz", "integrity": "sha512-z4cYYMMdKHzw4O5UkWJImbZynVIo0lSGTXc7bzB1e/rrDqkgGUNysK/o4bTr+0+xKvvLoTyGqYC4Fgljy9qe1Q==" }, + "p-queue": { + "version": "8.0.1", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-8.0.1.tgz", + "integrity": "sha512-NXzu9aQJTAzbBqOt2hwsR63ea7yvxJc0PwN/zobNAudYfb1B7R08SzB4TsLeSbUCuG467NhnoT0oO6w1qRO+BA==", + "requires": { + "eventemitter3": "^5.0.1", + "p-timeout": "^6.1.2" + } + }, + "p-timeout": { + "version": "6.1.2", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.2.tgz", + "integrity": "sha512-UbD77BuZ9Bc9aABo74gfXhNvzC9Tx7SxtHSh1fxvx3jTLLYvmVhiQZZrJzqqU0jKbN32kb5VOKiLEQI/3bIjgQ==" + }, "picomatch": { "version": "2.3.1", "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-2.3.1.tgz", @@ -3093,9 +3161,9 @@ "dev": true }, "undici": { - "version": "6.19.2", - "resolved": "https://registry.npmjs.org/undici/-/undici-6.19.2.tgz", - "integrity": "sha512-JfjKqIauur3Q6biAtHJ564e3bWa8VvT+7cSiOJHFbX4Erv6CLGDpg8z+Fmg/1OI/47RA+GI2QZaF48SSaLvyBA==" + "version": "6.19.5", + "resolved": "https://registry.npmjs.org/undici/-/undici-6.19.5.tgz", + "integrity": "sha512-LryC15SWzqQsREHIOUybavaIHF5IoL0dJ9aWWxL/PgT1KfqAW5225FZpDUFlt9xiDMS2/S7DOKhFWA7RLksWdg==" }, "uri-js": { "version": "4.4.1", diff --git a/servers/cu/package.json b/servers/cu/package.json index 1c6e896f8..62f5cdd05 100644 --- a/servers/cu/package.json +++ b/servers/cu/package.json @@ -21,7 +21,7 @@ "bytes": "^3.1.2", "cors": "^2.8.5", "dataloader": "^2.2.2", - "debug": "^4.3.5", + "debug": "^4.3.6", "dotenv": "^16.4.5", "fast-glob": "^3.3.2", "fastify": "^4.28.1", @@ -29,13 +29,14 @@ "hyper-async": "^1.1.2", "keccak": "^3.0.4", "long-timeout": "^0.1.1", - "lru-cache": "^10.3.0", + "lru-cache": "^11.0.0", "ms": "^2.1.3", "opossum": "^8.1.4", "p-map": "^7.0.2", + "p-queue": "^8.0.1", "prom-client": "^15.1.3", "ramda": "^0.30.1", - "undici": "^6.19.2", + "undici": "^6.19.5", "warp-arbundles": "^1.0.4", "workerpool": "^9.1.3", "zod": "^3.23.8" diff --git a/servers/cu/src/domain/client/ao-module.js b/servers/cu/src/domain/client/ao-module.js index c7dbacfa3..3d67458ee 100644 --- a/servers/cu/src/domain/client/ao-module.js +++ b/servers/cu/src/domain/client/ao-module.js @@ -105,7 +105,7 @@ export function findModuleWith ({ db }) { .toPromise() } -export function evaluatorWith ({ evaluate, loadWasmModule }) { +export function evaluatorWith ({ evaluateWith, loadWasmModule }) { const EVAL_DEFER_BACKPRESSURE = 10 return ({ moduleId, moduleOptions }) => of(moduleOptions) @@ -141,75 +141,79 @@ export function evaluatorWith ({ evaluate, loadWasmModule }) { */ if (defer) await new Promise(resolve => setImmediate(resolve)) - if (args.Memory) { - /** - * The ArrayBuffer is transferred to the worker as part of performing - * an evaluation. This transfer will subsequently detach any views, Buffers, - * and more broadly, references to the ArrayBuffer on this thread. - * - * So if this is the first eval being performed for the eval stream, - * then we copy the contents of the ArrayBuffer. That way, we can be sure - * that no references on the main thread will be affected during the eval stream - * transfers happening back and forth. This effectively give's each eval stream - * it's own ArrayBuffer to pass back and forth. - * - * (this is no worse than the structured clone that was happening before - * as part of message passing. But instead, the clone is only performed once, - * instead of on each evaluation) - * - * TODO: perhaps there is a way to somehow lock the ArrayBuffer usage - * instead of copying on first evaluation. We have to be careful that nothing - * (ie. a view of the ArrayBuffer in a Wasm Instnace dryrun) - * inadvertantly mutates the underlying ArrayBuffer - */ - if (args.first) { - let stopTimer = () => {} - if (args.Memory.byteLength > TWO_GB) { - stopTimer = timer('copyLargeMemory', { - streamId, - processId: args.processId, - byteLength: args.Memory.byteLength - }).stop + const prep = () => { + if (args.Memory) { + /** + * The ArrayBuffer is transferred to the worker as part of performing + * an evaluation. This transfer will subsequently detach any views, Buffers, + * and more broadly, references to the ArrayBuffer on this thread. + * + * So if this is the first eval being performed for the eval stream, + * then we copy the contents of the ArrayBuffer. That way, we can be sure + * that no references on the main thread will be affected during the eval stream + * transfers happening back and forth. This effectively give's each eval stream + * it's own ArrayBuffer to pass back and forth. + * + * (this is no worse than the structured clone that was happening before + * as part of message passing. But instead, the clone is only performed once, + * instead of on each evaluation) + * + * TODO: perhaps there is a way to somehow lock the ArrayBuffer usage + * instead of copying on first evaluation. We have to be careful that nothing + * (ie. a view of the ArrayBuffer in a Wasm Instnace dryrun) + * inadvertantly mutates the underlying ArrayBuffer + */ + if (args.first) { + let stopTimer = () => {} + if (args.Memory.byteLength > TWO_GB) { + stopTimer = timer('copyLargeMemory', { + streamId, + processId: args.processId, + byteLength: args.Memory.byteLength + }).stop + } + /** + * We must pass a view into copyBytesFrom, + * + * so we first check whether it already is or not, + * and create one on top of the ArrayBuffer if necessary + * + * (NodeJS' Buffer is a subclass of DataView) + */ + args.Memory = ArrayBuffer.isView(args.Memory) + ? Buffer.copyBytesFrom(args.Memory) + : Buffer.copyBytesFrom(new Uint8Array(args.Memory)) + stopTimer() } + /** - * We must pass a view into copyBytesFrom, + * If Memory is sufficiently large, transferring the View somehow + * causes the underlying ArrayBuffer to be truncated. This truncation + * does not occur when instead the underlying ArrayBuffer is transferred, + * directly. + * + * So we always ensure the Memory transferred to the worker thread + * is the actual ArrayBuffer, and not a View. * - * so we first check whether it already is or not, - * and create one on top of the ArrayBuffer if necessary + * (the same is done in the opposite direction in the worker thread) * - * (NodeJS' Buffer is a subclass of DataView) + * TODO: maybe AoLoader should be made to return the underlying ArrayBuffer + * as Memory, instead of a View? */ - args.Memory = ArrayBuffer.isView(args.Memory) - ? Buffer.copyBytesFrom(args.Memory) - : Buffer.copyBytesFrom(new Uint8Array(args.Memory)) - stopTimer() + args.Memory = arrayBufferFromMaybeView(args.Memory) + + options = { transfer: [args.Memory] } } - /** - * If Memory is sufficiently large, transferring the View somehow - * causes the underlying ArrayBuffer to be truncated. This truncation - * does not occur when instead the underlying ArrayBuffer is transferred, - * directly. - * - * So we always ensure the Memory transferred to the worker thread - * is the actual ArrayBuffer, and not a View. - * - * (the same is done in the opposite direction in the worker thread) - * - * TODO: maybe AoLoader should be made to return the underlying ArrayBuffer - * as Memory, instead of a View? - */ - args.Memory = arrayBufferFromMaybeView(args.Memory) - - options = { transfer: [args.Memory] } - } + args.streamId = streamId + args.moduleId = moduleId + args.moduleOptions = moduleOptions + args.wasmModule = wasmModule - args.streamId = streamId - args.moduleId = moduleId - args.moduleOptions = moduleOptions - args.wasmModule = wasmModule + return [args, options] + } - return evaluate(args, options) + return evaluateWith(prep) }) })) .toPromise() diff --git a/servers/cu/src/domain/client/ao-module.test.js b/servers/cu/src/domain/client/ao-module.test.js index bc3d6d45e..5ff239eeb 100644 --- a/servers/cu/src/domain/client/ao-module.test.js +++ b/servers/cu/src/domain/client/ao-module.test.js @@ -193,25 +193,32 @@ describe('ao-module', () => { test('should eval the message', async () => { const evaluator = evaluatorWith({ loadWasmModule: async () => WebAssembly.compile(readFileSync('./test/processes/happy/process.wasm')), - evaluate: ({ streamId, wasmModule, moduleId: mId, moduleOptions: mOptions, Memory, message, AoGlobal, ...rest }) => { - assert.ok(streamId) - assert.ok(wasmModule) - assert.equal(mId, moduleId) - assert.deepStrictEqual(mOptions, moduleOptions) - - assert.deepStrictEqual(rest, { - name: 'foobar Message', - noSave: false, - deepHash: undefined, - cron: undefined, - ordinate: '1', - isAssignment: false, - processId: 'foobar-process' + evaluateWith: (prep) => Promise.resolve() + .then(prep) + .then(([args, options]) => { + // No options with memory is falsey, b/c nothign to transfer + assert.equal(options, undefined) + return args }) - - return AoLoader(readFileSync('./test/processes/happy/process.wasm'), mOptions) - .then((wasmModule) => wasmModule(Memory, message, AoGlobal)) - }, + .then(({ streamId, wasmModule, moduleId: mId, moduleOptions: mOptions, Memory, message, AoGlobal, ...rest }) => { + assert.ok(streamId) + assert.ok(wasmModule) + assert.equal(mId, moduleId) + assert.deepStrictEqual(mOptions, moduleOptions) + + assert.deepStrictEqual(rest, { + name: 'foobar Message', + noSave: false, + deepHash: undefined, + cron: undefined, + ordinate: '1', + isAssignment: false, + processId: 'foobar-process' + }) + + return AoLoader(readFileSync('./test/processes/happy/process.wasm'), mOptions) + .then((wasmModule) => wasmModule(Memory, message, AoGlobal)) + }), logger }) diff --git a/servers/cu/src/domain/index.js b/servers/cu/src/domain/index.js index 52ae6aff2..1a5a673c4 100644 --- a/servers/cu/src/domain/index.js +++ b/servers/cu/src/domain/index.js @@ -4,6 +4,7 @@ import { randomBytes } from 'node:crypto' import { readFile, writeFile } from 'node:fs/promises' import pMap from 'p-map' +import PQueue from 'p-queue' import Dataloader from 'dataloader' import fastGlob from 'fast-glob' import workerpool from 'workerpool' @@ -81,6 +82,28 @@ export const createApis = async (ctx) => { } } + /** + * Some of the work performed, in prep for sending the task to the worker thread pool, + * ie. copying the process memory so that each eval stream may have their own memory + * to pass back and forth (see below), can be resource intensive. + * + * If the thread pool is fully utilized, the pool will start to queue tasks sent to it on the main-thread. + * Subsequently, the amount of time between when the "prep-work" and + * the "actual evaluation work" are performed can grow very large. + * + * This effectively produces a "front-loading" effect, where all the "prep-work" is ran + * all up front, then queued for a worker to eventually take on the actual work. + * + * In other words, resource intensive data strucutres ie. process memory can just be + * sitting queued for long periods of time, waiting for an available worker thread. We need to mitigate this. + * + * So for each worker thread pool, we utilize a queue with matching concurrency as the thread pool, + * which will defer performing the "prep-work" until right before a worker is available to perform the "actual work", + * ergo eliminating the "front-loading" effect. + * + * (see pQueue instantitations below) + */ + const maxPrimaryWorkerThreads = Math.min( Math.max(1, ctx.WASM_EVALUATION_MAX_WORKERS - 1), Math.ceil(ctx.WASM_EVALUATION_MAX_WORKERS * (ctx.WASM_EVALUATION_PRIMARY_WORKERS_PERCENTAGE / 100)) @@ -90,6 +113,7 @@ export const createApis = async (ctx) => { maxWorkers: maxPrimaryWorkerThreads, onCreateWorker: onCreateWorker('primary') }) + const primaryWorkQueue = new PQueue({ concurrency: maxPrimaryWorkerThreads }) const maxDryRunWorkerTheads = Math.max( 1, @@ -100,6 +124,7 @@ export const createApis = async (ctx) => { onCreateWorker: onCreateWorker('dry-run'), maxQueueSize: ctx.WASM_EVALUATION_WORKERS_DRY_RUN_MAX_QUEUE }) + const dryRunWorkQueue = new PQueue({ concurrency: maxDryRunWorkerTheads }) const arweave = ArweaveClient.createWalletClient() const address = ArweaveClient.addressWith({ WALLET: ctx.WALLET, arweave }) @@ -175,8 +200,26 @@ export const createApis = async (ctx) => { const stats = statsWith({ gauge, loadWorkerStats: () => ({ - primary: primaryWorkerPool.stats(), - dryRun: dryRunWorkerPool.stats() + primary: ({ + ...primaryWorkerPool.stats(), + /** + * We use a work queue on the main thread while keeping + * worker pool queues empty (see comment above) + * + * So we use the work queue size to report pending tasks + */ + pendingTasks: primaryWorkQueue.size + }), + dryRun: ({ + ...dryRunWorkerPool.stats(), + /** + * We use a work queue on the main thread while keeping + * worker pool queues empty (see comment above) + * + * So we use the work queue size to report pending tasks + */ + pendingTasks: dryRunWorkQueue.size + }) }), /** * https://nodejs.org/api/process.html#processmemoryusage @@ -256,10 +299,14 @@ export const createApis = async (ctx) => { saveModule: AoModuleClient.saveModuleWith({ db: sqlite, logger }), loadEvaluator: AoModuleClient.evaluatorWith({ loadWasmModule, - /** - * Evaluation will invoke a worker available in the worker pool - */ - evaluate: (args, options) => primaryWorkerPool.exec('evaluate', [args], options), + evaluateWith: (prep) => primaryWorkQueue.add(() => + Promise.resolve() + /** + * prep work is deferred until the work queue tasks is executed + */ + .then(prep) + .then(([args, options]) => primaryWorkerPool.exec('evaluate', [args], options)) + ), logger }), findMessageBefore: AoEvaluationClient.findMessageBeforeWith({ db: sqlite, logger }), @@ -300,25 +347,31 @@ export const createApis = async (ctx) => { */ loadDryRunEvaluator: AoModuleClient.evaluatorWith({ loadWasmModule, - /** - * Evaluation will invoke a worker available in the worker pool - */ - evaluate: (args, options) => Promise.resolve() - .then(() => dryRunWorkerPool.exec('evaluate', [args], options)) - .catch((err) => { + evaluateWith: (prep) => dryRunWorkQueue.add(() => + Promise.resolve() /** - * Hack to detect when the max queue size is being exceeded and to reject - * with a more informative error + * prep work is deferred until the work queue tasks is executed */ - if (err.message.startsWith('Max queue size of')) { - const dryRunLimitErr = new Error('Dry-Run enqueue limit exceeded') - dryRunLimitErr.status = 429 - return Promise.reject(dryRunLimitErr) - } - - // Bubble as normal - return Promise.reject(err) - }), + .then(prep) + .then(([args, options]) => + Promise.resolve() + .then(() => dryRunWorkerPool.exec('evaluate', [args], options)) + .catch((err) => { + /** + * Hack to detect when the max queue size is being exceeded and to reject + * with a more informative error + */ + if (err.message.startsWith('Max queue size of')) { + const dryRunLimitErr = new Error('Dry-Run enqueue limit exceeded') + dryRunLimitErr.status = 429 + return Promise.reject(dryRunLimitErr) + } + + // Bubble as normal + return Promise.reject(err) + }) + ) + ), logger: dryRunLogger }) })