diff --git a/servers/cu/package-lock.json b/servers/cu/package-lock.json index a0efb0155..f55d2179d 100644 --- a/servers/cu/package-lock.json +++ b/servers/cu/package-lock.json @@ -9,7 +9,7 @@ "version": "1.0.0", "dependencies": { "@permaweb/ao-loader": "0.0.12", - "@permaweb/ao-scheduler-utils": "^0.0.3", + "@permaweb/ao-scheduler-utils": "^0.0.4", "arweave": "^1.14.4", "cors": "^2.8.5", "dataloader": "^2.2.2", @@ -51,9 +51,9 @@ } }, "node_modules/@permaweb/ao-scheduler-utils": { - "version": "0.0.3", - "resolved": "https://registry.npmjs.org/@permaweb/ao-scheduler-utils/-/ao-scheduler-utils-0.0.3.tgz", - "integrity": "sha512-L21vKcKK9Sat+SSJheuU6/EmBjA6XUMIxfLPCd0zWW14s6haQYJhIT00fT1ZiihtLfMEXHVnIa8jFu1c4EdZdw==", + "version": "0.0.4", + "resolved": "https://registry.npmjs.org/@permaweb/ao-scheduler-utils/-/ao-scheduler-utils-0.0.4.tgz", + "integrity": "sha512-EmQUmobr87jPwBGuNUwHj3yLG/zL3B9n4tmqRx8OefTNTNWEJH45a0+maN/K582BZOQ5016zocRdYnXvgZsDXQ==", "dependencies": { "mnemonist": "^0.39.6", "ramda": "^0.29.1" @@ -2076,9 +2076,9 @@ "integrity": "sha512-ttYTYds0bxf4JDV4kwg/lJWBOaR5x6Mibm0KBXrOL5b8KOlqBwQhZhQE8lFKkzRp8iY/bXZLeb82DUDBn88AHA==" }, "@permaweb/ao-scheduler-utils": { - "version": "0.0.3", - "resolved": "https://registry.npmjs.org/@permaweb/ao-scheduler-utils/-/ao-scheduler-utils-0.0.3.tgz", - "integrity": "sha512-L21vKcKK9Sat+SSJheuU6/EmBjA6XUMIxfLPCd0zWW14s6haQYJhIT00fT1ZiihtLfMEXHVnIa8jFu1c4EdZdw==", + "version": "0.0.4", + "resolved": "https://registry.npmjs.org/@permaweb/ao-scheduler-utils/-/ao-scheduler-utils-0.0.4.tgz", + "integrity": "sha512-EmQUmobr87jPwBGuNUwHj3yLG/zL3B9n4tmqRx8OefTNTNWEJH45a0+maN/K582BZOQ5016zocRdYnXvgZsDXQ==", "requires": { "mnemonist": "^0.39.6", "ramda": "^0.29.1" diff --git a/servers/cu/package.json b/servers/cu/package.json index e374adc7a..8222a6a31 100644 --- a/servers/cu/package.json +++ b/servers/cu/package.json @@ -14,7 +14,7 @@ }, "dependencies": { "@permaweb/ao-loader": "0.0.12", - "@permaweb/ao-scheduler-utils": "^0.0.3", + "@permaweb/ao-scheduler-utils": "^0.0.4", "arweave": "^1.14.4", "cors": "^2.8.5", "dataloader": "^2.2.2", diff --git a/servers/cu/src/domain/client/ao-su.js b/servers/cu/src/domain/client/ao-su.js index 260549d3c..93f164135 100644 --- a/servers/cu/src/domain/client/ao-su.js +++ b/servers/cu/src/domain/client/ao-su.js @@ -141,7 +141,10 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { * down the pipeline */ block: applySpec({ - height: path(['block']), + height: pipe( + path(['block']), + (block) => parseInt(block) + ), timestamp: path(['timestamp']) }), AoGlobal: applySpec({ @@ -160,7 +163,7 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { fetchAllPages({ suUrl, processId, from, to }), Transform.from(mapAoMessage({ processId, processOwner, processTags })), (err) => { - if (err) logger('Encountered err when mapping Sequencer Messages', err) + if (err) logger('Encountered err when mapping Scheduled Messages', err) } ) }) @@ -175,11 +178,14 @@ export const loadProcessWith = ({ fetch }) => { owner: path(['owner', 'address']), tags: path(['tags']), block: applySpec({ - height: path(['block', 'height']), + height: pipe( + path(['block']), + (block) => parseInt(block) + ), /** * SU is currently sending back timestamp in milliseconds, */ - timestamp: path(['block', 'timestamp']) + timestamp: path(['timestamp']) }) })) } diff --git a/servers/cu/src/domain/client/wallet.js b/servers/cu/src/domain/client/wallet.js index ad37ce8d7..c3b920c49 100644 --- a/servers/cu/src/domain/client/wallet.js +++ b/servers/cu/src/domain/client/wallet.js @@ -4,7 +4,6 @@ let arweave export function createWalletClient () { if (arweave) return arweave arweave = Arweave.init() - console.log(Arweave) return arweave } diff --git a/servers/cu/src/domain/dal.js b/servers/cu/src/domain/dal.js index 4e0e4efee..8017f0b26 100644 --- a/servers/cu/src/domain/dal.js +++ b/servers/cu/src/domain/dal.js @@ -74,6 +74,7 @@ export const loadMessagesSchema = z.function() suUrl: z.string().url(), processId: z.string(), owner: z.string(), + tags: z.array(rawTagSchema), from: z.coerce.string().optional(), to: z.coerce.string().optional() }) diff --git a/servers/cu/src/domain/lib/evaluate.js b/servers/cu/src/domain/lib/evaluate.js index ba89c546f..3eb05b182 100644 --- a/servers/cu/src/domain/lib/evaluate.js +++ b/servers/cu/src/domain/lib/evaluate.js @@ -171,7 +171,7 @@ export function evaluateWith (env) { * if their deepHash is found in the cache, this prevents duplicate evals */ if (deepHash) { - logger('Checking if "%s" has already been evaluated...', message.Id) + logger('Checking if "%s" has already been evaluated...', message.Id || `Cron Message ${cron}`) const found = await doesMessageHashExist(deepHash).toPromise() if (found) { logger('Message with deepHash "%s" was found in cache and therefore has already been evaluated. Removing from eval stream', deepHash) @@ -184,7 +184,7 @@ export function evaluateWith (env) { .then(prev => Promise.resolve(prev.Memory) .then(Memory => { - logger('Evaluating message with id "%s" to process "%s"', message.Id, ctx.id) + logger('Evaluating message with id "%s" to process "%s"', message.Id || `Cron Message ${cron}`, ctx.id) return Memory }) /** @@ -206,7 +206,7 @@ export function evaluateWith (env) { : Promise.resolve(output) }) .then(output => { - logger('Applied message with id "%s" to process "%s"', message.Id, ctx.id) + logger('Applied message with id "%s" to process "%s"', message.Id || `Cron Message ${cron}`, ctx.id) return output }) /** @@ -227,7 +227,7 @@ export function evaluateWith (env) { ) .catch(logger.tap( 'Error occurred when applying message with id "%s" to process "%s" %o', - message.Id, + message.Id || `Cron Message ${cron}`, ctx.id )) ) diff --git a/servers/cu/src/domain/lib/hydrateMessages.js b/servers/cu/src/domain/lib/hydrateMessages.js index 85bb21439..3e6b0cfd9 100644 --- a/servers/cu/src/domain/lib/hydrateMessages.js +++ b/servers/cu/src/domain/lib/hydrateMessages.js @@ -113,7 +113,7 @@ export function maybeAoLoadWith ({ loadTransactionData, loadTransactionMeta, log continue } - logger('Hydrating ao-load message for "%s" from transaction "%s"', cur.message.Id, tag.value) + logger('Hydrating Load message for "%s" from transaction "%s"', cur.message.Id, tag.value) /** * - Fetch raw data and meta from gateway * - contruct the data item JSON, encoding the raw data as base64 @@ -124,7 +124,7 @@ export function maybeAoLoadWith ({ loadTransactionData, loadTransactionMeta, log loadTransactionMeta(tag.value) ]).then(([data, meta]) => messageFromParts({ data, meta })) - logger('Hydrated ao-load message for "%s" from transaction "%s" and attached as data', cur.message.Id, tag.value) + logger('Hydrated Load message for "%s" from transaction "%s" and attached as data', cur.message.Id, tag.value) yield cur } @@ -160,7 +160,7 @@ export function hydrateMessagesWith (env) { Transform.from(maybeMessageId), Transform.from(maybeAoLoad), (err) => { - if (err) logger('Encountered err when hydrating ao-load and forwarded-for messages', err) + if (err) logger('Encountered err when hydrating Load and forwarded-for messages', err) } ) }) diff --git a/servers/cu/src/domain/lib/loadMessageMeta.js b/servers/cu/src/domain/lib/loadMessageMeta.js index fe1a15870..300a8025a 100644 --- a/servers/cu/src/domain/lib/loadMessageMeta.js +++ b/servers/cu/src/domain/lib/loadMessageMeta.js @@ -2,6 +2,7 @@ import { fromPromise } from 'hyper-async' import { z } from 'zod' import { loadMessageMetaSchema, locateSchedulerSchema } from '../dal.js' +import { trimSlash } from '../utils.js' /** * The result that is produced from this step @@ -44,7 +45,7 @@ export function loadMessageMetaWith (env) { return (ctx) => { return locateScheduler(ctx.processId) .chain(({ url }) => loadMessageMeta({ - suUrl: url, + suUrl: trimSlash(url), processId: ctx.processId, messageTxId: ctx.messageTxId })) diff --git a/servers/cu/src/domain/lib/loadMessages.js b/servers/cu/src/domain/lib/loadMessages.js index 76e4c0ab4..d515aba7b 100644 --- a/servers/cu/src/domain/lib/loadMessages.js +++ b/servers/cu/src/domain/lib/loadMessages.js @@ -7,6 +7,7 @@ import ms from 'ms' import { messageSchema, streamSchema } from '../model.js' import { loadBlocksMetaSchema, loadMessagesSchema, loadTimestampSchema, locateSchedulerSchema } from '../dal.js' +import { trimSlash } from '../utils.js' /** * - { name: 'Cron-Interval', value: 'interval' } @@ -242,13 +243,13 @@ function loadScheduledMessagesWith ({ locateScheduler, loadMessages, loadBlocksM return (ctx) => of(ctx) .map(ctx => { - logger('Initializing AsyncIterable of Sequenced messages for process "%s" between "%s" and "%s"', ctx.id, ctx.from || 'initial', ctx.to || 'latest') + logger('Initializing AsyncIterable of Scheduled messages for process "%s" between "%s" and "%s"', ctx.id, ctx.from || 'initial', ctx.to || 'latest') return ctx }) .chain((ctx) => locateScheduler(ctx.id) .chain(({ url }) => loadMessages({ - suUrl: url, + suUrl: trimSlash(url), processId: ctx.id, owner: ctx.owner, tags: ctx.tags, @@ -320,7 +321,7 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta, * producing a single merged stream */ return locateScheduler(ctx.id) - .chain(({ url }) => loadTimestamp({ processId: ctx.id, suUrl: url })) + .chain(({ url }) => loadTimestamp({ processId: ctx.id, suUrl: trimSlash(url) })) .map(logger.tap('loaded current block and tiemstamp from SU')) /** * In order to generate cron messages and merge them with the @@ -366,7 +367,7 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta, }) ) .map(ctx => { - logger('Merging Streams of Sequenced and Scheduled Messages...') + logger('Merging Streams of Scheduled and Cron Messages...') return ctx }) .map(({ leftMost, rightMost, $scheduled, genCronMessages }) => { diff --git a/servers/cu/src/domain/lib/loadProcess.js b/servers/cu/src/domain/lib/loadProcess.js index df1a3a75a..151ce4d5f 100644 --- a/servers/cu/src/domain/lib/loadProcess.js +++ b/servers/cu/src/domain/lib/loadProcess.js @@ -4,7 +4,7 @@ import { z } from 'zod' import { findLatestEvaluationSchema, findProcessSchema, loadProcessSchema, locateSchedulerSchema, saveProcessSchema } from '../dal.js' import { rawBlockSchema, rawTagSchema } from '../model.js' -import { eqOrIncludes, parseTags } from '../utils.js' +import { eqOrIncludes, parseTags, trimSlash } from '../utils.js' function getProcessMetaWith ({ loadProcess, locateScheduler, findProcess, saveProcess, logger }) { locateScheduler = fromPromise(locateSchedulerSchema.implement(locateScheduler)) @@ -22,7 +22,7 @@ function getProcessMetaWith ({ loadProcess, locateScheduler, findProcess, savePr */ function loadFromSu (processId) { return locateScheduler(processId) - .chain(({ url }) => loadProcess({ suUrl: url, processId })) + .chain(({ url }) => loadProcess({ suUrl: trimSlash(url), processId })) /** * Verify the process by examining the tags */ diff --git a/servers/cu/src/domain/model.js b/servers/cu/src/domain/model.js index ffb758c91..826c74969 100644 --- a/servers/cu/src/domain/model.js +++ b/servers/cu/src/domain/model.js @@ -69,8 +69,8 @@ export const messageSchema = z.object({ 'Forwarded-By': z.string().optional(), 'Forwarded-For': z.string().optional(), Tags: z.array(rawTagSchema), - Epoch: z.number(), - Nonce: z.number(), + Epoch: z.number().optional(), + Nonce: z.number().optional(), Timestamp: z.coerce.number(), 'Block-Height': z.coerce.number(), /** diff --git a/servers/cu/src/domain/utils.js b/servers/cu/src/domain/utils.js index d642f5dba..ba3807b49 100644 --- a/servers/cu/src/domain/utils.js +++ b/servers/cu/src/domain/utils.js @@ -120,6 +120,11 @@ export function eqOrIncludes (val) { ]) } +export function trimSlash (str = '') { + if (!str.endsWith('/')) return str + return trimSlash(str.slice(0, -1)) +} + export function findRawTag (name, tags) { return pipe( defaultTo([]), diff --git a/servers/cu/src/routes/middleware/withInMemoryCache.js b/servers/cu/src/routes/middleware/withInMemoryCache.js index 455e40d45..c11cfd1ed 100644 --- a/servers/cu/src/routes/middleware/withInMemoryCache.js +++ b/servers/cu/src/routes/middleware/withInMemoryCache.js @@ -38,7 +38,7 @@ export const withInMemoryCache = ({ const logger = _logger.child('InMemoryCache') const dataloader = new Dataloader(loader, { - cacheKeyFn: keyer, + cacheKeyFn: ({ req }) => keyer(req), cacheMap: { get: (key) => { /**