Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support request cache control directives #3658

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 104 additions & 8 deletions lib/interceptor/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,74 @@ const util = require('../core/util')
const CacheHandler = require('../handler/cache-handler')
const MemoryCacheStore = require('../cache/memory-cache-store')
const CacheRevalidationHandler = require('../handler/cache-revalidation-handler')
const { assertCacheStore, assertCacheMethods } = require('../util/cache.js')
const { assertCacheStore, assertCacheMethods, parseCacheControlHeader } = require('../util/cache.js')

const AGE_HEADER = Buffer.from('age')

/**
* @param {import('../../types/dispatcher.d.ts').default.DispatchHandlers} handler
*/
function sendGatewayTimeout (handler) {
const ac = new AbortController()
const signal = ac.signal

try {
if (typeof handler.onConnect === 'function') {
handler.onConnect(ac.abort)
signal.throwIfAborted()
}

if (typeof handler.onHeaders === 'function') {
handler.onHeaders(504, [], () => {}, 'Gateway Timeout')
signal.throwIfAborted()
}

if (typeof handler.onComplete === 'function') {
handler.onComplete([])
}
} catch (err) {
if (typeof handler.onError === 'function') {
handler.onError(err)
}
}
}

/**
* @param {number} now
* @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreValue} value
* @param {number} age
* @param {import('../util/cache.js').CacheControlDirectives} cacheControlDirectives
*/
function needsRevalidation (now, value, age, cacheControlDirectives) {
if (cacheControlDirectives?.['no-cache']) {
// Always revalidate requests with the no-cache parameter
return true
}

if (now > value.staleAt) {
// Response is stale
if (cacheControlDirectives?.['max-stale']) {
// https://www.rfc-editor.org/rfc/rfc9111.html#name-max-stale
const gracePeriod = value.staleAt + (cacheControlDirectives['max-stale'] * 1000)
return now > gracePeriod
}

return true
}

if (cacheControlDirectives?.['min-fresh']) {
// https://www.rfc-editor.org/rfc/rfc9111.html#section-5.2.1.3

// At this point, staleAt is always > now
const timeLeftTillStale = value.staleAt - now
const threshold = cacheControlDirectives['min-fresh'] * 1000

return timeLeftTillStale <= threshold
}

return false
}

/**
* @param {import('../../types/cache-interceptor.d.ts').default.CacheOptions} [opts]
* @returns {import('../../types/dispatcher.d.ts').default.DispatcherComposeInterceptor}
Expand Down Expand Up @@ -39,22 +103,41 @@ module.exports = (opts = {}) => {
return dispatch(opts, handler)
}

const requestCacheControl = opts.headers?.['cache-control']
? parseCacheControlHeader(opts.headers['cache-control'])
: undefined

if (requestCacheControl?.['no-store']) {
return dispatch(opts, handler)
}

const stream = store.createReadStream(opts)
if (!stream) {
// Request isn't cached

if (requestCacheControl?.['only-if-cached']) {
// We only want cached responses
// https://www.rfc-editor.org/rfc/rfc9111.html#name-only-if-cached
sendGatewayTimeout(handler)
return true
}

return dispatch(opts, new CacheHandler(globalOpts, opts, handler))
}

let onErrorCalled = false
const now = Date.now()

/**
* @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreReadable} stream
* @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreValue} value
* @param {number} age
*/
const respondWithCachedValue = (stream, value) => {
const respondWithCachedValue = (stream, value, age) => {
const ac = new AbortController()
const signal = ac.signal

let onErrorCalled = false

signal.onabort = (_, err) => {
stream.destroy()
if (!onErrorCalled) {
Expand All @@ -79,8 +162,6 @@ module.exports = (opts = {}) => {
if (typeof handler.onHeaders === 'function') {
// Add the age header
// https://www.rfc-editor.org/rfc/rfc9111.html#name-age
const age = Math.round((Date.now() - value.cachedAt) / 1000)

value.rawHeaders.push(AGE_HEADER, Buffer.from(`${age}`))

handler.onHeaders(value.statusCode, value.rawHeaders, stream.resume, value.statusMessage)
Expand Down Expand Up @@ -122,19 +203,34 @@ module.exports = (opts = {}) => {
const handleStream = (stream) => {
if (!stream) {
// Request isn't cached

if (requestCacheControl?.['only-if-cached']) {
// We only want cached responses
// https://www.rfc-editor.org/rfc/rfc9111.html#name-only-if-cached
sendGatewayTimeout(handler)
return
}

return dispatch(opts, new CacheHandler(globalOpts, opts, handler))
}

const { value } = stream

const age = Math.round((now - value.cachedAt) / 1000)
if (requestCacheControl?.['max-age'] && age >= requestCacheControl['max-age']) {
// Response is considered expired for this specific request
// https://www.rfc-editor.org/rfc/rfc9111.html#section-5.2.1.1
dispatch(opts, handler)
return
}

// Dump body on error
if (util.isStream(opts.body)) {
opts.body?.on('error', () => {}).resume()
}

// Check if the response is stale
const now = Date.now()
if (now >= value.staleAt) {
if (needsRevalidation(now, value, age, requestCacheControl)) {
if (now >= value.deleteAt) {
// Safety check in case the store gave us a response that should've been
// deleted already
Expand All @@ -160,7 +256,7 @@ module.exports = (opts = {}) => {
return
}

respondWithCachedValue(stream, value)
respondWithCachedValue(stream, value, age)
}

Promise.resolve(stream).then(handleStream).catch(handler.onError)
Expand Down
Loading
Loading