Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/standardize logging #115

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/utils/aws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,23 @@ export async function getCurrentState() {

export async function reconcile(response: GetDesiredStateResponse, state: CurrentState): Promise<void> {
for (const volume of response.newVolumes) {
void scheduleTask(`volume/new/${volume.id}`, () => reconcileNewVolume(state.volumes, volume))
void scheduleTask(`volume/new/${volume.id}`, (key: string) => reconcileNewVolume(state.volumes, volume))
}

for (const volume of response.volumeChanges) {
void scheduleTask(`volume/change/${volume.resourceId}`, () =>
void scheduleTask(`volume/change/${volume.resourceId}`, (key: string) =>
reconcileVolume(state.volumes, volume, state.instances),
)
}

for (const instance of response.newMachines) {
void scheduleTask(`machine/new/${instance.id}`, () => reconcileNewMachine(state.instances, instance))
void scheduleTask(`machine/new/${instance.id}`, (key: string) => reconcileNewMachine(state.instances, instance))
}

for (const instance of response.machineChanges) {
void scheduleTask(`machine/change/${instance.resourceId}`, () => reconcileMachine(state.instances, instance))
void scheduleTask(`machine/change/${instance.resourceId}`, (key: string) =>
reconcileMachine(state.instances, instance),
)
}
}

Expand Down
78 changes: 50 additions & 28 deletions src/utils/fly/reconcile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import {promises} from '../common'
import {CLOUD_AGENT_CONNECTION_ID, FLY_REGION} from '../env'
import {errorMessage} from '../errors'
import {client} from '../grpc'
import {logger} from '../logger'
import {toPlainObject} from '../plain'
import {scheduleTask} from '../scheduler'
import {
Expand Down Expand Up @@ -54,45 +53,49 @@ export async function getCurrentState() {
}

export async function reconcile(response: GetDesiredStateResponse, state: CurrentState): Promise<void> {
logger.info('Reconciling state')

for (const volume of response.newVolumes) {
logger.info(`new volume requested: ${JSON.stringify(volume)}`)
void scheduleTask(`volume/new/${volume.id}`, () => reconcileNewVolume(state.volumes, volume))
void scheduleTask(`volume/new/${volume.id}`, (key: string) => reconcileNewVolume(key, state.volumes, volume))
}

for (const volume of response.volumeChanges) {
logger.info(`volume change requested: ${JSON.stringify(volume)}`)
void scheduleTask(`volume/change/${volume.resourceId}`, () => reconcileVolume(state, volume))
void scheduleTask(`volume/change/${volume.resourceId}`, (key: string) => reconcileVolume(key, state, volume))
}

for (const machine of response.newMachines) {
logger.info(`new machine requested: ${JSON.stringify(machine)}`)
void scheduleTask(`machine/new/${machine.id}`, () => reconcileNewMachine(state.machines, machine, state.volumes))
void scheduleTask(`machine/new/${machine.id}`, (key: string) =>
reconcileNewMachine(key, state.machines, machine, state.volumes),
)
}

for (const machine of response.machineChanges) {
logger.info(`machine change requested: ${JSON.stringify(machine)}`)
void scheduleTask(`machine/change/${machine.resourceId}`, () => reconcileMachine(state.machines, machine))
void scheduleTask(`machine/change/${machine.resourceId}`, (key: string) =>
reconcileMachine(key, state.machines, machine),
)
}
}

async function reconcileNewVolume(state: Volume[], volume: GetDesiredStateResponse_NewVolume) {
async function reconcileNewVolume(key: string, state: Volume[], volume: GetDesiredStateResponse_NewVolume) {
console.log(`Launch new volume ${key}: ${volume.id}`)
const existing = state.find((v) => v.name === volume.id)
if (existing) return

if (volume.kind === GetDesiredStateResponse_Kind.BUILDKIT_16X32_GPU) {
console.log(`Creating new gpu volume ${volume.id}`)
console.log(`Launch new volume ${key}: ${volume.id} as gpu volume`)
await createBuildkitGPUVolume({depotID: volume.id, region: volume.zone ?? FLY_REGION, sizeGB: volume.size})
} else {
console.log(`Creating new volume ${volume.id}`)
console.log(`Launch new volume ${key}: ${volume.id} as regular volume`)
await createBuildkitVolume({depotID: volume.id, region: volume.zone ?? FLY_REGION, sizeGB: volume.size})
}
}

// fly volumes are not attached/detatched. The only modification is deleting the volume.
// If the volume is attached to a machine, we delete the machine first.
async function reconcileVolume({volumes, machines}: CurrentState, volume: GetDesiredStateResponse_VolumeChange) {
async function reconcileVolume(
key: string,
{volumes, machines}: CurrentState,
volume: GetDesiredStateResponse_VolumeChange,
) {
console.log(`Delete volume ${key}: ${volume.resourceId}`)
if (volume.desiredState !== GetDesiredStateResponse_VolumeState.DELETED) {
return
}
Expand Down Expand Up @@ -120,15 +123,21 @@ async function reconcileVolume({volumes, machines}: CurrentState, volume: GetDes
})

console.log(`Deleting machine ${machine.id} ${machine.name} attached to volume ${toDelete.id} ${toDelete.name}`)
await reconcileMachine(machines, deleteMachine)
await reconcileMachine(key, machines, deleteMachine)
}
}

await deleteVolume(toDelete.id)
}
}

async function reconcileNewMachine(state: V1Machine[], machine: GetDesiredStateResponse_NewMachine, volumes: Volume[]) {
async function reconcileNewMachine(
key: string,
state: V1Machine[],
machine: GetDesiredStateResponse_NewMachine,
volumes: Volume[],
) {
console.log(`Launch new machine ${key}: ${machine.id}`)
const existing = state.find((m) => m.name === machine.id)
if (existing) return
if (!machine.flyOptions) return
Expand All @@ -141,8 +150,6 @@ async function reconcileNewMachine(state: V1Machine[], machine: GetDesiredStateR
throw new Error('Unsupported architecture, Fly only supports x86 (amd64) machines')
}

console.log(`Launching new machine ${machine.id}`)

const {cpuKind: cpu_kind, cpus, memGBs, needsGPU} = machineKind(machine.kind)
let req = {
cpu_kind,
Expand Down Expand Up @@ -170,12 +177,12 @@ async function reconcileNewMachine(state: V1Machine[], machine: GetDesiredStateR
try {
const flyMachine = await launchBuildkitMachine(req)
if (!flyMachine) throw new Error(`Unable to launch machine ${machine.id}`)
console.log(`Launched new machine ${machine.id} ${flyMachine.id}`)
console.log(`Launched new machine ${key}: ${machine.id} ${flyMachine.id}`)
} catch (err) {
// If we get a capacity error, delete the volume and try again.
// We do this because the volume is tied to the machine and we can't detach it.
if (isCapacityError(err)) {
console.error(`Capacity error, requesting replacement volume and trying again ${err}`)
console.log(`Capacity error, requesting replacement volume and trying again ${err}`)
await client.replaceVolume({id: volume.name})
return
}
Expand All @@ -197,27 +204,42 @@ function currentMachineState(machine: V1Machine): GetDesiredStateResponse_Machin

const timeoutSeconds = 30

async function reconcileMachine(state: V1Machine[], machine: GetDesiredStateResponse_MachineChange) {
async function reconcileMachine(key: string, state: V1Machine[], machine: GetDesiredStateResponse_MachineChange) {
const current = state.find((m) => m.id === machine.resourceId)
const currentState = current ? currentMachineState(current) : 'unknown'
if (!current) {
console.log(`Change machine ${key}: ${machine.resourceId} not found`)
return
}

const currentState = currentMachineState(current)
// Skip if already at the desired state
if (currentState === machine.desiredState) return
if (currentState === machine.desiredState) {
console.log(`Change machine ${key}: ${machine.resourceId} ${current.id} at desired state`)
return
}

if (!current) return
console.log(
`Change machine ${key}: ${machine.resourceId} ${current.id} from ${currentState} to ${machine.desiredState}`,
)

if (machine.desiredState === GetDesiredStateResponse_MachineState.RUNNING) {
if (currentState === GetDesiredStateResponse_MachineState.PENDING) return
if (currentState === GetDesiredStateResponse_MachineState.DELETED) return
console.log('Starting machine', current.id)
await startMachine(current.id)
const begin = Date.now()
console.log(`Change machine ${key}: ${machine.resourceId} starting`)
const res = await startMachine(current.id)
const duration_ms = Date.now() - begin
console.log(`Change machine ${key}: ${machine.resourceId} started in ${duration_ms}ms`)
}

if (machine.desiredState === GetDesiredStateResponse_MachineState.STOPPED) {
if (currentState === GetDesiredStateResponse_MachineState.PENDING) return
if (currentState === GetDesiredStateResponse_MachineState.DELETED) return
console.log('Stopping machine', current.id)
const begin = Date.now()
console.log(`Change machine ${key}: ${machine.resourceId} stopping`)
await stopAndWait(current)
const duration_ms = Date.now() - begin
console.log(`Change machine ${key}: ${machine.resourceId} stopped in ${duration_ms}ms`)
}

if (machine.desiredState === GetDesiredStateResponse_MachineState.DELETED) {
Expand Down
8 changes: 4 additions & 4 deletions src/utils/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ const inProgressTasks = new Set<string>()
/**
* Schedule an update to run, ensuring that only one update is running at a time.
*/
export async function scheduleTask(key: string, task: () => Promise<void>) {
export async function scheduleTask(key: string, task: (key: string) => Promise<void>) {
if (inProgressTasks.has(key)) {
console.log(`Skipping ${key} because it is already in progress`)
return
}

const start = new Date()

try {
inProgressTasks.add(key)
console.log(`Accepted ${key}, starting task`)
return await task()
return await task(key)
} catch (err) {
await reportError(err)
const duration = new Date().getTime() - start.getTime()
console.log(`Task ${key} failed (${duration}ms)`)
} finally {
inProgressTasks.delete(key)
const duration = new Date().getTime() - start.getTime()
Expand Down