From af9df6afc54048130c13cf1a5d174c165d38554b Mon Sep 17 00:00:00 2001 From: Tyler Hall Date: Tue, 2 Apr 2024 16:36:18 +0000 Subject: [PATCH] feat(cu): consolidate checkpoint logic in queryCheckpoint util and use in saveCheckpointWith #582 --- servers/cu/src/domain/client/ao-process.js | 210 +++++++++--------- .../cu/src/domain/client/ao-process.test.js | 1 + servers/cu/src/domain/index.js | 1 + 3 files changed, 106 insertions(+), 106 deletions(-) diff --git a/servers/cu/src/domain/client/ao-process.js b/servers/cu/src/domain/client/ao-process.js index 21d50941a..d4168f6ff 100644 --- a/servers/cu/src/domain/client/ao-process.js +++ b/servers/cu/src/domain/client/ao-process.js @@ -314,6 +314,80 @@ export function writeCheckpointFileWith ({ DIR, writeFile }) { } } +/** + * ################################ + * ##### Checkpoint query utils ### + * ################################ + */ + +function queryCheckpointsWith ({ queryGateway, queryCheckpointGateway, logger }) { + queryGateway = fromPromise(queryGateway) + queryCheckpointGateway = fromPromise(queryCheckpointGateway) + + return ({ query, variables, processId, timestamp, ordinate, cron }) => { + const queryCheckpoint = (gateway, name = 'gateway') => (attempt) => () => + gateway({ query, variables }) + .bimap( + (err) => { + logger( + 'Error encountered querying %s for Checkpoint for process "%s", before "%j". Attempt %d...', + name, + processId, + { timestamp, ordinate, cron }, + attempt, + err + ) + return variables + }, + identity + ) + + const queryOnDefaultGateway = queryCheckpoint(queryGateway) + /** + * Because the Checkpoint gateway is defaulted to the default gateway, if + * no Checkpoint gateway is configured, this is effectively equivalent to + * queryOnDefaultGateway. + * + * This means we maintain retry logic on default gateway + */ + const queryOnCheckpointGateway = queryCheckpoint(queryCheckpointGateway, 'Checkpoint gateway') + + return of() + /** + * First attempt to query the gateway configured specifically for Checkpoints. + */ + .chain(queryOnCheckpointGateway(1)) + /** + * Retry 1 on Checkpoint gateway + */ + .bichain(queryOnCheckpointGateway(2), Resolved) + .chain((res) => { + /** + * Fallback to the default gateway if no results. In this way, + * we either fallback to the default gateway, or use it as the final + * retry attempt, which is what we want. + */ + if (!path(['data', 'transactions', 'edges', '0'], res)) { + logger( + 'Checkpoint gateway returned no results for process "%s", before "%j". Falling back to default gateway...', + processId, + { timestamp, ordinate, cron } + ) + return Rejected(variables) + } + return Resolved(res) + }) + /** + * Our final attempt will be made against the default gateway. + * + * Since we also fallback to this if no results are returned from the Checkpoint gateway, + * this simoultaneously serves as our fallback AND our (max) 3rd retry attempt + * on the default gteway + */ + .bichain(queryOnDefaultGateway(3), Resolved) + } +} + export function findProcessMemoryBeforeWith ({ cache, findCheckpointFileBefore, @@ -326,13 +400,13 @@ export function findProcessMemoryBeforeWith ({ logger: _logger }) { const logger = _logger.child('ao-process:findProcessMemoryBefore') + address = fromPromise(address) findCheckpointFileBefore = fromPromise(findCheckpointFileBefore) readCheckpointFile = fromPromise(readCheckpointFile) - address = fromPromise(address) - queryGateway = fromPromise(queryGateway) - queryCheckpointGateway = fromPromise(queryCheckpointGateway) loadTransactionData = fromPromise(loadTransactionData) + const queryCheckpoints = queryCheckpointsWith({ queryGateway, queryCheckpointGateway, logger }) + const GET_AO_PROCESS_CHECKPOINTS = ` query GetAoProcessCheckpoints( $owner: String! @@ -512,72 +586,15 @@ export function findProcessMemoryBeforeWith ({ .chain(Rejected) } - const queryCheckpoint = (gateway, name = 'gateway') => (attempt) => (variables) => - gateway({ query: GET_AO_PROCESS_CHECKPOINTS, variables }) - .bimap( - (err) => { - logger( - 'Error encountered querying %s for Checkpoint for process "%s", before "%j". Attempt %d...', - name, - processId, - { timestamp, ordinate, cron }, - attempt, - err - ) - return variables - }, - identity - ) - - const queryOnDefaultGateway = queryCheckpoint(queryGateway) - /** - * Because the Checkpoint gateway is defaulted to the default gateway, if - * no Checkpoint gateway is configured, this is effectively equivalent to - * queryOnDefaultGateway. - * - * This means we maintain retry logic on default gateway - */ - const queryOnCheckpointGateway = queryCheckpoint(queryCheckpointGateway, 'Checkpoint gateway') - return address() - .map((owner) => ({ owner, processId, limit: 50 })) - .chain((variables) => - of(variables) - /** - * First attempt to query the gateway configured specifically for Checkpoints - * If no Checkpoint gateway was configured, then this will simply be a call - * to the default gateway, which is what we want - */ - .chain(queryOnCheckpointGateway(1)) - /** - * Retry 1 on Checkpoint gateway - */ - .bichain(queryOnCheckpointGateway(2), Resolved) - .chain((res) => { - /** - * Fallback to the default gateway if no results. In this way, - * we either fallback to the default gateway, or use it as the final - * retry attempt, which is what we want. - */ - if (!path(['data', 'transactions', 'edges', '0'], res)) { - logger( - 'Checkpoint gateway returned no results for process "%s", before "%j". Falling back to default gateway...', - processId, - { timestamp, ordinate, cron } - ) - return Rejected(variables) - } - return Resolved(res) - }) - /** - * Our final attempt will be made against the default gateway. - * - * Since we also fallback to this if no results are returned from the Checkpoint gateway, - * this simoultaneously serves as our fallback AND our (max) 3rd retry attempt - * on the default gteway - */ - .bichain(queryOnDefaultGateway(3), Resolved) - ) + .chain((owner) => queryCheckpoints({ + query: GET_AO_PROCESS_CHECKPOINTS, + variables: { owner, processId, limit: 50 }, + processId, + timestamp, + ordinate, + cron + })) .map(path(['data', 'transactions', 'edges'])) .map(determineLatestCheckpointBefore({ timestamp, ordinate, cron })) .chain((latestCheckpoint) => { @@ -726,6 +743,7 @@ export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EA } export function saveCheckpointWith ({ + queryCheckpointGateway, queryGateway, hashWasmMemory, buildAndSignDataItem, @@ -736,7 +754,6 @@ export function saveCheckpointWith ({ PROCESS_CHECKPOINT_CREATION_THROTTLE, DISABLE_PROCESS_CHECKPOINT_CREATION }) { - queryGateway = fromPromise(queryGateway) address = fromPromise(address) hashWasmMemory = fromPromise(hashWasmMemory) buildAndSignDataItem = fromPromise(buildAndSignDataItem) @@ -745,6 +762,8 @@ export function saveCheckpointWith ({ const logger = _logger.child('ao-process:saveCheckpoint') + const queryCheckpoints = queryCheckpointsWith({ queryGateway, queryCheckpointGateway, logger }) + /** * We will first query the gateway to determine if this CU * has already created a Checkpoint for this particular evaluation. @@ -876,47 +895,26 @@ export function saveCheckpointWith ({ } function createCheckpoint ({ Memory, encoding, processId, moduleId, timestamp, epoch, ordinate, nonce, blockHeight, cron }) { - const queryCheckpoint = (attempt) => (variables) => - queryGateway({ query: GET_AO_PROCESS_CHECKPOINTS(!!cron), variables }) - .bimap( - (err) => { - logger( - 'Error encountered querying gateway for Checkpoint for process "%s", before "%j". Attempt %d...', - processId, - { timestamp, ordinate, cron }, - attempt, - err - ) - return variables - }, - identity - ) - return address() - .map((owner) => ({ - owner, + .chain((owner) => queryCheckpoints({ + query: GET_AO_PROCESS_CHECKPOINTS(!!cron), + variables: { + owner, + processId, + /** + * Some messages do not have a nonce (Cron Messages), + * but every message will have an ordinate set to the most recent nonce + * (Scheduled Message ordinate is equal to its nonce) + */ + nonce: `${nonce || ordinate}`, + timestamp: `${timestamp}`, + cron + }, processId, - /** - * Some messages do not have a nonce (Cron Messages), - * but every message will have an ordinate set to the most recent nonce - * (Scheduled Message ordinate is equal to its nonce) - */ - nonce: `${nonce || ordinate}`, - timestamp: `${timestamp}`, + timestamp, + ordinate, cron })) - /** - * The gateway tends to timeout when making this query, - * but then will start working on retries. - * - * (I suspect the gateway is performing work, and times out on the first request, - * but then work is cached, which HITs on subsequent requests) - */ - .chain(queryCheckpoint(1)) - // Retry - .bichain(queryCheckpoint(2), Resolved) - // Retry - .bichain(queryCheckpoint(3), Resolved) .map(path(['data', 'transactions', 'edges', '0'])) .chain((checkpoint) => { /** diff --git a/servers/cu/src/domain/client/ao-process.test.js b/servers/cu/src/domain/client/ao-process.test.js index da778af7d..2c56308de 100644 --- a/servers/cu/src/domain/client/ao-process.test.js +++ b/servers/cu/src/domain/client/ao-process.test.js @@ -782,4 +782,5 @@ describe('ao-process', () => { }) describe.todo('saveLatestProcessMemoryWith') + describe.todo('saveCheckpointWith') }) diff --git a/servers/cu/src/domain/index.js b/servers/cu/src/domain/index.js index b83195074..13c0010bd 100644 --- a/servers/cu/src/domain/index.js +++ b/servers/cu/src/domain/index.js @@ -104,6 +104,7 @@ export const createApis = async (ctx) => { const saveCheckpoint = AoProcessClient.saveCheckpointWith({ address, queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger: ctx.logger }), + queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger: ctx.logger }), hashWasmMemory: WasmClient.hashWasmMemory, buildAndSignDataItem: ArweaveClient.buildAndSignDataItemWith({ WALLET: ctx.WALLET }), uploadDataItem: ArweaveClient.uploadDataItemWith({ UPLOADER_URL: ctx.UPLOADER_URL, fetch: ctx.fetch, logger: ctx.logger }),