Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix heal after failed refresh #1564

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/networkservice/chains/nsmgr/heal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func TestNSMGRHealEndpoint_DatapathHealthy_CtrlPlaneBroken(t *testing.T) {

nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken,
nsclient.WithHealClient(heal.NewClient(ctx,
heal.WithoutRetry(),
heal.WithLivenessCheck(livenessCheck))))

_, err = nsc.Request(ctx, request.Clone())
Expand Down
1 change: 1 addition & 0 deletions pkg/networkservice/chains/nsmgr/select_forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ func Test_DiscoverForwarder_Should_KeepSelectedForwarderWhileConnectionIsFine(t

nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken,
nsclient.WithHealClient(heal.NewClient(ctx,
heal.WithoutRetry(),
heal.WithLivenessCheck(livenessChecker))))

conn, err := nsc.Request(ctx, request.Clone())
Expand Down
106 changes: 94 additions & 12 deletions pkg/networkservice/chains/nsmgr/single_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2020-2022 Doc.ai and/or its affiliates.
//
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -22,9 +24,12 @@ import (
"net"
"net/url"
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/edwarnicke/serialize"
"github.com/golang-jwt/jwt/v4"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
Expand All @@ -41,9 +46,14 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint"
"github.com/networkservicemesh/sdk/pkg/networkservice/chains/nsmgr"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/excludedprefixes"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/heal"
"github.com/networkservicemesh/sdk/pkg/networkservice/ipam/point2pointipam"
countutils "github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkcontext"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkrequest"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkresponse"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror"
"github.com/networkservicemesh/sdk/pkg/registry"
registryclient "github.com/networkservicemesh/sdk/pkg/registry/chains/client"
Expand Down Expand Up @@ -74,10 +84,10 @@ func Test_AwareNSEs(t *testing.T) {
_, ipNet, err := net.ParseCIDR("172.16.0.96/29")
require.NoError(t, err)

const count = 3
var nseRegs [count]*registryapi.NetworkServiceEndpoint
var nses [count]*sandbox.EndpointEntry
var requests [count]*networkservice.NetworkServiceRequest
const nseCount = 3
var nseRegs [nseCount]*registryapi.NetworkServiceEndpoint
var nses [nseCount]*sandbox.EndpointEntry
var requests [nseCount]*networkservice.NetworkServiceRequest

ns1 := defaultRegistryService("my-ns-1")
ns2 := defaultRegistryService("my-ns-2")
Expand All @@ -88,7 +98,7 @@ func Test_AwareNSEs(t *testing.T) {
nsurl2, err := url.Parse(fmt.Sprintf("kernel://%s?%s=%s", ns2.Name, "color", "red"))
require.NoError(t, err)

nsInfo := [count]struct {
nsInfo := [nseCount]struct {
ns *registryapi.NetworkService
labelKey string
labelValue string
Expand All @@ -110,7 +120,7 @@ func Test_AwareNSEs(t *testing.T) {
},
}

for i := 0; i < count; i++ {
for i := 0; i < nseCount; i++ {
nseRegs[i] = &registryapi.NetworkServiceEndpoint{
Name: fmt.Sprintf("nse-%s", uuid.New().String()),
NetworkServiceNames: []string{nsInfo[i].ns.Name},
Expand Down Expand Up @@ -161,8 +171,8 @@ func Test_AwareNSEs(t *testing.T) {
},
))))

var conns [count]*networkservice.Connection
for i := 0; i < count; i++ {
var conns [nseCount]*networkservice.Connection
for i := 0; i < nseCount; i++ {
conns[i], err = nsc.Request(ctx, requests[i])
require.NoError(t, err)
require.Equal(t, conns[0].NetworkServiceEndpointName, nses[0].Name)
Expand All @@ -176,12 +186,12 @@ func Test_AwareNSEs(t *testing.T) {
require.NotEqual(t, srcIP1[0], srcIP3[0])
require.NotEqual(t, srcIP2[0], srcIP3[0])

for i := 0; i < count; i++ {
for i := 0; i < nseCount; i++ {
_, err = nsc.Close(ctx, conns[i])
require.NoError(t, err)
}

for i := 0; i < count; i++ {
for i := 0; i < nseCount; i++ {
_, err = nses[i].Unregister(ctx, nseRegs[i])
require.NoError(t, err)
}
Expand Down Expand Up @@ -604,6 +614,78 @@ func createAuthorizedEndpoint(ctx context.Context, t *testing.T, ns string, nsmg
require.NoError(t, err)
}

func Test_RestartDuringRefresh(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })
var ctx, cancel = context.WithTimeout(context.Background(), time.Second*15)
defer cancel()
var domain = sandbox.NewBuilder(ctx, t).SetNodesCount(1).Build()

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)
_, err := nsRegistryClient.Register(ctx, defaultRegistryService("ns"))
require.NoError(t, err)

var countServer count.Server
var countClint count.Client
var m sync.Once
var clientFactory begin.EventFactory
var destroyFwd atomic.Bool
var e serialize.Executor

domain.Nodes[0].NewEndpoint(ctx, &registryapi.NetworkServiceEndpoint{
Name: "nse-1",
NetworkServiceNames: []string{"ns"},
}, sandbox.GenerateTestToken, &countServer, checkrequest.NewServer(t, func(t *testing.T, nsr *networkservice.NetworkServiceRequest) {
if destroyFwd.Load() {
e.AsyncExec(func() {
for _, fwd := range domain.Nodes[0].Forwarders {
fwd.Cancel()
}
})
}
}))

var nsc = domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, client.WithAdditionalFunctionality(
&countClint,
checkcontext.NewClient(t, func(t *testing.T, ctx context.Context) {
m.Do(func() {
clientFactory = begin.FromContext(ctx)
})
}),
checkresponse.NewClient(t, func(t *testing.T, nsr *networkservice.Connection) {
if destroyFwd.Load() {
e.AsyncExec(func() {
for _, fwd := range domain.Nodes[0].Forwarders {
fwd.Restart()
}
})
}
}),
heal.NewClient(ctx),
))

_, err = nsc.Request(ctx, &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: uuid.NewString(),
NetworkService: "ns",
},
})
require.NoError(t, err)
<-clientFactory.Request()
require.Equal(t, 2, countServer.Requests())
require.Never(t, func() bool { return countServer.Requests() > 2 }, time.Second/2, time.Second/20)
destroyFwd.Store(true)
for i := 0; i < 15; i++ {
var cs = countServer.Requests()
destroyFwd.Store(true)
err = <-clientFactory.Request()
require.Error(t, err)
destroyFwd.Store(false)
var cc = countClint.Requests()
require.Eventually(t, func() bool { return cs < countServer.Requests() }, time.Second*2, time.Second/20)
require.Eventually(t, func() bool { return cc < countClint.Requests() }, time.Second*2, time.Second/20)
}
}

// This test checks timeout on sandbox
// We run nsmgr and NSE with networkservice authorize chain element (tokens_expired.rego)
func Test_Timeout(t *testing.T) {
Expand Down Expand Up @@ -636,7 +718,7 @@ func Test_Timeout(t *testing.T) {
nsReg, err := nsRegistryClient.Register(chainCtx, ns)
require.NoError(t, err)

counter := new(countutils.Server)
counter := new(count.Server)

createAuthorizedEndpoint(chainCtx, t, ns.Name, domain.Nodes[0].NSMgr.URL, counter)

Expand Down
16 changes: 15 additions & 1 deletion pkg/networkservice/common/heal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/heal/retry"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/extend"
"github.com/networkservicemesh/sdk/pkg/tools/log"
Expand All @@ -43,16 +45,28 @@ func NewClient(chainCtx context.Context, opts ...Option) networkservice.NetworkS
o := &options{
livenessCheckInterval: livenessCheckInterval,
livenessCheckTimeout: livenessCheckTimeout,
retryOnRequestFail: true,
}
for _, opt := range opts {
opt(o)
}
return &healClient{

var result networkservice.NetworkServiceClient = &healClient{
chainCtx: chainCtx,
livenessCheck: o.livenessCheck,
livenessCheckInterval: o.livenessCheckInterval,
livenessCheckTimeout: o.livenessCheckTimeout,
}

// TODO: rework within https://github.com/networkservicemesh/sdk/issues/1565
if o.retryOnRequestFail {
result = chain.NewNetworkServiceClient(
retry.NewClient(chainCtx),
result,
)
}

return result
}

func (h *healClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
Expand Down
10 changes: 9 additions & 1 deletion pkg/networkservice/common/heal/options.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2022 Cisco and/or its affiliates.
// Copyright (c) 2021-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -35,11 +35,19 @@ type options struct {
livenessCheck LivenessCheck
livenessCheckInterval time.Duration
livenessCheckTimeout time.Duration
retryOnRequestFail bool
}

// Option - option for heal.NewClient() chain element
type Option func(o *options)

// WithoutRetry disables retry on failed refreshes
func WithoutRetry() Option {
return func(o *options) {
o.retryOnRequestFail = false
}
}

// WithLivenessCheck - sets the data plane liveness checker
func WithLivenessCheck(livenessCheck LivenessCheck) Option {
return func(o *options) {
Expand Down
78 changes: 78 additions & 0 deletions pkg/networkservice/common/heal/retry/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package retry provies a chain elemen that manages retries for failed requests
package retry

import (
"context"

"github.com/edwarnicke/genericsync"
"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

type cancelableContext struct {
context.Context
cancel func()
}

type retryClient struct {
chainCtx context.Context
contextMap genericsync.Map[string, *cancelableContext]
}

// NewClient returns a new client that retries the request in case the previous attempt failed.
func NewClient(ctx context.Context) networkservice.NetworkServiceClient {
return &retryClient{
chainCtx: ctx,
}
}

func (n *retryClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
factory := begin.FromContext(ctx)
cancelableCtx := new(cancelableContext)
cancelableCtx.Context, cancelableCtx.cancel = context.WithCancel(n.chainCtx)
cancelableCtx, _ = n.contextMap.LoadOrStore(request.GetConnection().GetId(), cancelableCtx)
resp, err := next.Client(ctx).Request(ctx, request, opts...)
if err != nil {
var opts []begin.Option
opts = append(opts, begin.CancelContext(cancelableCtx.Context))
if request.GetConnection().GetNetworkServiceEndpointName() != "" && request.GetConnection().GetState() != networkservice.State_RESELECT_REQUESTED {
opts = append(opts, begin.WithReselect())
}
factory.Request(opts...)
} else {
if v, ok := n.contextMap.LoadAndDelete(request.GetConnection().GetId()); ok {
v.cancel()
}
}
return resp, err
}

func (n *retryClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) {
resp, err := next.Client(ctx).Close(ctx, conn, opts...)
if err == nil {
if v, ok := n.contextMap.LoadAndDelete(conn.GetId()); ok {
v.cancel()
}
}
return resp, err
}
2 changes: 1 addition & 1 deletion pkg/tools/sandbox/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (n *Node) NewClient(
client.WithClientURL(CloneURL(n.NSMgr.URL)),
client.WithDialOptions(DialOptions(WithTokenGenerator(generatorFunc))...),
client.WithAuthorizeClient(authorize.NewClient(authorize.Any())),
client.WithHealClient(heal.NewClient(ctx)),
client.WithHealClient(heal.NewClient(ctx, heal.WithoutRetry())),
client.WithDialTimeout(DialTimeout),
}

Expand Down
Loading