Skip to content

Commit

Permalink
Merge pull request #110 from depot/optimize-reconcile
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobwgillespie authored Sep 13, 2024
2 parents 068612c + 1a463d9 commit 01f7fbe
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 30 deletions.
11 changes: 3 additions & 8 deletions src/handlers/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {client} from '../utils/grpc'
interface CloudProvider<T> {
getCurrentState(): Promise<T>
reportCurrentState(currentState: T): Promise<void>
reconcile(response: GetDesiredStateResponse, state: T): Promise<string[]>
reconcile(response: GetDesiredStateResponse, state: T): Promise<void>
}

export const AwsProvider: CloudProvider<AwsCurrentState> = {
Expand All @@ -35,19 +35,14 @@ export const FlyProvider: CloudProvider<FlyCurrentState> = {
export async function startStateStream<T>(signal: AbortSignal, provider: CloudProvider<T>) {
while (!signal.aborted) {
try {
let currentState = await provider.getCurrentState()
const currentState = await provider.getCurrentState()

await provider.reportCurrentState(currentState)

const response = await client.getDesiredState({clientId: clientID}, {signal})
if (isEmptyResponse(response)) continue

currentState = await provider.getCurrentState()

const errors = await provider.reconcile(response, currentState)
for (const error of errors) {
await reportError(error)
}
await provider.reconcile(response, currentState)
} catch (err: any) {
if (err instanceof ConnectError && err.code === Code.FailedPrecondition) {
// Connection lock was not acquired, sleep and retry
Expand Down
28 changes: 17 additions & 11 deletions src/utils/aws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
additionalSubnetIDs,
} from './env'
import {toPlainObject} from './plain'
import {scheduleTask} from './scheduler'

const client = new EC2Client({})

Expand All @@ -50,17 +51,22 @@ export async function getCurrentState() {
return toPlainObject(state)
}

export async function reconcile(response: GetDesiredStateResponse, state: CurrentState): Promise<string[]> {
const results = await Promise.allSettled([
...response.newVolumes.map((volume) => reconcileNewVolume(state.volumes, volume)),
...response.volumeChanges.map((volume) => reconcileVolume(state.volumes, volume, state.instances)),
...response.newMachines.map((instance) => reconcileNewMachine(state.instances, instance)),
...response.machineChanges.map((instance) => reconcileMachine(state.instances, instance)),
])

return results
.map((r) => (r.status === 'rejected' ? `${r.reason}` : undefined))
.filter((r): r is string => r !== undefined)
export async function reconcile(response: GetDesiredStateResponse, state: CurrentState): Promise<void> {
for (const volume of response.newVolumes) {
void scheduleTask(`volume/new/${volume.id}`, () => reconcileNewVolume(state.volumes, volume))
}

for (const volume of response.volumeChanges) {
void scheduleTask(`volume/change/${volume.id}`, () => reconcileVolume(state.volumes, volume, state.instances))
}

for (const instance of response.newMachines) {
void scheduleTask(`machine/new/${instance.id}`, () => reconcileNewMachine(state.instances, instance))
}

for (const instance of response.machineChanges) {
void scheduleTask(`machine/change/${instance.id}`, () => reconcileMachine(state.instances, instance))
}
}

/** Filter to select only Depot-managed resources */
Expand Down
28 changes: 17 additions & 11 deletions src/utils/fly/reconcile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {CLOUD_AGENT_CONNECTION_ID, FLY_REGION} from '../env'
import {errorMessage} from '../errors'
import {client} from '../grpc'
import {toPlainObject} from '../plain'
import {scheduleTask} from '../scheduler'
import {
createBuildkitGPUVolume,
createBuildkitVolume,
Expand Down Expand Up @@ -51,17 +52,22 @@ export async function getCurrentState() {
return toPlainObject(state)
}

export async function reconcile(response: GetDesiredStateResponse, state: CurrentState): Promise<string[]> {
const results = await Promise.allSettled([
...response.newVolumes.map((volume) => reconcileNewVolume(state.volumes, volume)),
...response.volumeChanges.map((volume) => reconcileVolume(state, volume)),
...response.newMachines.map((machine) => reconcileNewMachine(state.machines, machine, state.volumes)),
...response.machineChanges.map((machine) => reconcileMachine(state.machines, machine)),
])

return results
.map((r) => (r.status === 'rejected' ? `${r.reason}` : undefined))
.filter((r): r is string => r !== undefined)
export async function reconcile(response: GetDesiredStateResponse, state: CurrentState): Promise<void> {
for (const volume of response.newVolumes) {
void scheduleTask(`volume/new/${volume.id}`, () => reconcileNewVolume(state.volumes, volume))
}

for (const volume of response.volumeChanges) {
void scheduleTask(`volume/change/${volume.id}`, () => reconcileVolume(state, volume))
}

for (const machine of response.newMachines) {
void scheduleTask(`machine/new/${machine.id}`, () => reconcileNewMachine(state.machines, machine, state.volumes))
}

for (const machine of response.machineChanges) {
void scheduleTask(`machine/change/${machine.id}`, () => reconcileMachine(state.machines, machine))
}
}

async function reconcileNewVolume(state: Volume[], volume: GetDesiredStateResponse_NewVolume) {
Expand Down
24 changes: 24 additions & 0 deletions src/utils/scheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import {reportError} from './errors'

const inProgressTasks = new Set<string>()

/**
* Schedule an update to run, ensuring that only one update is running at a time.
*/
export async function scheduleTask(key: string, task: () => Promise<void>) {
if (inProgressTasks.has(key)) {
console.log(`Skipping ${key} because it is already in progress`)
return
}

try {
inProgressTasks.add(key)
console.log(`Accepted ${key}, starting task`)
return await task()
} catch (err) {
await reportError(err)
} finally {
inProgressTasks.delete(key)
console.log(`Task ${key} completed`)
}
}

0 comments on commit 01f7fbe

Please sign in to comment.