Skip to content

Commit

Permalink
chore(cu): cleanup loadMessages pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
TillaTheHun0 committed Dec 21, 2023
1 parent 77078e2 commit c26e32a
Showing 1 changed file with 46 additions and 47 deletions.
93 changes: 46 additions & 47 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -277,41 +277,6 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta,
loadTimestamp = fromPromise(loadTimestampSchema.implement(loadTimestamp))
loadBlocksMeta = fromPromise(loadBlocksMetaSchema.implement(loadBlocksMeta))

/**
* Given a left-most and right-most boundary, return an async generator,
* that given a list of values, emits sequential binary tuples dervied from those values,
* with an additional element appended and prepended to the list of values.
*
* This effectively places an tuple aperture on the incoming stream of single values,
* and with andditional element at the beginning and end of the list
*
* Since we added our left and right bounds, there should always
* be at least one tuple emitted, which will account for any time
* we have <2 cron messages to use as boundaries.
*
* If our leftMost and rightMost boundary are the only boundaries, this effectively means
* that we have no cron messages to merge and evaluate, and only cron messages to generate
*
* [b1, b2, b3] -> [ [b1, b2], [b2, b3] ]
*/
function genTuplesWithBoundaries ({ left: first, right: last }) {
return async function * genTuples (boundaries) {
/**
* the initial prev is the left-most boundary
*/
let prev = first
for await (const boundary of boundaries) {
yield [prev, boundary]
prev = boundary
}

/**
* Emit the last boundary
*/
yield [prev, last]
}
}

return (ctx) => of(ctx)
.chain(parseCrons)
.bimap(
Expand Down Expand Up @@ -390,20 +355,54 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta,
return ctx
})
.map(({ leftMost, rightMost, $scheduled, genCronMessages }) => {
/**
* Each set of cron messages will be generated between a left and right boundary,
* So we need to procure a set of boundaries to use, while ALSO merging with the scheduled messages
* from the SU.
*
* Our messages retrieved from the SU are perfect boundaries, as they each have a
* block height and timestamp
*
* This will allow the CU to generate cron messages with monotonically increasing timestamp and accurate block metadata,
* at least w.r.t the SU's claims.
*/
return pipeline(
/**
* Each set of cron messages will be generated between a left and right boundary,
* So we need to procure a set of boundaries to use, while ALSO merging with the scheduled messages
* from the SU.
*
* Our messages retrieved from the SU are perfect boundaries, as they each have a
* block height and timestamp
*
* This will allow the CU to generate cron messages with monotonically increasing timestamp and accurate block metadata,
* at least w.r.t the SU's claims.
*/
$scheduled,
Transform.from(genTuplesWithBoundaries({ left: leftMost, right: rightMost })),
/**
* Given a left-most and right-most boundary, return an async generator,
* that given a list of values, emits sequential binary tuples dervied from those values,
* with an additional element appended and prepended to the list of values.
*
* This effectively places an tuple aperture on the incoming stream of single values,
* and with andditional element at the beginning and end of the list
*
* Since we added our left and right bounds, there should always
* be at least one tuple emitted, which will account for any time
* we have <2 cron messages to use as boundaries.
*
* If our leftMost and rightMost boundary are the only boundaries, this effectively means
* that we have no cron messages to merge and evaluate, and only cron messages to generate
*
* [b1, b2, b3] -> [ [b1, b2], [b2, b3] ]
*/
Transform.from(
async function * genBoundariesAsTuples (boundaries) {
/**
* the initial prev is the left-most boundary
*/
let prev = leftMost

for await (const boundary of boundaries) {
yield [prev, boundary]
prev = boundary
}

/**
* Emit the last boundary
*/
yield [prev, rightMost]
}
),
Transform.from(async function * (boundaries) {
let tuple = await boundaries.next()
while (!tuple.done) {
Expand Down

0 comments on commit c26e32a

Please sign in to comment.