diff --git a/servers/cu/src/bootstrap.js b/servers/cu/src/bootstrap.js index 151d9d2ed..ed9489246 100644 --- a/servers/cu/src/bootstrap.js +++ b/servers/cu/src/bootstrap.js @@ -214,7 +214,7 @@ export const createApis = async (ctx) => { readProcessMemoryFile, queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger: ctx.logger }), queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger: ctx.logger }), - hashWasmMemory: WasmClient.hashWasmMemory, + hashWasmMemory: WasmClient.hashWasmMemoryWith({ logger: ctx.logger }), buildAndSignDataItem: ArweaveClient.buildAndSignDataItemWith({ WALLET: ctx.WALLET }), uploadDataItem: ArweaveClient.uploadDataItemWith({ UPLOADER_URL: ctx.UPLOADER_URL, fetch: ctx.fetch, logger: ctx.logger }), writeCheckpointRecord: AoProcessClient.writeCheckpointRecordWith({ db: sqlite }), diff --git a/servers/cu/src/effects/wasm.js b/servers/cu/src/effects/wasm.js index f47316181..3dc98b4b6 100644 --- a/servers/cu/src/effects/wasm.js +++ b/servers/cu/src/effects/wasm.js @@ -1,5 +1,5 @@ import { promisify } from 'node:util' -import { PassThrough, Readable, pipeline } from 'node:stream' +import { PassThrough, Readable, Transform, pipeline } from 'node:stream' import { createGunzip, createGzip } from 'node:zlib' import { createHash } from 'node:crypto' import { createReadStream, createWriteStream } from 'node:fs' @@ -216,26 +216,79 @@ export function loadWasmModuleWith ({ fetch, ARWEAVE_URL, WASM_BINARY_FILE_DIREC * * We use a stream, so that we can incrementally compute hash in a non-blocking way */ -export async function hashWasmMemory (memoryStream, encoding) { - /** - * TODO: add more encoding options - */ - if (encoding && encoding !== 'gzip') { - throw new Error('Only GZIP encoding of Memory is supported for Process Checkpoints') +export function hashWasmMemoryWith () { + class SubchunkStream extends Transform { + constructor (chunkSize) { + super() + this.chunkSize = chunkSize + /** + * accumulate received chunks into this buffer. + * + * It will be subchunked as needed, as the transform stream + * is read. + */ + this.buffer = Buffer.alloc(0) + } + + _transform (chunk, _encoding, callback) { + this.buffer = Buffer.concat([this.buffer, chunk]) + + while (this.buffer.length >= this.chunkSize) { + const subChunk = this.buffer.subarray(0, this.chunkSize) + this.buffer = this.buffer.subarray(this.chunkSize) + + /** + * We stop if push returns false in order to respect + * backpressure + */ + if (!this.push(subChunk)) return + } + + callback() + } + + _flush (callback) { + if (this.buffer.length > 0) this.push(this.buffer) + callback() + } } - return Promise.resolve(memoryStream) - .then((memoryStream) => { - const hash = createHash('sha256') - return pipelineP( - memoryStream, - encoding === 'gzip' - ? createGunzip() - : new PassThrough(), - hash - ) - .then(() => hash.digest('hex')) - }) + return async (memoryStream, encoding) => { + /** + * TODO: add more encoding options + */ + if (encoding && encoding !== 'gzip') { + throw new Error('Only GZIP encoding of Memory is supported for Process Checkpoints') + } + + return Promise.resolve(memoryStream) + .then((memoryStream) => { + const hash = createHash('sha256') + return pipelineP( + memoryStream, + /** + * The memoryStream, if derived from an iterable like a Buffer, + * may emit the entire data stream as a single chunk. + * + * This can break things like Crypto hash, which can only handle + * data chunks less than 2GB. + * + * So we use this Transform stream to receive chunks, + * then re-emit "sub-chunks" of the given size -- in this case + * the default highWaterMark of 64kb + * + * This allows for hashing to work for any size memory, derived + * from any iterable, while respecting backpressure + */ + new SubchunkStream(1024 * 64), + encoding === 'gzip' + ? createGunzip() + : new PassThrough(), + hash + ) + .then(() => hash.digest('hex')) + }) + } } export function isModuleMemoryLimitSupportedWith ({ PROCESS_WASM_MEMORY_MAX_LIMIT }) { diff --git a/servers/cu/src/effects/wasm.test.js b/servers/cu/src/effects/wasm.test.js index 4a591623e..cd265d0cb 100644 --- a/servers/cu/src/effects/wasm.test.js +++ b/servers/cu/src/effects/wasm.test.js @@ -5,14 +5,17 @@ import { createReadStream, readFileSync } from 'node:fs' import { createGzip } from 'node:zlib' import { Readable } from 'node:stream' -import { hashWasmMemory } from './wasm.js' +import { hashWasmMemoryWith } from './wasm.js' +import { createTestLogger } from '../domain/logger.js' + +const logger = createTestLogger({ name: 'ao-cu:ao-process' }) describe('wasm', () => { describe('hashWasmMemory', () => { test('should hash the array buffer', async () => { const s = createReadStream('./test/processes/happy/process.wasm') - const sha = await hashWasmMemory(s) + const sha = await hashWasmMemoryWith({ logger })(s) assert.ok(typeof sha === 'string') assert.equal(sha.length, 64) @@ -20,26 +23,41 @@ describe('wasm', () => { test('should hash the array buffer derived from a typed array', async () => { const s = createReadStream('./test/processes/happy/process.wasm') - const sha = await hashWasmMemory(s) + const sha = await hashWasmMemoryWith({ logger })(s) const tArray = new Uint8Array(readFileSync('./test/processes/happy/process.wasm')) const fromTArray = Buffer.from(tArray.buffer, tArray.byteOffset, tArray.byteLength) - const fromBuffer = await hashWasmMemory(Readable.from(fromTArray)) + const fromBuffer = await hashWasmMemoryWith({ logger })(Readable.from(fromTArray)) assert.equal(fromBuffer, sha) }) test('should decode the array buffer before hashing', async () => { const raw = createReadStream('./test/processes/happy/process.wasm') - const rawSha = await hashWasmMemory(raw) + const rawSha = await hashWasmMemoryWith({ logger })(raw) const encoded = createReadStream('./test/processes/happy/process.wasm') .pipe(createGzip()) - const encodedSha = await hashWasmMemory(encoded, 'gzip') + const encodedSha = await hashWasmMemoryWith({ logger })(encoded, 'gzip') assert.equal(encodedSha.length, 64) assert.equal(rawSha, encodedSha) }) + + test('should hash large data', async () => { + function generate2GBBufferStream () { + const buffer = Buffer.allocUnsafe(2 * 1024 * 1024 * 1024) + buffer.fill(0) + + return Readable.from(buffer) + } + + const s = generate2GBBufferStream() + const sha = await hashWasmMemoryWith({ logger })(s) + + assert.ok(typeof sha === 'string') + assert.equal(sha.length, 64) + }) }) })