Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix goaway #3835

Merged
merged 5 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ class Request {
}

this.completed = false

this.aborted = false

this.upgrade = upgrade || null
Expand Down Expand Up @@ -272,6 +271,7 @@ class Request {
this.onFinally()

assert(!this.aborted)
assert(!this.completed)

this.completed = true
if (channels.trailers.hasSubscribers) {
Expand Down
43 changes: 26 additions & 17 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,19 @@ function onHttp2SessionEnd () {
* @param {number} errorCode
*/
function onHttp2SessionGoAway (errorCode) {
// We cannot recover, so best to close the session and the socket
// TODO(mcollina): Verify if GOAWAY implements the spec correctly:
// https://datatracker.ietf.org/doc/html/rfc7540#section-6.8
// Specifically, we do not verify the "valid" stream id.

const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${errorCode}`, util.getSocketInfo(this[kSocket]))
const client = this[kClient]

client[kSocket] = null
client[kHTTPContext] = null

if (this[kHTTP2Session] !== null) {
this[kHTTP2Session].close()
this[kHTTP2Session].destroy(err)
this[kHTTP2Session] = null
}
// this is an HTTP2 session
this.close()
this[kHTTP2Session] = null

util.destroy(this[kSocket], err)

Expand Down Expand Up @@ -321,13 +322,15 @@ function writeH2 (client, request) {
util.errorRequest(client, request, err)

if (stream != null) {
// Some chunks might still come after abort,
// let's ignore them
stream.removeAllListeners('data')

// On Abort, we close the stream to send RST_STREAM frame
stream.close()
// We delay the destroy to allow the stream to send the RST_STREAM frame
queueMicrotask(() => util.destroy(stream, err))

// We move the running index to the next request
client[kQueue][client[kRunningIdx]++] = null
client[kPendingIdx] = client[kRunningIdx]
client[kOnError](err)
client[kResume]()
}

Expand Down Expand Up @@ -356,7 +359,7 @@ function writeH2 (client, request) {
// We disabled endStream to allow the user to write to the stream
stream = session.request(headers, { endStream: false, signal })

if (stream.id && !stream.pending) {
if (!stream.pending) {
request.onUpgrade(null, null, stream)
++session[kOpenStreams]
client[kQueue][client[kRunningIdx]++] = null
Expand Down Expand Up @@ -463,26 +466,31 @@ function writeH2 (client, request) {
// for those scenarios, best effort is to destroy the stream immediately
// as there's no value to keep it open.
if (request.aborted) {
stream.removeAllListeners('data')
return
}

if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '') === false) {
stream.pause()
}
})

stream.on('data', (chunk) => {
if (request.onData(chunk) === false) {
stream.pause()
}
})
stream.on('data', (chunk) => {
if (request.onData(chunk) === false) {
stream.pause()
}
})

stream.once('end', (err) => {
stream.removeAllListeners('data')
// When state is null, it means we haven't consumed body and the stream still do not have
// a state.
// Present specially when using pipeline or stream
if (stream.state?.state == null || stream.state.state < 6) {
request.onComplete([])
// Do not complete the request if it was aborted
if (!request.aborted) {
request.onComplete([])
}

client[kQueue][client[kRunningIdx]++] = null
client[kResume]()
Expand All @@ -503,6 +511,7 @@ function writeH2 (client, request) {
})

stream.once('close', () => {
stream.removeAllListeners('data')
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
Expand Down
17 changes: 13 additions & 4 deletions test/http2.js
Original file line number Diff line number Diff line change
Expand Up @@ -1621,12 +1621,21 @@ test('#3753 - Handle GOAWAY Gracefully', async (t) => {
'x-my-header': 'foo'
}
}, (err, response) => {
if (i === 9 || i === 8) {
t.strictEqual(err?.message, 'HTTP/2: "GOAWAY" frame received with code 0')
t.strictEqual(err?.code, 'UND_ERR_SOCKET')
if (err) {
t.strictEqual(err.message, 'HTTP/2: "GOAWAY" frame received with code 0')
t.strictEqual(err.code, 'UND_ERR_SOCKET')
} else {
t.ifError(err)
t.strictEqual(response.statusCode, 200)
;(async function () {
let body
try {
body = await response.body.text()
} catch (err) {
t.strictEqual(err.code, 'UND_ERR_SOCKET')
return
}
t.strictEqual(body, 'hello world')
})()
}
})
}
Expand Down
Loading