Skip to content

Commit

Permalink
feat(cu): consolidate checkpoint logic in queryCheckpoint util and us…
Browse files Browse the repository at this point in the history
…e in saveCheckpointWith #582
  • Loading branch information
TillaTheHun0 committed Apr 2, 2024
1 parent 8ab8497 commit af9df6a
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 106 deletions.
210 changes: 104 additions & 106 deletions servers/cu/src/domain/client/ao-process.js
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,80 @@ export function writeCheckpointFileWith ({ DIR, writeFile }) {
}
}

/**
* ################################
* ##### Checkpoint query utils ###
* ################################
*/

function queryCheckpointsWith ({ queryGateway, queryCheckpointGateway, logger }) {
queryGateway = fromPromise(queryGateway)
queryCheckpointGateway = fromPromise(queryCheckpointGateway)

return ({ query, variables, processId, timestamp, ordinate, cron }) => {
const queryCheckpoint = (gateway, name = 'gateway') => (attempt) => () =>
gateway({ query, variables })
.bimap(
(err) => {
logger(
'Error encountered querying %s for Checkpoint for process "%s", before "%j". Attempt %d...',
name,
processId,
{ timestamp, ordinate, cron },
attempt,
err
)
return variables
},
identity
)

const queryOnDefaultGateway = queryCheckpoint(queryGateway)
/**
* Because the Checkpoint gateway is defaulted to the default gateway, if
* no Checkpoint gateway is configured, this is effectively equivalent to
* queryOnDefaultGateway.
*
* This means we maintain retry logic on default gateway
*/
const queryOnCheckpointGateway = queryCheckpoint(queryCheckpointGateway, 'Checkpoint gateway')

return of()
/**
* First attempt to query the gateway configured specifically for Checkpoints.
*/
.chain(queryOnCheckpointGateway(1))
/**
* Retry 1 on Checkpoint gateway
*/
.bichain(queryOnCheckpointGateway(2), Resolved)
.chain((res) => {
/**
* Fallback to the default gateway if no results. In this way,
* we either fallback to the default gateway, or use it as the final
* retry attempt, which is what we want.
*/
if (!path(['data', 'transactions', 'edges', '0'], res)) {
logger(
'Checkpoint gateway returned no results for process "%s", before "%j". Falling back to default gateway...',
processId,
{ timestamp, ordinate, cron }
)
return Rejected(variables)
}
return Resolved(res)
})
/**
* Our final attempt will be made against the default gateway.
*
* Since we also fallback to this if no results are returned from the Checkpoint gateway,
* this simoultaneously serves as our fallback AND our (max) 3rd retry attempt
* on the default gteway
*/
.bichain(queryOnDefaultGateway(3), Resolved)
}
}

