Skip to content

Commit

Permalink
feat(cu)!: use new Cron tag format and implement parsing #199 #198
Browse files Browse the repository at this point in the history
  • Loading branch information
TillaTheHun0 committed Dec 12, 2023
1 parent fc9f22e commit 6931691
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 113 deletions.
102 changes: 46 additions & 56 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Transform, pipeline } from 'node:stream'

import { Rejected, Resolved, fromPromise, of } from 'hyper-async'
import { Resolved, fromPromise, of } from 'hyper-async'
import { T, always, ascend, cond, equals, ifElse, length, mergeRight, pipe, prop, reduce } from 'ramda'
import { z } from 'zod'
import ms from 'ms'
Expand All @@ -10,22 +10,21 @@ import { loadBlocksMetaSchema, loadMessagesSchema, loadTimestampSchema } from '.
import { padBlockHeight } from './utils.js'

/**
* - { name: 'Scheduled-Interval', value: 'interval' }
* - { name: 'Scheduled-Message', value: 'JSON' }
* - { name: 'Cron-Interval', value: 'interval' }
* - { name: 'Cron-Tag-Foo', value: 'Bar' }
* - { name: 'Cron-Tag-Fizz', value: 'Buzz' }
*
* Interval Format: 'X-Y'
*
* Where X is the value
* Where Y is the unit:
* - 'blocks'
* - 'cron' (X is cron string)
* - time unit ie. 'seconds' 'minutes' 'hours' 'days' 'weeks' 'months' 'years'
*
* - '10-blocks'
* - '10-seconds'
* - '* * * * *-cron'
*/
export function parseSchedules ({ tags }) {
export function parseCrons ({ tags }) {
function parseInterval (interval = '') {
const [value, unit] = interval
.split('-')
Expand All @@ -34,17 +33,16 @@ export function parseSchedules ({ tags }) {
return cond([
[equals('blocks'), always({ interval, unit, value: parseInt(value) })],
[equals('block'), always({ interval, unit, value: parseInt(value) })],
[equals('cron'), always({ interval, unit, value })],
/**
* Assume it's a time, so convert to seconds
*
* TODO: harden
*/
[T, pipe(
always({ interval, unit: 'seconds', value: Math.floor(ms([value, unit].join(' ')) / 1000) }),
(schedule) => {
if (schedule.value <= 0) throw new Error('time-based interval cannot be less than 1 second')
return schedule
(cron) => {
if (cron.value <= 0) throw new Error('time-based cron cannot be less than 1 second')
return cron
}
)]
])(unit)
Expand All @@ -53,66 +51,58 @@ export function parseSchedules ({ tags }) {
return of(tags)
.chain(tags => {
/**
* Build schedules from tags.
* interval is matched with message using a queue
* Build crons from tags.
* interval is matched with most recent Cron-Interval ta
*
* tags like:
*
* [
{ name: 'Foo', value: 'Bar' },
{ name: 'Scheduled-Interval', value: '10-blocks' },
{ name: 'Scheduled-Interval', value: ' 20-seconds ' },
{
name: 'Scheduled-Message',
value: action1
},
{ name: 'Random', value: 'Tag' },
{
name: 'Scheduled-Message',
value: action2
},
{ name: 'Scheduled-Interval', value: '* 1 * * *-cron' },
{ name: 'Another', value: 'Tag' },
{
name: 'Scheduled-Message',
value: action3
}
]
{ name: 'Cron-Interval', value: '5-minutes' },
{ name: 'Cron-Tag-Foo', value: 'Bar' },
{ name: 'Cron-Tag-Fizz', value: 'Buzz' },
{ name: 'Cron-Interval', value: '10-blocks' },
{ name: 'Cron-Tag-Foo', value: 'Bar' },
{ name: 'Cron-Tag-Fizz', value: 'Buzz' },
* ]
*/
const [schedules, queue] = reduce(
(acc, tag) => {
/**
* New interval found, so push to queue
*/
if (tag.name === SCHEDULED_INTERVAL) acc[1].push(parseInterval(tag.value))
const crons = reduce(
(crons, tag) => {
/**
* New message found, so combine with earliest found interval
* and construct the schedule
* New interval found, so push to list
*/
if (tag.name === SCHEDULED_MESSAGE) {
const { value, unit, interval } = acc[1].shift()
acc[0].push({
value,
unit,
interval,
message: JSON.parse(tag.value)
if (tag.name === CRON_INTERVAL) {
crons.push({
...parseInterval(tag.value),
/**
* Cron Messages may only specify tags
*/
message: { tags: [] }
})

return crons
}
return acc

if (CRON_TAG_REGEX.test(tag.name)) {
/**
* If a Cron-Tag-* is not preceded, at some point, by a Cron-Interval tag, then this is invalid
* and we throw an error
*/
if (!crons.length) throw new Error(`Unmatched Cron-Tag with no preceding Cron-Interval: ${tag.name}`)
const [, tagName] = CRON_TAG_REGEX.exec(tag.name)
crons[crons.length - 1].message.tags.push({ name: tagName, value: tag.value })
}

return crons
},
[[], []],
[],
tags
)

if (queue.length) return Rejected(`Unmatched Schedules found: ${queue.join(', ')}`)

if (!schedules.length) return Resolved([])
return Resolved(schedules)
return Resolved(crons)
})
}

export const SCHEDULED_INTERVAL = 'Scheduled-Interval'
export const SCHEDULED_MESSAGE = 'Scheduled-Message'
export const CRON_INTERVAL = 'Cron-Interval'
export const CRON_TAG_REGEX = /^Cron-Tag-(.+)$/

/**
* Whether the block height, relative to the origin block height,
Expand Down Expand Up @@ -364,7 +354,7 @@ function loadScheduledMessagesWith ({ loadTimestamp, loadBlocksMeta, logger }) {
}

return (ctx) => of(ctx)
.chain(parseSchedules)
.chain(parseCrons)
.bimap(
logger.tap('Failed to parse schedules:'),
ifElse(
Expand Down
74 changes: 17 additions & 57 deletions servers/cu/src/domain/lib/loadMessages.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,56 +5,30 @@ import * as assert from 'node:assert'
import ms from 'ms'
import { countBy, prop, uniqBy } from 'ramda'

import { SCHEDULED_INTERVAL, SCHEDULED_MESSAGE, isBlockOnSchedule, isTimestampOnSchedule, parseSchedules, scheduleMessagesBetweenWith } from './loadMessages.js'
import { CRON_INTERVAL, parseCrons, isBlockOnSchedule, isTimestampOnSchedule, scheduleMessagesBetweenWith } from './loadMessages.js'
import { padBlockHeight } from './utils.js'

describe('loadMessages', () => {
describe('parseSchedules', () => {
const [action1, action2, action3] = [
JSON.stringify({
tags: [
{ name: 'function', value: 'notify' },
{ name: 'notify-function', value: 'transfer' }
]
}),
JSON.stringify({
tags: [
{ name: 'function', value: 'notify' },
{ name: 'notify-function', value: 'transfer' }
]
}),
JSON.stringify({
tags: [
{ name: 'function', value: 'transfer' }
]
})
]
describe('parseCrons', () => {
test('parses the schedules from the tags', async () => {
/**
* Purposefully mixed up to test robustness of parsing queue
*/
const tags = [
{ name: 'Foo', value: 'Bar' },
{ name: SCHEDULED_INTERVAL, value: '10-blocks' },
{ name: SCHEDULED_INTERVAL, value: ' 10-minutes ' },
{
name: SCHEDULED_MESSAGE,
value: action1
},
{ name: CRON_INTERVAL, value: '10-blocks' },
{ name: 'Cron-Tag-function', value: 'notify' },
{ name: 'Cron-Tag-notify-function', value: 'transfer' },
{ name: 'Random', value: 'Tag' },
{
name: SCHEDULED_MESSAGE,
value: action2
},
{ name: SCHEDULED_INTERVAL, value: '* 1 * * *-cron' },
{ name: CRON_INTERVAL, value: ' 10-minutes ' },
{ name: 'Cron-Tag-function', value: 'notify' },
{ name: 'Cron-Tag-notify-function', value: 'transfer' },
{ name: CRON_INTERVAL, value: '1 hour' },
{ name: 'Another', value: 'Tag' },
{
name: SCHEDULED_MESSAGE,
value: action3
}
{ name: 'Cron-Tag-Function', value: 'transfer' }
]

const [blocks, staticTime, cron] = await parseSchedules({ tags })
const [blocks, staticTime] = await parseCrons({ tags })
.toPromise()

assert.deepStrictEqual(blocks, {
Expand All @@ -80,39 +54,25 @@ describe('loadMessages', () => {
},
interval: ' 10-minutes '
})

assert.deepStrictEqual(cron, {
value: '* 1 * * *',
unit: 'cron',
message: {
tags: [
{ name: 'function', value: 'transfer' }
]
},
interval: '* 1 * * *-cron'
})
})

test('return an empty array of no schedules are found', async () => {
const schedules = await parseSchedules({
const crons = await parseCrons({
tags: []
}).toPromise()

assert.deepStrictEqual(schedules, [])
assert.deepStrictEqual(crons, [])
})

test('throw if time-based schedule is less than 1 second', async () => {
await parseSchedules({
await parseCrons({
tags: [
{ name: SCHEDULED_INTERVAL, value: '500-Milliseconds' },
{
name: SCHEDULED_MESSAGE,
value: action3
}
{ name: CRON_INTERVAL, value: '500-Milliseconds' },
{ name: 'Cron-Tag-Function', value: 'transfer' }
]
}).toPromise()
.catch(err => {
assert.equal(err.message, 'time-based interval cannot be less than 1 second')
assert.equal(err.message, 'time-based cron cannot be less than 1 second')
})
})
})
Expand Down

0 comments on commit 6931691

Please sign in to comment.