From 6931691131785bfc6f7e10ee3ffa61b3b45be727 Mon Sep 17 00:00:00 2001 From: Tyler Hall Date: Tue, 12 Dec 2023 00:31:32 +0000 Subject: [PATCH] feat(cu)!: use new Cron tag format and implement parsing #199 #198 --- servers/cu/src/domain/lib/loadMessages.js | 102 ++++++++---------- .../cu/src/domain/lib/loadMessages.test.js | 74 +++---------- 2 files changed, 63 insertions(+), 113 deletions(-) diff --git a/servers/cu/src/domain/lib/loadMessages.js b/servers/cu/src/domain/lib/loadMessages.js index 0920fc379..f85c810b2 100644 --- a/servers/cu/src/domain/lib/loadMessages.js +++ b/servers/cu/src/domain/lib/loadMessages.js @@ -1,6 +1,6 @@ import { Transform, pipeline } from 'node:stream' -import { Rejected, Resolved, fromPromise, of } from 'hyper-async' +import { Resolved, fromPromise, of } from 'hyper-async' import { T, always, ascend, cond, equals, ifElse, length, mergeRight, pipe, prop, reduce } from 'ramda' import { z } from 'zod' import ms from 'ms' @@ -10,22 +10,21 @@ import { loadBlocksMetaSchema, loadMessagesSchema, loadTimestampSchema } from '. import { padBlockHeight } from './utils.js' /** - * - { name: 'Scheduled-Interval', value: 'interval' } - * - { name: 'Scheduled-Message', value: 'JSON' } + * - { name: 'Cron-Interval', value: 'interval' } + * - { name: 'Cron-Tag-Foo', value: 'Bar' } + * - { name: 'Cron-Tag-Fizz', value: 'Buzz' } * * Interval Format: 'X-Y' * * Where X is the value * Where Y is the unit: * - 'blocks' - * - 'cron' (X is cron string) * - time unit ie. 'seconds' 'minutes' 'hours' 'days' 'weeks' 'months' 'years' * * - '10-blocks' * - '10-seconds' - * - '* * * * *-cron' */ -export function parseSchedules ({ tags }) { +export function parseCrons ({ tags }) { function parseInterval (interval = '') { const [value, unit] = interval .split('-') @@ -34,7 +33,6 @@ export function parseSchedules ({ tags }) { return cond([ [equals('blocks'), always({ interval, unit, value: parseInt(value) })], [equals('block'), always({ interval, unit, value: parseInt(value) })], - [equals('cron'), always({ interval, unit, value })], /** * Assume it's a time, so convert to seconds * @@ -42,9 +40,9 @@ export function parseSchedules ({ tags }) { */ [T, pipe( always({ interval, unit: 'seconds', value: Math.floor(ms([value, unit].join(' ')) / 1000) }), - (schedule) => { - if (schedule.value <= 0) throw new Error('time-based interval cannot be less than 1 second') - return schedule + (cron) => { + if (cron.value <= 0) throw new Error('time-based cron cannot be less than 1 second') + return cron } )] ])(unit) @@ -53,66 +51,58 @@ export function parseSchedules ({ tags }) { return of(tags) .chain(tags => { /** - * Build schedules from tags. - * interval is matched with message using a queue + * Build crons from tags. + * interval is matched with most recent Cron-Interval ta * * tags like: - * * [ - { name: 'Foo', value: 'Bar' }, - { name: 'Scheduled-Interval', value: '10-blocks' }, - { name: 'Scheduled-Interval', value: ' 20-seconds ' }, - { - name: 'Scheduled-Message', - value: action1 - }, - { name: 'Random', value: 'Tag' }, - { - name: 'Scheduled-Message', - value: action2 - }, - { name: 'Scheduled-Interval', value: '* 1 * * *-cron' }, - { name: 'Another', value: 'Tag' }, - { - name: 'Scheduled-Message', - value: action3 - } - ] + { name: 'Cron-Interval', value: '5-minutes' }, + { name: 'Cron-Tag-Foo', value: 'Bar' }, + { name: 'Cron-Tag-Fizz', value: 'Buzz' }, + { name: 'Cron-Interval', value: '10-blocks' }, + { name: 'Cron-Tag-Foo', value: 'Bar' }, + { name: 'Cron-Tag-Fizz', value: 'Buzz' }, + * ] */ - const [schedules, queue] = reduce( - (acc, tag) => { - /** - * New interval found, so push to queue - */ - if (tag.name === SCHEDULED_INTERVAL) acc[1].push(parseInterval(tag.value)) + const crons = reduce( + (crons, tag) => { /** - * New message found, so combine with earliest found interval - * and construct the schedule + * New interval found, so push to list */ - if (tag.name === SCHEDULED_MESSAGE) { - const { value, unit, interval } = acc[1].shift() - acc[0].push({ - value, - unit, - interval, - message: JSON.parse(tag.value) + if (tag.name === CRON_INTERVAL) { + crons.push({ + ...parseInterval(tag.value), + /** + * Cron Messages may only specify tags + */ + message: { tags: [] } }) + + return crons } - return acc + + if (CRON_TAG_REGEX.test(tag.name)) { + /** + * If a Cron-Tag-* is not preceded, at some point, by a Cron-Interval tag, then this is invalid + * and we throw an error + */ + if (!crons.length) throw new Error(`Unmatched Cron-Tag with no preceding Cron-Interval: ${tag.name}`) + const [, tagName] = CRON_TAG_REGEX.exec(tag.name) + crons[crons.length - 1].message.tags.push({ name: tagName, value: tag.value }) + } + + return crons }, - [[], []], + [], tags ) - if (queue.length) return Rejected(`Unmatched Schedules found: ${queue.join(', ')}`) - - if (!schedules.length) return Resolved([]) - return Resolved(schedules) + return Resolved(crons) }) } -export const SCHEDULED_INTERVAL = 'Scheduled-Interval' -export const SCHEDULED_MESSAGE = 'Scheduled-Message' +export const CRON_INTERVAL = 'Cron-Interval' +export const CRON_TAG_REGEX = /^Cron-Tag-(.+)$/ /** * Whether the block height, relative to the origin block height, @@ -364,7 +354,7 @@ function loadScheduledMessagesWith ({ loadTimestamp, loadBlocksMeta, logger }) { } return (ctx) => of(ctx) - .chain(parseSchedules) + .chain(parseCrons) .bimap( logger.tap('Failed to parse schedules:'), ifElse( diff --git a/servers/cu/src/domain/lib/loadMessages.test.js b/servers/cu/src/domain/lib/loadMessages.test.js index e698a55a8..ce851c070 100644 --- a/servers/cu/src/domain/lib/loadMessages.test.js +++ b/servers/cu/src/domain/lib/loadMessages.test.js @@ -5,56 +5,30 @@ import * as assert from 'node:assert' import ms from 'ms' import { countBy, prop, uniqBy } from 'ramda' -import { SCHEDULED_INTERVAL, SCHEDULED_MESSAGE, isBlockOnSchedule, isTimestampOnSchedule, parseSchedules, scheduleMessagesBetweenWith } from './loadMessages.js' +import { CRON_INTERVAL, parseCrons, isBlockOnSchedule, isTimestampOnSchedule, scheduleMessagesBetweenWith } from './loadMessages.js' import { padBlockHeight } from './utils.js' describe('loadMessages', () => { - describe('parseSchedules', () => { - const [action1, action2, action3] = [ - JSON.stringify({ - tags: [ - { name: 'function', value: 'notify' }, - { name: 'notify-function', value: 'transfer' } - ] - }), - JSON.stringify({ - tags: [ - { name: 'function', value: 'notify' }, - { name: 'notify-function', value: 'transfer' } - ] - }), - JSON.stringify({ - tags: [ - { name: 'function', value: 'transfer' } - ] - }) - ] + describe('parseCrons', () => { test('parses the schedules from the tags', async () => { /** * Purposefully mixed up to test robustness of parsing queue */ const tags = [ { name: 'Foo', value: 'Bar' }, - { name: SCHEDULED_INTERVAL, value: '10-blocks' }, - { name: SCHEDULED_INTERVAL, value: ' 10-minutes ' }, - { - name: SCHEDULED_MESSAGE, - value: action1 - }, + { name: CRON_INTERVAL, value: '10-blocks' }, + { name: 'Cron-Tag-function', value: 'notify' }, + { name: 'Cron-Tag-notify-function', value: 'transfer' }, { name: 'Random', value: 'Tag' }, - { - name: SCHEDULED_MESSAGE, - value: action2 - }, - { name: SCHEDULED_INTERVAL, value: '* 1 * * *-cron' }, + { name: CRON_INTERVAL, value: ' 10-minutes ' }, + { name: 'Cron-Tag-function', value: 'notify' }, + { name: 'Cron-Tag-notify-function', value: 'transfer' }, + { name: CRON_INTERVAL, value: '1 hour' }, { name: 'Another', value: 'Tag' }, - { - name: SCHEDULED_MESSAGE, - value: action3 - } + { name: 'Cron-Tag-Function', value: 'transfer' } ] - const [blocks, staticTime, cron] = await parseSchedules({ tags }) + const [blocks, staticTime] = await parseCrons({ tags }) .toPromise() assert.deepStrictEqual(blocks, { @@ -80,39 +54,25 @@ describe('loadMessages', () => { }, interval: ' 10-minutes ' }) - - assert.deepStrictEqual(cron, { - value: '* 1 * * *', - unit: 'cron', - message: { - tags: [ - { name: 'function', value: 'transfer' } - ] - }, - interval: '* 1 * * *-cron' - }) }) test('return an empty array of no schedules are found', async () => { - const schedules = await parseSchedules({ + const crons = await parseCrons({ tags: [] }).toPromise() - assert.deepStrictEqual(schedules, []) + assert.deepStrictEqual(crons, []) }) test('throw if time-based schedule is less than 1 second', async () => { - await parseSchedules({ + await parseCrons({ tags: [ - { name: SCHEDULED_INTERVAL, value: '500-Milliseconds' }, - { - name: SCHEDULED_MESSAGE, - value: action3 - } + { name: CRON_INTERVAL, value: '500-Milliseconds' }, + { name: 'Cron-Tag-Function', value: 'transfer' } ] }).toPromise() .catch(err => { - assert.equal(err.message, 'time-based interval cannot be less than 1 second') + assert.equal(err.message, 'time-based cron cannot be less than 1 second') }) }) })