export function findProcessMemoryBeforeWith ({
cache,
findCheckpointFileBefore,
Expand All @@ -326,13 +400,13 @@ export function findProcessMemoryBeforeWith ({
logger: _logger
}) {
const logger = _logger.child('ao-process:findProcessMemoryBefore')
address = fromPromise(address)
findCheckpointFileBefore = fromPromise(findCheckpointFileBefore)
readCheckpointFile = fromPromise(readCheckpointFile)
address = fromPromise(address)
queryGateway = fromPromise(queryGateway)
queryCheckpointGateway = fromPromise(queryCheckpointGateway)
loadTransactionData = fromPromise(loadTransactionData)

const queryCheckpoints = queryCheckpointsWith({ queryGateway, queryCheckpointGateway, logger })

const GET_AO_PROCESS_CHECKPOINTS = `
query GetAoProcessCheckpoints(
$owner: String!
Expand Down Expand Up @@ -512,72 +586,15 @@ export function findProcessMemoryBeforeWith ({
.chain(Rejected)
}

const queryCheckpoint = (gateway, name = 'gateway') => (attempt) => (variables) =>
gateway({ query: GET_AO_PROCESS_CHECKPOINTS, variables })
.bimap(
(err) => {
logger(
'Error encountered querying %s for Checkpoint for process "%s", before "%j". Attempt %d...',
name,
processId,
{ timestamp, ordinate, cron },
attempt,
err
)
return variables
},
identity
)

const queryOnDefaultGateway = queryCheckpoint(queryGateway)
/**
* Because the Checkpoint gateway is defaulted to the default gateway, if
* no Checkpoint gateway is configured, this is effectively equivalent to
* queryOnDefaultGateway.
*
* This means we maintain retry logic on default gateway
*/
const queryOnCheckpointGateway = queryCheckpoint(queryCheckpointGateway, 'Checkpoint gateway')

return address()
.map((owner) => ({ owner, processId, limit: 50 }))
.chain((variables) =>
of(variables)
/**
* First attempt to query the gateway configured specifically for Checkpoints
* If no Checkpoint gateway was configured, then this will simply be a call
* to the default gateway, which is what we want
*/
.chain(queryOnCheckpointGateway(1))
/**
* Retry 1 on Checkpoint gateway
*/
.bichain(queryOnCheckpointGateway(2), Resolved)
.chain((res) => {
/**
* Fallback to the default gateway if no results. In this way,
* we either fallback to the default gateway, or use it as the final
* retry attempt, which is what we want.
*/
if (!path(['data', 'transactions', 'edges', '0'], res)) {
logger(
'Checkpoint gateway returned no results for process "%s", before "%j". Falling back to default gateway...',
processId,
{ timestamp, ordinate, cron }
)
return Rejected(variables)
}
return Resolved(res)
})
/**
* Our final attempt will be made against the default gateway.
*
* Since we also fallback to this if no results are returned from the Checkpoint gateway,
* this simoultaneously serves as our fallback AND our (max) 3rd retry attempt
* on the default gteway
*/
.bichain(queryOnDefaultGateway(3), Resolved)
)
.chain((owner) => queryCheckpoints({
query: GET_AO_PROCESS_CHECKPOINTS,
variables: { owner, processId, limit: 50 },
processId,
timestamp,
ordinate,
cron
}))
.map(path(['data', 'transactions', 'edges']))
.map(determineLatestCheckpointBefore({ timestamp, ordinate, cron }))
.chain((latestCheckpoint) => {
Expand Down Expand Up @@ -726,6 +743,7 @@ export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EA
}

export function saveCheckpointWith ({
queryCheckpointGateway,
queryGateway,
hashWasmMemory,
buildAndSignDataItem,
Expand All @@ -736,7 +754,6 @@ export function saveCheckpointWith ({
PROCESS_CHECKPOINT_CREATION_THROTTLE,
DISABLE_PROCESS_CHECKPOINT_CREATION
}) {
queryGateway = fromPromise(queryGateway)
address = fromPromise(address)
hashWasmMemory = fromPromise(hashWasmMemory)
buildAndSignDataItem = fromPromise(buildAndSignDataItem)
Expand All @@ -745,6 +762,8 @@ export function saveCheckpointWith ({

const logger = _logger.child('ao-process:saveCheckpoint')

const queryCheckpoints = queryCheckpointsWith({ queryGateway, queryCheckpointGateway, logger })

/**
* We will first query the gateway to determine if this CU
* has already created a Checkpoint for this particular evaluation.
Expand Down Expand Up @@ -876,47 +895,26 @@ export function saveCheckpointWith ({
}

function createCheckpoint ({ Memory, encoding, processId, moduleId, timestamp, epoch, ordinate, nonce, blockHeight, cron }) {
const queryCheckpoint = (attempt) => (variables) =>
queryGateway({ query: GET_AO_PROCESS_CHECKPOINTS(!!cron), variables })
.bimap(
(err) => {
logger(
'Error encountered querying gateway for Checkpoint for process "%s", before "%j". Attempt %d...',
processId,
{ timestamp, ordinate, cron },
attempt,
err
)
return variables
},
identity
)

return address()
.map((owner) => ({
owner,
.chain((owner) => queryCheckpoints({
query: GET_AO_PROCESS_CHECKPOINTS(!!cron),
variables: {
owner,
processId,
/**
* Some messages do not have a nonce (Cron Messages),
* but every message will have an ordinate set to the most recent nonce
* (Scheduled Message ordinate is equal to its nonce)
*/
nonce: `${nonce || ordinate}`,
timestamp: `${timestamp}`,
cron
},
processId,
/**
* Some messages do not have a nonce (Cron Messages),
* but every message will have an ordinate set to the most recent nonce
* (Scheduled Message ordinate is equal to its nonce)
*/
nonce: `${nonce || ordinate}`,
timestamp: `${timestamp}`,
timestamp,
ordinate,
cron
}))
/**
* The gateway tends to timeout when making this query,
* but then will start working on retries.
*
* (I suspect the gateway is performing work, and times out on the first request,
* but then work is cached, which HITs on subsequent requests)
*/
.chain(queryCheckpoint(1))
// Retry
.bichain(queryCheckpoint(2), Resolved)
// Retry
.bichain(queryCheckpoint(3), Resolved)
.map(path(['data', 'transactions', 'edges', '0']))
.chain((checkpoint) => {
/**
Expand Down
1 change: 1 addition & 0 deletions servers/cu/src/domain/client/ao-process.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -782,4 +782,5 @@ describe('ao-process', () => {
})

describe.todo('saveLatestProcessMemoryWith')
describe.todo('saveCheckpointWith')
})
1 change: 1 addition & 0 deletions servers/cu/src/domain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ export const createApis = async (ctx) => {
const saveCheckpoint = AoProcessClient.saveCheckpointWith({
address,
queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger: ctx.logger }),
queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger: ctx.logger }),
hashWasmMemory: WasmClient.hashWasmMemory,
buildAndSignDataItem: ArweaveClient.buildAndSignDataItemWith({ WALLET: ctx.WALLET }),
uploadDataItem: ArweaveClient.uploadDataItemWith({ UPLOADER_URL: ctx.UPLOADER_URL, fetch: ctx.fetch, logger: ctx.logger }),
Expand Down

0 comments on commit af9df6a

Please sign in to comment.