Skip to content

Commit

Permalink
fix(cu): do not depend on scheduled messages stream to emit boundaries
Browse files Browse the repository at this point in the history
  • Loading branch information
TillaTheHun0 committed Dec 22, 2023
1 parent cd6c2b9 commit 0760ae6
Showing 1 changed file with 30 additions and 29 deletions.
59 changes: 30 additions & 29 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Transform, pipeline } from 'node:stream'
import { Readable, Transform, pipeline } from 'node:stream'

import { Resolved, fromPromise, of } from 'hyper-async'
import { T, always, ascend, cond, equals, ifElse, last, length, mergeRight, pipe, prop, reduce } from 'ramda'
Expand Down Expand Up @@ -363,18 +363,6 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta,
})
.map(({ leftMost, rightMost, $scheduled, genCronMessages }) => {
return pipeline(
/**
* Each set of cron messages will be generated between a left and right boundary,
* So we need to procure a set of boundaries to use, while ALSO merging with the scheduled messages
* from the SU.
*
* Our messages retrieved from the SU are perfect boundaries, as they each have a
* block height and timestamp, as well as a ordinate set to its nonce.
*
* This will allow the CU to generate cron messages that orderable in and amongst the scheduled message,
* and with accurate block metadata, at least w.r.t the SU's claims.
*/
$scheduled,
/**
* Given a left-most and right-most boundary, return an async generator,
* that given a list of values, emits sequential binary tuples dervied from those values,
Expand All @@ -392,23 +380,36 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta,
*
* [b1, b2, b3] -> [ [b1, b2], [b2, b3] ]
*/
Transform.from(
async function * genBoundariesAsTuples (boundaries) {
/**
* the initial prev is the left-most boundary
*/
let prev = leftMost

for await (const boundary of boundaries) {
yield [prev, boundary]
prev = boundary
Readable.from(
(
async function * genBoundariesAsTuples () {
/**
* the initial prev is the left-most boundary
*/
let prev = leftMost

/**
* Each set of cron messages will be generated between a left and right boundary,
* So we need to procure a set of boundaries to use, while ALSO merging with the scheduled messages
* from the SU, producing a single ordered sequence of messages to be evaluated by the process.
*
* Our messages retrieved from the SU are perfect boundaries, as they each have a
* block height and timestamp, as well as a ordinate set to its nonce.
*
* This will allow the CU to generate cron messages that orderable in and amongst the scheduled message,
* and with accurate block metadata, at least w.r.t the SU's claims.
*/
for await (const boundary of $scheduled) {
yield [prev, boundary]
prev = boundary
}

/**
* Emit the last boundary
*/
yield [prev, rightMost]
}

/**
* Emit the last boundary
*/
yield [prev, rightMost]
}
)()
),
Transform.from(async function * (boundaries) {
let tuple = await boundaries.next()
Expand Down

0 comments on commit 0760ae6

Please sign in to comment.