Skip to content

Commit

Permalink
feat(ur): add subrouting logic #697
Browse files Browse the repository at this point in the history
  • Loading branch information
VinceJuliano committed May 12, 2024
1 parent edd1341 commit 66823fd
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 24 deletions.
5 changes: 5 additions & 0 deletions servers/ur/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ There are a few environment variables that you MUST set:
- `HOSTS`: a comma-delimited string containing all of the underlying hosts that can Reverse Proxied to. If `AO_UNIT` is `cu`, then `HOSTS` should be a series of `ao` Compute Unit Hosts. Similarly if `AO_UNIT` is `mu` then `HOSTS` should be a series of `ao` Messenger Unit Hosts
- `STRATEGY`: either `redirect` or `proxy` (default). If `STRATEGY` is `redirect`, the service will reply with an [HTTP 307 Temporary Redirect](https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/307) to the underlying `ao` unit. If `STRATEGY` is `proxy`, the service will act as a reverse proxy to the underlying `ao` unit and forward the HTTP request itself.

The below environment variables are optional. All 3 must be set for subrouting to work.
- `SUBROUTER_URL` the underlying router to route to
- `SUR_URL` the SU-R url to use to check process owners for subrouter redirection
- `OWNERS` a list of owners to redirect to a subrouter

> In order for the Router's Reverse Proxying to be consistent, the ordering of the `HOST` list MUST be consistent.
## Tests
Expand Down
18 changes: 15 additions & 3 deletions servers/ur/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ const serverConfigSchema = z.object({
),
DUMP_PATH: z.string().min(1),
aoUnit: z.enum(['cu', 'mu']),
strategy: z.enum(['proxy', 'redirect'])
strategy: z.enum(['proxy', 'redirect']),
subrouterUrl: z.string().nullable().optional(),
surUrl: z.string().nullable().optional(),
owners: z.preprocess(
(arg) => (typeof arg === 'string' ? arg.split(',').map(str => str.trim()) : arg),
z.array(z.string())
).nullable().optional()
})

/**
Expand All @@ -54,15 +60,21 @@ const CONFIG_ENVS = {
* but should consider setting explicitly in your .env
*/
aoUnit: process.env.AO_UNIT || 'cu',
strategy: process.env.STRATEGY || 'proxy'
strategy: process.env.STRATEGY || 'proxy',
subrouterUrl: process.env.SUBROUTER_URL,
surUrl: process.env.SUR_URL,
owners: process.env.OWNERS
},
production: {
MODE,
port: process.env.PORT || 3005,
hosts: process.env.HOSTS,
DUMP_PATH: process.env.DUMP_PATH || tmpdir(),
aoUnit: process.env.AO_UNIT,
strategy: process.env.STRATEGY || 'proxy'
strategy: process.env.STRATEGY || 'proxy',
subrouterUrl: process.env.SUBROUTER_URL,
surUrl: process.env.SUR_URL,
owners: process.env.OWNERS
}
}

Expand Down
44 changes: 42 additions & 2 deletions servers/ur/src/domain.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,40 @@
import { LRUCache } from 'lru-cache'

export function bailoutWith ({ fetch, subrouterUrl, surUrl, owners }) {
const cache = new LRUCache({
/**
* 10MB
*/
maxSize: 10_000_000,
/**
* A number is 8 bytes
*/
sizeCalculation: () => 8
})

return async (processId) => {
/**
* All three of these must be set for the
* subrouter logic to work so if any are
* not set just return.
*/
if (!subrouterUrl || !surUrl || !owners) return
let owner = cache.get(processId)
if (!owner) {
const suResponse = await fetch(`${surUrl}/processes/${processId}`)
.then((res) => res.json())
.catch((_e) => null)
if (!suResponse) return
cache.set(processId, suResponse.owner.address)
owner = suResponse.owner.address
}

if (owners.includes(owner)) {
return subrouterUrl
}
}
}

