Skip to content

Commit

Permalink
fix(cu): prevent stuck stream by destroying all and bubbling error
Browse files Browse the repository at this point in the history
  • Loading branch information
TillaTheHun0 committed Feb 5, 2025
1 parent a0d509d commit e420c10
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions servers/cu/src/domain/lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -281,24 +281,28 @@ export function evaluateWith (env) {
* THEN we add an error listener such that any stream erroring
* results in all streams being cleaned up and destroyed with that error.
*
* Finally, if an error was thrown from any stream, we re-throw such that the eval promise rejects,
* bubbling the error to the caller
* Finally, if an error was thrown from any stream, we reject a top level promise that
* then triggers Promise.race to also reject, bubbling the error
*/
if (!Array.isArray(ctx.messages)) ctx.messages = [ctx.messages]
const streams = [...ctx.messages, removeInvalidTags]
streams.push(composeStreams(...streams))
let e
const { promise: bailout, resolve: pResolve, reject: pReject } = Promise.withResolvers()
streams.forEach(s => {
s.on('error', (err) => {
e = err
streams.forEach(s => {
s.emit('end')
s.destroy(err)
setImmediate(() => s.removeAllListeners())
})

pReject(err)
})
})
await pipeline(streams[streams.length - 1], evalStream)
if (e) throw e
await Promise.race([
pipeline(streams[streams.length - 1], evalStream).then(pResolve),
bailout
])

/**
* Make sure to attempt to cache the last result
Expand Down

0 comments on commit e420c10

Please sign in to comment.