Skip to content

Commit

Permalink
fix(cu): handle hashing wasm memory >2GB
Browse files Browse the repository at this point in the history
  • Loading branch information
TillaTheHun0 committed Sep 10, 2024
1 parent f3acc36 commit 390416a
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 26 deletions.
2 changes: 1 addition & 1 deletion servers/cu/src/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ export const createApis = async (ctx) => {
readProcessMemoryFile,
queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger: ctx.logger }),
queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger: ctx.logger }),
hashWasmMemory: WasmClient.hashWasmMemory,
hashWasmMemory: WasmClient.hashWasmMemoryWith({ logger: ctx.logger }),
buildAndSignDataItem: ArweaveClient.buildAndSignDataItemWith({ WALLET: ctx.WALLET }),
uploadDataItem: ArweaveClient.uploadDataItemWith({ UPLOADER_URL: ctx.UPLOADER_URL, fetch: ctx.fetch, logger: ctx.logger }),
writeCheckpointRecord: AoProcessClient.writeCheckpointRecordWith({ db: sqlite }),
Expand Down
91 changes: 72 additions & 19 deletions servers/cu/src/effects/wasm.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { promisify } from 'node:util'
import { PassThrough, Readable, pipeline } from 'node:stream'
import { PassThrough, Readable, Transform, pipeline } from 'node:stream'
import { createGunzip, createGzip } from 'node:zlib'
import { createHash } from 'node:crypto'
import { createReadStream, createWriteStream } from 'node:fs'
Expand Down Expand Up @@ -216,26 +216,79 @@ export function loadWasmModuleWith ({ fetch, ARWEAVE_URL, WASM_BINARY_FILE_DIREC
*
* We use a stream, so that we can incrementally compute hash in a non-blocking way
*/
export async function hashWasmMemory (memoryStream, encoding) {
/**
* TODO: add more encoding options
*/
if (encoding && encoding !== 'gzip') {
throw new Error('Only GZIP encoding of Memory is supported for Process Checkpoints')
export function hashWasmMemoryWith () {
class SubchunkStream extends Transform {
constructor (chunkSize) {
super()
this.chunkSize = chunkSize
/**
* accumulate received chunks into this buffer.
*
* It will be subchunked as needed, as the transform stream
* is read.
*/
this.buffer = Buffer.alloc(0)
}

_transform (chunk, _encoding, callback) {
this.buffer = Buffer.concat([this.buffer, chunk])

while (this.buffer.length >= this.chunkSize) {
const subChunk = this.buffer.subarray(0, this.chunkSize)
this.buffer = this.buffer.subarray(this.chunkSize)

/**
* We stop if push returns false in order to respect
* backpressure
*/
if (!this.push(subChunk)) return
}

callback()
}

_flush (callback) {
if (this.buffer.length > 0) this.push(this.buffer)
callback()
}
}

return Promise.resolve(memoryStream)
.then((memoryStream) => {
const hash = createHash('sha256')
return pipelineP(
memoryStream,
encoding === 'gzip'
? createGunzip()
: new PassThrough(),
hash
)
.then(() => hash.digest('hex'))
})
return async (memoryStream, encoding) => {
/**
* TODO: add more encoding options
*/
if (encoding && encoding !== 'gzip') {
throw new Error('Only GZIP encoding of Memory is supported for Process Checkpoints')
}

return Promise.resolve(memoryStream)
.then((memoryStream) => {
const hash = createHash('sha256')
return pipelineP(
memoryStream,
/**
* The memoryStream, if derived from an iterable like a Buffer,
* may emit the entire data stream as a single chunk.
*
* This can break things like Crypto hash, which can only handle
* data chunks less than 2GB.
*
* So we use this Transform stream to receive chunks,
* then re-emit "sub-chunks" of the given size -- in this case
* the default highWaterMark of 64kb
*
* This allows for hashing to work for any size memory, derived
* from any iterable, while respecting backpressure
*/
new SubchunkStream(1024 * 64),
encoding === 'gzip'
? createGunzip()
: new PassThrough(),
hash
)
.then(() => hash.digest('hex'))
})
}
}

export function isModuleMemoryLimitSupportedWith ({ PROCESS_WASM_MEMORY_MAX_LIMIT }) {
Expand Down
30 changes: 24 additions & 6 deletions servers/cu/src/effects/wasm.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,59 @@ import { createReadStream, readFileSync } from 'node:fs'
import { createGzip } from 'node:zlib'
import { Readable } from 'node:stream'

import { hashWasmMemory } from './wasm.js'
import { hashWasmMemoryWith } from './wasm.js'
import { createTestLogger } from '../domain/logger.js'

const logger = createTestLogger({ name: 'ao-cu:ao-process' })

describe('wasm', () => {
describe('hashWasmMemory', () => {
test('should hash the array buffer', async () => {
const s = createReadStream('./test/processes/happy/process.wasm')

const sha = await hashWasmMemory(s)
const sha = await hashWasmMemoryWith({ logger })(s)

assert.ok(typeof sha === 'string')
assert.equal(sha.length, 64)
})

test('should hash the array buffer derived from a typed array', async () => {
const s = createReadStream('./test/processes/happy/process.wasm')
const sha = await hashWasmMemory(s)
const sha = await hashWasmMemoryWith({ logger })(s)

const tArray = new Uint8Array(readFileSync('./test/processes/happy/process.wasm'))
const fromTArray = Buffer.from(tArray.buffer, tArray.byteOffset, tArray.byteLength)
const fromBuffer = await hashWasmMemory(Readable.from(fromTArray))
const fromBuffer = await hashWasmMemoryWith({ logger })(Readable.from(fromTArray))

assert.equal(fromBuffer, sha)
})

test('should decode the array buffer before hashing', async () => {
const raw = createReadStream('./test/processes/happy/process.wasm')
const rawSha = await hashWasmMemory(raw)
const rawSha = await hashWasmMemoryWith({ logger })(raw)

const encoded = createReadStream('./test/processes/happy/process.wasm')
.pipe(createGzip())

const encodedSha = await hashWasmMemory(encoded, 'gzip')
const encodedSha = await hashWasmMemoryWith({ logger })(encoded, 'gzip')

assert.equal(encodedSha.length, 64)
assert.equal(rawSha, encodedSha)
})

test('should hash large data', async () => {
function generate2GBBufferStream () {
const buffer = Buffer.allocUnsafe(2 * 1024 * 1024 * 1024)
buffer.fill(0)

return Readable.from(buffer)
}

const s = generate2GBBufferStream()
const sha = await hashWasmMemoryWith({ logger })(s)

assert.ok(typeof sha === 'string')
assert.equal(sha.length, 64)
})
})
})

0 comments on commit 390416a

Please sign in to comment.