Skip to content

Commit

Permalink
feat(mu): inital code for top up #1127
Browse files Browse the repository at this point in the history
  • Loading branch information
VinceJuliano committed Feb 3, 2025
1 parent b3e392e commit 352b88d
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 18 deletions.
20 changes: 17 additions & 3 deletions servers/mu/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ const positiveIntSchema = z.preprocess((val) => {
return typeof val === 'string' ? parseInt(val.replaceAll('_', '')) : -1
}, z.number().nonnegative())

const csvArraySchema = z.preprocess((val) => {
if (typeof val === 'string') {
return val.split(',').map(str => str.trim()).filter(Boolean);
}
return [];
}, z.array(z.string()))


/**
* Some frameworks will implicitly override NODE_ENV
*
Expand Down Expand Up @@ -62,7 +70,9 @@ export const domainConfigSchema = z.object({
GET_RESULT_MAX_RETRIES: positiveIntSchema,
GET_RESULT_RETRY_DELAY: positiveIntSchema,
MESSAGE_RECOVERY_MAX_RETRIES: positiveIntSchema,
MESSAGE_RECOVERY_RETRY_DELAY: positiveIntSchema
MESSAGE_RECOVERY_RETRY_DELAY: positiveIntSchema,
RELAY_PROCESSES: csvArraySchema,
RELAY_URL: z.string()
})

/**
Expand Down Expand Up @@ -112,7 +122,9 @@ const CONFIG_ENVS = {
GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5,
GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000,
MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 17,
MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000
MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000,
RELAY_PROCESSES: process.env.RELAY_PROCESSES || '',
RELAY_URL: process.env.RELAY_URL || ''
},
production: {
MODE,
Expand All @@ -139,7 +151,9 @@ const CONFIG_ENVS = {
GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5,
GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000,
MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 17,
MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000
MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000,
RELAY_PROCESSES: process.env.RELAY_PROCESSES || '',
RELAY_URL: process.env.RELAY_URL || ''
}
}

Expand Down
6 changes: 4 additions & 2 deletions servers/mu/src/domain/api/processMsg.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ export function processMsgWith ({
logger,
writeDataItemArweave,
isWallet,
fetchSchedulerProcess
fetchSchedulerProcess,
topUp,
RELAY_PROCESSES
}) {
const buildTx = buildTxWith({ buildAndSign, logger, locateProcess, fetchSchedulerProcess, isWallet })
const writeMessage = writeMessageTxWith({ writeDataItem, logger, writeDataItemArweave })
const writeMessage = writeMessageTxWith({ writeDataItem, logger, writeDataItemArweave, RELAY_PROCESSES, topUp })
const getCuAddress = getCuAddressWith({ selectNode, logger })
const pullResult = pullResultWith({ fetchResult, logger })

Expand Down
24 changes: 24 additions & 0 deletions servers/mu/src/domain/clients/relay.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { of, fromPromise, Rejected } from 'hyper-async'

export function topUpWith ({ fetch, logger, RELAY_URL }) {
return async ({ ctx, logId }) => {
return of(ctx)
.map(logger.tap({ log: `Forwarding message to RELAY ${RELAY_URL}`, logId }))
// .chain(
// fromPromise((body) =>
// fetch(RELAY_URL, {
// method: 'POST',
// headers: {
// 'Content-Type': 'application/octet-stream',
// Accept: 'application/json'
// },
// redirect: 'manual',
// body
// }).then(async (response) => {
// return response
// })
// )
// )
.toPromise()
}
}
7 changes: 6 additions & 1 deletion servers/mu/src/domain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import * as MetricsClient from './clients/metrics.js'
import * as SqliteClient from './clients/sqlite.js'
import cronClient, { deleteCronProcessWith, getCronProcessCursorWith, saveCronProcessWith, updateCronProcessCursorWith } from './clients/cron.js'
import { readTracesWith } from './clients/tracer.js'
import * as RelayClient from './clients/relay.js'

import { processMsgWith } from './api/processMsg.js'
import { processSpawnWith } from './api/processSpawn.js'
Expand Down Expand Up @@ -352,6 +353,8 @@ export const createResultApis = async (ctx) => {
const GRAPHQL_URL = ctx.GRAPHQL_URL
const ARWEAVE_URL = ctx.ARWEAVE_URL
const SPAWN_PUSH_ENABLED = ctx.SPAWN_PUSH_ENABLED
const RELAY_PROCESSES = ctx.RELAY_PROCESSES
const RELAY_URL = ctx.RELAY_URL

const logger = ctx.logger
const fetch = ctx.fetch
Expand Down Expand Up @@ -385,7 +388,9 @@ export const createResultApis = async (ctx) => {
buildAndSign: signerClient.buildAndSignWith({ MU_WALLET, logger: processMsgLogger }),
fetchResult: cuClient.resultWith({ fetch: fetchWithCache, histogram, CU_URL, logger: processMsgLogger }),
isWallet: gatewayClient.isWalletWith({ fetch, histogram, ARWEAVE_URL, logger: processMsgLogger, setById, getById }),
writeDataItemArweave: uploaderClient.uploadDataItemWith({ UPLOADER_URL, logger: processMsgLogger, fetch, histogram })
writeDataItemArweave: uploaderClient.uploadDataItemWith({ UPLOADER_URL, logger: processMsgLogger, fetch, histogram }),
RELAY_PROCESSES,
topUp: RelayClient.topUpWith({ RELAY_URL })
})

const processSpawnLogger = logger.child('processSpawn')
Expand Down
41 changes: 29 additions & 12 deletions servers/mu/src/domain/lib/write-message-tx.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Resolved, fromPromise } from 'hyper-async'
import { __, assoc } from 'ramda'
import { Resolved, fromPromise, of } from 'hyper-async'
import { __, assoc, T } from 'ramda'
import z from 'zod'
import { checkStage, setStage } from '../utils.js'
import { uploadDataItemSchema, writeDataItemSchema } from '../dal.js'
Expand All @@ -19,7 +19,7 @@ const ctxSchemaArweave = z.object({
}).passthrough()

export function writeMessageTxWith (env) {
let { logger, writeDataItem, writeDataItemArweave } = env
let { logger, writeDataItem, writeDataItemArweave, topUp, RELAY_PROCESSES } = env

writeDataItem = fromPromise(writeDataItemSchema.implement(writeDataItem))
writeDataItemArweave = fromPromise(uploadDataItemSchema.implement(writeDataItemArweave))
Expand All @@ -36,15 +36,32 @@ export function writeMessageTxWith (env) {

if (!checkStage('write-message-su')(ctx)) return Resolved(ctx)

return writeDataItem({ suUrl: ctx.schedLocation.url, data: ctx.tx.data.toString('base64'), logId: ctx.logId })
.map(assoc('schedulerTx', __, ctx))
.map(ctxSchema.parse)
.bimap(
(e) => {
return new Error(e, { cause: { ...ctx, stage: 'write-message' } })
},
logger.tap({ log: 'Added schedulerTx to ctx' })
)
return of()
.chain(() => {
if(RELAY_PROCESSES && RELAY_PROCESSES.includes(ctx.tx.processId)) {
if(ctx.cachedMsg.msg.Tags?.find((t) => t.name === 'Action' && t.value === 'Credit-Notice')) {
return topUp(ctx)
.bimap(
(e) => {
return new Error(e, { cause: { ...ctx, stage: 'write-message' } })
},
logger.tap({ log: 'Topped up relay process' })
)
}
}
return Resolved()
})
.chain((_) => {
return writeDataItem({ suUrl: ctx.schedLocation.url, data: ctx.tx.data.toString('base64'), logId: ctx.logId })
.map(assoc('schedulerTx', __, ctx))
.map(ctxSchema.parse)
.bimap(
(e) => {
return new Error(e, { cause: { ...ctx, stage: 'write-message' } })
},
logger.tap({ log: 'Added schedulerTx to ctx' })
)
})
} else {
ctx = setStage('write-message', 'write-message-arweave')(ctx)

Expand Down

0 comments on commit 352b88d

Please sign in to comment.