Skip to content

Commit

Permalink
[Streaming][Bugfix] Fix error(s) on services not being modified befor…
Browse files Browse the repository at this point in the history
…e cache TTL

On services not changing much, using the streaming would cause some
errors with context.canceled being returned.

This was caused by TTL of cache being lower than entry TTL.
With streaming, there is no garantee that data will be updated before
10 minutes, thus cache was expiring while request was stil on flight.

We thus ensure that at least a NotModified data will be issued before
the cache does expires.

Another problem that was when the data was being updated because
the cache background routine was stopped because entry did expire.

A new error `ErrCacheRefreshRoutineStopped` has been added to ensure
this result cannot pollute the cache with wrong data.

Added unit test

Be sure to avoid return Valid entry while not present

Protect from cache failures

When TTL expires during the initial fetch of streaming, implementation could return invalid data

More complete fix: avoid the case where cache could not be initialized before streaming has been init

Added test case

Use correct condition for invalid cache entry

Minor improbements for new unit test `TestStreamingHealthServices_IntegrationWithCache_Expiry`.
  • Loading branch information
pierresouchay committed Jan 19, 2021
1 parent ca2c3eb commit 78031c6
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 8 deletions.
5 changes: 4 additions & 1 deletion agent/cache-types/streaming_health_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ type StreamingHealthServices struct {
// so using a shorter TTL ensures the cache entry expires sooner.
func (c *StreamingHealthServices) RegisterOptions() cache.RegisterOptions {
opts := c.RegisterOptionsBlockingRefresh.RegisterOptions()
opts.LastGetTTL = 10 * time.Minute
// This has to be greater than opts.QueryTimeout to avoid cache being lost
// while blocking
opts.LastGetTTL = opts.QueryTimeout + 10*time.Second

return opts
}

Expand Down
81 changes: 81 additions & 0 deletions agent/cache-types/streaming_health_services_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cachetype

import (
"context"
"errors"
"fmt"
"strings"
Expand Down Expand Up @@ -560,3 +561,83 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.FailNow()
}
}

func TestStreamingHealthServices_IntegrationWithCache_Expiry(t *testing.T) {
namespace := getNamespace("ns2")

client := NewTestStreamingClient(namespace)
typ := StreamingHealthServices{deps: MaterializerDeps{
Client: client,
Logger: hclog.Default(),
}}

c := cache.New(cache.Options{})
c.RegisterType(StreamingHealthServicesName, &shortTTL{typ})

batchEv := newEventBatchWithEvents(
newEventServiceHealthRegister(5, 1, "web"))
client.QueueEvents(
batchEv,
newEndOfSnapshotEvent(5))

req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.EnterpriseMetaInitializer(namespace),
}
req.MinQueryIndex = 1
req.MaxQueryTime = time.Second

ctx := context.Background()

runStep(t, "initial fetch of results", func(t *testing.T) {
res, meta, err := c.Get(ctx, StreamingHealthServicesName, req)
require.NoError(t, err)
require.Equal(t, uint64(5), meta.Index)
require.NotNil(t, res)
req.MinQueryIndex = meta.Index
})

runStep(t, "request should block, and hit the cache TTL", func(t *testing.T) {
start := time.Now()
res, meta, err := c.Get(ctx, StreamingHealthServicesName, req)
require.NoError(t, err)
require.Equal(t, uint64(5), meta.Index)
require.Greater(t, uint64(time.Since(start)), uint64(req.MaxQueryTime))
require.NotNil(t, res)
// Will force immediate reponse for next call
req.MinQueryIndex = meta.Index - 1
})

runStep(t, "the next request should succeed", func(t *testing.T) {
res, meta, err := c.Get(ctx, StreamingHealthServicesName, req)
require.NoError(t, err)
require.Equal(t, uint64(5), meta.Index)
require.NotNil(t, res)
req.MinQueryIndex = meta.Index
})

// We sleep, so cache entry will likely expire during blocking query
// If this test is unstable, probably a bug has been introduced
time.Sleep(1 * time.Second)

runStep(t, "cache might expire during the blocking query", func(t *testing.T) {
start := time.Now()
res, meta, err := c.Get(ctx, StreamingHealthServicesName, req)
require.NoError(t, err)
require.Greater(t, uint64(time.Since(start)), uint64(req.MaxQueryTime))
require.Equal(t, uint64(5), meta.Index)
require.NotNil(t, res)
req.MinQueryIndex = meta.Index - 1
})
}

type shortTTL struct {
StreamingHealthServices
}

