Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cu)!: use new Cron tag format and implement parsing #199 #198 #209

Merged
merged 1 commit into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 46 additions & 56 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Transform, pipeline } from 'node:stream'

import { Rejected, Resolved, fromPromise, of } from 'hyper-async'
import { Resolved, fromPromise, of } from 'hyper-async'
import { T, always, ascend, cond, equals, ifElse, length, mergeRight, pipe, prop, reduce } from 'ramda'
import { z } from 'zod'
import ms from 'ms'
Expand All @@ -10,22 +10,21 @@ import { loadBlocksMetaSchema, loadMessagesSchema, loadTimestampSchema } from '.
import { padBlockHeight } from './utils.js'

/**
* - { name: 'Scheduled-Interval', value: 'interval' }
* - { name: 'Scheduled-Message', value: 'JSON' }
* - { name: 'Cron-Interval', value: 'interval' }
* - { name: 'Cron-Tag-Foo', value: 'Bar' }
* - { name: 'Cron-Tag-Fizz', value: 'Buzz' }
*
* Interval Format: 'X-Y'
*
* Where X is the value
* Where Y is the unit:
* - 'blocks'
* - 'cron' (X is cron string)
* - time unit ie. 'seconds' 'minutes' 'hours' 'days' 'weeks' 'months' 'years'
*
* - '10-blocks'
* - '10-seconds'
* - '* * * * *-cron'
*/
export function parseSchedules ({ tags }) {
export function parseCrons ({ tags }) {
function parseInterval (interval = '') {
const [value, unit] = interval
.split('-')
Expand All @@ -34,17 +33,16 @@ export function parseSchedules ({ tags }) {
return cond([
[equals('blocks'), always({ interval, unit, value: parseInt(value) })],
[equals('block'), always({ interval, unit, value: parseInt(value) })],
[equals('cron'), always({ interval, unit, value })],
/**
* Assume it's a time, so convert to seconds
*
* TODO: harden
*/
[T, pipe(
always({ interval, unit: 'seconds', value: Math.floor(ms([value, unit].join(' ')) / 1000) }),
(schedule) => {
if (schedule.value <= 0) throw new Error('time-based interval cannot be less than 1 second')
return schedule
(cron) => {
if (cron.value <= 0) throw new Error('time-based cron cannot be less than 1 second')
return cron
}
)]
])(unit)
Expand All @@ -53,66 +51,58 @@ export function parseSchedules ({ tags }) {
return of(tags)
.chain(tags => {
/**
* Build schedules from tags.
* interval is matched with message using a queue
* Build crons from tags.
* interval is matched with most recent Cron-Interval ta
*
* tags like:
*
* [
{ name: 'Foo', value: 'Bar' },
{ name: 'Scheduled-Interval', value: '10-blocks' },
{ name: 'Scheduled-Interval', value: ' 20-seconds ' },
{
name: 'Scheduled-Message',
value: action1
},
{ name: 'Random', value: 'Tag' },
{
name: 'Scheduled-Message',
value: action2
},
{ name: 'Scheduled-Interval', value: '* 1 * * *-cron' },
{ name: 'Another', value: 'Tag' },
{
name: 'Scheduled-Message',
value: action3
}
]
{ name: 'Cron-Interval', value: '5-minutes' },
{ name: 'Cron-Tag-Foo', value: 'Bar' },
{ name: 'Cron-Tag-Fizz', value: 'Buzz' },
{ name: 'Cron-Interval', value: '10-blocks' },
{ name: 'Cron-Tag-Foo', value: 'Bar' },
{ name: 'Cron-Tag-Fizz', value: 'Buzz' },
* ]
*/
const [schedules, queue] = reduce(
(acc, tag) => {
/**
* New interval found, so push to queue
*/
if (tag.name === SCHEDULED_INTERVAL) acc[1].push(parseInterval(tag.value))
const crons = reduce(
(crons, tag) => {
/**
* New message found, so combine with earliest found interval
* and construct the schedule
* New interval found, so push to list
*/
if (tag.name === SCHEDULED_MESSAGE) {
const { value, unit, interval } = acc[1].shift()
acc[0].push({
value,
unit,
interval,
message: JSON.parse(tag.value)
if (tag.name === CRON_INTERVAL) {
crons.push({
...parseInterval(tag.value),
/**
* Cron Messages may only specify tags
*/
message: { tags: [] }
})

return crons
}
return acc

if (CRON_TAG_REGEX.test(tag.name)) {
/**
* If a Cron-Tag-* is not preceded, at some point, by a Cron-Interval tag, then this is invalid
* and we throw an error
*/
if (!crons.length) throw new Error(`Unmatched Cron-Tag with no preceding Cron-Interval: ${tag.name}`)
const [, tagName] = CRON_TAG_REGEX.exec(tag.name)
crons[crons.length - 1].message.tags.push({ name: tagName, value: tag.value })
}

return crons
},
[[], []],
[],
tags
)

if (queue.length) return Rejected(`Unmatched Schedules found: ${queue.join(', ')}`)

if (!schedules.length) return Resolved([])
return Resolved(schedules)
return Resolved(crons)
})
}

export const SCHEDULED_INTERVAL = 'Scheduled-Interval'
export const SCHEDULED_MESSAGE = 'Scheduled-Message'
export const CRON_INTERVAL = 'Cron-Interval'
export const CRON_TAG_REGEX = /^Cron-Tag-(.+)$/

/**
* Whether the block height, relative to the origin block height,
Expand Down Expand Up @@ -364,7 +354,7 @@ function loadScheduledMessagesWith ({ loadTimestamp, loadBlocksMeta, logger }) {
}

return (ctx) => of(ctx)
.chain(parseSchedules)
.chain(parseCrons)
.bimap(
logger.tap('Failed to parse schedules:'),
ifElse(
Expand Down
74 changes: 17 additions & 57 deletions servers/cu/src/domain/lib/loadMessages.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,56 +5,30 @@ import * as assert from 'node:assert'
import ms from 'ms'
import { countBy, prop, uniqBy } from 'ramda'

import { SCHEDULED_INTERVAL, SCHEDULED_MESSAGE, isBlockOnSchedule, isTimestampOnSchedule, parseSchedules, scheduleMessagesBetweenWith } from './loadMessages.js'
import { CRON_INTERVAL, parseCrons, isBlockOnSchedule, isTimestampOnSchedule, scheduleMessagesBetweenWith } from './loadMessages.js'
import { padBlockHeight } from './utils.js'

describe('loadMessages', () => {
describe('parseSchedules', () => {
const [action1, action2, action3] = [
JSON.stringify({
tags: [
{ name: 'function', value: 'notify' },
{ name: 'notify-function', value: 'transfer' }
]
}),
JSON.stringify({
tags: [
{ name: 'function', value: 'notify' },
{ name: 'notify-function', value: 'transfer' }
]
}),
JSON.stringify({
tags: [
{ name: 'function', value: 'transfer' }
]
})
]
describe('parseCrons', () => {
test('parses the schedules from the tags', async () => {
/**
* Purposefully mixed up to test robustness of parsing queue
*/
const tags = [
{ name: 'Foo', value: 'Bar' },
{ name: SCHEDULED_INTERVAL, value: '10-blocks' },
{ name: SCHEDULED_INTERVAL, value: ' 10-minutes ' },
{
name: SCHEDULED_MESSAGE,
value: action1
},
{ name: CRON_INTERVAL, value: '10-blocks' },
{ name: 'Cron-Tag-function', value: 'notify' },
{ name: 'Cron-Tag-notify-function', value: 'transfer' },
{ name: 'Random', value: 'Tag' },
{
name: SCHEDULED_MESSAGE,
value: action2
},
{ name: SCHEDULED_INTERVAL, value: '* 1 * * *-cron' },
{ name: CRON_INTERVAL, value: ' 10-minutes ' },
{ name: 'Cron-Tag-function', value: 'notify' },
{ name: 'Cron-Tag-notify-function', value: 'transfer' },
{ name: CRON_INTERVAL, value: '1 hour' },
{ name: 'Another', value: 'Tag' },
{
name: SCHEDULED_MESSAGE,
value: action3
}
{ name: 'Cron-Tag-Function', value: 'transfer' }
]

const [blocks, staticTime, cron] = await parseSchedules({ tags })
const [blocks, staticTime] = await parseCrons({ tags })
.toPromise()

assert.deepStrictEqual(blocks, {
Expand All @@ -80,39 +54,25 @@ describe('loadMessages', () => {
},
interval: ' 10-minutes '
})

assert.deepStrictEqual(cron, {
value: '* 1 * * *',
unit: 'cron',
message: {
tags: [
{ name: 'function', value: 'transfer' }
]
},
interval: '* 1 * * *-cron'
})
})

test('return an empty array of no schedules are found', async () => {
const schedules = await parseSchedules({
const crons = await parseCrons({
tags: []
}).toPromise()

assert.deepStrictEqual(schedules, [])
assert.deepStrictEqual(crons, [])
})

test('throw if time-based schedule is less than 1 second', async () => {
await parseSchedules({
await parseCrons({
tags: [
{ name: SCHEDULED_INTERVAL, value: '500-Milliseconds' },
{
name: SCHEDULED_MESSAGE,
value: action3
}
{ name: CRON_INTERVAL, value: '500-Milliseconds' },
{ name: 'Cron-Tag-Function', value: 'transfer' }
]
}).toPromise()
.catch(err => {
assert.equal(err.message, 'time-based interval cannot be less than 1 second')
assert.equal(err.message, 'time-based cron cannot be less than 1 second')
})
})
})
Expand Down