From c186c0a588e419e635235795d4ec01d314fcfb28 Mon Sep 17 00:00:00 2001 From: Tyler Hall Date: Tue, 3 Sep 2024 11:21:17 -0400 Subject: [PATCH] perf(cu): broadcast to all worker threads when eval stream is closed #991 --- servers/cu/src/bootstrap.js | 32 +++++++++++++++++-- servers/cu/src/domain/lib/evaluate.js | 12 +++---- .../cu/src/effects/worker/evaluator/index.js | 11 ++++++- .../cu/src/effects/worker/evaluator/main.js | 18 +++-------- 4 files changed, 50 insertions(+), 23 deletions(-) diff --git a/servers/cu/src/bootstrap.js b/servers/cu/src/bootstrap.js index da74bec90..423789a2a 100644 --- a/servers/cu/src/bootstrap.js +++ b/servers/cu/src/bootstrap.js @@ -3,6 +3,7 @@ import { fileURLToPath } from 'node:url' import { randomBytes } from 'node:crypto' import { writeFile, mkdir } from 'node:fs/promises' import { createReadStream } from 'node:fs' +import { BroadcastChannel } from 'node:worker_threads' import pMap from 'p-map' import PQueue from 'p-queue' @@ -76,6 +77,10 @@ export const createApis = async (ctx) => { const DB_URL = `${ctx.DB_URL}.sqlite` const sqlite = await SqliteClient.createSqliteClient({ url: DB_URL, bootstrap: true }) + const BROADCAST = 'workers' + const workerBroadcast = new BroadcastChannel(BROADCAST).unref() + const broadcastCloseStream = (streamId) => workerBroadcast.postMessage({ type: 'close-stream', streamId }) + const onCreateWorker = (type) => () => { const workerId = randomBytes(8).toString('hex') ctx.logger('Spinning up "%s" pool worker with id "%s"...', type, workerId) @@ -83,6 +88,7 @@ export const createApis = async (ctx) => { return { workerThreadOpts: { workerData: { + BROADCAST, WASM_MODULE_CACHE_MAX_SIZE: ctx.WASM_MODULE_CACHE_MAX_SIZE, WASM_INSTANCE_CACHE_MAX_SIZE: ctx.WASM_INSTANCE_CACHE_MAX_SIZE, WASM_BINARY_FILE_DIRECTORY: ctx.WASM_BINARY_FILE_DIRECTORY, @@ -334,7 +340,18 @@ export const createApis = async (ctx) => { * prep work is deferred until the work queue tasks is executed */ .then(prep) - .then(([args, options]) => primaryWorkerPool.exec('evaluate', [args], options)) + .then(([args, options]) => { + /** + * TODO: is this the best place for this? + * + * It keeps it abstracted away from business logic, + * and tied to the specific evaluator, so seems kosher, + * but also feels kind of misplaced + */ + if (args.close) return broadcastCloseStream(args.streamId) + + return primaryWorkerPool.exec('evaluate', [args], options) + }) ), logger }), @@ -377,7 +394,18 @@ export const createApis = async (ctx) => { .then(prep) .then(([args, options]) => Promise.resolve() - .then(() => dryRunWorkerPool.exec('evaluate', [args], options)) + .then(() => { + /** + * TODO: is this the best place for this? + * + * It keeps it abstracted away from business logic, + * and tied to the specific evaluator, so seems kosher, + * but also feels kind of misplaced + */ + if (args.close) return broadcastCloseStream(args.streamId) + + return dryRunWorkerPool.exec('evaluate', [args], options) + }) .catch((err) => { /** * Hack to detect when the max queue size is being exceeded and to reject diff --git a/servers/cu/src/domain/lib/evaluate.js b/servers/cu/src/domain/lib/evaluate.js index ac8f5f297..79c80ba9e 100644 --- a/servers/cu/src/domain/lib/evaluate.js +++ b/servers/cu/src/domain/lib/evaluate.js @@ -278,13 +278,11 @@ export function evaluateWith (env) { * See https://nodejs.org/api/stream.html#streamfinishedstream-options-callback */ cleanup() - if (!err) { - /** - * Send a flag to the evaluator that the eval stream is finished. - * This will allow for the WASM instance to be removed from the cache. - */ - ctx.evaluator({ close: true }) - } + /** + * Signal the evaluator to close any resources spun up as part + * of handling this eval stream + */ + ctx.evaluator({ close: true }) err ? reject(err) : resolve() } ) diff --git a/servers/cu/src/effects/worker/evaluator/index.js b/servers/cu/src/effects/worker/evaluator/index.js index fd4fcff47..e6e798614 100644 --- a/servers/cu/src/effects/worker/evaluator/index.js +++ b/servers/cu/src/effects/worker/evaluator/index.js @@ -1,4 +1,4 @@ -import { workerData } from 'node:worker_threads' +import { BroadcastChannel, workerData } from 'node:worker_threads' import { hostname } from 'node:os' import { fetch, setGlobalDispatcher, Agent } from 'undici' @@ -24,6 +24,15 @@ const apis = await createApis({ logger }) +const broadcast = new BroadcastChannel(workerData.BROADCAST) + +broadcast.onmessage = (event) => { + const data = event.data + if (data.type === 'close-stream') return apis.close(data.streamId) + + logger.warn('Unrecognized event type "%s". Doing nothing...', data.type) +} + /** * Expose our worker api */ diff --git a/servers/cu/src/effects/worker/evaluator/main.js b/servers/cu/src/effects/worker/evaluator/main.js index 08b9b4bcc..2a078aaf2 100644 --- a/servers/cu/src/effects/worker/evaluator/main.js +++ b/servers/cu/src/effects/worker/evaluator/main.js @@ -6,20 +6,12 @@ import { evaluateWith } from '../evaluate.js' export const createApis = async (ctx) => { const sqlite = await SqliteClient.createSqliteClient({ url: ctx.DB_URL, bootstrap: false }) + const wasmInstanceCache = WasmClient.createWasmInstanceCache({ MAX_SIZE: ctx.WASM_INSTANCE_CACHE_MAX_SIZE }) + + const close = async (streamId) => wasmInstanceCache.delete(streamId) const evaluate = evaluateWith({ - /** - * TODO: no longer needed since the wasmModule - * is passed in. Eventually remove - */ - loadWasmModule: WasmClient.loadWasmModuleWith({ - fetch, - ARWEAVE_URL: ctx.ARWEAVE_URL, - WASM_BINARY_FILE_DIRECTORY: ctx.WASM_BINARY_FILE_DIRECTORY, - logger: ctx.logger, - cache: WasmClient.createWasmModuleCache({ MAX_SIZE: ctx.WASM_MODULE_CACHE_MAX_SIZE }) - }), - wasmInstanceCache: WasmClient.createWasmInstanceCache({ MAX_SIZE: ctx.WASM_INSTANCE_CACHE_MAX_SIZE }), + wasmInstanceCache, addExtension: WasmClient.addExtensionWith({ ARWEAVE_URL: ctx.ARWEAVE_URL }), bootstrapWasmInstance: WasmClient.bootstrapWasmInstanceWith(), saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db: sqlite, logger: ctx.logger }), @@ -27,5 +19,5 @@ export const createApis = async (ctx) => { logger: ctx.logger }) - return { evaluate } + return { evaluate, close } }