func (s *shortTTL) RegisterOptions() cache.RegisterOptions {
opts := s.RegisterOptionsBlockingRefresh.RegisterOptions()
opts.LastGetTTL = 1*time.Second + 100*time.Millisecond
return opts
}
43 changes: 37 additions & 6 deletions agent/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (c *Cache) getEntryLocked(
info RequestInfo,
) (entryExists bool, entryValid bool, entry cacheEntry) {
entry, ok := c.entries[key]
if !entry.Valid {
if !entry.Valid || !ok {
return ok, false, entry
}

Expand All @@ -357,22 +357,22 @@ func (c *Cache) getEntryLocked(
if tEntry.Opts.SupportsBlocking && info.MinIndex > 0 && info.MinIndex >= entry.Index {
// MinIndex was given and matches or is higher than current value so we
// ignore the cache and fallthrough to blocking on a new value below.
return true, false, entry
return ok, false, entry
}

// Check MaxAge is not exceeded if this is not a background refreshing type
// and MaxAge was specified.
if !tEntry.Opts.Refresh && info.MaxAge > 0 && entryExceedsMaxAge(info.MaxAge, entry) {
return true, false, entry
return ok, false, entry
}

// Check if re-validate is requested. If so the first time round the
// loop is not a hit but subsequent ones should be treated normally.
if !tEntry.Opts.Refresh && info.MustRevalidate {
return true, false, entry
return ok, false, entry
}

return true, true, entry
return ok, ok, entry
}

func entryExceedsMaxAge(maxAge time.Duration, entry cacheEntry) bool {
Expand Down Expand Up @@ -491,6 +491,13 @@ RETRY_GET:
goto RETRY_GET

case <-timeoutCh:
if entry.Index == 0 {
// This might happen with streaming: if fetch was stopped while
// not having fully received data (if fetch is very close from TTL end)
// In any case, looks like a legit safety guard because it avoids
// Returning index with empty result, will return an HTTP 500 instead
return nil, ResultMeta{}, ErrCacheFetchFailure
}
// Timeout on the cache read, just return whatever we have.
return entry.Value, ResultMeta{Index: entry.Index}, nil
}
Expand All @@ -510,9 +517,15 @@ func makeEntryKey(t, dc, token, key string) string {
// if the entry doesn't exist. This latter case is to support refreshing.
func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ignoreExisting bool) <-chan struct{} {
// We acquire a write lock because we may have to set Fetching to true.
tEntry := r.TypeEntry
c.entriesLock.Lock()
defer c.entriesLock.Unlock()
ok, entryValid, entry := c.getEntryLocked(r.TypeEntry, key, r.Info)
expiryTime := entry.Expiry.Expiry()
if ok && entryValid && tEntry.Opts.SupportsBlocking && expiryTime.Sub(time.Now()) < time.Second {
// This will avoid that the entry does expires while performing streaming fetches
c.entriesExpiryHeap.Update(entry.Expiry.Index(), r.TypeEntry.Opts.LastGetTTL)
}

// This handles the case where a fetch succeeded after checking for its existence in
// getWithIndex. This ensures that we don't miss updates.
Expand Down Expand Up @@ -557,7 +570,6 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
c.entries[key] = entry
metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries)))

tEntry := r.TypeEntry
// The actual Fetch must be performed in a goroutine.
go func() {
// If we have background refresh and currently are in "disconnected" state,
Expand Down Expand Up @@ -589,6 +601,22 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
if fOpts.Timeout == 0 {
fOpts.Timeout = 10 * time.Minute
}
// We want to refresh before the TTL of cache entry in any case
refreshBefore := expiryTime.Sub(time.Now())
if refreshBefore <= 0 {
refreshBefore = tEntry.Opts.LastGetTTL
}

if refreshBefore > time.Second {
// Lets timeout at least 1 second before the cache entry ends
refreshBefore -= time.Second
} else {
// Less than 1 second, let's try to go fast
refreshBefore /= 2
}
if refreshBefore < fOpts.Timeout {
fOpts.Timeout = refreshBefore
}
}
if entry.Valid {
fOpts.LastResult = &FetchResult{
Expand All @@ -609,6 +637,9 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
if connectedTimer != nil {
connectedTimer.Stop()
}
if err == ErrCacheRefreshRoutineStopped {
return
}

// Copy the existing entry to start.
newEntry := entry
Expand Down
14 changes: 14 additions & 0 deletions agent/cache/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,20 @@ import (
"time"
)

type cacheError string

// Error describe the error.
func (err cacheError) Error() string {
return string(err)
}

const (
// ErrCacheFetchFailure when entry could not be retrived on time in cache
ErrCacheFetchFailure = cacheError("Could not fetch entry from cache")
// ErrCacheRefreshRoutineStopped to discard result from cache.
ErrCacheRefreshRoutineStopped = cacheError("Cache Refresh Routine Stopped")
)

// Type implements the logic to fetch certain types of data.
type Type interface {
// Fetch fetches a single unique item.
Expand Down
4 changes: 3 additions & 1 deletion agent/submatview/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,12 @@ func (m *Materializer) Fetch(done <-chan struct{}, opts cache.FetchOptions) (cac

case <-timeoutCh:
// Just return whatever we got originally, might still be empty
result.NotModified = true
return result, nil

case <-done:
return result, context.Canceled
// This happens when cache entry is cleared and will avoid repopulating cache
return result, cache.ErrCacheRefreshRoutineStopped
}
}
}
12 changes: 12 additions & 0 deletions lib/ttlcache/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ func (e *Entry) Key() string {
return e.key
}

// ExpiryNotSet is return when Expiry() is not set
var ExpiryNotSet = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)

// Expiry returns the expiration time of entry in the heap.
// If not set, will return ExpiryNotSet.
func (e *Entry) Expiry() time.Time {
if e == nil {
return ExpiryNotSet
}
return e.expiry
}

// ExpiryHeap is a heap that is ordered by the expiry time of entries. It may
// be used by a cache or storage to expiry items after a TTL.
//
Expand Down

0 comments on commit 78031c6

Please sign in to comment.