Skip to content

Commit

Permalink
chore(cu): use allDocs to fetch latest eval. Make findAllEvaluations …
Browse files Browse the repository at this point in the history
…from exclusive and to inclusive
  • Loading branch information
TillaTheHun0 committed Dec 22, 2023
1 parent d84a2ce commit 65b2ba9
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 100 deletions.
3 changes: 3 additions & 0 deletions servers/cu/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ NODE_CONFIG_ENV="development"
DEBUG=*
DB_MODE="embedded"
DB_URL="ao-cache"
# If you want to use a local CouchDB, then use these env vars instead
# DB_MODE="remote"
# DB_URL="http://admin:[email protected]:5984/ao-cache"
DB_MAX_LISTENERS=100
DUMP_PATH="static"
NODE_HEAPDUMP_OPTIONS="nosignal"
70 changes: 38 additions & 32 deletions servers/cu/src/domain/client/pouchdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { deflate, inflate } from 'node:zlib'
import { promisify } from 'node:util'

import { fromPromise, of, Rejected, Resolved } from 'hyper-async'
import { always, applySpec, head, isNotNil, lensPath, map, omit, pipe, prop, set } from 'ramda'
import { always, applySpec, head, isNotNil, lensPath, map, omit, pipe, pluck, prop, set } from 'ramda'
import { z } from 'zod'

import PouchDb from 'pouchdb'
Expand All @@ -17,6 +17,8 @@ const inflateP = promisify(inflate)

/**
* An implementation of the db client using pouchDB
*
* @type {PouchDB.Database}
*/
let internalPouchDb
export function createPouchDbClient ({ logger, maxListeners, mode, url }) {
Expand Down Expand Up @@ -162,22 +164,34 @@ export function saveProcessWith ({ pouchDb }) {
}
}

