From 8757139233b792a75aefce337b00169369188993 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 07:23:44 +0000 Subject: [PATCH 01/17] fix: cache --- lib/interceptor/cache.js | 84 ++++++++++++++++++------------------ types/cache-interceptor.d.ts | 2 +- 2 files changed, 43 insertions(+), 43 deletions(-) diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index 63d56b6880b..9406a0b722f 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -1,5 +1,6 @@ 'use strict' +const assert = require('node:assert') const util = require('../core/util') const CacheHandler = require('../handler/cache-handler') const MemoryCacheStore = require('../cache/memory-cache-store') @@ -39,41 +40,37 @@ module.exports = (opts = {}) => { return dispatch(opts, handler) } + // TODO (perf): For small entries support returning a Buffer instead of a stream. const stream = store.createReadStream(opts) if (!stream) { // Request isn't cached return dispatch(opts, new CacheHandler(globalOpts, opts, handler)) } - let onErrorCalled = false - /** * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreReadable} stream * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreValue} value */ const respondWithCachedValue = (stream, value) => { - const ac = new AbortController() - const signal = ac.signal + let completed = false - signal.onabort = (_, err) => { - stream.destroy() - if (!onErrorCalled) { - handler.onError(err) - onErrorCalled = true - } - } + assert(!stream.destroyed, 'stream should not be destroyed') + assert(!stream.readableDidRead, 'stream should not be readableDidRead') stream.on('error', (err) => { - if (!onErrorCalled) { + if (!completed && typeof handler.onError === 'function') { handler.onError(err) - onErrorCalled = true } }) try { if (typeof handler.onConnect === 'function') { - handler.onConnect(ac.abort) - signal.throwIfAborted() + handler.onConnect((err) => { + stream.destroy(err) + }) + if (stream.destroyed) { + return + } } if (typeof handler.onHeaders === 'function') { @@ -83,36 +80,33 @@ module.exports = (opts = {}) => { value.rawHeaders.push(AGE_HEADER, Buffer.from(`${age}`)) - handler.onHeaders(value.statusCode, value.rawHeaders, stream.resume, value.statusMessage) - signal.throwIfAborted() + handler.onHeaders(value.statusCode, value.rawHeaders, () => stream.resume(), value.statusMessage) + if (stream.destroyed) { + return + } } if (opts.method === 'HEAD') { if (typeof handler.onComplete === 'function') { + completed = true handler.onComplete(null) - stream.destroy() } + stream.destroy() } else { - if (typeof handler.onData === 'function') { - stream.on('data', chunk => { - if (!handler.onData(chunk)) { - stream.pause() - } - }) - } - - if (typeof handler.onComplete === 'function') { - stream.on('end', () => { + stream.on('data', chunk => { + if (typeof handler.onData === 'function' && !handler.onData(chunk)) { + stream.pause() + } + }) + + stream.on('end', () => { + if (typeof handler.onComplete === 'function') { handler.onComplete(value.rawTrailers ?? []) - }) - } + } + }) } } catch (err) { stream.destroy(err) - if (!onErrorCalled && typeof handler.onError === 'function') { - handler.onError(err) - onErrorCalled = true - } } } @@ -125,9 +119,11 @@ module.exports = (opts = {}) => { return dispatch(opts, new CacheHandler(globalOpts, opts, handler)) } + // TODO (fix): It's weird that "value" lives on stream. const { value } = stream - // Dump body on error + // Dump body if cached... + // TODO (fix): This is a bit suspect. if (util.isStream(opts.body)) { opts.body?.on('error', () => {}).resume() } @@ -135,11 +131,13 @@ module.exports = (opts = {}) => { // Check if the response is stale const now = Date.now() if (now >= value.staleAt) { + // TODO (fix): This whole bit is a bit suspect. In particular given that + // we dumped the body above. + if (now >= value.deleteAt) { // Safety check in case the store gave us a response that should've been - // deleted already - dispatch(opts, new CacheHandler(globalOpts, opts, handler)) - return + // deleted already + return dispatch(opts, new CacheHandler(globalOpts, opts, handler)) } if (!opts.headers) { @@ -149,21 +147,23 @@ module.exports = (opts = {}) => { opts.headers['if-modified-since'] = new Date(value.cachedAt).toUTCString() // Need to revalidate the response - dispatch( + return dispatch( opts, new CacheRevalidationHandler( () => respondWithCachedValue(stream, value), new CacheHandler(globalOpts, opts, handler) ) ) - - return } respondWithCachedValue(stream, value) } - Promise.resolve(stream).then(handleStream).catch(handler.onError) + Promise.resolve(stream).then(handleStream, err => { + if (typeof handler.onError === 'function') { + handler.onError(err) + } + }) return true } diff --git a/types/cache-interceptor.d.ts b/types/cache-interceptor.d.ts index 7fa1aeab0ed..b05e0c871cb 100644 --- a/types/cache-interceptor.d.ts +++ b/types/cache-interceptor.d.ts @@ -49,7 +49,7 @@ declare namespace CacheHandler { export interface CacheStoreValue { statusCode: number; statusMessage: string; - rawHeaders: (Buffer | Buffer[])[]; + rawHeaders: Buffer[]; rawTrailers?: string[]; /** * Headers defined by the Vary header and their respective values for From 5fad8a737dcf2b029e8e2384a400075ddabd57ba Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 09:30:02 +0000 Subject: [PATCH 02/17] WIP --- lib/handler/cache-revalidation-handler.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/handler/cache-revalidation-handler.js b/lib/handler/cache-revalidation-handler.js index 53c58773022..7589666c4d6 100644 --- a/lib/handler/cache-revalidation-handler.js +++ b/lib/handler/cache-revalidation-handler.js @@ -60,7 +60,6 @@ class CacheRevalidationHandler extends DecoratorHandler { // https://www.rfc-editor.org/rfc/rfc9111.html#name-handling-a-validation-respo if (statusCode === 304) { this.#successful = true - this.#successCallback() return true } @@ -72,7 +71,8 @@ class CacheRevalidationHandler extends DecoratorHandler { statusMessage ) } - return false + + return true } /** @@ -90,7 +90,7 @@ class CacheRevalidationHandler extends DecoratorHandler { return this.#handler.onData(chunk) } - return false + return true } /** @@ -99,7 +99,9 @@ class CacheRevalidationHandler extends DecoratorHandler { * @param {string[] | null} rawTrailers */ onComplete (rawTrailers) { - if (!this.#successful && typeof this.#handler.onComplete === 'function') { + if (this.#successful) { + this.#successCallback() + } else if (typeof this.#handler.onComplete === 'function') { this.#handler.onComplete(rawTrailers) } } From 6f76cce24b698d18640531005d128949492364cf Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 09:39:06 +0000 Subject: [PATCH 03/17] WIP --- lib/interceptor/cache.js | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index 9406a0b722f..b8adfd16b2d 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -93,17 +93,17 @@ module.exports = (opts = {}) => { } stream.destroy() } else { - stream.on('data', chunk => { - if (typeof handler.onData === 'function' && !handler.onData(chunk)) { - stream.pause() - } - }) - - stream.on('end', () => { - if (typeof handler.onComplete === 'function') { - handler.onComplete(value.rawTrailers ?? []) - } - }) + stream + .on('data', chunk => { + if (typeof handler.onData === 'function' && !handler.onData(chunk)) { + stream.pause() + } + }) + .on('end', () => { + if (typeof handler.onComplete === 'function') { + handler.onComplete(value.rawTrailers ?? []) + } + }) } } catch (err) { stream.destroy(err) From 3eab9955173786c8bb732c8cb096cf0395d700d0 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 09:40:44 +0000 Subject: [PATCH 04/17] WIP --- lib/handler/cache-handler.js | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/lib/handler/cache-handler.js b/lib/handler/cache-handler.js index d48675c3760..419523f7e10 100644 --- a/lib/handler/cache-handler.js +++ b/lib/handler/cache-handler.js @@ -7,6 +7,8 @@ const { parseVaryHeader } = require('../util/cache') +function noop () {} + /** * Writes a response to a CacheStore and then passes it on to the next handler */ @@ -77,15 +79,11 @@ class CacheHandler extends DecoratorHandler { // Try/catch for if it's synchronous try { const result = this.#store.deleteByOrigin(this.#requestOptions.origin) - if ( - result && - typeof result.catch === 'function' && - typeof this.#handler.onError === 'function' - ) { + if (typeof result?.catch === 'function') { // Fail silently - result.catch(_ => {}) + result.catch(noop) } - } catch (err) { + } catch { // Fail silently } @@ -139,6 +137,7 @@ class CacheHandler extends DecoratorHandler { if (this.#writeStream) { this.#writeStream.on('drain', resume) this.#writeStream.on('error', () => { + // TODO (fix): Make this error somehow observable? this.#writeStream = undefined resume() }) @@ -148,6 +147,7 @@ class CacheHandler extends DecoratorHandler { if (typeof this.#handler.onHeaders === 'function') { return downstreamOnHeaders() } + return false } @@ -178,10 +178,7 @@ class CacheHandler extends DecoratorHandler { */ onComplete (rawTrailers) { if (this.#writeStream) { - if (rawTrailers) { - this.#writeStream.rawTrailers = rawTrailers - } - + this.#writeStream.rawTrailers = rawTrailers ?? [] this.#writeStream.end() } From efb57a46b1bd627bd1001065baab9d0d68e56929 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 09:49:11 +0000 Subject: [PATCH 05/17] WIP --- lib/cache/memory-cache-store.js | 1 + lib/handler/cache-handler.js | 27 ++++++++++++++------------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/lib/cache/memory-cache-store.js b/lib/cache/memory-cache-store.js index ae6d6614d82..bd8f6e33d97 100644 --- a/lib/cache/memory-cache-store.js +++ b/lib/cache/memory-cache-store.js @@ -294,6 +294,7 @@ class MemoryStoreReadableStream extends Readable { * @type {MemoryStoreValue} */ #value + /** * @type {Buffer[]} */ diff --git a/lib/handler/cache-handler.js b/lib/handler/cache-handler.js index 419523f7e10..e4b361693ab 100644 --- a/lib/handler/cache-handler.js +++ b/lib/handler/cache-handler.js @@ -63,26 +63,27 @@ class CacheHandler extends DecoratorHandler { resume, statusMessage ) { - const downstreamOnHeaders = () => this.#handler.onHeaders( - statusCode, - rawHeaders, - resume, - statusMessage - ) + const downstreamOnHeaders = () => { + if (typeof this.#handler.onHeaders === 'function') { + return this.#handler.onHeaders( + statusCode, + rawHeaders, + resume, + statusMessage + ) + } else { + return true + } + } if ( !util.safeHTTPMethods.includes(this.#requestOptions.method) && statusCode >= 200 && statusCode <= 399 ) { - // https://www.rfc-editor.org/rfc/rfc9111.html#name-invalidating-stored-respons - // Try/catch for if it's synchronous + // https://www.rfc-editor.org/rfc/rfc9111.html#name-invalidating-stored-response try { - const result = this.#store.deleteByOrigin(this.#requestOptions.origin) - if (typeof result?.catch === 'function') { - // Fail silently - result.catch(noop) - } + this.#store.deleteByOrigin(this.#requestOptions.origin.toString())?.catch?.(noop) } catch { // Fail silently } From ea5e09c9ec8ce1827068fd90f4e8e8f9d6f2b156 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 09:52:36 +0000 Subject: [PATCH 06/17] WIP --- lib/handler/cache-handler.js | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/lib/handler/cache-handler.js b/lib/handler/cache-handler.js index e4b361693ab..8e56a5769b6 100644 --- a/lib/handler/cache-handler.js +++ b/lib/handler/cache-handler.js @@ -87,7 +87,6 @@ class CacheHandler extends DecoratorHandler { } catch { // Fail silently } - return downstreamOnHeaders() } @@ -138,18 +137,15 @@ class CacheHandler extends DecoratorHandler { if (this.#writeStream) { this.#writeStream.on('drain', resume) this.#writeStream.on('error', () => { - // TODO (fix): Make this error somehow observable? + // TODO (fix): Make error somehow observable? this.#writeStream = undefined + // TODO (fix): Should we resume even if was paused downstream? resume() }) } } - if (typeof this.#handler.onHeaders === 'function') { - return downstreamOnHeaders() - } - - return false + return downstreamOnHeaders() } /** @@ -330,7 +326,7 @@ function stripNecessaryHeaders (rawHeaders, parsedHeaders, cacheControlDirective for (let i = 0; i < headerNames.length; i++) { const header = headerNames[i] - if (headersToRemove.indexOf(header) !== -1) { + if (headersToRemove.includes(header)) { // We have a at least one header we want to remove if (!strippedHeaders) { // This is the first header we want to remove, let's create the object From d247aba444fd51464c6752cfb72f89cfce06018c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 09:58:38 +0000 Subject: [PATCH 07/17] WIP --- lib/handler/cache-handler.js | 11 ++++++----- lib/interceptor/cache.js | 19 +++++++++---------- types/cache-interceptor.d.ts | 2 +- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/lib/handler/cache-handler.js b/lib/handler/cache-handler.js index 8e56a5769b6..9a116ffa2d7 100644 --- a/lib/handler/cache-handler.js +++ b/lib/handler/cache-handler.js @@ -93,15 +93,16 @@ class CacheHandler extends DecoratorHandler { const headers = util.parseHeaders(rawHeaders) const cacheControlHeader = headers['cache-control'] - const contentLengthHeader = headers['content-length'] - - if (!cacheControlHeader || !contentLengthHeader || this.#store.isFull) { - // Don't have the headers we need, can't cache + if (!cacheControlHeader || typeof cacheControlHeader !== 'string') { + // Don't have cache-control, can't cache. return downstreamOnHeaders() } - const contentLength = Number(contentLengthHeader) + const contentLengthHeader = headers['content-length'] + const contentLength = contentLengthHeader ? Number(contentLengthHeader) : null if (!Number.isInteger(contentLength)) { + // Don't know the final size, don't cache. + // TODO (fix): Why not cache? return downstreamOnHeaders() } diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index b8adfd16b2d..c3264a1f0af 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -8,6 +8,7 @@ const CacheRevalidationHandler = require('../handler/cache-revalidation-handler' const { assertCacheStore, assertCacheMethods } = require('../util/cache.js') const AGE_HEADER = Buffer.from('age') +function noop () {} /** * @param {import('../../types/cache-interceptor.d.ts').default.CacheOptions} [opts] @@ -52,17 +53,9 @@ module.exports = (opts = {}) => { * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreValue} value */ const respondWithCachedValue = (stream, value) => { - let completed = false - assert(!stream.destroyed, 'stream should not be destroyed') assert(!stream.readableDidRead, 'stream should not be readableDidRead') - stream.on('error', (err) => { - if (!completed && typeof handler.onError === 'function') { - handler.onError(err) - } - }) - try { if (typeof handler.onConnect === 'function') { handler.onConnect((err) => { @@ -88,10 +81,11 @@ module.exports = (opts = {}) => { if (opts.method === 'HEAD') { if (typeof handler.onComplete === 'function') { - completed = true handler.onComplete(null) } - stream.destroy() + stream + .on('error', noop) + .destroy() } else { stream .on('data', chunk => { @@ -104,6 +98,11 @@ module.exports = (opts = {}) => { handler.onComplete(value.rawTrailers ?? []) } }) + .on('error', (err) => { + if (!stream.readableEnded && typeof handler.onError === 'function') { + handler.onError(err) + } + }) } } catch (err) { stream.destroy(err) diff --git a/types/cache-interceptor.d.ts b/types/cache-interceptor.d.ts index b05e0c871cb..7fa1aeab0ed 100644 --- a/types/cache-interceptor.d.ts +++ b/types/cache-interceptor.d.ts @@ -49,7 +49,7 @@ declare namespace CacheHandler { export interface CacheStoreValue { statusCode: number; statusMessage: string; - rawHeaders: Buffer[]; + rawHeaders: (Buffer | Buffer[])[]; rawTrailers?: string[]; /** * Headers defined by the Vary header and their respective values for From 0d01767000d55d686a888dc63cb4e503e909a7e5 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 10:07:10 +0000 Subject: [PATCH 08/17] WIP --- lib/interceptor/cache.js | 46 +++++++++++++++++----------------------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index c3264a1f0af..f63ea0d27d8 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -8,7 +8,6 @@ const CacheRevalidationHandler = require('../handler/cache-revalidation-handler' const { assertCacheStore, assertCacheMethods } = require('../util/cache.js') const AGE_HEADER = Buffer.from('age') -function noop () {} /** * @param {import('../../types/cache-interceptor.d.ts').default.CacheOptions} [opts] @@ -57,6 +56,18 @@ module.exports = (opts = {}) => { assert(!stream.readableDidRead, 'stream should not be readableDidRead') try { + stream + .on('error', function (err) { + if (!this.readableEnded && typeof handler.onError === 'function') { + handler.onError(err) + } + }) + .on('close', function () { + if (!this.errored && typeof handler.onComplete === 'function') { + handler.onComplete(value.rawTrailers ?? []) + } + }) + if (typeof handler.onConnect === 'function') { handler.onConnect((err) => { stream.destroy(err) @@ -73,36 +84,19 @@ module.exports = (opts = {}) => { value.rawHeaders.push(AGE_HEADER, Buffer.from(`${age}`)) - handler.onHeaders(value.statusCode, value.rawHeaders, () => stream.resume(), value.statusMessage) - if (stream.destroyed) { - return + if (handler.onHeaders(value.statusCode, value.rawHeaders, () => stream.resume(), value.statusMessage) === false) { + stream.pause() } } if (opts.method === 'HEAD') { - if (typeof handler.onComplete === 'function') { - handler.onComplete(null) - } - stream - .on('error', noop) - .destroy() + stream.destroy() } else { - stream - .on('data', chunk => { - if (typeof handler.onData === 'function' && !handler.onData(chunk)) { - stream.pause() - } - }) - .on('end', () => { - if (typeof handler.onComplete === 'function') { - handler.onComplete(value.rawTrailers ?? []) - } - }) - .on('error', (err) => { - if (!stream.readableEnded && typeof handler.onError === 'function') { - handler.onError(err) - } - }) + stream.on('data', function (chunk) { + if (typeof handler.onData === 'function' && !handler.onData(chunk)) { + stream.pause() + } + }) } } catch (err) { stream.destroy(err) From b48d306768989c146700065296013903c9fc23b4 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 10:20:26 +0000 Subject: [PATCH 09/17] WIP --- lib/handler/cache-handler.js | 32 ++++++++++++++++++----- lib/handler/cache-revalidation-handler.js | 8 ++++++ 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/lib/handler/cache-handler.js b/lib/handler/cache-handler.js index 9a116ffa2d7..9bc9a776747 100644 --- a/lib/handler/cache-handler.js +++ b/lib/handler/cache-handler.js @@ -48,6 +48,17 @@ class CacheHandler extends DecoratorHandler { this.#handler = handler } + onConnect (abort) { + if (this.#writeStream) { + this.#writeStream.destroy() + this.#writeStream = undefined + } + + if (typeof this.#handler.onConnect === 'function') { + this.#handler.onConnect(abort) + } + } + /** * @see {DispatchHandlers.onHeaders} * @@ -136,13 +147,20 @@ class CacheHandler extends DecoratorHandler { }) if (this.#writeStream) { - this.#writeStream.on('drain', resume) - this.#writeStream.on('error', () => { - // TODO (fix): Make error somehow observable? - this.#writeStream = undefined - // TODO (fix): Should we resume even if was paused downstream? - resume() - }) + const handler = this + this.#writeStream + .on('drain', resume) + .on('error', function () { + // TODO (fix): Make error somehow observable? + }) + .on('close', function () { + if (handler.#writeStream === this) { + handler.#writeStream = undefined + } + + // TODO (fix): Should we resume even if was paused downstream? + resume() + }) } } diff --git a/lib/handler/cache-revalidation-handler.js b/lib/handler/cache-revalidation-handler.js index 7589666c4d6..01a809e487e 100644 --- a/lib/handler/cache-revalidation-handler.js +++ b/lib/handler/cache-revalidation-handler.js @@ -42,6 +42,14 @@ class CacheRevalidationHandler extends DecoratorHandler { this.#handler = handler } + onConnect (abort) { + this.#successful = false + + if (typeof this.#handler.onConnect === 'function') { + this.#handler.onConnect(abort) + } + } + /** * @see {DispatchHandlers.onHeaders} * From 74c159905102cfde883e1fe8bc887f743e12d100 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 12:04:57 +0000 Subject: [PATCH 10/17] WIP --- lib/interceptor/cache.js | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index f63ea0d27d8..4087d38bae3 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -124,24 +124,15 @@ module.exports = (opts = {}) => { // Check if the response is stale const now = Date.now() if (now >= value.staleAt) { - // TODO (fix): This whole bit is a bit suspect. In particular given that - // we dumped the body above. - - if (now >= value.deleteAt) { - // Safety check in case the store gave us a response that should've been - // deleted already - return dispatch(opts, new CacheHandler(globalOpts, opts, handler)) - } - - if (!opts.headers) { - opts.headers = {} - } - - opts.headers['if-modified-since'] = new Date(value.cachedAt).toUTCString() - // Need to revalidate the response return dispatch( - opts, + { + ...opts, + headers: { + ...opts.headers, + 'if-modified-since': new Date(value.cachedAt).toUTCString() + } + }, new CacheRevalidationHandler( () => respondWithCachedValue(stream, value), new CacheHandler(globalOpts, opts, handler) From 5cb8b0c5091de318a89e818dcd0d4ee1e5ce0365 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 12:07:30 +0000 Subject: [PATCH 11/17] WIP --- lib/interceptor/cache.js | 67 ++++++++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 27 deletions(-) diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index 4087d38bae3..2a80d529c69 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -108,46 +108,59 @@ module.exports = (opts = {}) => { */ const handleStream = (stream) => { if (!stream) { - // Request isn't cached return dispatch(opts, new CacheHandler(globalOpts, opts, handler)) } + assert(util.isStream(stream)) + // TODO (fix): It's weird that "value" lives on stream. const { value } = stream - // Dump body if cached... - // TODO (fix): This is a bit suspect. - if (util.isStream(opts.body)) { - opts.body?.on('error', () => {}).resume() - } - // Check if the response is stale const now = Date.now() - if (now >= value.staleAt) { + if (now < value.staleAt) { + // Dump body. + if (util.isStream(opts.body)) { + opts.body.on('error', () => {}).resume() + } + respondWithCachedValue(stream, value) + } else if (util.isStream(opts.body) && util.bodyLength(opts.body) !== 0) { + // If body is is stream we can't revalidate... + // TODO (fix): This could be less strict... + dispatch(opts, new CacheHandler(globalOpts, opts, handler)) + } else { // Need to revalidate the response - return dispatch( - { - ...opts, - headers: { - ...opts.headers, - 'if-modified-since': new Date(value.cachedAt).toUTCString() - } - }, - new CacheRevalidationHandler( - () => respondWithCachedValue(stream, value), - new CacheHandler(globalOpts, opts, handler) + try { + return dispatch( + { + ...opts, + headers: { + ...opts.headers, + 'if-modified-since': new Date(value.cachedAt).toUTCString() + } + }, + new CacheRevalidationHandler( + () => respondWithCachedValue(stream, value), + new CacheHandler(globalOpts, opts, handler) + ) ) - ) + } catch (err) { + if (typeof handler.onError === 'function') { + handler.onError(err) + } + } } - - respondWithCachedValue(stream, value) } - Promise.resolve(stream).then(handleStream, err => { - if (typeof handler.onError === 'function') { - handler.onError(err) - } - }) + if (util.isStream(stream)) { + handleStream(stream) + } else { + Promise.resolve(stream).then(handleStream, err => { + if (typeof handler.onError === 'function') { + handler.onError(err) + } + }) + } return true } From d92b39e1f148a99ec9017b3252d225c5362410fb Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 12:15:41 +0000 Subject: [PATCH 12/17] WIP --- lib/interceptor/cache.js | 52 +++++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index 2a80d529c69..70ed899bb0b 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -58,8 +58,12 @@ module.exports = (opts = {}) => { try { stream .on('error', function (err) { - if (!this.readableEnded && typeof handler.onError === 'function') { - handler.onError(err) + if (!this.readableEnded) { + if (typeof handler.onError === 'function') { + handler.onError(err) + } else { + throw err + } } }) .on('close', function () { @@ -116,22 +120,22 @@ module.exports = (opts = {}) => { // TODO (fix): It's weird that "value" lives on stream. const { value } = stream - // Check if the response is stale - const now = Date.now() - if (now < value.staleAt) { - // Dump body. - if (util.isStream(opts.body)) { - opts.body.on('error', () => {}).resume() - } - respondWithCachedValue(stream, value) - } else if (util.isStream(opts.body) && util.bodyLength(opts.body) !== 0) { - // If body is is stream we can't revalidate... - // TODO (fix): This could be less strict... - dispatch(opts, new CacheHandler(globalOpts, opts, handler)) - } else { - // Need to revalidate the response - try { - return dispatch( + try { + // Check if the response is stale + const now = Date.now() + if (now < value.staleAt) { + // Dump body. + if (util.isStream(opts.body)) { + opts.body.on('error', () => {}).resume() + } + respondWithCachedValue(stream, value) + } else if (util.isStream(opts.body) && util.bodyLength(opts.body) !== 0) { + // If body is is stream we can't revalidate... + // TODO (fix): This could be less strict... + dispatch(opts, new CacheHandler(globalOpts, opts, handler)) + } else { + // Need to revalidate the response + dispatch( { ...opts, headers: { @@ -144,10 +148,12 @@ module.exports = (opts = {}) => { new CacheHandler(globalOpts, opts, handler) ) ) - } catch (err) { - if (typeof handler.onError === 'function') { - handler.onError(err) - } + } + } catch (err) { + if (typeof handler.onError === 'function') { + handler.onError(err) + } else { + throw err } } } @@ -158,6 +164,8 @@ module.exports = (opts = {}) => { Promise.resolve(stream).then(handleStream, err => { if (typeof handler.onError === 'function') { handler.onError(err) + } else { + throw err } }) } From 46aac2c792ab981846df32179098dfdbfd5d669a Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 12:17:29 +0000 Subject: [PATCH 13/17] WIP --- lib/interceptor/cache.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index 70ed899bb0b..dababaf4b0d 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -9,6 +9,10 @@ const { assertCacheStore, assertCacheMethods } = require('../util/cache.js') const AGE_HEADER = Buffer.from('age') +/** + * @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStoreValue} CacheStoreValue + */ + /** * @param {import('../../types/cache-interceptor.d.ts').default.CacheOptions} [opts] * @returns {import('../../types/dispatcher.d.ts').default.DispatcherComposeInterceptor} @@ -48,7 +52,7 @@ module.exports = (opts = {}) => { } /** - * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreReadable} stream + * @param {import('node:stream').Readable} stream * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreValue} value */ const respondWithCachedValue = (stream, value) => { @@ -108,7 +112,7 @@ module.exports = (opts = {}) => { } /** - * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreReadable | undefined} stream + * @param {import('node:stream').Readable & { value: CacheStoreValue } | undefined} stream */ const handleStream = (stream) => { if (!stream) { From 05634ea28747add98a5304e14de8a030b1a9157b Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 12:19:06 +0000 Subject: [PATCH 14/17] WIP --- lib/interceptor/cache.js | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index dababaf4b0d..dcd40330f81 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -112,18 +112,15 @@ module.exports = (opts = {}) => { } /** - * @param {import('node:stream').Readable & { value: CacheStoreValue } | undefined} stream + * @param {import('node:stream').Readable | undefined} stream + * @param {CacheStoreValue | undefined} value */ - const handleStream = (stream) => { - if (!stream) { + const handleStream = (stream, value) => { + if (!stream || !value) { + stream?.on('error', () => {}).destroy() return dispatch(opts, new CacheHandler(globalOpts, opts, handler)) } - assert(util.isStream(stream)) - - // TODO (fix): It's weird that "value" lives on stream. - const { value } = stream - try { // Check if the response is stale const now = Date.now() @@ -163,9 +160,9 @@ module.exports = (opts = {}) => { } if (util.isStream(stream)) { - handleStream(stream) + handleStream(stream, stream.value) } else { - Promise.resolve(stream).then(handleStream, err => { + Promise.resolve(stream).then(stream => handleStream(stream, stream?.value), err => { if (typeof handler.onError === 'function') { handler.onError(err) } else { From cd343bdc7281fddb213a9ce5be8da702af931222 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 12:20:58 +0000 Subject: [PATCH 15/17] fixuP --- lib/interceptor/cache.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index dcd40330f81..c7409cc141c 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -90,9 +90,10 @@ module.exports = (opts = {}) => { // https://www.rfc-editor.org/rfc/rfc9111.html#name-age const age = Math.round((Date.now() - value.cachedAt) / 1000) - value.rawHeaders.push(AGE_HEADER, Buffer.from(`${age}`)) + // TODO (fix): What if rawHeaders already contains age header? + const rawHeaders = [...value.rawHeaders, AGE_HEADER, Buffer.from(`${age}`)] - if (handler.onHeaders(value.statusCode, value.rawHeaders, () => stream.resume(), value.statusMessage) === false) { + if (handler.onHeaders(value.statusCode, rawHeaders, () => stream.resume(), value.statusMessage) === false) { stream.pause() } } From 27674560ba61cf0e0a5eca7c32bf133d1d980447 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 12:28:07 +0000 Subject: [PATCH 16/17] WIP --- lib/handler/cache-revalidation-handler.js | 23 +++++++++++++---------- lib/interceptor/cache.js | 8 +++++++- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/lib/handler/cache-revalidation-handler.js b/lib/handler/cache-revalidation-handler.js index 01a809e487e..14560baf859 100644 --- a/lib/handler/cache-revalidation-handler.js +++ b/lib/handler/cache-revalidation-handler.js @@ -19,32 +19,33 @@ const DecoratorHandler = require('../handler/decorator-handler') class CacheRevalidationHandler extends DecoratorHandler { #successful = false /** - * @type {(() => void)} + * @type {((boolean) => void)} */ - #successCallback + #callback /** * @type {(import('../../types/dispatcher.d.ts').default.DispatchHandlers)} */ #handler /** - * @param {() => void} successCallback Function to call if the cached value is valid + * @param {(boolean) => void} callback Function to call if the cached value is valid * @param {import('../../types/dispatcher.d.ts').default.DispatchHandlers} handler */ - constructor (successCallback, handler) { - if (typeof successCallback !== 'function') { - throw new TypeError('successCallback must be a function') + constructor (callback, handler) { + if (typeof callback !== 'function') { + throw new TypeError('callback must be a function') } super(handler) - this.#successCallback = successCallback + this.#callback = callback this.#handler = handler } onConnect (abort) { this.#successful = false + // TODO (fix): This is suspect... if (typeof this.#handler.onConnect === 'function') { this.#handler.onConnect(abort) } @@ -107,9 +108,9 @@ class CacheRevalidationHandler extends DecoratorHandler { * @param {string[] | null} rawTrailers */ onComplete (rawTrailers) { - if (this.#successful) { - this.#successCallback() - } else if (typeof this.#handler.onComplete === 'function') { + this.#callback(this.#successful) + + if (!this.#successful && typeof this.#handler.onComplete === 'function') { this.#handler.onComplete(rawTrailers) } } @@ -120,6 +121,8 @@ class CacheRevalidationHandler extends DecoratorHandler { * @param {Error} err */ onError (err) { + this.#callback(false) + if (typeof this.#handler.onError === 'function') { this.#handler.onError(err) } diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index c7409cc141c..b56c32c5545 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -146,7 +146,13 @@ module.exports = (opts = {}) => { } }, new CacheRevalidationHandler( - () => respondWithCachedValue(stream, value), + (success) => { + if (success) { + respondWithCachedValue(stream, value) + } else { + stream.on('error', () => {}).destroy() + } + }, new CacheHandler(globalOpts, opts, handler) ) ) From 3241ad92013c7f750424cdf4a18b97d3d6ece993 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Nov 2024 12:38:13 +0000 Subject: [PATCH 17/17] WIP --- lib/handler/cache-revalidation-handler.js | 40 ++++++++---- lib/interceptor/cache.js | 80 ++++++++++++----------- 2 files changed, 70 insertions(+), 50 deletions(-) diff --git a/lib/handler/cache-revalidation-handler.js b/lib/handler/cache-revalidation-handler.js index 14560baf859..729cb57d03d 100644 --- a/lib/handler/cache-revalidation-handler.js +++ b/lib/handler/cache-revalidation-handler.js @@ -1,5 +1,6 @@ 'use strict' +const assert = require('node:assert') const DecoratorHandler = require('../handler/decorator-handler') /** @@ -19,7 +20,7 @@ const DecoratorHandler = require('../handler/decorator-handler') class CacheRevalidationHandler extends DecoratorHandler { #successful = false /** - * @type {((boolean) => void)} + * @type {((boolean) => void) | null} */ #callback /** @@ -27,6 +28,7 @@ class CacheRevalidationHandler extends DecoratorHandler { */ #handler + #abort /** * @param {(boolean) => void} callback Function to call if the cached value is valid * @param {import('../../types/dispatcher.d.ts').default.DispatchHandlers} handler @@ -44,11 +46,7 @@ class CacheRevalidationHandler extends DecoratorHandler { onConnect (abort) { this.#successful = false - - // TODO (fix): This is suspect... - if (typeof this.#handler.onConnect === 'function') { - this.#handler.onConnect(abort) - } + this.#abort = abort } /** @@ -66,12 +64,21 @@ class CacheRevalidationHandler extends DecoratorHandler { resume, statusMessage ) { + assert(this.#callback != null) + // https://www.rfc-editor.org/rfc/rfc9111.html#name-handling-a-validation-respo - if (statusCode === 304) { - this.#successful = true + this.#successful = statusCode === 304 + this.#callback(this.#successful) + this.#callback = null + + if (this.#successful) { return true } + if (typeof this.#handler.onConnect === 'function') { + this.#handler.onConnect(this.#abort) + } + if (typeof this.#handler.onHeaders === 'function') { return this.#handler.onHeaders( statusCode, @@ -108,9 +115,11 @@ class CacheRevalidationHandler extends DecoratorHandler { * @param {string[] | null} rawTrailers */ onComplete (rawTrailers) { - this.#callback(this.#successful) + if (this.#successful) { + return + } - if (!this.#successful && typeof this.#handler.onComplete === 'function') { + if (typeof this.#handler.onComplete === 'function') { this.#handler.onComplete(rawTrailers) } } @@ -121,10 +130,19 @@ class CacheRevalidationHandler extends DecoratorHandler { * @param {Error} err */ onError (err) { - this.#callback(false) + if (this.#successful) { + return + } + + if (this.#callback) { + this.#callback(false) + this.#callback = null + } if (typeof this.#handler.onError === 'function') { this.#handler.onError(err) + } else { + throw err } } } diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index b56c32c5545..945eeeb924a 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -39,12 +39,18 @@ module.exports = (opts = {}) => { return dispatch => { return (opts, handler) => { + // TODO (fix): What if e.g. opts.headers has if-modified-since header? Or other headers + // that make things ambigious? + if (!opts.origin || safeMethodsToNotCache.includes(opts.method)) { // Not a method we want to cache or we don't have the origin, skip return dispatch(opts, handler) } // TODO (perf): For small entries support returning a Buffer instead of a stream. + // Maybe store should return { staleAt, headers, body, etc... } instead of a stream + stream.value? + // Where body can be a Buffer, string, stream or blob? + const stream = store.createReadStream(opts) if (!stream) { // Request isn't cached @@ -66,8 +72,12 @@ module.exports = (opts = {}) => { if (typeof handler.onError === 'function') { handler.onError(err) } else { - throw err + process.nextTick(() => { + throw err + }) } + } else { + // Ignore error... } }) .on('close', function () { @@ -122,47 +132,39 @@ module.exports = (opts = {}) => { return dispatch(opts, new CacheHandler(globalOpts, opts, handler)) } - try { - // Check if the response is stale - const now = Date.now() - if (now < value.staleAt) { - // Dump body. - if (util.isStream(opts.body)) { - opts.body.on('error', () => {}).resume() - } - respondWithCachedValue(stream, value) - } else if (util.isStream(opts.body) && util.bodyLength(opts.body) !== 0) { - // If body is is stream we can't revalidate... - // TODO (fix): This could be less strict... - dispatch(opts, new CacheHandler(globalOpts, opts, handler)) - } else { - // Need to revalidate the response - dispatch( - { - ...opts, - headers: { - ...opts.headers, - 'if-modified-since': new Date(value.cachedAt).toUTCString() + // Check if the response is stale + const now = Date.now() + if (now < value.staleAt) { + // Dump request body. + if (util.isStream(opts.body)) { + opts.body.on('error', () => {}).destroy() + } + respondWithCachedValue(stream, value) + } else if (util.isStream(opts.body) && util.bodyLength(opts.body) !== 0) { + // If body is is stream we can't revalidate... + // TODO (fix): This could be less strict... + dispatch(opts, new CacheHandler(globalOpts, opts, handler)) + } else { + // Need to revalidate the response + dispatch( + { + ...opts, + headers: { + ...opts.headers, + 'if-modified-since': new Date(value.cachedAt).toUTCString() + } + }, + new CacheRevalidationHandler( + (success) => { + if (success) { + respondWithCachedValue(stream, value) + } else { + stream.on('error', () => {}).destroy() } }, - new CacheRevalidationHandler( - (success) => { - if (success) { - respondWithCachedValue(stream, value) - } else { - stream.on('error', () => {}).destroy() - } - }, - new CacheHandler(globalOpts, opts, handler) - ) + new CacheHandler(globalOpts, opts, handler) ) - } - } catch (err) { - if (typeof handler.onError === 'function') { - handler.onError(err) - } else { - throw err - } + ) } }