Skip to content

Commit

Permalink
fix(cu): transfer memory between worker and main thread
Browse files Browse the repository at this point in the history
  • Loading branch information
TillaTheHun0 committed Jul 3, 2024
1 parent 155c0bb commit f047ea9
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 3 deletions.
50 changes: 50 additions & 0 deletions servers/cu/src/domain/client/ao-module.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import { z } from 'zod'

import { moduleSchema } from '../model.js'
import { MODULES_TABLE } from './sqlite.js'
import { timer } from './metrics.js'

const TWO_GB = 2 * 1024 * 1024 * 1024

const moduleDocSchema = z.object({
id: z.string().min(1),
Expand Down Expand Up @@ -116,6 +119,53 @@ export function evaluatorWith ({ evaluate, loadWasmModule }) {
*/
if (defer) await new Promise(resolve => setImmediate(resolve))

if (args.Memory) {
/**
* The ArrayBuffer is transferred to the worker as part of performing
* an evaluation. This transfer will subsequently detach any views, Buffers,
* and more broadly, references to the ArrayBuffer on this thread.
*
* So if this is the first eval being performed for the eval stream,
* then we copy the contents of the ArrayBuffer. That way, we can be sure
* that no references on the main thread will be affected during the eval stream
* transfers happening back and forth. This effectively give's each eval stream
* it's own ArrayBuffer to pass back and forth.
*
* (this is no worse than the structured clone that was happening before
* as part of message passing. But instead, the clone is only performed once,
* instead of on each evaluation)
*
* TODO: perhaps there is a way to somehow lock the ArrayBuffer usage
* instead of copying on first evaluation. We have to be careful that nothing
* (ie. a view of the ArrayBuffer in a Wasm Instnace dryrun)
* inadvertantly mutates the underlying ArrayBuffer
*/
if (args.first) {
let stopTimer = () => {}
if (args.Memory.byteLength > TWO_GB) {
stopTimer = timer('copyLargeMemory', {
streamId,
processId: args.processId,
byteLength: args.Memory.byteLength
}).stop
}
/**
* We must pass a view into copyBytesFrom,
*
* so we first check whether it already is or not,
* and create one on top of the ArrayBuffer if necessary
*
* (NodeJS' Buffer is a subclass of DataView)
*/
args.Memory = ArrayBuffer.isView(args.Memory)
? Buffer.copyBytesFrom(args.Memory)
: Buffer.copyBytesFrom(new Uint8Array(args.Memory))
stopTimer()
}

options = { transfer: [ArrayBuffer.isView(args.Memory) ? args.Memory.buffer : args.Memory] }
}

args.streamId = streamId
args.moduleId = moduleId
args.moduleOptions = moduleOptions
Expand Down
8 changes: 7 additions & 1 deletion servers/cu/src/domain/lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ export function evaluateWith (env) {
* Iterate over the async iterable of messages,
* and evaluate each one
*/
let first = true
for await (const { noSave, cron, ordinate, name, message, deepHash, isAssignment, AoGlobal } of messages) {
if (cron) {
const key = toEvaledCron({ timestamp: message.Timestamp, cron })
Expand Down Expand Up @@ -208,14 +209,19 @@ export function evaluateWith (env) {
/**
* Where the actual evaluation is performed
*/
.then((Memory) => ctx.evaluator({ noSave, name, deepHash, cron, ordinate, isAssignment, processId: ctx.id, Memory, message, AoGlobal }))
.then((Memory) => ctx.evaluator({ first, noSave, name, deepHash, cron, ordinate, isAssignment, processId: ctx.id, Memory, message, AoGlobal }))
/**
* These values are folded,
* so that we can potentially update the process memory cache
* at the end of evaluation
*/
.then(mergeLeft({ noSave, message, cron, ordinate }))
.then(async (output) => {
/**
* Make sure to set first to false
* for all subsequent evaluations for this evaluation stream
*/
if (first) first = false
if (output.GasUsed) totalGasUsed += BigInt(output.GasUsed ?? 0)

if (cron) ctx.stats.messages.cron++
Expand Down
4 changes: 4 additions & 0 deletions servers/cu/src/domain/worker/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import { saveEvaluationSchema } from '../dal.js'
const WASM_64_FORMAT = 'wasm64-unknown-emscripten-draft_2024_02_15'

export function evaluateWith ({
/**
* TODO: no longer needed since the wasmModule
* is passed in. Eventually remove usage and injection
*/
loadWasmModule,
wasmInstanceCache,
bootstrapWasmInstance,
Expand Down
31 changes: 29 additions & 2 deletions servers/cu/src/domain/worker/evaluator/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { workerData } from 'node:worker_threads'
import { hostname } from 'node:os'

import { fetch, setGlobalDispatcher, Agent } from 'undici'
import { worker } from 'workerpool'
import { worker, Transfer } from 'workerpool'

import { createLogger } from '../../logger.js'

Expand All @@ -26,4 +26,31 @@ const apis = await createApis({
/**
* Expose our worker api
*/
worker(apis)
worker({
evaluate: (...args) => apis.evaluate(...args)
/**
* Transfer the ownership of the underlying ArrayBuffer back to the main thread
* to prevent copying it over
*/
.then((output) => {
/**
* If the very first evaluation produces
* an error, the resultant Memory will be null
* (prevMemory is used, which initializes as null)
*
* So in this edge-case, there's nothing to transfer,
* so we simply return output
*/
if (!output.Memory) return output

/**
* We make sure to only transfer the underlying ArrayBuffer
* back to the main thread.
*/
output.Memory = ArrayBuffer.isView(output.Memory)
? output.Memory.buffer
: output.Memory

return new Transfer(output, [output.Memory])
})
})
4 changes: 4 additions & 0 deletions servers/cu/src/domain/worker/evaluator/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ export const createApis = async (ctx) => {
const sqlite = await SqliteClient.createSqliteClient({ url: ctx.DB_URL, bootstrap: false })

const evaluate = evaluateWith({
/**
* TODO: no longer needed since the wasmModule
* is passed in. Eventually remove
*/
loadWasmModule: WasmClient.loadWasmModuleWith({
fetch,
ARWEAVE_URL: ctx.ARWEAVE_URL,
Expand Down

0 comments on commit f047ea9

Please sign in to comment.