Skip to content

Commit

Permalink
fix(cu): get result working
Browse files Browse the repository at this point in the history
  • Loading branch information
TillaTheHun0 committed Dec 19, 2023
1 parent 8cf6ae2 commit f820134
Show file tree
Hide file tree
Showing 13 changed files with 43 additions and 30 deletions.
14 changes: 7 additions & 7 deletions servers/cu/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion servers/cu/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
},
"dependencies": {
"@permaweb/ao-loader": "0.0.12",
"@permaweb/ao-scheduler-utils": "^0.0.3",
"@permaweb/ao-scheduler-utils": "^0.0.4",
"arweave": "^1.14.4",
"cors": "^2.8.5",
"dataloader": "^2.2.2",
Expand Down
14 changes: 10 additions & 4 deletions servers/cu/src/domain/client/ao-su.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => {
* down the pipeline
*/
block: applySpec({
height: path(['block']),
height: pipe(
path(['block']),
(block) => parseInt(block)
),
timestamp: path(['timestamp'])
}),
AoGlobal: applySpec({
Expand All @@ -160,7 +163,7 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => {
fetchAllPages({ suUrl, processId, from, to }),
Transform.from(mapAoMessage({ processId, processOwner, processTags })),
(err) => {
if (err) logger('Encountered err when mapping Sequencer Messages', err)
if (err) logger('Encountered err when mapping Scheduled Messages', err)
}
)
})
Expand All @@ -175,11 +178,14 @@ export const loadProcessWith = ({ fetch }) => {
owner: path(['owner', 'address']),
tags: path(['tags']),
block: applySpec({
height: path(['block', 'height']),
height: pipe(
path(['block']),
(block) => parseInt(block)
),
/**
* SU is currently sending back timestamp in milliseconds,
*/
timestamp: path(['block', 'timestamp'])
timestamp: path(['timestamp'])
})
}))
}
Expand Down
1 change: 0 additions & 1 deletion servers/cu/src/domain/client/wallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ let arweave
export function createWalletClient () {
if (arweave) return arweave
arweave = Arweave.init()
console.log(Arweave)
return arweave
}

