Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer: Move metrics recorder from BuildOptions to ClientConn #8027

Merged
merged 7 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ type ClientConn interface {
//
// Deprecated: Use the Target field in the BuildOptions instead.
Target() string

// MetricsRecorder provides the metrics recorder that balancers can use to
// record metrics. Balancer implementations which do not register metrics on
// metrics registry and record on them can ignore this method.
dfawley marked this conversation as resolved.
Show resolved Hide resolved
MetricsRecorder() estats.MetricsRecorder

// EnforceClientConnEmbedding is included to force implementers to embed
// another implementation of this interface, allowing gRPC to add methods
// without breaking users.
internal.EnforceClientConnEmbedding
dfawley marked this conversation as resolved.
Show resolved Hide resolved
}

// BuildOptions contains additional information for Build.
Expand Down Expand Up @@ -198,10 +208,6 @@ type BuildOptions struct {
// same resolver.Target as passed to the resolver. See the documentation for
// the resolver.Target type for details about what it contains.
Target resolver.Target
// MetricsRecorder is the metrics recorder that balancers can use to record
// metrics. Balancer implementations which do not register metrics on
// metrics registry and record on them can ignore this field.
MetricsRecorder estats.MetricsRecorder
}

// Builder creates a balancer.
Expand Down Expand Up @@ -327,6 +333,13 @@ type Picker interface {
// UpdateClientConnState, ResolverError, UpdateSubConnState, and Close are
// guaranteed to be called synchronously from the same goroutine. There's no
// guarantee on picker.Pick, it may be called anytime.
//
// NOTICE: This interface is intended to be implemented by gRPC, or intercepted
dfawley marked this conversation as resolved.
Show resolved Hide resolved
// by custom load balancing polices. Users should not need their own complete
// implementation of this interface -- they should always delegate to a
// ClientConn passed to Builder.Build() by embedding it in their
// implementations. An embedded ClientConn must never be nil, or runtime panics
// will occur.
type Balancer interface {
// UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes. If the error returned is ErrBadResolverState, the ClientConn
Expand Down
5 changes: 2 additions & 3 deletions balancer/pickfirst/pickfirst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/resolver"
)

Expand All @@ -56,7 +55,7 @@ func (s) TestPickFirst_InitialResolverError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
bal.ResolverError(errors.New("resolution failed: test error"))

Expand Down Expand Up @@ -89,7 +88,7 @@ func (s) TestPickFirst_ResolverErrorinTF(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()

// After sending a valid update, the LB policy should report CONNECTING.
Expand Down
5 changes: 2 additions & 3 deletions balancer/pickfirst/pickfirstleaf/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/stats"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -80,7 +79,7 @@ func (s) TestPickFirstMetrics(t *testing.T) {
Addresses: []resolver.Address{{Addr: ss.Address}}},
)

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithStatsHandler(tmr), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("NewClient() failed with error: %v", err)
Expand Down Expand Up @@ -124,7 +123,7 @@ func (s) TestPickFirstMetricsFailure(t *testing.T) {
Addresses: []resolver.Address{{Addr: "bad address"}}},
)
grpcTarget := r.Scheme() + ":///"
tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
cc, err := grpc.NewClient(grpcTarget, grpc.WithStatsHandler(tmr), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("NewClient() failed with error: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions)
b := &pickfirstBalancer{
cc: cc,
target: bo.Target.String(),
metricsRecorder: bo.MetricsRecorder, // ClientConn will always create a Metrics Recorder.
metricsRecorder: cc.MetricsRecorder(),

subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
Expand Down
13 changes: 6 additions & 7 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/pickfirst"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -885,7 +884,7 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_AfterEndOfList(t *testing.T) {
triggerTimer, timeAfter := mockTimer()
pfinternal.TimeAfterFunc = timeAfter

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
dialer := testutils.NewBlockingDialer()
opts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, pickfirstleaf.Name)),
Expand Down Expand Up @@ -974,7 +973,7 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TriggerConnectionDelay(t *testing.T) {
triggerTimer, timeAfter := mockTimer()
pfinternal.TimeAfterFunc = timeAfter

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
dialer := testutils.NewBlockingDialer()
opts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, pickfirstleaf.Name)),
Expand Down Expand Up @@ -1034,7 +1033,7 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_ThenTimerFires(t *testing.T) {
triggerTimer, timeAfter := mockTimer()
pfinternal.TimeAfterFunc = timeAfter

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
dialer := testutils.NewBlockingDialer()
opts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, pickfirstleaf.Name)),
Expand Down Expand Up @@ -1099,7 +1098,7 @@ func (s) TestPickFirstLeaf_InterleavingIPV4Preffered(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
Expand Down Expand Up @@ -1145,7 +1144,7 @@ func (s) TestPickFirstLeaf_InterleavingIPv6Preffered(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
Expand Down Expand Up @@ -1189,7 +1188,7 @@ func (s) TestPickFirstLeaf_InterleavingUnknownPreffered(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
Expand Down
3 changes: 1 addition & 2 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/resolver"
)

Expand Down Expand Up @@ -196,7 +195,7 @@ func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
Expand Down
4 changes: 2 additions & 2 deletions balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
updateCh: buffer.NewUnbounded(),
}
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
lb.dataCache = newDataCache(maxCacheSize, lb.logger, opts.MetricsRecorder, opts.Target.String())
lb.dataCache = newDataCache(maxCacheSize, lb.logger, cc.MetricsRecorder(), opts.Target.String())
lb.bg = balancergroup.New(balancergroup.Options{
CC: cc,
BuildOpts: opts,
Expand Down Expand Up @@ -539,7 +539,7 @@ func (b *rlsBalancer) sendNewPickerLocked() {
bg: b.bg,
rlsServerTarget: b.lbCfg.lookupService,
grpcTarget: b.bopts.Target.String(),
metricsRecorder: b.bopts.MetricsRecorder,
metricsRecorder: b.cc.MetricsRecorder(),
}
picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker))
state := balancer.State{
Expand Down
14 changes: 7 additions & 7 deletions balancer/rls/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/internal/testutils"
)

var (
Expand Down Expand Up @@ -120,7 +120,7 @@ func (s) TestLRU_BasicOperations(t *testing.T) {

func (s) TestDataCache_BasicOperations(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
dc := newDataCache(5, nil, &testutils.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand All @@ -134,7 +134,7 @@ func (s) TestDataCache_BasicOperations(t *testing.T) {

func (s) TestDataCache_AddForcesResize(t *testing.T) {
initCacheEntries()
dc := newDataCache(1, nil, &stats.NoopMetricsRecorder{}, "")
dc := newDataCache(1, nil, &testutils.NoopMetricsRecorder{}, "")

// The first entry in cacheEntries has a minimum expiry time in the future.
// This entry would stop the resize operation since we do not evict entries
Expand Down Expand Up @@ -163,7 +163,7 @@ func (s) TestDataCache_AddForcesResize(t *testing.T) {

func (s) TestDataCache_Resize(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
dc := newDataCache(5, nil, &testutils.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (s) TestDataCache_Resize(t *testing.T) {

func (s) TestDataCache_EvictExpiredEntries(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
dc := newDataCache(5, nil, &testutils.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand All @@ -221,7 +221,7 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) {
}

initCacheEntries()
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
dc := newDataCache(5, nil, &testutils.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func (s) TestDataCache_Metrics(t *testing.T) {
{size: 4},
{size: 5},
}
tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
dc := newDataCache(50, nil, tmr, "")

dc.updateRLSServerTarget("rls-server-target")
Expand Down
8 changes: 4 additions & 4 deletions balancer/rls/picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
rlstest "google.golang.org/grpc/internal/testutils/rls"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -266,7 +266,7 @@ func (s) Test_RLSDefaultTargetPicksMetric(t *testing.T) {
// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
Expand Down Expand Up @@ -312,7 +312,7 @@ func (s) Test_RLSTargetPicksMetric(t *testing.T) {
// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
// Dial the backend.
cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
if err != nil {
Expand Down Expand Up @@ -350,7 +350,7 @@ func (s) Test_RLSFailedPicksMetric(t *testing.T) {
// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
// Dial the backend.
cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion balancer/subconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
// should only use a single address.
//
// NOTICE: This interface is intended to be implemented by gRPC, or intercepted
// by custom load balancing poilices. Users should not need their own complete
// by custom load balancing polices. Users should not need their own complete
// implementation of this interface -- they should always delegate to a SubConn
// returned by ClientConn.NewSubConn() by embedding it in their implementations.
// An embedded SubConn must never be nil, or runtime panics will occur.
Expand Down
2 changes: 1 addition & 1 deletion balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
b := &wrrBalancer{
ClientConn: cc,
target: bOpts.Target.String(),
metricsRecorder: bOpts.MetricsRecorder,
metricsRecorder: cc.MetricsRecorder(),
addressWeights: resolver.NewAddressMap(),
endpointToWeight: resolver.NewEndpointMap(),
scToWeight: make(map[balancer.SubConn]*endpointWeight),
Expand Down
4 changes: 2 additions & 2 deletions balancer/weightedroundrobin/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/roundrobin"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -224,7 +224,7 @@ func (s) TestWRRMetricsBasic(t *testing.T) {
srv := startServer(t, reportCall)
sc := svcConfig(t, testMetricsConfig)

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(tmr)); err != nil {
t.Fatalf("Error starting client: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions balancer/weightedroundrobin/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"google.golang.org/grpc/internal/grpctest"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/internal/testutils"
)

type s struct {
Expand Down Expand Up @@ -108,7 +108,7 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
wsc := &endpointWeight{
metricsRecorder: tmr,
weightVal: 3,
Expand Down Expand Up @@ -136,7 +136,7 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {
// with no weights. Both of these should emit a count metric for round robin
// fallback.
func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
ew := &endpointWeight{
metricsRecorder: tmr,
weightVal: 0,
Expand Down
7 changes: 6 additions & 1 deletion balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/channelz"
Expand Down Expand Up @@ -59,6 +60,7 @@ var (
// It uses the gracefulswitch.Balancer internally to ensure that balancer
// switches happen in a graceful manner.
type ccBalancerWrapper struct {
internal.EnforceClientConnEmbedding
// The following fields are initialized when the wrapper is created and are
// read-only afterwards, and therefore can be accessed without a mutex.
cc *ClientConn
Expand Down Expand Up @@ -92,7 +94,6 @@ func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParent: cc.channelz,
Target: cc.parsedTarget,
MetricsRecorder: cc.metricsRecorderList,
},
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
Expand All @@ -101,6 +102,10 @@ func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
return ccb
}

func (ccb *ccBalancerWrapper) MetricsRecorder() stats.MetricsRecorder {
return ccb.cc.metricsRecorderList
}

// updateClientConnState is invoked by grpc to push a ClientConnState update to
// the underlying balancer. This is always executed from the serializer, so
// it is safe to call into the balancer here.
Expand Down
Loading
Loading