Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_control: add ru details in ExecDetails #1070

Merged
merged 2 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/prometheus/client_model v0.3.0
github.com/stretchr/testify v1.8.2
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a
github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e
github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2
github.com/twmb/murmur3 v1.1.3
go.etcd.io/etcd/api/v3 v3.5.10
go.etcd.io/etcd/client/v3 v3.5.10
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e h1:11cWLLmEreKof/VJi6LLQ+Jkav5ZqPJgeI+KX4pc/DE=
github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2 h1:7fnKwFC9pgiOolvnUnquEAb60liIpna+0hFRkopaOSg=
github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.14.1
github.com/tikv/client-go/v2 v2.0.8-0.20231201024404-0ff16620f6c0
github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e
github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2
go.uber.org/goleak v1.3.0
)

Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e h1:11cWLLmEreKof/VJi6LLQ+Jkav5ZqPJgeI+KX4pc/DE=
github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2 h1:7fnKwFC9pgiOolvnUnquEAb60liIpna+0hFRkopaOSg=
github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
Expand Down
34 changes: 15 additions & 19 deletions internal/client/client_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package client

import (
"context"
"sync"
"sync/atomic"
"time"

Expand All @@ -35,17 +34,16 @@ var _ Client = interceptedClient{}

type interceptedClient struct {
Client
ruRuntimeStatsMap *sync.Map
}

// NewInterceptedClient creates a Client which can execute interceptor.
func NewInterceptedClient(client Client, ruRuntimeStatsMap *sync.Map) Client {
return interceptedClient{client, ruRuntimeStatsMap}
func NewInterceptedClient(client Client) Client {
return interceptedClient{client}
}

func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
// Build the resource control interceptor.
var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req, r.getRURuntimeStats(req.GetStartTS()))
var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req)
// Chain the interceptors if there are multiple interceptors.
if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil {
if finalInterceptor != nil {
Expand All @@ -62,16 +60,6 @@ func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *ti
return r.Client.SendRequest(ctx, addr, req, timeout)
}

func (r interceptedClient) getRURuntimeStats(startTS uint64) *util.RURuntimeStats {
if r.ruRuntimeStatsMap == nil || startTS == 0 {
return nil
}
if v, ok := r.ruRuntimeStatsMap.Load(startTS); ok {
return v.(*util.RURuntimeStats)
}
return nil
}

var (
// ResourceControlSwitch is used to control whether to enable the resource control.
ResourceControlSwitch atomic.Value
Expand All @@ -84,7 +72,6 @@ var (
func buildResourceControlInterceptor(
ctx context.Context,
req *tikvrpc.Request,
ruRuntimeStats *util.RURuntimeStats,
) interceptor.RPCInterceptor {
if !ResourceControlSwitch.Load().(bool) {
return nil
Expand All @@ -102,6 +89,8 @@ func buildResourceControlInterceptor(
}
resourceControlInterceptor := *rcInterceptor

ruDetails := ctx.Value(util.RUDetailsCtxKey)

// Make the request info.
reqInfo := resourcecontrol.MakeRequestInfo(req)
// Build the interceptor.
Expand All @@ -116,26 +105,33 @@ func buildResourceControlInterceptor(
return next(target, req)
}

consumption, penalty, priority, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
consumption, penalty, waitDuration, priority, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about use one struct ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there is a best practice. The different between tuple and struct is not large, I see many other functions return multiple parameters.

if err != nil {
return nil, err
}
req.GetResourceControlContext().Penalty = penalty
ruRuntimeStats.Update(consumption)
// override request priority with resource group priority if it's not set.
// Get the priority at tikv side has some performance issue, so we pass it
// at client side. See: https://github.com/tikv/tikv/issues/15994 for more details.
if req.GetResourceControlContext().OverridePriority == 0 {
req.GetResourceControlContext().OverridePriority = uint64(priority)
}
if ruDetails != nil {
detail := ruDetails.(*util.RUDetails)
detail.Update(consumption, waitDuration)
}

resp, err := next(target, req)
if resp != nil {
respInfo := resourcecontrol.MakeResponseInfo(resp)
consumption, err = resourceControlInterceptor.OnResponse(resourceGroupName, reqInfo, respInfo)
if err != nil {
return nil, err
}
ruRuntimeStats.Update(consumption)
if ruDetails != nil {
detail := ruDetails.(*util.RUDetails)
detail.Update(consumption, time.Duration(0))
}
}
return resp, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/client/client_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c emptyClient) CloseAddr(addr string) error {

func TestInterceptedClient(t *testing.T) {
executed := false
client := NewInterceptedClient(emptyClient{}, nil)
client := NewInterceptedClient(emptyClient{})
ctx := interceptor.WithRPCInterceptor(context.Background(), interceptor.NewRPCInterceptor("test", func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
executed = true
Expand All @@ -54,7 +54,7 @@ func TestInterceptedClient(t *testing.T) {

func TestAppendChainedInterceptor(t *testing.T) {
executed := make([]int, 0, 10)
client := NewInterceptedClient(emptyClient{}, nil)
client := NewInterceptedClient(emptyClient{})

mkInterceptorFn := func(i int) interceptor.RPCInterceptor {
return interceptor.NewRPCInterceptor(fmt.Sprintf("%d", i), func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
Expand Down
50 changes: 2 additions & 48 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ const (
// DCLabelKey indicates the key of label which represents the dc for Store.
DCLabelKey = "zone"
safeTSUpdateInterval = time.Second * 2
// Since the default max transaction TTL is 1 hour, we can use this to
// clean up the RU runtime stats as well.
ruRuntimeStatsCleanThreshold = time.Hour
ruRuntimeStatsCleanInterval = ruRuntimeStatsCleanThreshold / 2
)

func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) {
Expand Down Expand Up @@ -138,9 +134,6 @@ type KVStore struct {

replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled

// StartTS -> RURuntimeStats, stores the RU runtime stats for certain transaction.
ruRuntimeStatsMap sync.Map

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
Expand Down Expand Up @@ -238,14 +231,13 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl
cancel: cancel,
gP: NewSpool(128, 10*time.Second),
}
store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient, &store.ruRuntimeStatsMap))
store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient))
store.lockResolver = txnlock.NewLockResolver(store)
loadOption(store, opt...)

store.wg.Add(3)
store.wg.Add(2)
go store.runSafePointChecker()
go store.safeTSUpdater()
go store.ruRuntimeStatsMapCleaner()

return store, nil
}
Expand Down Expand Up @@ -754,44 +746,6 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool {
return false
}

func (s *KVStore) ruRuntimeStatsMapCleaner() {
defer s.wg.Done()
t := time.NewTicker(ruRuntimeStatsCleanInterval)
defer t.Stop()
ctx, cancel := context.WithCancel(s.ctx)
ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC)
defer cancel()

cleanThreshold := ruRuntimeStatsCleanThreshold
if _, e := util.EvalFailpoint("mockFastRURuntimeStatsMapClean"); e == nil {
t.Reset(time.Millisecond * 100)
cleanThreshold = time.Millisecond
}

for {
select {
case <-ctx.Done():
return
case now := <-t.C:
s.ruRuntimeStatsMap.Range(
func(key, _ interface{}) bool {
startTSTime := oracle.GetTimeFromTS(key.(uint64))
if now.Sub(startTSTime) >= cleanThreshold {
s.ruRuntimeStatsMap.Delete(key)
}
return true
},
)
}
}
}

// CreateRURuntimeStats creates a RURuntimeStats for the startTS and returns it.
func (s *KVStore) CreateRURuntimeStats(startTS uint64) *util.RURuntimeStats {
rrs, _ := s.ruRuntimeStatsMap.LoadOrStore(startTS, util.NewRURuntimeStats())
return rrs.(*util.RURuntimeStats)
}

// EnableResourceControl enables the resource control.
func EnableResourceControl() {
client.ResourceControlSwitch.Store(true)
Expand Down
30 changes: 0 additions & 30 deletions tikv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -126,31 +124,3 @@ func (s *testKVSuite) TestMinSafeTs() {
s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(2))
s.Require().Equal(uint64(80), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
}

func TestRURuntimeStatsCleanUp(t *testing.T) {
util.EnableFailpoints()
require := require.New(t)
require.Nil(failpoint.Enable("tikvclient/mockFastRURuntimeStatsMapClean", `return()`))
defer func() {
require.Nil(failpoint.Disable("tikvclient/mockFastRURuntimeStatsMapClean"))
}()

client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
require.Nil(err)
testutils.BootstrapWithSingleStore(cluster)
store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0)
require.Nil(err)
defer store.Close()

// Create a ruRuntimeStats first.
startTS := oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0)
ruRuntimeStats := store.CreateRURuntimeStats(startTS)
require.NotNil(ruRuntimeStats)
// Wait for the cleanup goroutine to clean up the ruRuntimeStatsMap.
time.Sleep(time.Millisecond * 150)
// The ruRuntimeStatsMap should be cleaned up.
store.ruRuntimeStatsMap.Range(func(key, value interface{}) bool {
require.Fail("ruRuntimeStatsMap should be cleaned up")
return true
})
}
66 changes: 40 additions & 26 deletions util/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
type commitDetailCtxKeyType struct{}
type lockKeysDetailCtxKeyType struct{}
type execDetailsCtxKeyType struct{}
type ruDetailsCtxKeyType struct{}
type traceExecDetailsCtxKeyType struct{}

var (
Expand All @@ -64,6 +65,9 @@ var (
// ExecDetailsKey presents ExecDetail info key in context.
ExecDetailsKey = execDetailsCtxKeyType{}

// ruDetailsCtxKey presents RUDetals info key in context.
RUDetailsCtxKey = ruDetailsCtxKeyType{}

// traceExecDetailsKey is a context key whose value indicates whether to add ExecDetails to trace.
traceExecDetailsKey = traceExecDetailsCtxKeyType{}
)
Expand Down Expand Up @@ -683,54 +687,64 @@ func (rd *ResolveLockDetail) Merge(resolveLock *ResolveLockDetail) {
rd.ResolveLockTime += resolveLock.ResolveLockTime
}

// RURuntimeStats is the runtime stats collector for RU.
type RURuntimeStats struct {
readRU *uatomic.Float64
writeRU *uatomic.Float64
// RUDetails contains RU detail info.
type RUDetails struct {
readRU *uatomic.Float64
writeRU *uatomic.Float64
ruWaitDuration *uatomic.Duration
}

// NewRURuntimeStats creates a new RURuntimeStats.
func NewRURuntimeStats() *RURuntimeStats {
return &RURuntimeStats{
readRU: uatomic.NewFloat64(0),
writeRU: uatomic.NewFloat64(0),
// NewRUDetails creates a new RUDetails.
func NewRUDetails() *RUDetails {
return &RUDetails{
readRU: uatomic.NewFloat64(0),
writeRU: uatomic.NewFloat64(0),
ruWaitDuration: uatomic.NewDuration(0),
}
}

// Clone implements the RuntimeStats interface.
func (rs *RURuntimeStats) Clone() *RURuntimeStats {
return &RURuntimeStats{
readRU: uatomic.NewFloat64(rs.readRU.Load()),
writeRU: uatomic.NewFloat64(rs.writeRU.Load()),
func (rd *RUDetails) Clone() *RUDetails {
return &RUDetails{
readRU: uatomic.NewFloat64(rd.readRU.Load()),
writeRU: uatomic.NewFloat64(rd.writeRU.Load()),
ruWaitDuration: uatomic.NewDuration(rd.ruWaitDuration.Load()),
}
}

// Merge implements the RuntimeStats interface.
func (rs *RURuntimeStats) Merge(other *RURuntimeStats) {
rs.readRU.Add(other.readRU.Load())
rs.writeRU.Add(other.writeRU.Load())
func (rd *RUDetails) Merge(other *RUDetails) {
rd.readRU.Add(other.readRU.Load())
rd.writeRU.Add(other.writeRU.Load())
rd.ruWaitDuration.Add(other.ruWaitDuration.Load())
}

// String implements fmt.Stringer interface.
func (rs *RURuntimeStats) String() string {
return fmt.Sprintf("RRU:%f, WRU:%f", rs.readRU.Load(), rs.writeRU.Load())
func (rd *RUDetails) String() string {
return fmt.Sprintf("RRU:%f, WRU:%f, WaitDuration:%v", rd.readRU.Load(), rd.writeRU.Load(), rd.ruWaitDuration.Load())
}

// RRU returns the read RU.
func (rs RURuntimeStats) RRU() float64 {
return rs.readRU.Load()
func (rd *RUDetails) RRU() float64 {
return rd.readRU.Load()
}

// WRU returns the write RU.
func (rs RURuntimeStats) WRU() float64 {
return rs.writeRU.Load()
func (rd *RUDetails) WRU() float64 {
return rd.writeRU.Load()
}

// RUWaitDuration returns the time duration waiting for available RU.
func (rd *RUDetails) RUWaitDuration() time.Duration {
return rd.ruWaitDuration.Load()
}

// Update updates the RU runtime stats with the given consumption info.
func (rs *RURuntimeStats) Update(consumption *rmpb.Consumption) {
if rs == nil || consumption == nil {
func (rd *RUDetails) Update(consumption *rmpb.Consumption, waitDuration time.Duration) {
if rd == nil || consumption == nil {
return
}
rs.readRU.Add(consumption.RRU)
rs.writeRU.Add(consumption.WRU)
rd.readRU.Add(consumption.RRU)
rd.writeRU.Add(consumption.WRU)
rd.ruWaitDuration.Add(waitDuration)
}