diff --git a/servers/mu/package-lock.json b/servers/mu/package-lock.json index c8c6f8191..5e945fb0f 100644 --- a/servers/mu/package-lock.json +++ b/servers/mu/package-lock.json @@ -18,6 +18,7 @@ "debug": "^4.3.4", "dotenv": "^16.3.1", "express": "^4.18.2", + "http-message-signatures": "^1.0.4", "hyper-async": "^1.1.2", "lru-cache": "^10.2.0", "node-cron": "^3.0.3", @@ -1705,6 +1706,15 @@ "node": ">= 0.8" } }, + "node_modules/http-message-signatures": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/http-message-signatures/-/http-message-signatures-1.0.4.tgz", + "integrity": "sha512-gavCQWnxHFg0BVlKs6CmYK7hNSH1o0x0mHTC68yBAHYOYuTVXPv52mEE7QuT5TenfiagTdOa/zPJzen4lEX7Rg==", + "license": "ISC", + "dependencies": { + "structured-headers": "^1.0.1" + } + }, "node_modules/hyper-async": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/hyper-async/-/hyper-async-1.1.2.tgz", @@ -2582,6 +2592,16 @@ "node": ">=0.10.0" } }, + "node_modules/structured-headers": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/structured-headers/-/structured-headers-1.0.1.tgz", + "integrity": "sha512-QYBxdBtA4Tl5rFPuqmbmdrS9kbtren74RTJTcs0VSQNVV5iRhJD4QlYTLD0+81SBwUQctjEQzjTRI3WG4DzICA==", + "license": "MIT", + "engines": { + "node": ">= 14", + "npm": ">=6" + } + }, "node_modules/supports-color": { "version": "5.5.0", "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.5.0.tgz", @@ -4030,6 +4050,14 @@ "toidentifier": "1.0.1" } }, + "http-message-signatures": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/http-message-signatures/-/http-message-signatures-1.0.4.tgz", + "integrity": "sha512-gavCQWnxHFg0BVlKs6CmYK7hNSH1o0x0mHTC68yBAHYOYuTVXPv52mEE7QuT5TenfiagTdOa/zPJzen4lEX7Rg==", + "requires": { + "structured-headers": "^1.0.1" + } + }, "hyper-async": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/hyper-async/-/hyper-async-1.1.2.tgz", @@ -4647,6 +4675,11 @@ "resolved": "https://registry.npmjs.org/strip-json-comments/-/strip-json-comments-2.0.1.tgz", "integrity": "sha512-4gB8na07fecVVkOI6Rs4e7T6NOTki5EmL7TUduTs6bu3EdnSycntVJ4re8kgZA+wx9IueI2Y11bfbgwtzuE0KQ==" }, + "structured-headers": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/structured-headers/-/structured-headers-1.0.1.tgz", + "integrity": "sha512-QYBxdBtA4Tl5rFPuqmbmdrS9kbtren74RTJTcs0VSQNVV5iRhJD4QlYTLD0+81SBwUQctjEQzjTRI3WG4DzICA==" + }, "supports-color": { "version": "5.5.0", "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.5.0.tgz", diff --git a/servers/mu/package.json b/servers/mu/package.json index 7fc1fc537..a4a6b482e 100644 --- a/servers/mu/package.json +++ b/servers/mu/package.json @@ -28,6 +28,7 @@ "debug": "^4.3.4", "dotenv": "^16.3.1", "express": "^4.18.2", + "http-message-signatures": "^1.0.4", "hyper-async": "^1.1.2", "lru-cache": "^10.2.0", "node-cron": "^3.0.3", diff --git a/servers/mu/src/config.js b/servers/mu/src/config.js index 454cab4d6..72664609a 100644 --- a/servers/mu/src/config.js +++ b/servers/mu/src/config.js @@ -16,6 +16,18 @@ const positiveIntSchema = z.preprocess((val) => { return typeof val === 'string' ? parseInt(val.replaceAll('_', '')) : -1 }, z.number().nonnegative()) +const jsonObjectSchema = z.preprocess((val) => { + if (typeof val === 'string') { + try { + return JSON.parse(val); + } catch (error) { + return {}; // Default to an empty object if parsing fails + } + } + return val; +}, z.record(z.object({ url: z.string().url() }))) + + /** * Some frameworks will implicitly override NODE_ENV * @@ -62,7 +74,8 @@ export const domainConfigSchema = z.object({ GET_RESULT_MAX_RETRIES: positiveIntSchema, GET_RESULT_RETRY_DELAY: positiveIntSchema, MESSAGE_RECOVERY_MAX_RETRIES: positiveIntSchema, - MESSAGE_RECOVERY_RETRY_DELAY: positiveIntSchema + MESSAGE_RECOVERY_RETRY_DELAY: positiveIntSchema, + RELAY_MAP: jsonObjectSchema }) /** @@ -112,7 +125,8 @@ const CONFIG_ENVS = { GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5, GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000, MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 17, - MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000 + MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000, + RELAY_MAP: process.env.RELAY_MAP || '' }, production: { MODE, @@ -139,7 +153,8 @@ const CONFIG_ENVS = { GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5, GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000, MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 17, - MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000 + MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000, + RELAY_MAP: process.env.RELAY_MAP || '' } } diff --git a/servers/mu/src/domain/api/processMsg.js b/servers/mu/src/domain/api/processMsg.js index f2364f922..b62556c80 100644 --- a/servers/mu/src/domain/api/processMsg.js +++ b/servers/mu/src/domain/api/processMsg.js @@ -19,10 +19,12 @@ export function processMsgWith ({ logger, writeDataItemArweave, isWallet, - fetchSchedulerProcess + fetchSchedulerProcess, + topUp, + RELAY_MAP }) { const buildTx = buildTxWith({ buildAndSign, logger, locateProcess, fetchSchedulerProcess, isWallet }) - const writeMessage = writeMessageTxWith({ writeDataItem, logger, writeDataItemArweave }) + const writeMessage = writeMessageTxWith({ writeDataItem, logger, writeDataItemArweave, RELAY_MAP, topUp }) const getCuAddress = getCuAddressWith({ selectNode, logger }) const pullResult = pullResultWith({ fetchResult, logger }) diff --git a/servers/mu/src/domain/clients/relay.js b/servers/mu/src/domain/clients/relay.js new file mode 100644 index 000000000..cb1243b7a --- /dev/null +++ b/servers/mu/src/domain/clients/relay.js @@ -0,0 +1,65 @@ +import { createPrivateKey, createHash} from 'node:crypto' +import { httpbis, createSigner } from 'http-message-signatures' + +import { of, fromPromise } from 'hyper-async' + +const { signMessage } = httpbis + +function httpSigName (address) { + // Decode the base64 address + const decoded = Buffer.from(address, 'base64url'); + + // Get the first 8 bytes + const first8Bytes = decoded.subarray(1, 9); + + // Convert to hexadecimal + const hexString = [...first8Bytes].map(byte => byte.toString(16).padStart(2, '0')).join(''); + + return `http-sig-${hexString}`; +} + +export function topUpWith ({ fetch, logger, wallet, address }) { + const privateKey = createPrivateKey({ key: wallet, format: 'jwk' }) + const s = createSigner(privateKey, 'rsa-pss-sha512', address) + const params = ['alg', 'keyid'].sort() + + return async ({ logId, relayUrl, amount, recipient }) => { + let relayUrlObj = new URL(relayUrl) + const urlString = `${relayUrl}?amount=${amount}&recipient=${recipient}` + + const request = { + url: new URL(urlString), + method: 'POST', + headers: { + 'amount': `${amount}`, + 'recipient': `${recipient}`, + 'path': relayUrlObj.pathname, + } + } + + const { method, headers } = await signMessage({ + key: s, + fields: [ + ...Object.keys(request.headers) + ].sort(), + name: httpSigName(address), + params + }, request) + + return of() + .map(logger.tap({ log: `Forwarding message to RELAY ${urlString}`, logId })) + .chain( + fromPromise(() => + fetch( + urlString, { method, headers } + ).then(async (response) => { + return { + response, + request: { method, url: urlString, headers } + } + }) + ) + ) + .toPromise() + } +} \ No newline at end of file diff --git a/servers/mu/src/domain/clients/relay.test.js b/servers/mu/src/domain/clients/relay.test.js new file mode 100644 index 000000000..b11358420 --- /dev/null +++ b/servers/mu/src/domain/clients/relay.test.js @@ -0,0 +1,59 @@ +import { createPrivateKey, createPublicKey } from 'node:crypto' +import { httpbis, createSigner, createVerifier } from 'http-message-signatures' +import Arweave from 'arweave' +import { describe, it } from 'node:test' + +import { topUpWith } from './relay.js' + +const { verifyMessage } = httpbis +const arweave = Arweave.init() +const verifiers = new Map() + +let { wallet, address } = await arweave.wallets.generate() + .then((jwk) => arweave.wallets.getAddress(jwk) + .then(address => { + return { + address, + privateKey: createPrivateKey({ key: jwk, format: 'jwk' }), + publicKey: createPublicKey({ key: jwk, format: 'jwk' }), + jwk + }; + }) + ) + .then(({ address, publicKey, jwk }) => { + const verifier = createVerifier(publicKey, 'rsa-pss-sha512') + verifiers.set(address, { verify: verifier }) + return { wallet: jwk, address } + }) + +describe('topUpWith function', function () { + let fetch = async (_url, options) => ({ ok: true, status: 200, json: async () => ({ success: true }), ...options }) + const logger = () => undefined + logger.tap = () => (args) => { + return args + } + let topUp = topUpWith({ fetch, logger, wallet, address }) + + it('should correctly sign and verify a request', async function () { + const params = { + logId: 'test-log', + relayUrl: 'https://relay1.example/path/topup', + amount: 100, + recipient: 'recipient-address' + } + + const result = await topUp(params) + + const { request } = result + + const verified = await verifyMessage({ + keyLookup: (params) => { + return verifiers.get(params.keyid) + } + }, request) + + if (!verified) { + throw new Error('Signature verification failed') + } + }) +}) diff --git a/servers/mu/src/domain/clients/worker.js b/servers/mu/src/domain/clients/worker.js index f38fc2d39..108cafb9b 100644 --- a/servers/mu/src/domain/clients/worker.js +++ b/servers/mu/src/domain/clients/worker.js @@ -46,13 +46,13 @@ async function whileTrue (fn) { } const broadcastLogger = createBroadcastLogger({ namespace: 'mu-worker-broadcast', config }) - export const domain = { ...(domainConfigSchema.parse(config)), fetch, logger: broadcastLogger } + /** * This program utilizes the business logic for * processing results but since the worker is also diff --git a/servers/mu/src/domain/index.js b/servers/mu/src/domain/index.js index 0aa2c1854..8aabba1eb 100644 --- a/servers/mu/src/domain/index.js +++ b/servers/mu/src/domain/index.js @@ -7,6 +7,7 @@ import warpArBundles from 'warp-arbundles' import { connect as schedulerUtilsConnect } from '@permaweb/ao-scheduler-utils' import { fromPromise } from 'hyper-async' import workerpool from 'workerpool' +import Arweave from 'arweave' import cuClient from './clients/cu.js' import schedulerClient from './clients/scheduler.js' @@ -18,6 +19,7 @@ import * as MetricsClient from './clients/metrics.js' import * as SqliteClient from './clients/sqlite.js' import cronClient, { deleteCronProcessWith, getCronProcessCursorWith, saveCronProcessWith, updateCronProcessCursorWith } from './clients/cron.js' import { readTracesWith } from './clients/tracer.js' +import * as RelayClient from './clients/relay.js' import { processMsgWith } from './api/processMsg.js' import { processSpawnWith } from './api/processSpawn.js' @@ -73,6 +75,8 @@ const errorStageGauge = MetricsClient.gaugeWith({})({ labelNames: ['stage', 'type'] }) +const arweave = Arweave.init() + /** * A set of apis used by the express server * to send initial items and start the message @@ -352,10 +356,13 @@ export const createResultApis = async (ctx) => { const GRAPHQL_URL = ctx.GRAPHQL_URL const ARWEAVE_URL = ctx.ARWEAVE_URL const SPAWN_PUSH_ENABLED = ctx.SPAWN_PUSH_ENABLED + const RELAY_MAP = ctx.RELAY_MAP const logger = ctx.logger const fetch = ctx.fetch + const walletAddress = await arweave.wallets.getAddress(MU_WALLET) + const fetchWithCache = cuFetchWithCache({ fetch, cache: muRedirectCache, @@ -385,7 +392,9 @@ export const createResultApis = async (ctx) => { buildAndSign: signerClient.buildAndSignWith({ MU_WALLET, logger: processMsgLogger }), fetchResult: cuClient.resultWith({ fetch: fetchWithCache, histogram, CU_URL, logger: processMsgLogger }), isWallet: gatewayClient.isWalletWith({ fetch, histogram, ARWEAVE_URL, logger: processMsgLogger, setById, getById }), - writeDataItemArweave: uploaderClient.uploadDataItemWith({ UPLOADER_URL, logger: processMsgLogger, fetch, histogram }) + writeDataItemArweave: uploaderClient.uploadDataItemWith({ UPLOADER_URL, logger: processMsgLogger, fetch, histogram }), + RELAY_MAP, + topUp: RelayClient.topUpWith({ fetch, logger: processMsgLogger, wallet: MU_WALLET, address: walletAddress }) }) const processSpawnLogger = logger.child('processSpawn') diff --git a/servers/mu/src/domain/lib/write-message-tx.js b/servers/mu/src/domain/lib/write-message-tx.js index 376ce7ea4..23b395f25 100644 --- a/servers/mu/src/domain/lib/write-message-tx.js +++ b/servers/mu/src/domain/lib/write-message-tx.js @@ -1,5 +1,5 @@ -import { Resolved, fromPromise } from 'hyper-async' -import { __, assoc } from 'ramda' +import { Rejected, Resolved, fromPromise, of } from 'hyper-async' +import { __, assoc, T } from 'ramda' import z from 'zod' import { checkStage, setStage } from '../utils.js' import { uploadDataItemSchema, writeDataItemSchema } from '../dal.js' @@ -19,10 +19,11 @@ const ctxSchemaArweave = z.object({ }).passthrough() export function writeMessageTxWith (env) { - let { logger, writeDataItem, writeDataItemArweave } = env + let { logger, writeDataItem, writeDataItemArweave, topUp, RELAY_MAP } = env writeDataItem = fromPromise(writeDataItemSchema.implement(writeDataItem)) writeDataItemArweave = fromPromise(uploadDataItemSchema.implement(writeDataItemArweave)) + topUp = fromPromise(topUp) return (ctx) => { /* @@ -31,34 +32,59 @@ export function writeMessageTxWith (env) { */ if (!checkStage('write-message')(ctx)) return Resolved(ctx) - if (ctx.schedLocation) { - ctx = setStage('write-message', 'write-message-su')(ctx) - if (!checkStage('write-message-su')(ctx)) return Resolved(ctx) + return of() + .chain(() => { + if(RELAY_MAP && Object.keys(RELAY_MAP).includes(ctx.tx.processId)) { + if(ctx.cachedMsg?.msg?.Tags?.find((t) => t.name === 'Action' && t.value === 'Credit-Notice')) { + let sender = ctx.cachedMsg?.msg?.Tags?.find((t) => t.name === 'Sender')?.value; + let amount = ctx.cachedMsg?.msg?.Tags?.find((t) => t.name === 'Quantity')?.value; - return writeDataItem({ suUrl: ctx.schedLocation.url, data: ctx.tx.data.toString('base64'), logId: ctx.logId }) - .map(assoc('schedulerTx', __, ctx)) - .map(ctxSchema.parse) - .bimap( - (e) => { - return new Error(e, { cause: { ...ctx, stage: 'write-message' } }) - }, - logger.tap({ log: 'Added schedulerTx to ctx' }) - ) - } else { - ctx = setStage('write-message', 'write-message-arweave')(ctx) - - if (!checkStage('write-message-arweave')(ctx)) return Resolved(ctx) - - return writeDataItemArweave(ctx.tx.data) - .map(assoc('arweaveTx', __, ctx)) - .map(ctxSchemaArweave.parse) - .bimap( - (e) => { - return new Error(e, { cause: { ...ctx, stage: 'write-message' } }) - }, - logger.tap({ log: 'Added "arweaveTx" to ctx' }) - ) - } + if(!amount || !sender) { + return Rejected(new Error('Must set Sender and Quantity to top up.', { cause: ctx })) + } + + return topUp({ctx, relayUrl: RELAY_MAP[ctx.tx.processId].url, amount, recipient: sender}) + .bimap( + (e) => { + return new Error(e, { cause: { ...ctx, stage: 'write-message' } }) + }, + logger.tap({ log: `Topped up relay id ${ctx.tx.processId}` }) + ) + } + } + return Resolved() + }) + .chain(() => { + if (ctx.schedLocation) { + ctx = setStage('write-message', 'write-message-su')(ctx) + + if (!checkStage('write-message-su')(ctx)) return Resolved(ctx) + + return writeDataItem({ suUrl: ctx.schedLocation.url, data: ctx.tx.data.toString('base64'), logId: ctx.logId }) + .map(assoc('schedulerTx', __, ctx)) + .map(ctxSchema.parse) + .bimap( + (e) => { + return new Error(e, { cause: { ...ctx, stage: 'write-message' } }) + }, + logger.tap({ log: 'Added schedulerTx to ctx' }) + ) + } else { + ctx = setStage('write-message', 'write-message-arweave')(ctx) + + if (!checkStage('write-message-arweave')(ctx)) return Resolved(ctx) + + return writeDataItemArweave(ctx.tx.data) + .map(assoc('arweaveTx', __, ctx)) + .map(ctxSchemaArweave.parse) + .bimap( + (e) => { + return new Error(e, { cause: { ...ctx, stage: 'write-message' } }) + }, + logger.tap({ log: 'Added "arweaveTx" to ctx' }) + ) + } + }) } -} +} \ No newline at end of file diff --git a/servers/mu/src/domain/lib/write-message-tx.test.js b/servers/mu/src/domain/lib/write-message-tx.test.js index 682f3a9d8..3480e144f 100644 --- a/servers/mu/src/domain/lib/write-message-tx.test.js +++ b/servers/mu/src/domain/lib/write-message-tx.test.js @@ -77,4 +77,123 @@ describe('writeMessageTxWith', () => { timestamp: 1234567 }) }) + + test('top up a process from a relay', async () => { + const writeMessageTx = writeMessageTxWith({ + writeDataItem: async ({ data }) => { + assert.equal(data, 'raw-123') + + return { + id: 'id-3', + timestamp: 1234567 + } + }, + logger, + writeDataItemArweave: async (buffer) => { + return { + id: 'arweave-id-3', + timestamp: 1234567 + } + }, + RELAY_MAP: { + 'mappedid1': { 'url': 'url1' }, + 'mappedid2': { 'url': 'url1' } + }, + topUp: async (ctx) => { + assert.equal(ctx.relayUrl, 'url1') + return {} + }, + }) + + const result = await writeMessageTx({ + tx: { + processId: 'mappedid1', + id: 'id-2', + data: 'raw-123' + }, + schedLocation: { + url: 'https://foo.bar' + }, + cachedMsg: { + msg: { + Tags: [{ + name: 'Action', + value: 'Credit-Notice' + }, + { + name: 'Sender', + value: 's1' + }, + { + name: 'Quantity', + value: '5000' + }] + } + } + }).toPromise() + + assert.deepStrictEqual(result.schedulerTx, { + id: 'id-3', + timestamp: 1234567 + }) + }) + + test('top up a process from a relay', async () => { + const writeMessageTx = writeMessageTxWith({ + writeDataItem: async ({ data }) => { + assert.equal(data, 'raw-123') + + return { + id: 'id-3', + timestamp: 1234567 + } + }, + logger, + writeDataItemArweave: async (buffer) => { + return { + id: 'arweave-id-3', + timestamp: 1234567 + } + }, + RELAY_MAP: { + 'mappedid1': { 'url': 'url1' }, + 'mappedid2': { 'url': 'url1' } + }, + topUp: async () => { + assert.fail('Should not top up this process') + }, + }) + + const result = await writeMessageTx({ + tx: { + processId: 'nonmappedid1', + id: 'id-2', + data: 'raw-123' + }, + schedLocation: { + url: 'https://foo.bar' + }, + cachedMsg: { + msg: { + Tags: [{ + name: 'Action', + value: 'Credit-Notice' + }, + { + name: 'Sender', + value: 's1' + }, + { + name: 'Quantity', + value: '5000' + }] + } + } + }).toPromise() + + assert.deepStrictEqual(result.schedulerTx, { + id: 'id-3', + timestamp: 1234567 + }) + }) })