Skip to content

Commit

Permalink
feat(ur): add redirect strategy #539
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelBuhler committed Mar 18, 2024
1 parent de0d709 commit b1fe54f
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 156 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.DS_Store
wallet.json
dist
.VSCodeCounter
Expand Down
1 change: 1 addition & 0 deletions servers/ur/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ NODE_CONFIG_ENV="development"
DEBUG=*
HOSTS="https://foo.bar,https://fizz.buzz"
AO_UNIT=cu
STRATEGY=proxy
15 changes: 1 addition & 14 deletions servers/ur/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ This service will deterministically route `ao` Process operations to an underlyi
- [Tests](#tests)
- [Debug Logging](#debug-logging)
- [Project Structure](#project-structure)
- [Entrypoint](#entrypoint)
- [Routing](#routing)
- [System Requirements](#system-requirements)

<!-- tocstop -->
Expand All @@ -32,6 +30,7 @@ There are a few environment variables that you MUST set:
- `NODE_CONFIG_ENV`: whether the service should be ran in `development` or `production` mode. Basically, this loads a separate set of default configuration.
- `AO_UNIT`: which `ao` Unit, either `cu` or `mu`, this Reverse Proxy Service is meant to mirror.
- `HOSTS`: a comma-delimited string containing all of the underlying hosts that can Reverse Proxied to. If `AO_UNIT` is `cu`, then `HOSTS` should be a series of `ao` Compute Unit Hosts. Similarly if `AO_UNIT` is `mu` then `HOSTS` should be a series of `ao` Messenger Unit Hosts
- `STRATEGY`: either `redirect` or `proxy` (default). If `STRATEGY` is `redirect`, the service will reply with an [HTTP 307 Temporary Redirect](https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/307) to the underlying `ao` unit. If `STRATEGY` is `proxy`, the service will act as a reverse proxy to the underlying `ao` unit and forward the HTTP request itself.

> In order for the Router's Reverse Proxying to be consistent, the ordering of the `HOST` list MUST be consistent.
Expand All @@ -50,18 +49,6 @@ All logging is scoped under the name `ao-router*`.

This `ao` Unit Router project is simple service, with minimal business logic.

The total service boilerplate is less than ~200 lines while the business logic is less than ~10 lines.

The ONLY custom code that's specific to the `ao` Units exists in the [entrypoint](./src/app.js), where the specific endpoints to Reverse Proxy are specified.

### Entrypoint

The entrypoint is `src/app.js` where the ONLY custom configuration is specified. Here, we mirror the `ao` Unit API for both the CU and MU, and set up the Reverse Proxying handlers.

### Routing

All routing logic can be found in `src/router.js`

## System Requirements

The `ao` Unit Router Server is containerized stateless application, and can be deployed to any containerized environment using its `Dockerfile`. It will also need some way to receive secrets injected from it's environment ie. some sort of Parameter Store. See [Environment Variables](#environment-variables).
Expand Down
113 changes: 36 additions & 77 deletions servers/ur/src/app.js
Original file line number Diff line number Diff line change
@@ -1,84 +1,43 @@
import { Readable } from 'node:stream'
import express from 'express'
import WarpArBundles from 'warp-arbundles'
import { join } from 'node:path'

import { router } from './router.js'
import cors from 'cors'
import express from 'express'
import heapdump from 'heapdump'
import { pipe } from 'ramda'

const { DataItem } = WarpArBundles
import { config } from './config.js'
import { logger } from './logger.js'

/**
* The ONLY custom bits needed for the router.
*
* Mount any custom endpoints, optionally reverse-proxying to the set of
* underyling hosts with automatic failover, by using the injected revProxy handler
*/
import { proxyWith } from './proxy.js'
import { redirectWith } from './redirect.js'

/**
* The Reverse Proxy Configuration for an ao Compute Unit Router
*/
function aoComputeUnitMount ({ app, revProxy }) {
app.get('/', revProxy({ processIdFromRequest: (req) => 'process' }))
app.get('/result/:messageTxId', revProxy({ processIdFromRequest: (req) => req.query['process-id'] }))
app.get('/results/:processId', revProxy({ processIdFromRequest: (req) => req.params.processId }))
app.get('/state/:processId', revProxy({ processIdFromRequest: (req) => req.params.processId }))
app.get('/cron/:processId', revProxy({ processIdFromRequest: (req) => req.params.processId }))
app.post('/dry-run', revProxy({ processIdFromRequest: (req) => req.query['process-id'] }))
const middlewareWithByStrategy = {
proxy: proxyWith,
redirect: redirectWith
}

/**
* The Reverse Proxy Configuration for an ao Messenger Unit Router
*/
function aoMessengerUnitMount ({ app, revProxy }) {
class InvalidDataItemError extends Error {
constructor (...args) {
super(...args)
this.status = 422
}
const middlewareWith = middlewareWithByStrategy[config.strategy]

pipe(
(app) => app.use(cors()),
(app) => app.use(express.static(config.DUMP_PATH)),
middlewareWith({ ...config }),
(app) => {
const server = app.listen(config.port, () => {
logger(`Server is running on http://localhost:${config.port}`)
})

process.on('SIGTERM', () => {
logger('Recevied SIGTERM. Gracefully shutting down server...')
server.close(() => logger('Server Shut Down'))
})

process.on('SIGUSR2', () => {
const name = `${Date.now()}.heapsnapshot`
heapdump.writeSnapshot(join(config.DUMP_PATH, name))
console.log(name)
})

return server
}

const isTagEqualTo = ({ name, value }) => (tag) => tag.name === name && tag.value === value
const isMessage = (dataItem) => !!dataItem.tags.find(isTagEqualTo({ name: 'Type', value: 'Message' }))
const isProcess = (dataItem) => !!dataItem.tags.find(isTagEqualTo({ name: 'Type', value: 'Process' }))

app.get('/', (req, res, next) => {
if (req.query.debug) return res.status(501).send('MU Tracing not implemented on the Router')

/**
* Continue the request, rev proxying with a static value in order to get the roout info response from a MU
*/
return revProxy({ processIdFromRequest: () => 'process' })(req, res, next)
})

/**
* Since the MU receives opaque data items, we have to unpack it, to know which MU
* to route to
*/
app.post('/', express.raw({ type: 'application/octet-stream', limit: '10mb' }), revProxy({
processIdFromRequest: async (req) => {
const dataItem = new DataItem(Buffer.from(req.body))

if (!(await dataItem.isValid())) throw new InvalidDataItemError('A valid and signed data item must be provided as the body')
/**
* The processId is the target on a message data item
*/
if (isMessage(dataItem)) return dataItem.target
/**
* The processId is the dataItem itseld on a process data item
*/
if (isProcess(dataItem)) return dataItem.id

throw new InvalidDataItemError('Could not determine ao type of DataItem based on tag \'Type\'')
},
/**
* Since we consumed the request stream in order to parse the data item and
* determine the processId, we must provide a new request stream, to be sent
* as the body on proxied request
*/
restreamBody: (req) => Readable.from(req.body)
}))

app.post('/monitor/:processId', revProxy({ processIdFromRequest: (req) => req.params.processId }))
app.delete('/monitor/:processId', revProxy({ processIdFromRequest: (req) => req.params.processId }))
}

router({ cu: aoComputeUnitMount, mu: aoMessengerUnitMount })
)(express())
9 changes: 6 additions & 3 deletions servers/ur/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ const serverConfigSchema = z.object({
z.array(z.string().url())
),
DUMP_PATH: z.string().min(1),
aoUnit: z.enum(['cu', 'mu'])
aoUnit: z.enum(['cu', 'mu']),
strategy: z.enum(['proxy', 'redirect'])
})

/**
Expand All @@ -52,14 +53,16 @@ const CONFIG_ENVS = {
*
* but should consider setting explicitly in your .env
*/
aoUnit: process.env.AO_UNIT || 'cu'
aoUnit: process.env.AO_UNIT || 'cu',
strategy: process.env.STRATEGY || 'proxy'
},
production: {
MODE,
port: process.env.PORT || 3005,
hosts: process.env.HOSTS,
DUMP_PATH: process.env.DUMP_PATH || tmpdir(),
aoUnit: process.env.AO_UNIT
aoUnit: process.env.AO_UNIT,
strategy: process.env.STRATEGY || 'proxy'
}
}

Expand Down
33 changes: 23 additions & 10 deletions servers/ur/src/domain.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { LRUCache } from 'lru-cache'

/**
* The pure business logic.
*
Expand All @@ -7,23 +9,34 @@
* If the failoverAttempt exceeds the length of valid hosts list, then every host has
* been attempted, and so return undefined, to be handled upstream
*/
export function determineHostWith ({ hosts = [], cache }) {
function stringToUniqueId (str) {
return str.split('').reduce((acc, char) => acc + char.charCodeAt(0), 0)
}
export function determineHostWith ({ hosts = [] }) {
const cache = new LRUCache({
/**
* 10MB
*/
maxSize: 10_000_000,
/**
* A number is 8 bytes
*/
sizeCalculation: () => 8
})

return ({ processId, failoverAttempt }) => {
return ({ processId, failoverAttempt = 0 }) => {
if (failoverAttempt >= hosts.length) return

/**
* Check cache, and hydrate if necessary
*/
let uniqueId = cache.get(processId)
if (!uniqueId) {
uniqueId = stringToUniqueId(processId)
cache.set(processId, uniqueId)
let hashSum = cache.get(processId)
if (!hashSum) {
hashSum = computeHashSumFromProcessId(processId)
cache.set(processId, hashSum)
}

return hosts[(uniqueId + failoverAttempt) % hosts.length]
return hosts[(hashSum + failoverAttempt) % hosts.length]
}
}

function computeHashSumFromProcessId (processId) {
return processId.split('').reduce((acc, char) => acc + char.charCodeAt(0), 0)
}
68 changes: 16 additions & 52 deletions servers/ur/src/router.js → servers/ur/src/proxy.js
Original file line number Diff line number Diff line change
@@ -1,40 +1,26 @@
import { join } from 'node:path'
import { always, compose, pipe } from 'ramda'
import heapdump from 'heapdump'
import express from 'express'
import cors from 'cors'
import { always, compose } from 'ramda'
/**
* See https://github.com/http-party/node-http-proxy/pull/1559
* the PR that fixes the memory was not merged, so a fork
* was created with the fix
*/
import httpProxy from 'http-proxy-node16'
import { LRUCache } from 'lru-cache'

/**
* TODO: we could inject these, but just keeping simple for now
*/
import { logger } from './logger.js'
import { config } from './config.js'
import { determineHostWith } from './domain.js'
import { logger } from './logger.js'

function withRevProxies ({ aoUnitConfig, hosts, maxSize = 1_000_000 * 10 }) {
const proxy = httpProxy.createProxyServer({})
const cache = new LRUCache({
/**
* Defaulted to 10MB above
*/
maxSize,
/**
* A number is 8 bytes
*/
sizeCalculation: () => 8
})
import { mountRoutesWithByAoUnit } from './routes/byAoUnit.js'

logger('Configuring to reverse proxy ao %s units...', config.aoUnit)
export function proxyWith ({ aoUnit, hosts }) {
const _logger = logger.child('proxy')
_logger('Configuring to reverse proxy ao %s units...', aoUnit)

const determineHost = determineHostWith({ hosts, cache })
const mount = aoUnitConfig[config.aoUnit]
const proxy = httpProxy.createProxyServer({})

const determineHost = determineHostWith({ hosts })

async function trampoline (init) {
let result = init
Expand All @@ -60,7 +46,7 @@ function withRevProxies ({ aoUnitConfig, hosts, maxSize = 1_000_000 * 10 }) {
return Promise.resolve()
.then(() => handler(req, res))
.catch((err) => {
logger(err)
_logger(err)
if (res.writableEnded) return
return res.status(err.status || 500).send(err || 'Internal Server Error')
})
Expand Down Expand Up @@ -99,11 +85,11 @@ function withRevProxies ({ aoUnitConfig, hosts, maxSize = 1_000_000 * 10 }) {
* There are no more hosts to failover to -- we've tried them all
*/
if (!host) {
logger('Exhausted all failover attempts for process %s. Bubbling final error', processId, err)
_logger('Exhausted all failover attempts for process %s. Bubbling final error', processId, err)
return reject(err)
}

logger('Reverse Proxying process %s to host %s', processId, host)
_logger('Reverse Proxying process %s to host %s', processId, host)
/**
* Reverse proxy the request to the underlying selected host.
* If an error occurs, return the next iteration for our trampoline to invoke.
Expand All @@ -118,7 +104,7 @@ function withRevProxies ({ aoUnitConfig, hosts, maxSize = 1_000_000 * 10 }) {
* Return the thunk for our next iteration, incrementing our failoverAttempt,
* so the next host in the list will be used
*/
logger('Error occurred for host %s and process %s', host, processId, err)
_logger('Error occurred for host %s and process %s', host, processId, err)
return resolve(() => revProxy({ failoverAttempt: failoverAttempt + 1, err }))
})
})
Expand All @@ -135,32 +121,10 @@ function withRevProxies ({ aoUnitConfig, hosts, maxSize = 1_000_000 * 10 }) {
)()
}

const mountRoutesWith = mountRoutesWithByAoUnit[aoUnit]

return (app) => {
mount({ app, revProxy: withRevProxyHandler })
mountRoutesWith({ app, middleware: withRevProxyHandler })
return app
}
}

export const router = (aoUnitConfig) => pipe(
(app) => app.use(cors()),
(app) => app.use(express.static(config.DUMP_PATH)),
withRevProxies({ ...config, aoUnitConfig }),
(app) => {
const server = app.listen(config.port, () => {
logger(`Server is running on http://localhost:${config.port}`)
})

process.on('SIGTERM', () => {
logger('Recevied SIGTERM. Gracefully shutting down server...')
server.close(() => logger('Server Shut Down'))
})

process.on('SIGUSR2', () => {
const name = `${Date.now()}.heapsnapshot`
heapdump.writeSnapshot(join(config.DUMP_PATH, name))
console.log(name)
})

return server
}
)(express())
Loading

0 comments on commit b1fe54f

Please sign in to comment.