/**
* The pure business logic.
*
Expand All @@ -9,7 +44,7 @@ import { LRUCache } from 'lru-cache'
* If the failoverAttempt exceeds the length of valid hosts list, then every host has
* been attempted, and so return undefined, to be handled upstream
*/
export function determineHostWith ({ hosts = [] }) {
export function determineHostWith ({ hosts = [], bailout }) {
const cache = new LRUCache({
/**
* 10MB
Expand All @@ -21,9 +56,14 @@ export function determineHostWith ({ hosts = [] }) {
sizeCalculation: () => 8
})

return ({ processId, failoverAttempt = 0 }) => {
return async ({ processId, failoverAttempt = 0 }) => {
if (failoverAttempt >= hosts.length) return

if (bailout) {
const bail = await bailout(processId)
if (bail) return bail
}

/**
* Check cache, and hydrate if necessary
*/
Expand Down
62 changes: 52 additions & 10 deletions servers/ur/src/domain.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, test } from 'node:test'
import assert from 'node:assert'

import { determineHostWith } from './domain.js'
import { determineHostWith, bailoutWith } from './domain.js'

const HOSTS = ['http://foo.bar', 'http://fizz.buzz']
const cache = {
Expand All @@ -11,31 +11,73 @@ const cache = {

describe('domain', () => {
describe('determineHostWith', () => {
test('should deterministically return a valid host', () => {
test('should deterministically return a valid host', async () => {
const determineHost = determineHostWith({ hosts: HOSTS, cache })

assert(determineHost({ processId: 'process-123', failoverAttempt: 0 }))
assert.equal(determineHost({ processId: 'process-123', failoverAttempt: 0 }), determineHost({ processId: 'process-123', failoverAttempt: 0 }))
assert(await determineHost({ processId: 'process-123', failoverAttempt: 0 }))
assert.equal(await determineHost({ processId: 'process-123', failoverAttempt: 0 }), await determineHost({ processId: 'process-123', failoverAttempt: 0 }))
})

test('should shift the determined host according to failoverAttempt', () => {
test('should shift the determined host according to failoverAttempt', async () => {
const determineHost = determineHostWith({ hosts: HOSTS, cache })

assert.notEqual(determineHost({ processId: 'process-123', failoverAttempt: 0 }), determineHost({ processId: 'process-123', failoverAttempt: 1 }))
assert.notEqual(await determineHost({ processId: 'process-123', failoverAttempt: 0 }), await determineHost({ processId: 'process-123', failoverAttempt: 1 }))
})

test('should return undefined if all hosts have been attempted', () => {
test('should return undefined if all hosts have been attempted', async () => {
const determineHost = determineHostWith({ hosts: HOSTS, cache })
assert.equal(determineHost({ processId: 'process-123', failoverAttempt: HOSTS.length }), undefined)
assert.equal(await determineHost({ processId: 'process-123', failoverAttempt: HOSTS.length }), undefined)
})

test('should serve from the cache, if found', () => {
test('should serve from the cache, if found', async () => {
const determineHost = determineHostWith({
hosts: HOSTS,
cache: { ...cache, get: () => 10 }
})

assert.equal(determineHost({ processId: 'process-123', failoverAttempt: HOSTS.length }), HOSTS[HOSTS.length & 10])
assert.equal(await determineHost({ processId: 'process-123', failoverAttempt: HOSTS.length }), HOSTS[HOSTS.length & 10])
})

test('should redirect to the subrouterUrl', async () => {
const fetchMock = async (url) => {
assert.equal(url, 'surUrl1/processes/process-123')
return new Response(JSON.stringify({ owner: { address: 'owner2' } }))
}

const bailout = bailoutWith({
fetch: fetchMock,
surUrl: 'surUrl1',
subrouterUrl: 'subrouterUrl1',
owners: ['owner1', 'owner2']
})

const determineHost = determineHostWith({ hosts: HOSTS, cache, bailout })

assert(await determineHost({ processId: 'process-123', failoverAttempt: 0 }))
assert.equal(await determineHost({ processId: 'process-123', failoverAttempt: 0 }), 'subrouterUrl1')
})

test('should not redirect to the subrouterUrl', async () => {
const fetchMock = async (url) => {
assert.equal(url, 'surUrl1/processes/process-123')
/**
* Here the owner does not match any in the list
* this will cause it to not redirect to the subrouter
*/
return new Response(JSON.stringify({ owner: { address: 'owner3' } }))
}

const bailout = bailoutWith({
fetch: fetchMock,
surUrl: 'surUrl1',
subrouterUrl: 'subrouterUrl1',
owners: ['owner1', 'owner2']
})

const determineHost = determineHostWith({ hosts: HOSTS, cache, bailout })

assert(await determineHost({ processId: 'process-123', failoverAttempt: 0 }))
assert.equal(await determineHost({ processId: 'process-123', failoverAttempt: 0 }), 'http://foo.bar')
})
})
})
11 changes: 6 additions & 5 deletions servers/ur/src/proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ import httpProxy from 'http-proxy-node16'
/**
* TODO: we could inject these, but just keeping simple for now
*/
import { determineHostWith } from './domain.js'
import { determineHostWith, bailoutWith } from './domain.js'
import { logger } from './logger.js'

import { mountRoutesWithByAoUnit } from './routes/byAoUnit.js'

export function proxyWith ({ aoUnit, hosts }) {
export function proxyWith ({ aoUnit, hosts, subrouterUrl, surUrl, owners }) {
const _logger = logger.child('proxy')
_logger('Configuring to reverse proxy ao %s units...', aoUnit)

const proxy = httpProxy.createProxyServer({})

const determineHost = determineHostWith({ hosts })
const bailout = bailoutWith({ fetch, subrouterUrl, surUrl, owners })
const determineHost = determineHostWith({ hosts, bailout })

async function trampoline (init) {
let result = init
Expand Down Expand Up @@ -78,9 +79,9 @@ export function proxyWith ({ aoUnit, hosts }) {
*/
const buffer = restreamBody ? await restreamBody(req) : undefined

return new Promise((resolve, reject) => {
const host = determineHost({ processId, failoverAttempt })
const host = await determineHost({ processId, failoverAttempt })

return new Promise((resolve, reject) => {
/**
* There are no more hosts to failover to -- we've tried them all
*/
Expand Down
9 changes: 5 additions & 4 deletions servers/ur/src/redirect.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
/**
* TODO: we could inject these, but just keeping simple for now
*/
import { determineHostWith } from './domain.js'
import { determineHostWith, bailoutWith } from './domain.js'
import { logger } from './logger.js'

import { mountRoutesWithByAoUnit } from './routes/byAoUnit.js'

export function redirectWith ({ aoUnit, hosts }) {
export function redirectWith ({ aoUnit, hosts, subrouterUrl, surUrl, owners }) {
const _logger = logger.child('redirect')
_logger('Configuring to redirect ao %s units...', aoUnit)

const determineHost = determineHostWith({ hosts })
const bailout = bailoutWith({ fetch, subrouterUrl, surUrl, owners })
const determineHost = determineHostWith({ hosts, bailout })

/**
* A middleware that will redirect the request to the host determined
Expand All @@ -19,7 +20,7 @@ export function redirectWith ({ aoUnit, hosts }) {
const redirectHandler = ({ processIdFromRequest }) => {
return async (req, res) => {
const processId = await processIdFromRequest(req)
const host = determineHost({ processId })
const host = await determineHost({ processId })

_logger('Redirecting process %s to host %s', processId, host)

Expand Down

0 comments on commit 66823fd

Please sign in to comment.