export function findLatestEvaluationWith ({ pouchDb }) {
export function findLatestEvaluationWith ({ pouchDb = internalPouchDb }) {
function createSelector ({ processId, to }) {
/**
* By using the max collation sequence, this will give us all docs whose _id
* is prefixed with the processId id
*/
const selector = {
_id: {
$gte: createEvaluationId({ processId, timestamp: '' }),
$lte: createEvaluationId({ processId, timestamp: COLLATION_SEQUENCE_MAX_CHAR })
}
/**
* Because we want a descending order, our startkey is the largest key in the range
* and endkey is the smallest key in the range
*
* By using the max collation sequence char, this will give us all docs whose _id
* is prefixed with the processId id
*/
descending: true,
startkey: createEvaluationId({ processId, timestamp: COLLATION_SEQUENCE_MAX_CHAR }),
endkey: createEvaluationId({ processId, timestamp: '' }),
include_docs: true,
/**
* Only get the latest document within the range,
* aka the latest evaluation
*/
limit: 1
}

/**
* overwrite upper range with actual timestamp, since we have it
* overwrite upper range with actual timestamp, since we have it.
*
* By appending the max collation sequence char, we ensure we capture the latest
* message within the range, including CRON messages
*/
if (to) selector._id.$lte = createEvaluationId({ processId, timestamp: to })
if (to) selector.startkey = `${createEvaluationId({ processId, timestamp: to })}${COLLATION_SEQUENCE_MAX_CHAR}`
return selector
}

Expand All @@ -199,20 +213,11 @@ export function findLatestEvaluationWith ({ pouchDb }) {
return of({ processId, to })
.map(createSelector)
.chain(fromPromise((selector) => {
/**
* Find the most recent evaluation:
* - sort key less than or equal to the sort key we're interested in
*
* This will give us the most recent evaluation
*/
return pouchDb.find({
selector,
sort: [{ _id: 'desc' }],
limit: 1
}).then((res) => {
if (res.warning) console.warn(res.warning)
return res.docs
})
return pouchDb.allDocs(selector)
.then((res) => {
if (res.warning) console.warn(res.warning)
return pluck('doc', res.rows)
})
}))
.map(head)
.chain((doc) => doc ? Resolved(doc) : Rejected(undefined))
Expand Down Expand Up @@ -365,28 +370,29 @@ export function saveEvaluationWith ({ pouchDb, logger: _logger }) {
}

export function findEvaluationsWith ({ pouchDb }) {
function createSelector ({ processId, from, to }) {
function createSelector ({ processId, from, to, cron }) {
/**
* grab all evaluations for the processId, by default
*/
const selector = {
_id: {
$gte: createEvaluationId({ processId, timestamp: '' }),
$lt: createEvaluationId({ processId, timestamp: COLLATION_SEQUENCE_MAX_CHAR })
}
},
...(cron ? { cron: { $exists: true } } : {})
}

/**
* trim range using sort keys, if provided
* trim range using timestamps, if provided.
*/
if (from) selector._id.$gte = `${createEvaluationId({ processId, timestamp: from })},`
if (to) selector._id.$lt = `${createEvaluationId({ processId, timestamp: to })},`
if (to) selector._id.$lt = `${createEvaluationId({ processId, timestamp: to })}`

return selector
}

return ({ processId, from, to }) => {
return of({ processId, from, to })
return ({ processId, from, to, cron }) => {
return of({ processId, from, to, cron })
.map(createSelector)
.chain(fromPromise((selector) => {
return pouchDb.find({
Expand Down
96 changes: 30 additions & 66 deletions servers/cu/src/domain/client/pouchdb.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { promisify } from 'node:util'

import { findEvaluationsSchema, findLatestEvaluationSchema, findMessageHashSchema, findProcessSchema, saveEvaluationSchema, saveProcessSchema } from '../dal.js'
import {
COLLATION_SEQUENCE_MAX_CHAR,
findEvaluationsWith,
findLatestEvaluationWith,
findMessageHashWith,
Expand Down Expand Up @@ -155,29 +154,21 @@ describe('pouchdb', () => {
const findLatestEvaluation = findLatestEvaluationSchema.implement(
findLatestEvaluationWith({
pouchDb: {
find: async (op) => {
assert.deepStrictEqual(op, {
selector: {
_id: {
$gte: 'eval-process-123,',
$lte: 'eval-process-123,1702677252111'
}
},
sort: [{ _id: 'desc' }],
limit: 1
})
allDocs: async (op) => {
return {
docs: [
rows: [
{
_id: 'eval-process-123,1702677252111',
timestamp: 1702677252111,
blockHeight: 1234,
processId: 'process-123',
messageId: 'message-123',
parent: 'proc-process-123',
output: { Messages: [{ foo: 'bar' }] },
evaluatedAt,
type: 'evaluation'
doc: {
_id: 'eval-process-123,1702677252111',
timestamp: 1702677252111,
blockHeight: 1234,
processId: 'process-123',
messageId: 'message-123',
parent: 'proc-process-123',
output: { Messages: [{ foo: 'bar' }] },
evaluatedAt,
type: 'evaluation'
}
}
]
}
Expand Down Expand Up @@ -211,29 +202,21 @@ describe('pouchdb', () => {
const findLatestEvaluation = findLatestEvaluationSchema.implement(
findLatestEvaluationWith({
pouchDb: {
find: async (op) => {
assert.deepStrictEqual(op, {
selector: {
_id: {
$gte: 'eval-process-123,',
$lte: `eval-process-123,${COLLATION_SEQUENCE_MAX_CHAR}`
}
},
sort: [{ _id: 'desc' }],
limit: 1
})
allDocs: async (op) => {
return {
docs: [
rows: [
{
_id: 'eval-process-123,1702677252111',
timestamp: 1702677252111,
blockHeight: 1234,
processId: 'process-123',
messageId: 'message-123',
parent: 'proc-process-123',
output: { Messages: [{ foo: 'bar' }] },
evaluatedAt,
type: 'evaluation'
doc: {
_id: 'eval-process-123,1702677252111',
timestamp: 1702677252111,
blockHeight: 1234,
processId: 'process-123',
messageId: 'message-123',
parent: 'proc-process-123',
output: { Messages: [{ foo: 'bar' }] },
evaluatedAt,
type: 'evaluation'
}
}
]
}
Expand Down Expand Up @@ -263,7 +246,7 @@ describe('pouchdb', () => {
const findLatestEvaluation = findLatestEvaluationSchema.implement(
findLatestEvaluationWith({
pouchDb: {
find: async () => ({ docs: [] })
allDocs: async () => ({ rows: [] })
},
logger
})
Expand Down Expand Up @@ -383,16 +366,6 @@ describe('pouchdb', () => {
findEvaluationsWith({
pouchDb: {
find: async (op) => {
assert.deepStrictEqual(op, {
selector: {
_id: {
$gte: 'eval-process-123,',
$lt: `eval-process-123,${COLLATION_SEQUENCE_MAX_CHAR}`
}
},
sort: [{ _id: 'asc' }],
limit: Number.MAX_SAFE_INTEGER
})
return {
docs: [
mockEval,
Expand All @@ -404,7 +377,7 @@ describe('pouchdb', () => {
logger
}))

const res = await findEvaluations({ processId: 'process-123' })
const res = await findEvaluations({ processId: 'process-123', cron: true })

assert.equal(res.length, 2)
})
Expand All @@ -426,16 +399,6 @@ describe('pouchdb', () => {
findEvaluationsWith({
pouchDb: {
find: async (op) => {
assert.deepStrictEqual(op, {
selector: {
_id: {
$gte: 'eval-process-123,1702677252111,',
$lt: 'eval-process-123,1702677252111,'
}
},
sort: [{ _id: 'asc' }],
limit: Number.MAX_SAFE_INTEGER
})
return {
docs: [
mockEval,
Expand All @@ -450,7 +413,8 @@ describe('pouchdb', () => {
const res = await findEvaluations({
processId: 'process-123',
from: 1702677252111,
to: 1702677252111
to: 1702677252111,
cron: true
})

assert.equal(res.length, 2)
Expand Down
3 changes: 2 additions & 1 deletion servers/cu/src/domain/dal.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ export const findEvaluationsSchema = z.function()
.args(z.object({
processId: z.string(),
from: z.coerce.number().optional(),
to: z.coerce.number().optional()
to: z.coerce.number().optional(),
cron: z.boolean().default(false)
}))
.returns(z.promise(z.array(evaluationSchema)))

Expand Down
2 changes: 1 addition & 1 deletion servers/cu/src/domain/readCronOutboxes.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export function readCronOutboxesWith (env) {

return ({ processId, from, to }) => {
return readState({ processId, to })
.chain(() => gatherCronMessages({ processId, from, to }))
.chain(() => gatherCronMessages({ processId, from, to, cron: true }))
.map(env.logger.tap(
'readCronMessages result for process %s from "%s" to "%s" and appended to ctx %j',
processId,
Expand Down

0 comments on commit 65b2ba9

Please sign in to comment.