Skip to content

Commit

Permalink
Merge pull request #1137 from permaweb/VinceJuliano/mu-top-up-1127
Browse files Browse the repository at this point in the history
Vince juliano/mu top up 1127
  • Loading branch information
VinceJuliano authored Feb 6, 2025
2 parents e2158c2 + eec57c7 commit d345cb0
Show file tree
Hide file tree
Showing 10 changed files with 367 additions and 38 deletions.
33 changes: 33 additions & 0 deletions servers/mu/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions servers/mu/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"debug": "^4.3.4",
"dotenv": "^16.3.1",
"express": "^4.18.2",
"http-message-signatures": "^1.0.4",
"hyper-async": "^1.1.2",
"lru-cache": "^10.2.0",
"node-cron": "^3.0.3",
Expand Down
21 changes: 18 additions & 3 deletions servers/mu/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ const positiveIntSchema = z.preprocess((val) => {
return typeof val === 'string' ? parseInt(val.replaceAll('_', '')) : -1
}, z.number().nonnegative())

const jsonObjectSchema = z.preprocess((val) => {
if (typeof val === 'string') {
try {
return JSON.parse(val);
} catch (error) {
return {}; // Default to an empty object if parsing fails
}
}
return val;
}, z.record(z.object({ url: z.string().url() })))


