From 366848a718dce0e3ee6186faed88ad756e96985f Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 14 Nov 2024 10:42:50 +0100 Subject: [PATCH 1/5] fix goaway Signed-off-by: Matteo Collina --- lib/core/request.js | 17 ++++++++- lib/dispatcher/client-h2.js | 71 +++++++++++++++++++++++++++++++------ test/http2.js | 24 ++++++++++--- 3 files changed, 95 insertions(+), 17 deletions(-) diff --git a/lib/core/request.js b/lib/core/request.js index 6cd9b2f8307..450c78d9fee 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -26,7 +26,9 @@ const { headerNameLowerCasedRecord } = require('./constants') const invalidPathRegex = /[^\u0021-\u00ff]/ const kHandler = Symbol('handler') +const kCompleted = Symbol('completed') +let nextRequestId = 0 class Request { constructor (origin, { path, @@ -44,6 +46,7 @@ class Request { servername, throwOnError }, handler) { + this.id = nextRequestId++ if (typeof path !== 'string') { throw new InvalidArgumentError('path must be a string') } else if ( @@ -129,7 +132,18 @@ class Request { throw new InvalidArgumentError('body must be a string, a Buffer, a Readable stream, an iterable, or an async iterable') } - this.completed = false + this[kCompleted] = false + + + Object.defineProperty(this, 'completed', { + get () { + return this[kCompleted] + }, + set (value) { + process._rawDebug(this.id, 'completed set to', value, 'from', this[kCompleted], Error().stack) + this[kCompleted] = value + } + }) this.aborted = false @@ -272,6 +286,7 @@ class Request { this.onFinally() assert(!this.aborted) + assert(!this.completed) this.completed = true if (channels.trailers.hasSubscribers) { diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index ef6d47a0c9c..e77f24f1d5f 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -76,6 +76,8 @@ function parseH2Headers (headers) { return result } +let sessionId = 0 + async function connectH2 (client, socket) { client[kSocket] = socket @@ -91,6 +93,8 @@ async function connectH2 (client, socket) { peerMaxConcurrentStreams: client[kMaxConcurrentStreams] }) + session.id = sessionId++ + session[kOpenStreams] = 0 session[kClient] = client session[kSocket] = socket @@ -184,18 +188,21 @@ function onHttp2SessionEnd () { * @param {number} errorCode */ function onHttp2SessionGoAway (errorCode) { - // We cannot recover, so best to close the session and the socket + // TODO(mcollina): Verify if GOAWAY implements the spec correctly: + // https://datatracker.ietf.org/doc/html/rfc7540#section-6.8 + // Specifically, we do not verify the "valid" stream id. + const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${errorCode}`, util.getSocketInfo(this[kSocket])) const client = this[kClient] client[kSocket] = null client[kHTTPContext] = null - if (this[kHTTP2Session] !== null) { - this[kHTTP2Session].close() - this[kHTTP2Session].destroy(err) - this[kHTTP2Session] = null - } + process._rawDebug('goaway', this) + + // this is an HTTP2 session + this.close() + this[kHTTP2Session] = null util.destroy(this[kSocket], err) @@ -276,7 +283,14 @@ function shouldSendContentLength (method) { return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT' } +const seen = new Set() + function writeH2 (client, request) { + assert(!request.completed) + if (seen.has(request.id)) { + throw new Error('This request was already seen ' + request.id) + } + seen.add(request.id) const session = client[kHTTP2Session] const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request @@ -321,13 +335,20 @@ function writeH2 (client, request) { util.errorRequest(client, request, err) if (stream != null) { + // Some chunks might still come after abort, + // let's ignore them + stream.removeAllListeners('data') + // On Abort, we close the stream to send RST_STREAM frame stream.close() // We delay the destroy to allow the stream to send the RST_STREAM frame queueMicrotask(() => util.destroy(stream, err)) + + console.log('kRunningIdx', client[kRunningIdx]) + console.log('kQueue', client[kQueue]) + // We move the running index to the next request - client[kQueue][client[kRunningIdx]++] = null - client[kPendingIdx] = client[kRunningIdx] + client[kOnError](err) client[kResume]() } @@ -356,7 +377,7 @@ function writeH2 (client, request) { // We disabled endStream to allow the user to write to the stream stream = session.request(headers, { endStream: false, signal }) - if (stream.id && !stream.pending) { + if (session.id, stream.id, request.id && !stream.pending) { request.onUpgrade(null, null, stream) ++session[kOpenStreams] client[kQueue][client[kRunningIdx]++] = null @@ -453,6 +474,10 @@ function writeH2 (client, request) { // Increment counter as we have new streams open ++session[kOpenStreams] + // Unfortunately, there is a bug in HTTP/2 that have 'data' being + // emitted after 'end' + let endEmitted = false + stream.once('response', headers => { const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers request.onResponseStarted() @@ -463,30 +488,50 @@ function writeH2 (client, request) { // for those scenarios, best effort is to destroy the stream immediately // as there's no value to keep it open. if (request.aborted) { + stream.removeAllListeners('data') return } if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '') === false) { stream.pause() } + }) - stream.on('data', (chunk) => { + stream.on('data', (chunk) => { + try { + process._rawDebug(session.id, stream.id, request.id, 'completed', request.completed) + if (request.completed) { + process._rawDebug('---') + process._rawDebug(session.id, stream.id, request.id, endEmitted) + process._rawDebug(stream) + process._rawDebug(session) + process._rawDebug(request) + process._rawDebug('---') + } if (request.onData(chunk) === false) { stream.pause() } - }) + } catch (err) { + process._rawDebug('caught', err) + stream.destroy(err) + } }) stream.once('end', (err) => { + process._rawDebug(session.id, stream.id, request.id, 'end emitted') + endEmitted = true + stream.removeAllListeners('data') // When state is null, it means we haven't consumed body and the stream still do not have // a state. // Present specially when using pipeline or stream if (stream.state?.state == null || stream.state.state < 6) { + process._rawDebug(session.id, stream.id, request.id, 'calling on complete') request.onComplete([]) client[kQueue][client[kRunningIdx]++] = null client[kResume]() } else { + process._rawDebug(session.id, stream.id, request.id, 'not calling on complete') // Stream is closed or half-closed-remote (6), decrement counter and cleanup // It does not have sense to continue working with the stream as we do not // have yet RST_STREAM support on client-side @@ -503,6 +548,7 @@ function writeH2 (client, request) { }) stream.once('close', () => { + stream.removeAllListeners('data') session[kOpenStreams] -= 1 if (session[kOpenStreams] === 0) { session.unref() @@ -510,6 +556,7 @@ function writeH2 (client, request) { }) stream.once('error', function (err) { + process._rawDebug(session.id, stream.id, request.id, 'error', err) stream.removeAllListeners('data') abort(err) }) @@ -520,7 +567,9 @@ function writeH2 (client, request) { }) stream.on('aborted', () => { + process._rawDebug(session.id, stream.id, request.id, 'aborted') stream.removeAllListeners('data') + endEmitted = true }) // stream.on('timeout', () => { diff --git a/test/http2.js b/test/http2.js index 83ea5f62cbf..d1f7164e9f0 100644 --- a/test/http2.js +++ b/test/http2.js @@ -14,6 +14,11 @@ const { Client, Agent } = require('..') const isGreaterThanv20 = process.versions.node.split('.').map(Number)[0] >= 20 +process.once('uncaughtException', function (err) { + console.log(new Error().stack) + throw err +}) + test('Should support H2 connection', async t => { const body = [] const server = createSecureServer(pem) @@ -1342,7 +1347,7 @@ test('#2364 - Concurrent aborts', async t => { await t.completed }) -test('#2364 - Concurrent aborts (2nd variant)', async t => { +test('#2364 - Concurrent aborts (2nd variant)', { only: true }, async t => { const server = createSecureServer(pem) let counter = 0 @@ -1621,12 +1626,21 @@ test('#3753 - Handle GOAWAY Gracefully', async (t) => { 'x-my-header': 'foo' } }, (err, response) => { - if (i === 9 || i === 8) { - t.strictEqual(err?.message, 'HTTP/2: "GOAWAY" frame received with code 0') - t.strictEqual(err?.code, 'UND_ERR_SOCKET') + if (err) { + t.strictEqual(err.message, 'HTTP/2: "GOAWAY" frame received with code 0') + t.strictEqual(err.code, 'UND_ERR_SOCKET') } else { - t.ifError(err) t.strictEqual(response.statusCode, 200) + ;(async function () { + let body + try { + body = await response.body.text() + } catch (err) { + t.strictEqual(err.code, 'UND_ERR_SOCKET') + return + } + t.strictEqual(body, 'hello world') + })() } }) } From de68be4ff387b15abe495cd364b7826d6660f295 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 14 Nov 2024 10:52:55 +0100 Subject: [PATCH 2/5] fixup Signed-off-by: Matteo Collina --- lib/dispatcher/client-h2.js | 7 ++++--- test/http2.js | 11 +++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index e77f24f1d5f..7f327f0b0dc 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -341,8 +341,6 @@ function writeH2 (client, request) { // On Abort, we close the stream to send RST_STREAM frame stream.close() - // We delay the destroy to allow the stream to send the RST_STREAM frame - queueMicrotask(() => util.destroy(stream, err)) console.log('kRunningIdx', client[kRunningIdx]) console.log('kQueue', client[kQueue]) @@ -526,7 +524,10 @@ function writeH2 (client, request) { // Present specially when using pipeline or stream if (stream.state?.state == null || stream.state.state < 6) { process._rawDebug(session.id, stream.id, request.id, 'calling on complete') - request.onComplete([]) + // The request was aborted + if (!request.aborted) { + request.onComplete([]) + } client[kQueue][client[kRunningIdx]++] = null client[kResume]() diff --git a/test/http2.js b/test/http2.js index d1f7164e9f0..7e7ebf2ed6a 100644 --- a/test/http2.js +++ b/test/http2.js @@ -14,11 +14,6 @@ const { Client, Agent } = require('..') const isGreaterThanv20 = process.versions.node.split('.').map(Number)[0] >= 20 -process.once('uncaughtException', function (err) { - console.log(new Error().stack) - throw err -}) - test('Should support H2 connection', async t => { const body = [] const server = createSecureServer(pem) @@ -1347,7 +1342,7 @@ test('#2364 - Concurrent aborts', async t => { await t.completed }) -test('#2364 - Concurrent aborts (2nd variant)', { only: true }, async t => { +test('#2364 - Concurrent aborts (2nd variant)', async t => { const server = createSecureServer(pem) let counter = 0 @@ -1405,6 +1400,7 @@ test('#2364 - Concurrent aborts (2nd variant)', { only: true }, async t => { } }, (err, response) => { + process._rawDebug('response 1') t.ifError(err) t.strictEqual( response.headers['content-type'], @@ -1425,6 +1421,7 @@ test('#2364 - Concurrent aborts (2nd variant)', { only: true }, async t => { signal }, (err, response) => { + process._rawDebug('response 2') t.strictEqual(err.name, 'TimeoutError') } ) @@ -1438,6 +1435,7 @@ test('#2364 - Concurrent aborts (2nd variant)', { only: true }, async t => { } }, (err, response) => { + process._rawDebug('response 3') t.ifError(err) t.strictEqual( response.headers['content-type'], @@ -1458,6 +1456,7 @@ test('#2364 - Concurrent aborts (2nd variant)', { only: true }, async t => { signal }, (err, response) => { + process._rawDebug('response 4') t.strictEqual(err.name, 'TimeoutError') } ) From b471d202824c2b24088389d729e1f4e46f0f9808 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 14 Nov 2024 11:05:23 +0100 Subject: [PATCH 3/5] linting Signed-off-by: Matteo Collina --- lib/core/request.js | 1 - lib/dispatcher/client-h2.js | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/core/request.js b/lib/core/request.js index 450c78d9fee..e567eab0386 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -134,7 +134,6 @@ class Request { this[kCompleted] = false - Object.defineProperty(this, 'completed', { get () { return this[kCompleted] diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index 7f327f0b0dc..a1706fd8d66 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -94,7 +94,7 @@ async function connectH2 (client, socket) { }) session.id = sessionId++ - + session[kOpenStreams] = 0 session[kClient] = client session[kSocket] = socket @@ -375,7 +375,7 @@ function writeH2 (client, request) { // We disabled endStream to allow the user to write to the stream stream = session.request(headers, { endStream: false, signal }) - if (session.id, stream.id, request.id && !stream.pending) { + if (!stream.pending) { request.onUpgrade(null, null, stream) ++session[kOpenStreams] client[kQueue][client[kRunningIdx]++] = null From 7b54febe691d2cecb4501e213adb674b3a93a269 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 14 Nov 2024 11:10:31 +0100 Subject: [PATCH 4/5] cleanup Signed-off-by: Matteo Collina --- lib/core/request.js | 16 +------------ lib/dispatcher/client-h2.js | 45 ++----------------------------------- test/http2.js | 4 ---- 3 files changed, 3 insertions(+), 62 deletions(-) diff --git a/lib/core/request.js b/lib/core/request.js index e567eab0386..a8680b5057f 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -26,9 +26,7 @@ const { headerNameLowerCasedRecord } = require('./constants') const invalidPathRegex = /[^\u0021-\u00ff]/ const kHandler = Symbol('handler') -const kCompleted = Symbol('completed') -let nextRequestId = 0 class Request { constructor (origin, { path, @@ -46,7 +44,6 @@ class Request { servername, throwOnError }, handler) { - this.id = nextRequestId++ if (typeof path !== 'string') { throw new InvalidArgumentError('path must be a string') } else if ( @@ -132,18 +129,7 @@ class Request { throw new InvalidArgumentError('body must be a string, a Buffer, a Readable stream, an iterable, or an async iterable') } - this[kCompleted] = false - - Object.defineProperty(this, 'completed', { - get () { - return this[kCompleted] - }, - set (value) { - process._rawDebug(this.id, 'completed set to', value, 'from', this[kCompleted], Error().stack) - this[kCompleted] = value - } - }) - + this.completed = false this.aborted = false this.upgrade = upgrade || null diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index a1706fd8d66..a44684d6a3c 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -76,8 +76,6 @@ function parseH2Headers (headers) { return result } -let sessionId = 0 - async function connectH2 (client, socket) { client[kSocket] = socket @@ -93,8 +91,6 @@ async function connectH2 (client, socket) { peerMaxConcurrentStreams: client[kMaxConcurrentStreams] }) - session.id = sessionId++ - session[kOpenStreams] = 0 session[kClient] = client session[kSocket] = socket @@ -198,8 +194,6 @@ function onHttp2SessionGoAway (errorCode) { client[kSocket] = null client[kHTTPContext] = null - process._rawDebug('goaway', this) - // this is an HTTP2 session this.close() this[kHTTP2Session] = null @@ -283,14 +277,7 @@ function shouldSendContentLength (method) { return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT' } -const seen = new Set() - function writeH2 (client, request) { - assert(!request.completed) - if (seen.has(request.id)) { - throw new Error('This request was already seen ' + request.id) - } - seen.add(request.id) const session = client[kHTTP2Session] const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request @@ -342,9 +329,6 @@ function writeH2 (client, request) { // On Abort, we close the stream to send RST_STREAM frame stream.close() - console.log('kRunningIdx', client[kRunningIdx]) - console.log('kQueue', client[kQueue]) - // We move the running index to the next request client[kOnError](err) client[kResume]() @@ -472,10 +456,6 @@ function writeH2 (client, request) { // Increment counter as we have new streams open ++session[kOpenStreams] - // Unfortunately, there is a bug in HTTP/2 that have 'data' being - // emitted after 'end' - let endEmitted = false - stream.once('response', headers => { const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers request.onResponseStarted() @@ -496,34 +476,17 @@ function writeH2 (client, request) { }) stream.on('data', (chunk) => { - try { - process._rawDebug(session.id, stream.id, request.id, 'completed', request.completed) - if (request.completed) { - process._rawDebug('---') - process._rawDebug(session.id, stream.id, request.id, endEmitted) - process._rawDebug(stream) - process._rawDebug(session) - process._rawDebug(request) - process._rawDebug('---') - } - if (request.onData(chunk) === false) { - stream.pause() - } - } catch (err) { - process._rawDebug('caught', err) - stream.destroy(err) + if (request.onData(chunk) === false) { + stream.pause() } }) stream.once('end', (err) => { - process._rawDebug(session.id, stream.id, request.id, 'end emitted') - endEmitted = true stream.removeAllListeners('data') // When state is null, it means we haven't consumed body and the stream still do not have // a state. // Present specially when using pipeline or stream if (stream.state?.state == null || stream.state.state < 6) { - process._rawDebug(session.id, stream.id, request.id, 'calling on complete') // The request was aborted if (!request.aborted) { request.onComplete([]) @@ -532,7 +495,6 @@ function writeH2 (client, request) { client[kQueue][client[kRunningIdx]++] = null client[kResume]() } else { - process._rawDebug(session.id, stream.id, request.id, 'not calling on complete') // Stream is closed or half-closed-remote (6), decrement counter and cleanup // It does not have sense to continue working with the stream as we do not // have yet RST_STREAM support on client-side @@ -557,7 +519,6 @@ function writeH2 (client, request) { }) stream.once('error', function (err) { - process._rawDebug(session.id, stream.id, request.id, 'error', err) stream.removeAllListeners('data') abort(err) }) @@ -568,9 +529,7 @@ function writeH2 (client, request) { }) stream.on('aborted', () => { - process._rawDebug(session.id, stream.id, request.id, 'aborted') stream.removeAllListeners('data') - endEmitted = true }) // stream.on('timeout', () => { diff --git a/test/http2.js b/test/http2.js index 7e7ebf2ed6a..51f66b10ced 100644 --- a/test/http2.js +++ b/test/http2.js @@ -1400,7 +1400,6 @@ test('#2364 - Concurrent aborts (2nd variant)', async t => { } }, (err, response) => { - process._rawDebug('response 1') t.ifError(err) t.strictEqual( response.headers['content-type'], @@ -1421,7 +1420,6 @@ test('#2364 - Concurrent aborts (2nd variant)', async t => { signal }, (err, response) => { - process._rawDebug('response 2') t.strictEqual(err.name, 'TimeoutError') } ) @@ -1435,7 +1433,6 @@ test('#2364 - Concurrent aborts (2nd variant)', async t => { } }, (err, response) => { - process._rawDebug('response 3') t.ifError(err) t.strictEqual( response.headers['content-type'], @@ -1456,7 +1453,6 @@ test('#2364 - Concurrent aborts (2nd variant)', async t => { signal }, (err, response) => { - process._rawDebug('response 4') t.strictEqual(err.name, 'TimeoutError') } ) From 2b293cbbdb3f8b2097154f26dc25443e4789d1af Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 14 Nov 2024 15:53:52 +0100 Subject: [PATCH 5/5] fixup Signed-off-by: Matteo Collina --- lib/dispatcher/client-h2.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index a44684d6a3c..bb06ab957c5 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -487,7 +487,7 @@ function writeH2 (client, request) { // a state. // Present specially when using pipeline or stream if (stream.state?.state == null || stream.state.state < 6) { - // The request was aborted + // Do not complete the request if it was aborted if (!request.aborted) { request.onComplete([]) }