From d7d64ee49d0711570efe23d92a9d09b3612253e4 Mon Sep 17 00:00:00 2001 From: zzm Date: Mon, 6 Nov 2023 17:26:36 +0800 Subject: [PATCH 1/2] collecting the RU information by pasing point through context.Value (#1032) Signed-off-by: zzm --- .github/workflows/compatibility.yml | 1 + internal/client/client_interceptor.go | 32 ++++++-------- internal/client/client_interceptor_test.go | 4 +- tikv/kv.go | 50 +--------------------- tikv/kv_test.go | 30 ------------- util/execdetails.go | 48 +++++++++++---------- 6 files changed, 45 insertions(+), 120 deletions(-) diff --git a/.github/workflows/compatibility.yml b/.github/workflows/compatibility.yml index e598e4b61e..e0b82d3a95 100644 --- a/.github/workflows/compatibility.yml +++ b/.github/workflows/compatibility.yml @@ -26,6 +26,7 @@ jobs: with: repository: pingcap/tidb path: tidb + ref: release-7.1 - name: Check build run: | diff --git a/internal/client/client_interceptor.go b/internal/client/client_interceptor.go index 5afa476ed1..924d611a17 100644 --- a/internal/client/client_interceptor.go +++ b/internal/client/client_interceptor.go @@ -16,7 +16,6 @@ package client import ( "context" - "sync" "sync/atomic" "time" @@ -35,17 +34,16 @@ var _ Client = interceptedClient{} type interceptedClient struct { Client - ruRuntimeStatsMap *sync.Map } // NewInterceptedClient creates a Client which can execute interceptor. -func NewInterceptedClient(client Client, ruRuntimeStatsMap *sync.Map) Client { - return interceptedClient{client, ruRuntimeStatsMap} +func NewInterceptedClient(client Client) Client { + return interceptedClient{client} } func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { // Build the resource control interceptor. - var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req, r.getRURuntimeStats(req.GetStartTS())) + var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req) // Chain the interceptors if there are multiple interceptors. if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil { if finalInterceptor != nil { @@ -62,16 +60,6 @@ func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *ti return r.Client.SendRequest(ctx, addr, req, timeout) } -func (r interceptedClient) getRURuntimeStats(startTS uint64) *util.RURuntimeStats { - if r.ruRuntimeStatsMap == nil || startTS == 0 { - return nil - } - if v, ok := r.ruRuntimeStatsMap.Load(startTS); ok { - return v.(*util.RURuntimeStats) - } - return nil -} - var ( // ResourceControlSwitch is used to control whether to enable the resource control. ResourceControlSwitch atomic.Value @@ -84,7 +72,6 @@ var ( func buildResourceControlInterceptor( ctx context.Context, req *tikvrpc.Request, - ruRuntimeStats *util.RURuntimeStats, ) interceptor.RPCInterceptor { if !ResourceControlSwitch.Load().(bool) { return nil @@ -102,6 +89,8 @@ func buildResourceControlInterceptor( } resourceControlInterceptor := *rcInterceptor + ruDetails := ctx.Value(util.RUDetailsCtxKey) + // Make the request info. reqInfo := resourcecontrol.MakeRequestInfo(req) // Build the interceptor. @@ -121,13 +110,17 @@ func buildResourceControlInterceptor( return nil, err } req.GetResourceControlContext().Penalty = penalty - ruRuntimeStats.Update(consumption) // override request priority with resource group priority if it's not set. // Get the priority at tikv side has some performance issue, so we pass it // at client side. See: https://github.com/tikv/tikv/issues/15994 for more details. if req.GetResourceControlContext().OverridePriority == 0 { req.GetResourceControlContext().OverridePriority = uint64(priority) } + if ruDetails != nil { + detail := ruDetails.(*util.RUDetails) + detail.Update(consumption) + } + resp, err := next(target, req) if resp != nil { respInfo := resourcecontrol.MakeResponseInfo(resp) @@ -135,7 +128,10 @@ func buildResourceControlInterceptor( if err != nil { return nil, err } - ruRuntimeStats.Update(consumption) + if ruDetails != nil { + detail := ruDetails.(*util.RUDetails) + detail.Update(consumption) + } } return resp, err } diff --git a/internal/client/client_interceptor_test.go b/internal/client/client_interceptor_test.go index 128fea5ed4..88fc0af7e8 100644 --- a/internal/client/client_interceptor_test.go +++ b/internal/client/client_interceptor_test.go @@ -41,7 +41,7 @@ func (c emptyClient) CloseAddr(addr string) error { func TestInterceptedClient(t *testing.T) { executed := false - client := NewInterceptedClient(emptyClient{}, nil) + client := NewInterceptedClient(emptyClient{}) ctx := interceptor.WithRPCInterceptor(context.Background(), interceptor.NewRPCInterceptor("test", func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc { return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { executed = true @@ -54,7 +54,7 @@ func TestInterceptedClient(t *testing.T) { func TestAppendChainedInterceptor(t *testing.T) { executed := make([]int, 0, 10) - client := NewInterceptedClient(emptyClient{}, nil) + client := NewInterceptedClient(emptyClient{}) mkInterceptorFn := func(i int) interceptor.RPCInterceptor { return interceptor.NewRPCInterceptor(fmt.Sprintf("%d", i), func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc { diff --git a/tikv/kv.go b/tikv/kv.go index fbf45a2de7..5c9aa60055 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -79,10 +79,6 @@ const ( // DCLabelKey indicates the key of label which represents the dc for Store. DCLabelKey = "zone" safeTSUpdateInterval = time.Second * 2 - // Since the default max transaction TTL is 1 hour, we can use this to - // clean up the RU runtime stats as well. - ruRuntimeStatsCleanThreshold = time.Hour - ruRuntimeStatsCleanInterval = ruRuntimeStatsCleanThreshold / 2 ) func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) { @@ -138,9 +134,6 @@ type KVStore struct { replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled - // StartTS -> RURuntimeStats, stores the RU runtime stats for certain transaction. - ruRuntimeStatsMap sync.Map - ctx context.Context cancel context.CancelFunc wg sync.WaitGroup @@ -238,14 +231,13 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl cancel: cancel, gP: NewSpool(128, 10*time.Second), } - store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient, &store.ruRuntimeStatsMap)) + store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient)) store.lockResolver = txnlock.NewLockResolver(store) loadOption(store, opt...) - store.wg.Add(3) + store.wg.Add(2) go store.runSafePointChecker() go store.safeTSUpdater() - go store.ruRuntimeStatsMapCleaner() return store, nil } @@ -754,44 +746,6 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool { return false } -func (s *KVStore) ruRuntimeStatsMapCleaner() { - defer s.wg.Done() - t := time.NewTicker(ruRuntimeStatsCleanInterval) - defer t.Stop() - ctx, cancel := context.WithCancel(s.ctx) - ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC) - defer cancel() - - cleanThreshold := ruRuntimeStatsCleanThreshold - if _, e := util.EvalFailpoint("mockFastRURuntimeStatsMapClean"); e == nil { - t.Reset(time.Millisecond * 100) - cleanThreshold = time.Millisecond - } - - for { - select { - case <-ctx.Done(): - return - case now := <-t.C: - s.ruRuntimeStatsMap.Range( - func(key, _ interface{}) bool { - startTSTime := oracle.GetTimeFromTS(key.(uint64)) - if now.Sub(startTSTime) >= cleanThreshold { - s.ruRuntimeStatsMap.Delete(key) - } - return true - }, - ) - } - } -} - -// CreateRURuntimeStats creates a RURuntimeStats for the startTS and returns it. -func (s *KVStore) CreateRURuntimeStats(startTS uint64) *util.RURuntimeStats { - rrs, _ := s.ruRuntimeStatsMap.LoadOrStore(startTS, util.NewRURuntimeStats()) - return rrs.(*util.RURuntimeStats) -} - // EnableResourceControl enables the resource control. func EnableResourceControl() { client.ResourceControlSwitch.Store(true) diff --git a/tikv/kv_test.go b/tikv/kv_test.go index f439d35594..9f9af85006 100644 --- a/tikv/kv_test.go +++ b/tikv/kv_test.go @@ -21,10 +21,8 @@ import ( "testing" "time" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/oracle" @@ -126,31 +124,3 @@ func (s *testKVSuite) TestMinSafeTs() { s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(2)) s.Require().Equal(uint64(80), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) } - -func TestRURuntimeStatsCleanUp(t *testing.T) { - util.EnableFailpoints() - require := require.New(t) - require.Nil(failpoint.Enable("tikvclient/mockFastRURuntimeStatsMapClean", `return()`)) - defer func() { - require.Nil(failpoint.Disable("tikvclient/mockFastRURuntimeStatsMapClean")) - }() - - client, cluster, pdClient, err := testutils.NewMockTiKV("", nil) - require.Nil(err) - testutils.BootstrapWithSingleStore(cluster) - store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0) - require.Nil(err) - defer store.Close() - - // Create a ruRuntimeStats first. - startTS := oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0) - ruRuntimeStats := store.CreateRURuntimeStats(startTS) - require.NotNil(ruRuntimeStats) - // Wait for the cleanup goroutine to clean up the ruRuntimeStatsMap. - time.Sleep(time.Millisecond * 150) - // The ruRuntimeStatsMap should be cleaned up. - store.ruRuntimeStatsMap.Range(func(key, value interface{}) bool { - require.Fail("ruRuntimeStatsMap should be cleaned up") - return true - }) -} diff --git a/util/execdetails.go b/util/execdetails.go index 75a5181409..7c39bdafa8 100644 --- a/util/execdetails.go +++ b/util/execdetails.go @@ -52,6 +52,7 @@ import ( type commitDetailCtxKeyType struct{} type lockKeysDetailCtxKeyType struct{} type execDetailsCtxKeyType struct{} +type ruDetailsCtxKeyType struct{} type traceExecDetailsCtxKeyType struct{} var ( @@ -64,6 +65,9 @@ var ( // ExecDetailsKey presents ExecDetail info key in context. ExecDetailsKey = execDetailsCtxKeyType{} + // ruDetailsCtxKey presents RUDetals info key in context. + RUDetailsCtxKey = ruDetailsCtxKeyType{} + // traceExecDetailsKey is a context key whose value indicates whether to add ExecDetails to trace. traceExecDetailsKey = traceExecDetailsCtxKeyType{} ) @@ -683,54 +687,54 @@ func (rd *ResolveLockDetail) Merge(resolveLock *ResolveLockDetail) { rd.ResolveLockTime += resolveLock.ResolveLockTime } -// RURuntimeStats is the runtime stats collector for RU. -type RURuntimeStats struct { +// RUDetails contains RU detail info. +type RUDetails struct { readRU *uatomic.Float64 writeRU *uatomic.Float64 } -// NewRURuntimeStats creates a new RURuntimeStats. -func NewRURuntimeStats() *RURuntimeStats { - return &RURuntimeStats{ +// NewRUDetails creates a new RUDetails. +func NewRUDetails() *RUDetails { + return &RUDetails{ readRU: uatomic.NewFloat64(0), writeRU: uatomic.NewFloat64(0), } } // Clone implements the RuntimeStats interface. -func (rs *RURuntimeStats) Clone() *RURuntimeStats { - return &RURuntimeStats{ - readRU: uatomic.NewFloat64(rs.readRU.Load()), - writeRU: uatomic.NewFloat64(rs.writeRU.Load()), +func (rd *RUDetails) Clone() *RUDetails { + return &RUDetails{ + readRU: uatomic.NewFloat64(rd.readRU.Load()), + writeRU: uatomic.NewFloat64(rd.writeRU.Load()), } } // Merge implements the RuntimeStats interface. -func (rs *RURuntimeStats) Merge(other *RURuntimeStats) { - rs.readRU.Add(other.readRU.Load()) - rs.writeRU.Add(other.writeRU.Load()) +func (rd *RUDetails) Merge(other *RUDetails) { + rd.readRU.Add(other.readRU.Load()) + rd.writeRU.Add(other.writeRU.Load()) } // String implements fmt.Stringer interface. -func (rs *RURuntimeStats) String() string { - return fmt.Sprintf("RRU:%f, WRU:%f", rs.readRU.Load(), rs.writeRU.Load()) +func (rd *RUDetails) String() string { + return fmt.Sprintf("RRU:%f, WRU:%f", rd.readRU.Load(), rd.writeRU.Load()) } // RRU returns the read RU. -func (rs RURuntimeStats) RRU() float64 { - return rs.readRU.Load() +func (rd *RUDetails) RRU() float64 { + return rd.readRU.Load() } // WRU returns the write RU. -func (rs RURuntimeStats) WRU() float64 { - return rs.writeRU.Load() +func (rd *RUDetails) WRU() float64 { + return rd.writeRU.Load() } // Update updates the RU runtime stats with the given consumption info. -func (rs *RURuntimeStats) Update(consumption *rmpb.Consumption) { - if rs == nil || consumption == nil { +func (rd *RUDetails) Update(consumption *rmpb.Consumption) { + if rd == nil || consumption == nil { return } - rs.readRU.Add(consumption.RRU) - rs.writeRU.Add(consumption.WRU) + rd.readRU.Add(consumption.RRU) + rd.writeRU.Add(consumption.WRU) } From 919ec412b899b2ddfc3fc9456d5d4c20327c97c4 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 4 Dec 2023 15:28:01 +0800 Subject: [PATCH 2/2] add ruWaitDuration to RUDetails and update pd-client Signed-off-by: glorv --- .github/workflows/compatibility.yml | 1 - go.mod | 2 +- go.sum | 4 ++-- integration_tests/go.mod | 2 +- integration_tests/go.sum | 4 ++-- internal/client/client_interceptor.go | 6 +++--- util/execdetails.go | 26 ++++++++++++++++++-------- 7 files changed, 27 insertions(+), 18 deletions(-) diff --git a/.github/workflows/compatibility.yml b/.github/workflows/compatibility.yml index e0b82d3a95..e598e4b61e 100644 --- a/.github/workflows/compatibility.yml +++ b/.github/workflows/compatibility.yml @@ -26,7 +26,6 @@ jobs: with: repository: pingcap/tidb path: tidb - ref: release-7.1 - name: Check build run: | diff --git a/go.mod b/go.mod index 5ea3ef1ee5..87655fb89c 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/prometheus/client_model v0.3.0 github.com/stretchr/testify v1.8.2 github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a - github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e + github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2 github.com/twmb/murmur3 v1.1.3 go.etcd.io/etcd/api/v3 v3.5.10 go.etcd.io/etcd/client/v3 v3.5.10 diff --git a/go.sum b/go.sum index 12e9478162..39e069e2c8 100644 --- a/go.sum +++ b/go.sum @@ -112,8 +112,8 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e h1:11cWLLmEreKof/VJi6LLQ+Jkav5ZqPJgeI+KX4pc/DE= -github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= +github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2 h1:7fnKwFC9pgiOolvnUnquEAb60liIpna+0hFRkopaOSg= +github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA= github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/integration_tests/go.mod b/integration_tests/go.mod index ccb2f0456a..ac8addea46 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -12,7 +12,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/tidwall/gjson v1.14.1 github.com/tikv/client-go/v2 v2.0.8-0.20231201024404-0ff16620f6c0 - github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e + github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2 go.uber.org/goleak v1.3.0 ) diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 65e3bc9198..59b765013b 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -516,8 +516,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e h1:11cWLLmEreKof/VJi6LLQ+Jkav5ZqPJgeI+KX4pc/DE= -github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= +github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2 h1:7fnKwFC9pgiOolvnUnquEAb60liIpna+0hFRkopaOSg= +github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= diff --git a/internal/client/client_interceptor.go b/internal/client/client_interceptor.go index 924d611a17..46dbf4bd29 100644 --- a/internal/client/client_interceptor.go +++ b/internal/client/client_interceptor.go @@ -105,7 +105,7 @@ func buildResourceControlInterceptor( return next(target, req) } - consumption, penalty, priority, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo) + consumption, penalty, waitDuration, priority, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo) if err != nil { return nil, err } @@ -118,7 +118,7 @@ func buildResourceControlInterceptor( } if ruDetails != nil { detail := ruDetails.(*util.RUDetails) - detail.Update(consumption) + detail.Update(consumption, waitDuration) } resp, err := next(target, req) @@ -130,7 +130,7 @@ func buildResourceControlInterceptor( } if ruDetails != nil { detail := ruDetails.(*util.RUDetails) - detail.Update(consumption) + detail.Update(consumption, time.Duration(0)) } } return resp, err diff --git a/util/execdetails.go b/util/execdetails.go index 7c39bdafa8..e21bd1ce79 100644 --- a/util/execdetails.go +++ b/util/execdetails.go @@ -689,23 +689,26 @@ func (rd *ResolveLockDetail) Merge(resolveLock *ResolveLockDetail) { // RUDetails contains RU detail info. type RUDetails struct { - readRU *uatomic.Float64 - writeRU *uatomic.Float64 + readRU *uatomic.Float64 + writeRU *uatomic.Float64 + ruWaitDuration *uatomic.Duration } // NewRUDetails creates a new RUDetails. func NewRUDetails() *RUDetails { return &RUDetails{ - readRU: uatomic.NewFloat64(0), - writeRU: uatomic.NewFloat64(0), + readRU: uatomic.NewFloat64(0), + writeRU: uatomic.NewFloat64(0), + ruWaitDuration: uatomic.NewDuration(0), } } // Clone implements the RuntimeStats interface. func (rd *RUDetails) Clone() *RUDetails { return &RUDetails{ - readRU: uatomic.NewFloat64(rd.readRU.Load()), - writeRU: uatomic.NewFloat64(rd.writeRU.Load()), + readRU: uatomic.NewFloat64(rd.readRU.Load()), + writeRU: uatomic.NewFloat64(rd.writeRU.Load()), + ruWaitDuration: uatomic.NewDuration(rd.ruWaitDuration.Load()), } } @@ -713,11 +716,12 @@ func (rd *RUDetails) Clone() *RUDetails { func (rd *RUDetails) Merge(other *RUDetails) { rd.readRU.Add(other.readRU.Load()) rd.writeRU.Add(other.writeRU.Load()) + rd.ruWaitDuration.Add(other.ruWaitDuration.Load()) } // String implements fmt.Stringer interface. func (rd *RUDetails) String() string { - return fmt.Sprintf("RRU:%f, WRU:%f", rd.readRU.Load(), rd.writeRU.Load()) + return fmt.Sprintf("RRU:%f, WRU:%f, WaitDuration:%v", rd.readRU.Load(), rd.writeRU.Load(), rd.ruWaitDuration.Load()) } // RRU returns the read RU. @@ -730,11 +734,17 @@ func (rd *RUDetails) WRU() float64 { return rd.writeRU.Load() } +// RUWaitDuration returns the time duration waiting for available RU. +func (rd *RUDetails) RUWaitDuration() time.Duration { + return rd.ruWaitDuration.Load() +} + // Update updates the RU runtime stats with the given consumption info. -func (rd *RUDetails) Update(consumption *rmpb.Consumption) { +func (rd *RUDetails) Update(consumption *rmpb.Consumption, waitDuration time.Duration) { if rd == nil || consumption == nil { return } rd.readRU.Add(consumption.RRU) rd.writeRU.Add(consumption.WRU) + rd.ruWaitDuration.Add(waitDuration) }