/**
* Some frameworks will implicitly override NODE_ENV
*
Expand Down Expand Up @@ -62,7 +74,8 @@ export const domainConfigSchema = z.object({
GET_RESULT_MAX_RETRIES: positiveIntSchema,
GET_RESULT_RETRY_DELAY: positiveIntSchema,
MESSAGE_RECOVERY_MAX_RETRIES: positiveIntSchema,
MESSAGE_RECOVERY_RETRY_DELAY: positiveIntSchema
MESSAGE_RECOVERY_RETRY_DELAY: positiveIntSchema,
RELAY_MAP: jsonObjectSchema
})

/**
Expand Down Expand Up @@ -112,7 +125,8 @@ const CONFIG_ENVS = {
GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5,
GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000,
MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 17,
MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000
MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000,
RELAY_MAP: process.env.RELAY_MAP || ''
},
production: {
MODE,
Expand All @@ -139,7 +153,8 @@ const CONFIG_ENVS = {
GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5,
GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000,
MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 17,
MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000
MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000,
RELAY_MAP: process.env.RELAY_MAP || ''
}
}

Expand Down
6 changes: 4 additions & 2 deletions servers/mu/src/domain/api/processMsg.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ export function processMsgWith ({
logger,
writeDataItemArweave,
isWallet,
fetchSchedulerProcess
fetchSchedulerProcess,
topUp,
RELAY_MAP
}) {
const buildTx = buildTxWith({ buildAndSign, logger, locateProcess, fetchSchedulerProcess, isWallet })
const writeMessage = writeMessageTxWith({ writeDataItem, logger, writeDataItemArweave })
const writeMessage = writeMessageTxWith({ writeDataItem, logger, writeDataItemArweave, RELAY_MAP, topUp })
const getCuAddress = getCuAddressWith({ selectNode, logger })
const pullResult = pullResultWith({ fetchResult, logger })

Expand Down
65 changes: 65 additions & 0 deletions servers/mu/src/domain/clients/relay.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { createPrivateKey, createHash} from 'node:crypto'
import { httpbis, createSigner } from 'http-message-signatures'

import { of, fromPromise } from 'hyper-async'

const { signMessage } = httpbis

function httpSigName (address) {
// Decode the base64 address
const decoded = Buffer.from(address, 'base64url');

// Get the first 8 bytes
const first8Bytes = decoded.subarray(1, 9);

// Convert to hexadecimal
const hexString = [...first8Bytes].map(byte => byte.toString(16).padStart(2, '0')).join('');

return `http-sig-${hexString}`;
}

export function topUpWith ({ fetch, logger, wallet, address }) {
const privateKey = createPrivateKey({ key: wallet, format: 'jwk' })
const s = createSigner(privateKey, 'rsa-pss-sha512', address)
const params = ['alg', 'keyid'].sort()

return async ({ logId, relayUrl, amount, recipient }) => {
let relayUrlObj = new URL(relayUrl)
const urlString = `${relayUrl}?amount=${amount}&recipient=${recipient}`

const request = {
url: new URL(urlString),
method: 'POST',
headers: {
'amount': `${amount}`,
'recipient': `${recipient}`,
'path': relayUrlObj.pathname,
}
}

const { method, headers } = await signMessage({
key: s,
fields: [
...Object.keys(request.headers)
].sort(),
name: httpSigName(address),
params
}, request)

return of()
.map(logger.tap({ log: `Forwarding message to RELAY ${urlString}`, logId }))
.chain(
fromPromise(() =>
fetch(
urlString, { method, headers }
).then(async (response) => {
return {
response,
request: { method, url: urlString, headers }
}
})
)
)
.toPromise()
}
}
59 changes: 59 additions & 0 deletions servers/mu/src/domain/clients/relay.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { createPrivateKey, createPublicKey } from 'node:crypto'
import { httpbis, createSigner, createVerifier } from 'http-message-signatures'
import Arweave from 'arweave'
import { describe, it } from 'node:test'

import { topUpWith } from './relay.js'

const { verifyMessage } = httpbis
const arweave = Arweave.init()
const verifiers = new Map()

let { wallet, address } = await arweave.wallets.generate()
.then((jwk) => arweave.wallets.getAddress(jwk)
.then(address => {
return {
address,
privateKey: createPrivateKey({ key: jwk, format: 'jwk' }),
publicKey: createPublicKey({ key: jwk, format: 'jwk' }),
jwk
};
})
)
.then(({ address, publicKey, jwk }) => {
const verifier = createVerifier(publicKey, 'rsa-pss-sha512')
verifiers.set(address, { verify: verifier })
return { wallet: jwk, address }
})

describe('topUpWith function', function () {
let fetch = async (_url, options) => ({ ok: true, status: 200, json: async () => ({ success: true }), ...options })
const logger = () => undefined
logger.tap = () => (args) => {
return args
}
let topUp = topUpWith({ fetch, logger, wallet, address })

it('should correctly sign and verify a request', async function () {
const params = {
logId: 'test-log',
relayUrl: 'https://relay1.example/path/topup',
amount: 100,
recipient: 'recipient-address'
}

const result = await topUp(params)

const { request } = result

const verified = await verifyMessage({
keyLookup: (params) => {
return verifiers.get(params.keyid)
}
}, request)

if (!verified) {
throw new Error('Signature verification failed')
}
})
})
2 changes: 1 addition & 1 deletion servers/mu/src/domain/clients/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ async function whileTrue (fn) {
}

const broadcastLogger = createBroadcastLogger({ namespace: 'mu-worker-broadcast', config })

export const domain = {
...(domainConfigSchema.parse(config)),
fetch,
logger: broadcastLogger
}


/**
* This program utilizes the business logic for
* processing results but since the worker is also
Expand Down
11 changes: 10 additions & 1 deletion servers/mu/src/domain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import warpArBundles from 'warp-arbundles'
import { connect as schedulerUtilsConnect } from '@permaweb/ao-scheduler-utils'
import { fromPromise } from 'hyper-async'
import workerpool from 'workerpool'
import Arweave from 'arweave'

import cuClient from './clients/cu.js'
import schedulerClient from './clients/scheduler.js'
Expand All @@ -18,6 +19,7 @@ import * as MetricsClient from './clients/metrics.js'
import * as SqliteClient from './clients/sqlite.js'
import cronClient, { deleteCronProcessWith, getCronProcessCursorWith, saveCronProcessWith, updateCronProcessCursorWith } from './clients/cron.js'
import { readTracesWith } from './clients/tracer.js'
import * as RelayClient from './clients/relay.js'

import { processMsgWith } from './api/processMsg.js'
import { processSpawnWith } from './api/processSpawn.js'
Expand Down Expand Up @@ -73,6 +75,8 @@ const errorStageGauge = MetricsClient.gaugeWith({})({
labelNames: ['stage', 'type']
})

const arweave = Arweave.init()

/**
* A set of apis used by the express server
* to send initial items and start the message
Expand Down Expand Up @@ -352,10 +356,13 @@ export const createResultApis = async (ctx) => {
const GRAPHQL_URL = ctx.GRAPHQL_URL
const ARWEAVE_URL = ctx.ARWEAVE_URL
const SPAWN_PUSH_ENABLED = ctx.SPAWN_PUSH_ENABLED
const RELAY_MAP = ctx.RELAY_MAP

const logger = ctx.logger
const fetch = ctx.fetch

const walletAddress = await arweave.wallets.getAddress(MU_WALLET)

const fetchWithCache = cuFetchWithCache({
fetch,
cache: muRedirectCache,
Expand Down Expand Up @@ -385,7 +392,9 @@ export const createResultApis = async (ctx) => {
buildAndSign: signerClient.buildAndSignWith({ MU_WALLET, logger: processMsgLogger }),
fetchResult: cuClient.resultWith({ fetch: fetchWithCache, histogram, CU_URL, logger: processMsgLogger }),
isWallet: gatewayClient.isWalletWith({ fetch, histogram, ARWEAVE_URL, logger: processMsgLogger, setById, getById }),
writeDataItemArweave: uploaderClient.uploadDataItemWith({ UPLOADER_URL, logger: processMsgLogger, fetch, histogram })
writeDataItemArweave: uploaderClient.uploadDataItemWith({ UPLOADER_URL, logger: processMsgLogger, fetch, histogram }),
RELAY_MAP,
topUp: RelayClient.topUpWith({ fetch, logger: processMsgLogger, wallet: MU_WALLET, address: walletAddress })
})

const processSpawnLogger = logger.child('processSpawn')
Expand Down
Loading

0 comments on commit d345cb0

Please sign in to comment.