diff --git a/pkg/sessionctx/vardef/tidb_vars.go b/pkg/sessionctx/vardef/tidb_vars.go index 13807b903f201..32e3adcbd59f1 100644 --- a/pkg/sessionctx/vardef/tidb_vars.go +++ b/pkg/sessionctx/vardef/tidb_vars.go @@ -721,6 +721,11 @@ const ( // TiDBStmtSummaryMaxSQLLength indicates the max length of displayed normalized sql and sample sql. TiDBStmtSummaryMaxSQLLength = "tidb_stmt_summary_max_sql_length" + // TiDBStmtSummaryGroupByUser, when enabled, adds the executing user to the + // statement summary grouping key so the same digest run by different users + // produces separate rows. Off by default to avoid cardinality growth. + TiDBStmtSummaryGroupByUser = "tidb_stmt_summary_group_by_user" + // TiDBIgnoreInlistPlanDigest enables TiDB to generate the same plan digest with SQL using different in-list arguments. TiDBIgnoreInlistPlanDigest = "tidb_ignore_inlist_plan_digest" @@ -1622,6 +1627,7 @@ const ( DefTiDBStmtSummaryHistorySize = 24 DefTiDBStmtSummaryMaxStmtCount = 3000 DefTiDBStmtSummaryMaxSQLLength = 32768 + DefTiDBStmtSummaryGroupByUser = false DefTiDBCapturePlanBaseline = Off DefTiDBIgnoreInlistPlanDigest = true DefTiDBEnableIndexMerge = true diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index 98780f1406ea8..4480c9827a573 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -953,6 +953,10 @@ var defaultSysVars = []*SysVar{ SetGlobal: func(_ context.Context, s *SessionVars, val string) error { return stmtsummaryv2.SetMaxSQLLength(TidbOptInt(val, vardef.DefTiDBStmtSummaryMaxSQLLength)) }}, + {Scope: vardef.ScopeGlobal, Name: vardef.TiDBStmtSummaryGroupByUser, Value: BoolToOnOff(vardef.DefTiDBStmtSummaryGroupByUser), Type: vardef.TypeBool, AllowEmpty: true, + SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + return stmtsummaryv2.SetGroupByUser(TiDBOptOn(val)) + }}, {Scope: vardef.ScopeGlobal, Name: vardef.TiDBCapturePlanBaseline, Value: vardef.DefTiDBCapturePlanBaseline, Type: vardef.TypeBool, AllowEmptyAll: true}, {Scope: vardef.ScopeGlobal, Name: vardef.TiDBEvolvePlanTaskMaxTime, Value: strconv.Itoa(vardef.DefTiDBEvolvePlanTaskMaxTime), Type: vardef.TypeInt, MinValue: -1, MaxValue: math.MaxInt64}, {Scope: vardef.ScopeGlobal, Name: vardef.TiDBEvolvePlanTaskStartTime, Value: vardef.DefTiDBEvolvePlanTaskStartTime, Type: vardef.TypeTime}, diff --git a/pkg/util/stmtsummary/BUILD.bazel b/pkg/util/stmtsummary/BUILD.bazel index 16d48c2c65ec6..f98a374797061 100644 --- a/pkg/util/stmtsummary/BUILD.bazel +++ b/pkg/util/stmtsummary/BUILD.bazel @@ -40,7 +40,7 @@ go_test( ], embed = [":stmtsummary"], flaky = True, - shard_count = 26, + shard_count = 28, deps = [ "//pkg/meta/model", "//pkg/metrics", diff --git a/pkg/util/stmtsummary/evicted_test.go b/pkg/util/stmtsummary/evicted_test.go index 54e572efb4911..73c183efcb7c8 100644 --- a/pkg/util/stmtsummary/evicted_test.go +++ b/pkg/util/stmtsummary/evicted_test.go @@ -52,7 +52,7 @@ func newInduceSsbde(beginTime int64, endTime int64) *stmtSummaryByDigestElement // generate new StmtDigestKey and stmtSummaryByDigest func generateStmtSummaryByDigestKeyValue(schema string, beginTime int64, endTime int64) (*StmtDigestKey, *stmtSummaryByDigest) { key := &StmtDigestKey{} - key.Init(schema, "", "", "", "") + key.Init(schema, "", "", "", "", "") value := newInduceSsbd(beginTime, endTime) return key, value } @@ -191,7 +191,7 @@ func TestSimpleStmtSummaryByDigestEvicted(t *testing.T) { require.Equal(t, "{begin: 8, end: 9, count: 1}, {begin: 5, end: 6, count: 1}, {begin: 2, end: 3, count: 1}", getAllEvicted(ssbde)) evictedKey = &StmtDigestKey{} - evictedKey.Init("b", "", "", "", "") + evictedKey.Init("b", "", "", "", "", "") ssbde.AddEvicted(evictedKey, evictedValue, 4) require.Equal(t, "{begin: 8, end: 9, count: 2}, {begin: 5, end: 6, count: 2}, {begin: 2, end: 3, count: 2}, {begin: 1, end: 2, count: 1}", getAllEvicted(ssbde)) diff --git a/pkg/util/stmtsummary/statement_summary.go b/pkg/util/stmtsummary/statement_summary.go index 06017d458e2e7..6196c1f4cca18 100644 --- a/pkg/util/stmtsummary/statement_summary.go +++ b/pkg/util/stmtsummary/statement_summary.go @@ -18,6 +18,7 @@ import ( "bytes" "cmp" "container/list" + "encoding/binary" "fmt" "math" "slices" @@ -53,8 +54,16 @@ type StmtDigestKey struct { } // Init initialize the hash key. -func (key *StmtDigestKey) Init(schemaName, digest, prevDigest, planDigest, resourceGroupName string) { - length := len(schemaName) + len(digest) + len(prevDigest) + len(planDigest) + len(resourceGroupName) +// When user is empty (group_by_user disabled), the hash is byte-identical to +// the pre-user-dimension layout. When user is non-empty, the hash appends a +// length-prefixed user segment after resourceGroupName so the boundary is +// unambiguous and pairs like ("rg", "alice") and ("rga", "lice") cannot +// collide. +func (key *StmtDigestKey) Init(schemaName, digest, prevDigest, planDigest, resourceGroupName, user string) { + length := len(schemaName) + len(digest) + len(prevDigest) + len(planDigest) + len(resourceGroupName) + len(user) + if len(user) > 0 { + length += 4 + } if cap(key.hash) < length { key.hash = make([]byte, 0, length) } else { @@ -65,6 +74,12 @@ func (key *StmtDigestKey) Init(schemaName, digest, prevDigest, planDigest, resou key.hash = append(key.hash, hack.Slice(prevDigest)...) key.hash = append(key.hash, hack.Slice(planDigest)...) key.hash = append(key.hash, hack.Slice(resourceGroupName)...) + if len(user) > 0 { + var buf [4]byte + binary.BigEndian.PutUint32(buf[:], uint32(len(user))) + key.hash = append(key.hash, buf[:]...) + key.hash = append(key.hash, hack.Slice(user)...) + } } // Hash implements SimpleLRUCache.Key. @@ -90,6 +105,7 @@ type stmtSummaryByDigestMap struct { optRefreshInterval *atomic2.Int64 optHistorySize *atomic2.Int32 optMaxSQLLength *atomic2.Int32 + optGroupByUser *atomic2.Bool // other stores summary of evicted data. other *stmtSummaryByDigestEvicted @@ -322,6 +338,7 @@ func newStmtSummaryByDigestMap() *stmtSummaryByDigestMap { optRefreshInterval: atomic2.NewInt64(1800), optHistorySize: atomic2.NewInt32(24), optMaxSQLLength: atomic2.NewInt32(32768), + optGroupByUser: atomic2.NewBool(false), other: ssbde, } newSsMap.summaryMap.SetOnEvict(func(k kvcache.Key, v kvcache.Value) { @@ -355,8 +372,6 @@ func (ssMap *stmtSummaryByDigestMap) AddStatement(sei *StmtExecInfo) { } key := StmtDigestKeyPool.Get().(*StmtDigestKey) - // Init hash value in advance, to reduce the time holding the lock. - key.Init(sei.SchemaName, sei.Digest, sei.PrevSQLDigest, sei.PlanDigest, sei.ResourceGroupName) var exist bool @@ -370,6 +385,15 @@ func (ssMap *stmtSummaryByDigestMap) AddStatement(sei *StmtExecInfo) { ssMap.Lock() defer ssMap.Unlock() + // Decide userForKey under the lock so SetGroupByUser's flag flip + Clear + // is atomic w.r.t. AddStatement; otherwise a post-clear insert could land + // under the wrong grouping mode. + userForKey := "" + if ssMap.optGroupByUser.Load() { + userForKey = sei.User + } + key.Init(sei.SchemaName, sei.Digest, sei.PrevSQLDigest, sei.PlanDigest, sei.ResourceGroupName, userForKey) + // Check again. Statements could be added before disabling the flag and after Clear(). if !ssMap.Enabled() { return @@ -411,6 +435,11 @@ func (ssMap *stmtSummaryByDigestMap) Clear() { ssMap.Lock() defer ssMap.Unlock() + ssMap.clearLocked() +} + +// clearLocked removes all statement summaries. ssMap.Lock must be held. +func (ssMap *stmtSummaryByDigestMap) clearLocked() { ssMap.summaryMap.DeleteAll() ssMap.other.Clear() ssMap.beginTimeForCurInterval = 0 @@ -518,6 +547,28 @@ func (ssMap *stmtSummaryByDigestMap) historySize() int { return int(ssMap.optHistorySize.Load()) } +// SetGroupByUser enables or disables grouping statement summaries by the +// executing user. Switching the flag clears existing data because existing +// rows were aggregated under a different grouping key. +func (ssMap *stmtSummaryByDigestMap) SetGroupByUser(value bool) error { + // Hold ssMap.Lock across the flag flip and clear so AddStatement (which + // reads the flag under the same lock) cannot insert a record with the + // old grouping mode after Clear() completes. + ssMap.Lock() + defer ssMap.Unlock() + if ssMap.optGroupByUser.Load() == value { + return nil + } + ssMap.optGroupByUser.Store(value) + ssMap.clearLocked() + return nil +} + +// GroupByUser reports whether statement summaries are grouped by user. +func (ssMap *stmtSummaryByDigestMap) GroupByUser() bool { + return ssMap.optGroupByUser.Load() +} + // SetHistorySize sets the history size for all summaries. func (ssMap *stmtSummaryByDigestMap) SetMaxStmtCount(value uint) error { // `optMaxStmtCount` and `ssMap` don't need to be strictly atomically updated. diff --git a/pkg/util/stmtsummary/statement_summary_test.go b/pkg/util/stmtsummary/statement_summary_test.go index 626de1bd74019..bdf4fac921c78 100644 --- a/pkg/util/stmtsummary/statement_summary_test.go +++ b/pkg/util/stmtsummary/statement_summary_test.go @@ -85,7 +85,7 @@ func TestAddStatement(t *testing.T) { stmtExecInfo1 := generateAnyExecInfo() stmtExecInfo1.ExecDetail.CommitDetail.Mu.PrewriteBackoffTypes = make([]string, 0) key := &StmtDigestKey{} - key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName) + key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName, "") samplePlan, _, _ := stmtExecInfo1.LazyInfo.GetEncodedPlan() stmtExecInfo1.ExecDetail.CommitDetail.Mu.Lock() expectedSummaryElement := stmtSummaryByDigestElement{ @@ -502,7 +502,7 @@ func TestAddStatement(t *testing.T) { stmtExecInfo4.SchemaName = "schema2" stmtExecInfo4.ExecDetail.CommitDetail = nil key = &StmtDigestKey{} - key.Init(stmtExecInfo4.SchemaName, stmtExecInfo4.Digest, "", stmtExecInfo4.PlanDigest, stmtExecInfo4.ResourceGroupName) + key.Init(stmtExecInfo4.SchemaName, stmtExecInfo4.Digest, "", stmtExecInfo4.PlanDigest, stmtExecInfo4.ResourceGroupName, "") ssMap.AddStatement(stmtExecInfo4) require.Equal(t, 2, ssMap.summaryMap.Size()) _, ok = ssMap.summaryMap.Get(key) @@ -512,7 +512,7 @@ func TestAddStatement(t *testing.T) { stmtExecInfo5 := stmtExecInfo1 stmtExecInfo5.Digest = "digest2" key = &StmtDigestKey{} - key.Init(stmtExecInfo5.SchemaName, stmtExecInfo5.Digest, "", stmtExecInfo5.PlanDigest, stmtExecInfo5.ResourceGroupName) + key.Init(stmtExecInfo5.SchemaName, stmtExecInfo5.Digest, "", stmtExecInfo5.PlanDigest, stmtExecInfo5.ResourceGroupName, "") ssMap.AddStatement(stmtExecInfo5) require.Equal(t, 3, ssMap.summaryMap.Size()) _, ok = ssMap.summaryMap.Get(key) @@ -522,7 +522,7 @@ func TestAddStatement(t *testing.T) { stmtExecInfo6 := stmtExecInfo1 stmtExecInfo6.PlanDigest = "plan_digest2" key = &StmtDigestKey{} - key.Init(stmtExecInfo6.SchemaName, stmtExecInfo6.Digest, "", stmtExecInfo6.PlanDigest, stmtExecInfo6.ResourceGroupName) + key.Init(stmtExecInfo6.SchemaName, stmtExecInfo6.Digest, "", stmtExecInfo6.PlanDigest, stmtExecInfo6.ResourceGroupName, "") ssMap.AddStatement(stmtExecInfo6) require.Equal(t, 4, ssMap.summaryMap.Size()) _, ok = ssMap.summaryMap.Get(key) @@ -545,7 +545,7 @@ func TestAddStatement(t *testing.T) { bindingSQL: originalSQL, } key = &StmtDigestKey{} - key.Init(stmtExecInfo7.SchemaName, stmtExecInfo7.Digest, "", stmtExecInfo7.PlanDigest, stmtExecInfo7.ResourceGroupName) + key.Init(stmtExecInfo7.SchemaName, stmtExecInfo7.Digest, "", stmtExecInfo7.PlanDigest, stmtExecInfo7.ResourceGroupName, "") ssMap.AddStatement(stmtExecInfo7) require.Equal(t, 5, ssMap.summaryMap.Size()) v, ok := ssMap.summaryMap.Get(key) @@ -1122,7 +1122,7 @@ func TestMaxStmtCount(t *testing.T) { // LRU cache should work. for i := loops - 10; i < loops; i++ { key := &StmtDigestKey{} - key.Init(stmtExecInfo1.SchemaName, fmt.Sprintf("digest%d", i), "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName) + key.Init(stmtExecInfo1.SchemaName, fmt.Sprintf("digest%d", i), "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName, "") key.Hash() _, ok := sm.Get(key) require.True(t, ok) @@ -1166,7 +1166,7 @@ func TestMaxSQLLength(t *testing.T) { ssMap.AddStatement(stmtExecInfo1) key := &StmtDigestKey{} - key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName) + key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName, "") value, ok := ssMap.summaryMap.Get(key) require.True(t, ok) @@ -1418,7 +1418,7 @@ func TestRefreshCurrentSummary(t *testing.T) { ssMap.beginTimeForCurInterval = now + 10 stmtExecInfo1 := generateAnyExecInfo() key := &StmtDigestKey{} - key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName) + key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName, "") ssMap.AddStatement(stmtExecInfo1) require.Equal(t, 1, ssMap.summaryMap.Size()) value, ok := ssMap.summaryMap.Get(key) @@ -1465,7 +1465,7 @@ func TestSummaryHistory(t *testing.T) { stmtExecInfo1 := generateAnyExecInfo() key := &StmtDigestKey{} - key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName) + key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName, "") for i := range 11 { ssMap.beginTimeForCurInterval = now + int64(i+1)*10 ssMap.AddStatement(stmtExecInfo1) @@ -1534,7 +1534,7 @@ func TestPrevSQL(t *testing.T) { stmtExecInfo1.PrevSQLDigest = "prevSQLDigest" ssMap.AddStatement(stmtExecInfo1) key := &StmtDigestKey{} - key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, stmtExecInfo1.PrevSQLDigest, stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName) + key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, stmtExecInfo1.PrevSQLDigest, stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName, "") require.Equal(t, 1, ssMap.summaryMap.Size()) _, ok := ssMap.summaryMap.Get(key) require.True(t, ok) @@ -1549,7 +1549,7 @@ func TestPrevSQL(t *testing.T) { stmtExecInfo2.PrevSQLDigest = "prevSQLDigest1" ssMap.AddStatement(stmtExecInfo2) require.Equal(t, 2, ssMap.summaryMap.Size()) - key.Init(stmtExecInfo2.SchemaName, stmtExecInfo2.Digest, stmtExecInfo2.PrevSQLDigest, stmtExecInfo2.PlanDigest, stmtExecInfo2.ResourceGroupName) + key.Init(stmtExecInfo2.SchemaName, stmtExecInfo2.Digest, stmtExecInfo2.PrevSQLDigest, stmtExecInfo2.PlanDigest, stmtExecInfo2.ResourceGroupName, "") _, ok = ssMap.summaryMap.Get(key) require.True(t, ok) } @@ -1562,7 +1562,7 @@ func TestEndTime(t *testing.T) { stmtExecInfo1 := generateAnyExecInfo() ssMap.AddStatement(stmtExecInfo1) key := &StmtDigestKey{} - key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName) + key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName, "") require.Equal(t, 1, ssMap.summaryMap.Size()) value, ok := ssMap.summaryMap.Get(key) require.True(t, ok) @@ -1608,7 +1608,7 @@ func TestPointGet(t *testing.T) { stmtExecInfo1.LazyInfo.(*mockLazyInfo).plan = fakePlanDigestGenerator() ssMap.AddStatement(stmtExecInfo1) key := &StmtDigestKey{} - key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", "", stmtExecInfo1.ResourceGroupName) + key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", "", stmtExecInfo1.ResourceGroupName, "") require.Equal(t, 1, ssMap.summaryMap.Size()) value, ok := ssMap.summaryMap.Get(key) require.True(t, ok) @@ -1687,3 +1687,81 @@ func TestAccessPrivilege(t *testing.T) { datums = reader.GetStmtSummaryCurrentRows() require.Len(t, datums, loops) } + +// TestAddStatementGroupByUser verifies that flipping the group-by-user flag +// splits the same digest into per-user rows and fills ssbd.user. The default +// (flag OFF) keeps legacy behavior: one row per digest regardless of user. +func TestAddStatementGroupByUser(t *testing.T) { + ssMap := newStmtSummaryByDigestMap() + + info1 := generateAnyExecInfo() + info1.User = "alice" + info2 := generateAnyExecInfo() + info2.User = "bob" + + // Flag off: both statements collapse into one record. + ssMap.AddStatement(info1) + ssMap.AddStatement(info2) + require.Equal(t, 1, ssMap.summaryMap.Size()) + + // Flipping the flag clears prior data (different grouping key). + require.NoError(t, ssMap.SetGroupByUser(true)) + require.Equal(t, 0, ssMap.summaryMap.Size()) + + ssMap.AddStatement(info1) + ssMap.AddStatement(info2) + ssMap.AddStatement(info1) + require.Equal(t, 2, ssMap.summaryMap.Size()) + + // With grouping ON, each record's authUsers must hold exactly one user — + // the one that groups it — so SAMPLE_USER naturally reflects the grouping + // dimension without a dedicated column. + seen := map[string]bool{} + for _, v := range ssMap.summaryMap.Values() { + ssbd := v.(*stmtSummaryByDigest) + elem := ssbd.history.Front().Value.(*stmtSummaryByDigestElement) + require.Len(t, elem.authUsers, 1) + for u := range elem.authUsers { + seen[u] = true + } + } + require.True(t, seen["alice"]) + require.True(t, seen["bob"]) + + // Flipping back off clears again, and re-emitted records merge users. + require.NoError(t, ssMap.SetGroupByUser(false)) + require.Equal(t, 0, ssMap.summaryMap.Size()) + ssMap.AddStatement(info1) + ssMap.AddStatement(info2) + require.Equal(t, 1, ssMap.summaryMap.Size()) + for _, v := range ssMap.summaryMap.Values() { + ssbd := v.(*stmtSummaryByDigest) + elem := ssbd.history.Front().Value.(*stmtSummaryByDigestElement) + require.Len(t, elem.authUsers, 2) + } +} + +// TestStmtDigestKeyBoundary guards against two regressions: +// 1. Adjacent string fields must not collide across boundary, e.g. +// (resourceGroupName, user) = ("rg", "alice") vs ("rga", "lice"); without +// a boundary marker on user, both produce the same hash. +// 2. With user empty (group_by_user OFF), the hash must stay byte-identical +// to the pre-user-dimension encoding so persisted/in-memory rows from +// older versions match. +func TestStmtDigestKeyBoundary(t *testing.T) { + k1 := &StmtDigestKey{} + k1.Init("schema", "digest", "prev", "plan", "rg", "alice") + k2 := &StmtDigestKey{} + k2.Init("schema", "digest", "prev", "plan", "rga", "lice") + require.NotEqual(t, k1.Hash(), k2.Hash(), "user segment must have an unambiguous boundary") + + // user="" leaves the hash equal to the legacy 5-field layout. + off := &StmtDigestKey{} + off.Init("schema", "digest", "prev", "plan", "rg", "") + legacy := append([]byte{}, hack.Slice("digest")...) + legacy = append(legacy, hack.Slice("schema")...) + legacy = append(legacy, hack.Slice("prev")...) + legacy = append(legacy, hack.Slice("plan")...) + legacy = append(legacy, hack.Slice("rg")...) + require.Equal(t, legacy, off.Hash()) +} diff --git a/pkg/util/stmtsummary/v2/BUILD.bazel b/pkg/util/stmtsummary/v2/BUILD.bazel index 1abfaf8d31663..8e1c5a2554118 100644 --- a/pkg/util/stmtsummary/v2/BUILD.bazel +++ b/pkg/util/stmtsummary/v2/BUILD.bazel @@ -49,7 +49,7 @@ go_test( ], embed = [":stmtsummary"], flaky = True, - shard_count = 15, + shard_count = 16, deps = [ "//pkg/meta/model", "//pkg/metrics", diff --git a/pkg/util/stmtsummary/v2/stmtsummary.go b/pkg/util/stmtsummary/v2/stmtsummary.go index acefa0955e191..8eec704ea7d91 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary.go +++ b/pkg/util/stmtsummary/v2/stmtsummary.go @@ -86,6 +86,7 @@ type StmtSummary struct { optMaxStmtCount *atomic2.Uint32 optMaxSQLLength *atomic2.Uint32 optRefreshInterval *atomic2.Uint32 + optGroupByUser *atomic2.Bool window *stmtWindow windowLock sync.Mutex @@ -113,6 +114,7 @@ func NewStmtSummary(cfg *Config) (*StmtSummary, error) { optMaxStmtCount: atomic2.NewUint32(defaultMaxStmtCount), optMaxSQLLength: atomic2.NewUint32(defaultMaxSQLLength), optRefreshInterval: atomic2.NewUint32(defaultRefreshInterval), + optGroupByUser: atomic2.NewBool(false), window: newStmtWindow(timeNow(), uint(defaultMaxStmtCount)), storage: newStmtLogStorage(&log.Config{ File: log.FileLogConfig{ @@ -146,6 +148,7 @@ func NewStmtSummary4Test(maxStmtCount uint) *StmtSummary { optMaxStmtCount: atomic2.NewUint32(defaultMaxStmtCount), optMaxSQLLength: atomic2.NewUint32(defaultMaxSQLLength), optRefreshInterval: atomic2.NewUint32(60 * 60 * 24 * 365), // 1 year + optGroupByUser: atomic2.NewBool(false), window: newStmtWindow(timeNow(), maxStmtCount), storage: &mockStmtStorage{}, } @@ -235,6 +238,29 @@ func (s *StmtSummary) SetRefreshInterval(v uint32) error { return nil } +// GroupByUser reports whether statement summaries are grouped by the +// executing user in addition to the usual digest/schema/plan tuple. +func (s *StmtSummary) GroupByUser() bool { + return s.optGroupByUser.Load() +} + +// SetGroupByUser toggles user-dimension grouping. Switching the flag clears +// the in-memory window because existing records were aggregated under a +// different grouping key; persisted records are unaffected. +func (s *StmtSummary) SetGroupByUser(v bool) error { + // Hold windowLock across the flag flip and clear so Add (which reads + // the flag under the same lock) cannot insert a record with the old + // grouping mode after the window is cleared. + s.windowLock.Lock() + defer s.windowLock.Unlock() + if s.optGroupByUser.Load() == v { + return nil + } + s.optGroupByUser.Store(v) + s.window.clear() + return nil +} + // Add adds a single stmtsummary.StmtExecInfo to the current statistics window // of StmtSummary. Before adding, it will check whether the current window has // expired, and if it has expired, the window will be persisted asynchronously @@ -245,11 +271,17 @@ func (s *StmtSummary) Add(info *stmtsummary.StmtExecInfo) { } k := stmtsummary.StmtDigestKeyPool.Get().(*stmtsummary.StmtDigestKey) - // Init hash value in advance, to reduce the time holding the lock. - k.Init(info.SchemaName, info.Digest, info.PrevSQLDigest, info.PlanDigest, info.ResourceGroupName) // Add info to the current statistics window. s.windowLock.Lock() + // Decide userForKey under windowLock so SetGroupByUser's flag flip + clear + // is atomic w.r.t. Add; otherwise a post-clear insert could land under the + // wrong grouping mode. + userForKey := "" + if s.optGroupByUser.Load() { + userForKey = info.User + } + k.Init(info.SchemaName, info.Digest, info.PrevSQLDigest, info.PlanDigest, info.ResourceGroupName, userForKey) var record *lockedStmtRecord v, exist := s.window.lru.Get(k) if exist { @@ -544,3 +576,15 @@ func SetMaxSQLLength(v int) error { } return stmtsummary.StmtSummaryByDigestMap.SetMaxSQLLength(v) } + +// SetGroupByUser toggles the user dimension on both v1 and v2 so the sysvar +// setter can call one entry point regardless of which backend is active. +func SetGroupByUser(v bool) error { + if err := stmtsummary.StmtSummaryByDigestMap.SetGroupByUser(v); err != nil { + return err + } + if GlobalStmtSummary != nil { + return GlobalStmtSummary.SetGroupByUser(v) + } + return nil +} diff --git a/pkg/util/stmtsummary/v2/stmtsummary_test.go b/pkg/util/stmtsummary/v2/stmtsummary_test.go index 062a52e9c41b5..4d48dd30474d6 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary_test.go +++ b/pkg/util/stmtsummary/v2/stmtsummary_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/util/stmtsummary" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" @@ -82,6 +83,57 @@ func TestStmtSummary(t *testing.T) { require.Equal(t, 0, w.lru.Size()) } +func TestStmtSummaryGroupByUser(t *testing.T) { + ss := NewStmtSummary4Test(100) + defer ss.Close() + + // Two statements, same digest, different users: without the flag they + // should merge into one record. + ss.Add(stmtExecInfoWithUser("digest1", "alice")) + ss.Add(stmtExecInfoWithUser("digest1", "bob")) + require.Equal(t, 1, ss.window.lru.Size()) + + // Switching the flag on clears the window. Re-emitting produces two rows. + require.NoError(t, ss.SetGroupByUser(true)) + require.Equal(t, 0, ss.window.lru.Size()) + ss.Add(stmtExecInfoWithUser("digest1", "alice")) + ss.Add(stmtExecInfoWithUser("digest1", "bob")) + ss.Add(stmtExecInfoWithUser("digest1", "alice")) + require.Equal(t, 2, ss.window.lru.Size()) + + // When grouping by user, each record's AuthUsers must hold exactly one + // user — the one that groups it — so SAMPLE_USER naturally reflects the + // grouping dimension without a dedicated column. + users := map[string]int64{} + for _, v := range ss.window.lru.Values() { + r := v.(*lockedStmtRecord) + require.Len(t, r.AuthUsers, 1) + for u := range r.AuthUsers { + users[u] = r.ExecCount + } + } + require.Equal(t, int64(2), users["alice"]) + require.Equal(t, int64(1), users["bob"]) + + // Turning the flag off again clears and reverts to single-record merging. + require.NoError(t, ss.SetGroupByUser(false)) + ss.Add(stmtExecInfoWithUser("digest1", "alice")) + ss.Add(stmtExecInfoWithUser("digest1", "bob")) + require.Equal(t, 1, ss.window.lru.Size()) + for _, v := range ss.window.lru.Values() { + r := v.(*lockedStmtRecord) + require.Len(t, r.AuthUsers, 2) // both users merged when grouping is off + } +} + +// stmtExecInfoWithUser returns a StmtExecInfo whose digest and User fields are +// set; everything else is the generic test fixture. +func stmtExecInfoWithUser(digest, user string) *stmtsummary.StmtExecInfo { + info := GenerateStmtExecInfo4Test(digest) + info.User = user + return info +} + func TestWindowEvictedCountResetOnRotate(t *testing.T) { ss := NewStmtSummary4Test(2) defer ss.Close()