Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(dp-server): remove metadata tracker #12882

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 0 additions & 50 deletions pkg/xds/server/callbacks/dataplane_metadata_tracker.go

This file was deleted.

80 changes: 0 additions & 80 deletions pkg/xds/server/callbacks/dataplane_metadata_tracker_test.go

This file was deleted.

45 changes: 31 additions & 14 deletions pkg/xds/server/callbacks/dataplane_sync_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,20 @@ package callbacks
import (
"context"
stdsync "sync"
"sync/atomic"

"github.com/kumahq/kuma/pkg/core"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3"
"github.com/kumahq/kuma/pkg/xds/sync"
)

var dataplaneSyncTrackerLog = core.Log.WithName("xds").WithName("dataplane-sync-tracker")

type NewDataplaneWatchdogFunc func(key core_model.ResourceKey) util_xds_v3.Watchdog

func NewDataplaneSyncTracker(factoryFunc NewDataplaneWatchdogFunc) DataplaneCallbacks {
func NewDataplaneSyncTracker(factoryFunc sync.DataplaneWatchdogFactory) DataplaneCallbacks {
return &dataplaneSyncTracker{
newDataplaneWatchdog: factoryFunc,
watchdogs: map[core_model.ResourceKey]context.CancelFunc{},
watchdogs: map[core_model.ResourceKey]entry{},
}
}

Expand All @@ -29,36 +28,54 @@ var _ DataplaneCallbacks = &dataplaneSyncTracker{}
//
// Node info can be (but does not have to be) carried only on the first XDS request. That's why need streamsAssociation map
// that indicates that the stream was already associated

type entry struct {
cancelFunc context.CancelFunc
meta *atomic.Pointer[core_xds.DataplaneMetadata]
}
type dataplaneSyncTracker struct {
NoopDataplaneCallbacks

newDataplaneWatchdog NewDataplaneWatchdogFunc
newDataplaneWatchdog sync.DataplaneWatchdogFactory

stdsync.RWMutex // protects access to the fields below
watchdogs map[core_model.ResourceKey]context.CancelFunc
watchdogs map[core_model.ResourceKey]entry
}

func (t *dataplaneSyncTracker) OnProxyConnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey, _ context.Context, _ core_xds.DataplaneMetadata) error {
func (t *dataplaneSyncTracker) OnProxyConnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey, _ context.Context, meta core_xds.DataplaneMetadata) error {
// We use OnProxyConnected because there should be only one watchdog for given dataplane.
t.Lock()
defer t.Unlock()

ctx, cancel := context.WithCancel(context.Background())
t.watchdogs[dpKey] = func() {
dataplaneSyncTrackerLog.V(1).Info("stopping Watchdog for a Dataplane", "dpKey", dpKey, "streamID", streamID)
cancel()
t.watchdogs[dpKey] = entry{
cancelFunc: func() {
dataplaneSyncTrackerLog.V(1).Info("stopping Watchdog for a Dataplane", "dpKey", dpKey, "streamID", streamID)
cancel()
},
meta: &atomic.Pointer[core_xds.DataplaneMetadata]{},
}
t.watchdogs[dpKey].meta.Store(&meta)
dataplaneSyncTrackerLog.V(1).Info("starting Watchdog for a Dataplane", "dpKey", dpKey, "streamID", streamID)
//nolint:contextcheck // it's not clear how the parent go-control-plane context lives
go t.newDataplaneWatchdog(dpKey).Start(ctx)
go t.newDataplaneWatchdog.New(dpKey, t.watchdogs[dpKey].meta.Load).Start(ctx)
return nil
}

func (t *dataplaneSyncTracker) OnProxyReconnected(_ core_xds.StreamID, dpKey core_model.ResourceKey, _ context.Context, meta core_xds.DataplaneMetadata) error {
t.RLock()
defer t.RUnlock()
if e, ok := t.watchdogs[dpKey]; ok {
e.meta.Store(&meta)
}
return nil
}

func (t *dataplaneSyncTracker) OnProxyDisconnected(_ context.Context, _ core_xds.StreamID, dpKey core_model.ResourceKey) {
t.Lock()
defer t.Unlock()
if cancelFn := t.watchdogs[dpKey]; cancelFn != nil {
cancelFn()
if e, exists := t.watchdogs[dpKey]; exists {
e.cancelFunc()
}
delete(t.watchdogs, dpKey)
}
10 changes: 6 additions & 4 deletions pkg/xds/server/callbacks/dataplane_sync_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
. "github.com/onsi/gomega"

core_model "github.com/kumahq/kuma/pkg/core/resources/model"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
"github.com/kumahq/kuma/pkg/test"
util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3"
. "github.com/kumahq/kuma/pkg/xds/server/callbacks"
"github.com/kumahq/kuma/pkg/xds/sync"
)

var _ = Describe("Sync", func() {
Expand Down Expand Up @@ -76,13 +78,13 @@ var _ = Describe("Sync", func() {
watchdogCh := make(chan core_model.ResourceKey)

// setup
tracker := NewDataplaneSyncTracker(func(key core_model.ResourceKey) util_xds_v3.Watchdog {
tracker := NewDataplaneSyncTracker(sync.DataplaneWatchdogFactoryFunc(func(key core_model.ResourceKey, fetchMeta func() *core_xds.DataplaneMetadata) util_xds_v3.Watchdog {
return WatchdogFunc(func(ctx context.Context) {
watchdogCh <- key
<-ctx.Done()
close(watchdogCh)
})
})
}))
callbacks := util_xds_v3.AdaptCallbacks(DataplaneCallbacksToXdsCallbacks(tracker))

// given
Expand Down Expand Up @@ -130,13 +132,13 @@ var _ = Describe("Sync", func() {
It("should start only one watchdog per dataplane", func() {
// setup
var activeWatchdogs int32
tracker := NewDataplaneSyncTracker(func(key core_model.ResourceKey) util_xds_v3.Watchdog {
tracker := NewDataplaneSyncTracker(sync.DataplaneWatchdogFactoryFunc(func(key core_model.ResourceKey, _ func() *core_xds.DataplaneMetadata) util_xds_v3.Watchdog {
return WatchdogFunc(func(ctx context.Context) {
atomic.AddInt32(&activeWatchdogs, 1)
<-ctx.Done()
atomic.AddInt32(&activeWatchdogs, -1)
})
})
}))
callbacks := util_xds_v3.AdaptCallbacks(DataplaneCallbacksToXdsCallbacks(tracker))

// when one stream for backend-01 is connected and request is sent
Expand Down
6 changes: 2 additions & 4 deletions pkg/xds/server/v3/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ func RegisterXDS(
authenticator := rt.XDS().PerProxyTypeAuthenticator()
authCallbacks := auth.NewCallbacks(rt.ReadOnlyResourceManager(), authenticator, auth.DPNotFoundRetry{}) // no need to retry on DP Not Found because we are creating DP in DataplaneLifecycle callback

metadataTracker := xds_callbacks.NewDataplaneMetadataTracker()
reconciler := DefaultReconciler(rt, xdsContext, statsCallbacks)
ingressReconciler := DefaultIngressReconciler(rt, xdsContext, statsCallbacks)
egressReconciler := DefaultEgressReconciler(rt, xdsContext, statsCallbacks)
watchdogFactory, err := xds_sync.DefaultDataplaneWatchdogFactory(rt, metadataTracker, reconciler, ingressReconciler, egressReconciler, xdsMetrics, envoyCpCtx, envoy_common.APIV3)
watchdogFactory, err := xds_sync.DefaultDataplaneWatchdogFactory(rt, reconciler, ingressReconciler, egressReconciler, xdsMetrics, envoyCpCtx, envoy_common.APIV3)
if err != nil {
return err
}
Expand All @@ -52,11 +51,10 @@ func RegisterXDS(
util_xds_v3.NewControlPlaneIdCallbacks(rt.GetInstanceId()),
util_xds_v3.AdaptCallbacks(statsCallbacks),
util_xds_v3.AdaptCallbacks(authCallbacks),
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(metadataTracker)),
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(
xds_callbacks.NewDataplaneLifecycle(rt.AppContext(), rt.ResourceManager(), authenticator, rt.Config().XdsServer.DataplaneDeregistrationDelay.Duration, rt.GetInstanceId(), rt.Config().Store.Cache.ExpirationTime.Duration)),
),
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(xds_callbacks.NewDataplaneSyncTracker(watchdogFactory.New))),
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(xds_callbacks.NewDataplaneSyncTracker(watchdogFactory))),
util_xds_v3.AdaptCallbacks(DefaultDataplaneStatusTracker(rt, envoyCpCtx.Secrets)),
util_xds_v3.AdaptCallbacks(xds_callbacks.NewNackBackoff(rt.Config().XdsServer.NACKBackoff.Duration)),
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/xds/sync/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func DefaultEgressProxyBuilder(rt core_runtime.Runtime, apiVersion core_xds.APIV
}
}

// DataplaneWatchdogFactory returns a Watchdog that creates a new XdsContext and Proxy and executes SnapshotReconciler if there is any change
func DefaultDataplaneWatchdogFactory(
rt core_runtime.Runtime,
metadataTracker DataplaneMetadataTracker,
dataplaneReconciler SnapshotReconciler,
ingressReconciler SnapshotReconciler,
egressReconciler SnapshotReconciler,
Expand Down Expand Up @@ -74,7 +74,6 @@ func DefaultDataplaneWatchdogFactory(
EgressReconciler: egressReconciler,
EnvoyCpCtx: envoyCpCtx,
MeshCache: rt.MeshCache(),
MetadataTracker: metadataTracker,
ResManager: rt.ReadOnlyResourceManager(),
}
return NewDataplaneWatchdogFactory(
Expand Down
8 changes: 1 addition & 7 deletions pkg/xds/sync/dataplane_watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type DataplaneWatchdogDependencies struct {
EgressReconciler SnapshotReconciler
EnvoyCpCtx *xds_context.ControlPlaneContext
MeshCache *mesh.Cache
MetadataTracker DataplaneMetadataTracker
ResManager core_manager.ReadOnlyResourceManager
}

Expand Down Expand Up @@ -66,12 +65,7 @@ func NewDataplaneWatchdog(deps DataplaneWatchdogDependencies, dpKey core_model.R
}
}

func (d *DataplaneWatchdog) Sync(ctx context.Context) (SyncResult, error) {
metadata := d.MetadataTracker.Metadata(d.key)
if metadata == nil {
return SyncResult{}, errors.New("metadata cannot be nil")
}

func (d *DataplaneWatchdog) Sync(ctx context.Context, metadata *core_xds.DataplaneMetadata) (SyncResult, error) {
if d.dpType == "" {
d.dpType = metadata.GetProxyType()
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/xds/sync/dataplane_watchdog_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package sync

import (
"context"
"errors"
"time"

"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/user"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
util_watchdog "github.com/kumahq/kuma/pkg/util/watchdog"
util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3"
xds_metrics "github.com/kumahq/kuma/pkg/xds/metrics"
Expand All @@ -31,7 +33,7 @@ func NewDataplaneWatchdogFactory(
}, nil
}

func (d *dataplaneWatchdogFactory) New(dpKey model.ResourceKey) util_xds_v3.Watchdog {
func (d *dataplaneWatchdogFactory) New(dpKey model.ResourceKey, fetchMeta func() *core_xds.DataplaneMetadata) util_xds_v3.Watchdog {
log := xdsServerLog.WithName("dataplane-sync-watchdog").WithValues("dataplaneKey", dpKey)
dataplaneWatchdog := NewDataplaneWatchdog(d.deps, dpKey)
return &util_watchdog.SimpleWatchdog{
Expand All @@ -41,7 +43,11 @@ func (d *dataplaneWatchdogFactory) New(dpKey model.ResourceKey) util_xds_v3.Watc
OnTick: func(ctx context.Context) error {
ctx = user.Ctx(ctx, user.ControlPlane)
start := core.Now()
result, err := dataplaneWatchdog.Sync(ctx)
meta := fetchMeta()
if meta == nil {
return errors.New("metadata cannot be nil")
}
result, err := dataplaneWatchdog.Sync(ctx, meta)
if err != nil {
return err
}
Expand Down
Loading
Loading