Expand Down
1 change: 1 addition & 0 deletions servers/cu/src/domain/dal.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export const loadMessagesSchema = z.function()
suUrl: z.string().url(),
processId: z.string(),
owner: z.string(),
tags: z.array(rawTagSchema),
from: z.coerce.string().optional(),
to: z.coerce.string().optional()
})
Expand Down
8 changes: 4 additions & 4 deletions servers/cu/src/domain/lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ export function evaluateWith (env) {
* if their deepHash is found in the cache, this prevents duplicate evals
*/
if (deepHash) {
logger('Checking if "%s" has already been evaluated...', message.Id)
logger('Checking if "%s" has already been evaluated...', message.Id || `Cron Message ${cron}`)
const found = await doesMessageHashExist(deepHash).toPromise()
if (found) {
logger('Message with deepHash "%s" was found in cache and therefore has already been evaluated. Removing from eval stream', deepHash)
Expand All @@ -184,7 +184,7 @@ export function evaluateWith (env) {
.then(prev =>
Promise.resolve(prev.Memory)
.then(Memory => {
logger('Evaluating message with id "%s" to process "%s"', message.Id, ctx.id)
logger('Evaluating message with id "%s" to process "%s"', message.Id || `Cron Message ${cron}`, ctx.id)
return Memory
})
/**
Expand All @@ -206,7 +206,7 @@ export function evaluateWith (env) {
: Promise.resolve(output)
})
.then(output => {
logger('Applied message with id "%s" to process "%s"', message.Id, ctx.id)
logger('Applied message with id "%s" to process "%s"', message.Id || `Cron Message ${cron}`, ctx.id)
return output
})
/**
Expand All @@ -227,7 +227,7 @@ export function evaluateWith (env) {
)
.catch(logger.tap(
'Error occurred when applying message with id "%s" to process "%s" %o',
message.Id,
message.Id || `Cron Message ${cron}`,
ctx.id
))
)
Expand Down
6 changes: 3 additions & 3 deletions servers/cu/src/domain/lib/hydrateMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ export function maybeAoLoadWith ({ loadTransactionData, loadTransactionMeta, log
continue
}

logger('Hydrating ao-load message for "%s" from transaction "%s"', cur.message.Id, tag.value)
logger('Hydrating Load message for "%s" from transaction "%s"', cur.message.Id, tag.value)
/**
* - Fetch raw data and meta from gateway
* - contruct the data item JSON, encoding the raw data as base64
Expand All @@ -124,7 +124,7 @@ export function maybeAoLoadWith ({ loadTransactionData, loadTransactionMeta, log
loadTransactionMeta(tag.value)
]).then(([data, meta]) => messageFromParts({ data, meta }))

logger('Hydrated ao-load message for "%s" from transaction "%s" and attached as data', cur.message.Id, tag.value)
logger('Hydrated Load message for "%s" from transaction "%s" and attached as data', cur.message.Id, tag.value)

yield cur
}
Expand Down Expand Up @@ -160,7 +160,7 @@ export function hydrateMessagesWith (env) {
Transform.from(maybeMessageId),
Transform.from(maybeAoLoad),
(err) => {
if (err) logger('Encountered err when hydrating ao-load and forwarded-for messages', err)
if (err) logger('Encountered err when hydrating Load and forwarded-for messages', err)
}
)
})
Expand Down
3 changes: 2 additions & 1 deletion servers/cu/src/domain/lib/loadMessageMeta.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { fromPromise } from 'hyper-async'
import { z } from 'zod'

import { loadMessageMetaSchema, locateSchedulerSchema } from '../dal.js'
import { trimSlash } from '../utils.js'

/**
* The result that is produced from this step
Expand Down Expand Up @@ -44,7 +45,7 @@ export function loadMessageMetaWith (env) {
return (ctx) => {
return locateScheduler(ctx.processId)
.chain(({ url }) => loadMessageMeta({
suUrl: url,
suUrl: trimSlash(url),
processId: ctx.processId,
messageTxId: ctx.messageTxId
}))
Expand Down
9 changes: 5 additions & 4 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import ms from 'ms'

import { messageSchema, streamSchema } from '../model.js'
import { loadBlocksMetaSchema, loadMessagesSchema, loadTimestampSchema, locateSchedulerSchema } from '../dal.js'
import { trimSlash } from '../utils.js'

/**
* - { name: 'Cron-Interval', value: 'interval' }
Expand Down Expand Up @@ -242,13 +243,13 @@ function loadScheduledMessagesWith ({ locateScheduler, loadMessages, loadBlocksM
return (ctx) =>
of(ctx)
.map(ctx => {
logger('Initializing AsyncIterable of Sequenced messages for process "%s" between "%s" and "%s"', ctx.id, ctx.from || 'initial', ctx.to || 'latest')
logger('Initializing AsyncIterable of Scheduled messages for process "%s" between "%s" and "%s"', ctx.id, ctx.from || 'initial', ctx.to || 'latest')
return ctx
})
.chain((ctx) =>
locateScheduler(ctx.id)
.chain(({ url }) => loadMessages({
suUrl: url,
suUrl: trimSlash(url),
processId: ctx.id,
owner: ctx.owner,
tags: ctx.tags,
Expand Down Expand Up @@ -320,7 +321,7 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta,
* producing a single merged stream
*/
return locateScheduler(ctx.id)
.chain(({ url }) => loadTimestamp({ processId: ctx.id, suUrl: url }))
.chain(({ url }) => loadTimestamp({ processId: ctx.id, suUrl: trimSlash(url) }))
.map(logger.tap('loaded current block and tiemstamp from SU'))
/**
* In order to generate cron messages and merge them with the
Expand Down Expand Up @@ -366,7 +367,7 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta,
})
)
.map(ctx => {
logger('Merging Streams of Sequenced and Scheduled Messages...')
logger('Merging Streams of Scheduled and Cron Messages...')
return ctx
})
.map(({ leftMost, rightMost, $scheduled, genCronMessages }) => {
Expand Down
4 changes: 2 additions & 2 deletions servers/cu/src/domain/lib/loadProcess.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { z } from 'zod'

import { findLatestEvaluationSchema, findProcessSchema, loadProcessSchema, locateSchedulerSchema, saveProcessSchema } from '../dal.js'
import { rawBlockSchema, rawTagSchema } from '../model.js'
import { eqOrIncludes, parseTags } from '../utils.js'
import { eqOrIncludes, parseTags, trimSlash } from '../utils.js'

function getProcessMetaWith ({ loadProcess, locateScheduler, findProcess, saveProcess, logger }) {
locateScheduler = fromPromise(locateSchedulerSchema.implement(locateScheduler))
Expand All @@ -22,7 +22,7 @@ function getProcessMetaWith ({ loadProcess, locateScheduler, findProcess, savePr
*/
function loadFromSu (processId) {
return locateScheduler(processId)
.chain(({ url }) => loadProcess({ suUrl: url, processId }))
.chain(({ url }) => loadProcess({ suUrl: trimSlash(url), processId }))
/**
* Verify the process by examining the tags
*/
Expand Down
4 changes: 2 additions & 2 deletions servers/cu/src/domain/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ export const messageSchema = z.object({
'Forwarded-By': z.string().optional(),
'Forwarded-For': z.string().optional(),
Tags: z.array(rawTagSchema),
Epoch: z.number(),
Nonce: z.number(),
Epoch: z.number().optional(),
Nonce: z.number().optional(),
Timestamp: z.coerce.number(),
'Block-Height': z.coerce.number(),
/**
Expand Down
5 changes: 5 additions & 0 deletions servers/cu/src/domain/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ export function eqOrIncludes (val) {
])
}

export function trimSlash (str = '') {
if (!str.endsWith('/')) return str
return trimSlash(str.slice(0, -1))
}

export function findRawTag (name, tags) {
return pipe(
defaultTo([]),
Expand Down
2 changes: 1 addition & 1 deletion servers/cu/src/routes/middleware/withInMemoryCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export const withInMemoryCache = ({
const logger = _logger.child('InMemoryCache')

const dataloader = new Dataloader(loader, {
cacheKeyFn: keyer,
cacheKeyFn: ({ req }) => keyer(req),
cacheMap: {
get: (key) => {
/**
Expand Down

0 comments on commit f820134

Please sign in to comment.