Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile.devcontainer
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ release-vmproxy: ## Build vmproxy
make -C vmproxy release

release-qan: ## Build QAN
make -C qan-api2 release
make -C qan release

# used by host Makefile
_bash:
Expand Down Expand Up @@ -47,9 +47,9 @@ run-vmproxy-ci: release-vmproxy ## Replace vmproxy from build and restart (used
run-vmproxy: run-vmproxy-ci ## Replace vmproxy from build, restart and tail logs
tail -f /srv/logs/vmproxy.log

run-qan-ci: release-qan ## Replace qan-api2 from build and restart (used in CI)
run-qan-ci: release-qan ## Replace qan from build and restart (used in CI)
supervisorctl stop qan-api2
install -m 755 $(PMM_RELEASE_PATH)/qan-api2 /usr/sbin/percona-qan-api2
install -m 755 $(PMM_RELEASE_PATH)/qan /usr/sbin/percona-qan
truncate -s 0 /srv/logs/qan-api2.log
supervisorctl start qan-api2

Expand Down
3 changes: 2 additions & 1 deletion Makefile.include
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ release: ## Build release versions of all components
make -C admin release
make -C managed release
make -C qan-api2 release
make -C qan release

gen: clean ## Generate files
make -C api gen
Expand All @@ -40,7 +41,7 @@ gen-mocks:
go tool mockery --config .mockery.yaml

test-common: ## Run tests from API (and other shared) packages only (i.e it ignores directories that are explicitly listed)
go test $(shell go list ./... | grep -v -e admin -e agent -e managed -e api-tests -e qan-api2 -e update)
go test $(shell go list ./... | grep -v -e admin -e agent -e managed -e api-tests -e qan-api2 -e qan -e update)

api-test: ## Run API tests on dev env.
go test -count=1 -race -p 1 -v ./api-tests/... -pmm.server-insecure-tls
Expand Down
13 changes: 13 additions & 0 deletions agent/agents/mongodb/shared/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/percona/pmm/agent/utils/truncate"
agentv1 "github.com/percona/pmm/api/agent/v1"
inventoryv1 "github.com/percona/pmm/api/inventory/v1"
"github.com/percona/pmm/utils/ddsketch"
)

// DefaultInterval is interval for aggregator tick.
Expand Down Expand Up @@ -253,6 +254,17 @@ func (a *Aggregator) createResult(_ context.Context) *report.Result {
queryStats := queries.CalcQueriesStats(int64(DefaultInterval))
buckets := make([]*agentv1.MetricsBucket, 0, len(queryStats))

// Build a query_time DDSketch per class from the raw per-op times (ms -> s),
// keyed by class ID (shared with QueryStats.ID below).
sketches := make(map[string]map[uint32]uint64, len(queries))
for i := range queries {
dense := ddsketch.New()
for _, ms := range queries[i].QueryTime {
ddsketch.Add(dense, ms/millisecondsToSeconds)
}
sketches[queries[i].ID] = ddsketch.ToWire(dense)
}

a.logger.Tracef("Queries: %#v", queries)
a.logger.Tracef("Query Stats: %#v", queryStats)

Expand Down Expand Up @@ -293,6 +305,7 @@ func (a *Aggregator) createResult(_ context.Context) *report.Result {
bucket.Common.MQueryTimeMin = float32(v.QueryTime.Min) / millisecondsToSeconds
bucket.Common.MQueryTimeP99 = float32(v.QueryTime.Pct99) / millisecondsToSeconds
bucket.Common.MQueryTimeSum = float32(v.QueryTime.Total) / millisecondsToSeconds
bucket.Common.MQueryTimeSketch = sketches[v.ID]

bucket.Mongodb.MDocsReturnedCnt = float32(v.Count) // PMM-13788
bucket.Mongodb.MDocsReturnedMax = float32(v.Returned.Max)
Expand Down
27 changes: 27 additions & 0 deletions agent/agents/mongodb/shared/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/percona/pmm/agent/utils/truncate"
agentv1 "github.com/percona/pmm/api/agent/v1"
inventoryv1 "github.com/percona/pmm/api/inventory/v1"
"github.com/percona/pmm/utils/ddsketch"
)

func TestAggregator(t *testing.T) {
Expand Down Expand Up @@ -78,6 +79,7 @@ func TestAggregator(t *testing.T) {
ExampleType: agentv1.ExampleType_EXAMPLE_TYPE_RANDOM,
NumQueries: 1,
MQueryTimeCnt: 1,
MQueryTimeSketch: map[uint32]uint64{0: 1},
},
Mongodb: &agentv1.MetricsBucket_MongoDB{
MDocsReturnedCnt: 1,
Expand All @@ -102,6 +104,30 @@ func TestAggregator(t *testing.T) {
}, *result)
})

t.Run("queryTimeSketch", func(t *testing.T) {
aggregator := New(time.Now(), "test-agent", logrus.WithField("component", "test"), truncate.GetMongoDBDefaultMaxQueryLength())
aggregator.Start()
defer aggregator.Stop()
ctx := context.TODO()
// 100 ops of the same class with 1..100 ms latency.
for i := 1; i <= 100; i++ {
err := aggregator.Add(ctx, proto.SystemProfile{Ns: "collection.people", Op: "insert", Millis: i})
require.NoError(t, err)
}

result := aggregator.createResult(ctx)
require.Len(t, result.Buckets, 1)

wire := result.Buckets[0].Common.MQueryTimeSketch
require.NotEmpty(t, wire)
dense := ddsketch.New()
for idx, c := range wire {
dense[idx] = c
}
// p99 of 1..100 ms is ~0.099 s, recovered within the DDSketch error bound.
require.InDelta(t, 0.099, ddsketch.Quantile(dense, 0.99), 0.099*ddsketch.Alpha+1e-3)
})

t.Run("createResultInvalidUTF8", func(t *testing.T) {
agentID := "test-agent"
startPeriod := time.Now()
Expand Down Expand Up @@ -147,6 +173,7 @@ func TestAggregator(t *testing.T) {
ExampleType: agentv1.ExampleType_EXAMPLE_TYPE_RANDOM,
NumQueries: 1,
MQueryTimeCnt: 1,
MQueryTimeSketch: map[uint32]uint64{0: 1},
},
Mongodb: &agentv1.MetricsBucket_MongoDB{
MDocsReturnedCnt: 1,
Expand Down
33 changes: 32 additions & 1 deletion agent/agents/mysql/slowlog/slowlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/percona/pmm/agent/utils/truncate"
agentv1 "github.com/percona/pmm/api/agent/v1"
inventoryv1 "github.com/percona/pmm/api/inventory/v1"
"github.com/percona/pmm/utils/ddsketch"
)

const (
Expand Down Expand Up @@ -349,6 +350,9 @@ func (s *SlowLog) processFile(ctx context.Context, file string, outlierTime floa
s.changes <- agents.Change{Status: inventoryv1.AgentStatus_AGENT_STATUS_RUNNING}

aggregator := event.NewAggregator(true, 0, outlierTime)
// Per-query-class DDSketch of query_time, built from individual events alongside
// the aggregator so the server can compute mergeable percentiles.
sketches := make(map[string][]uint64)
ctxDone := ctx.Done()

// aggregate every minute at 00 seconds
Expand All @@ -375,15 +379,22 @@ func (s *SlowLog) processFile(ctx context.Context, file string, outlierTime floa
fingerprint := query.Fingerprint(e.Query)
digest := hashIntoQueryID(fingerprint)
aggregator.AddEvent(e, digest, e.User, e.Host, e.Db, e.Server, e.Query)
if qt, ok := e.TimeMetrics["Query_time"]; ok {
if sketches[digest] == nil {
sketches[digest] = ddsketch.New()
}
ddsketch.Add(sketches[digest], qt)
}

case <-t.C:
lengthS := uint32(math.Round(wait.Seconds())) // round 59.9s/60.1s to 60s
res := aggregator.Finalize()
buckets := makeBuckets(s.params.AgentID, res, start, lengthS, s.params.DisableCommentsParsing, s.params.DisableQueryExamples, s.params.MaxQueryLength, s.l)
buckets := makeBuckets(s.params.AgentID, res, start, lengthS, s.params.DisableCommentsParsing, s.params.DisableQueryExamples, s.params.MaxQueryLength, sketches, s.l)
s.l.Debugf("Made %d buckets out of %d classes in %s+%d interval. Wait time: %s.",
len(buckets), len(res.Class), start.Format("15:04:05"), lengthS, time.Since(start))

aggregator = event.NewAggregator(true, 0, outlierTime)
sketches = make(map[string][]uint64)
start = time.Now()
wait = start.Truncate(aggregateInterval).Add(aggregateInterval).Sub(start)
s.l.Debugf("Scheduling next aggregation in %s at %s.", wait, start.Add(wait).Format("15:04:05"))
Expand All @@ -406,6 +417,24 @@ func hashIntoQueryID(fingerprint string) string {
return strings.ToUpper(hex.EncodeToString(id.Sum(nil)))
}

// sketchToWire converts a dense DDSketch into the wire map (bucket index -> count),
// returning nil when there is no data so older behavior (no sketch) is preserved.
func sketchToWire(dense []uint64) map[uint32]uint64 {
if dense == nil {
return nil
}
out := make(map[uint32]uint64)
for i, c := range dense {
if c > 0 {
out[uint32(i)] = c
}
}
if len(out) == 0 {
return nil
}
return out
}

// makeBuckets is a pure function for easier testing.
//
//nolint:gocognit,cyclop,maintidx
Expand All @@ -417,6 +446,7 @@ func makeBuckets(
disableCommentsParsing bool,
disableQueryExamples bool,
maxQueryLength int32,
sketches map[string][]uint64,
l *logrus.Entry,
) []*agentv1.MetricsBucket {
buckets := make([]*agentv1.MetricsBucket, 0, len(res.Class))
Expand Down Expand Up @@ -498,6 +528,7 @@ func makeBuckets(
mb.Common.MQueryTimeMin = float32(*m.Min)
mb.Common.MQueryTimeP99 = float32(*m.P99)
}
mb.Common.MQueryTimeSketch = sketchToWire(sketches[v.Id])
// lock_time - Lock_time
if m, ok := v.Metrics.TimeMetrics["Lock_time"]; ok {
mb.Mysql.MLockTimeCnt = float32(m.Cnt)
Expand Down
26 changes: 24 additions & 2 deletions agent/agents/mysql/slowlog/slowlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/percona/pmm/agent/utils/version"
agentv1 "github.com/percona/pmm/api/agent/v1"
inventoryv1 "github.com/percona/pmm/api/inventory/v1"
"github.com/percona/pmm/utils/ddsketch"
)

func getDataFromFile(t *testing.T, filePath string, data any) {
Expand Down Expand Up @@ -62,7 +63,7 @@ func TestSlowLogMakeBucketsInvalidUTF8(t *testing.T) {
},
}

actualBuckets := makeBuckets(agentID, parsingResult, periodStart, 60, false, false, truncate.GetDefaultMaxQueryLength(), logrus.NewEntry(logrus.New()))
actualBuckets := makeBuckets(agentID, parsingResult, periodStart, 60, false, false, truncate.GetDefaultMaxQueryLength(), nil, logrus.NewEntry(logrus.New()))
expectedBuckets := []*agentv1.MetricsBucket{
{
Common: &agentv1.MetricsBucket_Common{
Expand All @@ -87,6 +88,27 @@ func TestSlowLogMakeBucketsInvalidUTF8(t *testing.T) {
tests.AssertBucketsEqual(t, expectedBuckets[0], actualBuckets[0])
}

func TestSlowLogMakeBucketsSketch(t *testing.T) {
t.Parallel()

const agentID = "73ee2f92-d5aa-45f0-8b09-6d3df605fd44"
parsingResult := event.Result{
Class: map[string]*event.Class{
"q1": {Id: "q1", Metrics: &event.Metrics{}, Fingerprint: "SELECT 1"},
},
}
dense := ddsketch.New()
for i := 1; i <= 100; i++ {
ddsketch.Add(dense, float64(i)/1000.0)
}
sketches := map[string][]uint64{"q1": dense}

buckets := makeBuckets(agentID, parsingResult, time.Unix(1557137220, 0), 60, false, false, truncate.GetDefaultMaxQueryLength(), sketches, logrus.NewEntry(logrus.New()))
require.Len(t, buckets, 1)
require.NotEmpty(t, buckets[0].Common.MQueryTimeSketch)
require.Equal(t, sketchToWire(dense), buckets[0].Common.MQueryTimeSketch)
}

func TestSlowLogMakeBuckets(t *testing.T) {
t.Parallel()

Expand All @@ -96,7 +118,7 @@ func TestSlowLogMakeBuckets(t *testing.T) {
parsingResult := event.Result{}
getDataFromFile(t, "slowlog_fixture.json", &parsingResult)

actualBuckets := makeBuckets(agentID, parsingResult, periodStart, 60, false, false, truncate.GetDefaultMaxQueryLength(), logrus.NewEntry(logrus.New()))
actualBuckets := makeBuckets(agentID, parsingResult, periodStart, 60, false, false, truncate.GetDefaultMaxQueryLength(), nil, logrus.NewEntry(logrus.New()))

var expectedBuckets []*agentv1.MetricsBucket
getDataFromFile(t, "slowlog_expected.json", &expectedBuckets)
Expand Down
51 changes: 51 additions & 0 deletions agent/agents/postgres/pgstatmonitor/pgstatmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"database/sql"
"fmt"
"io"
"math"
"strconv"
"strings"
"time"

"github.com/AlekSi/pointer"
Expand All @@ -37,6 +39,7 @@ import (
"github.com/percona/pmm/agent/utils/version"
agentv1 "github.com/percona/pmm/api/agent/v1"
inventoryv1 "github.com/percona/pmm/api/inventory/v1"
"github.com/percona/pmm/utils/ddsketch"
"github.com/percona/pmm/utils/sqlmetrics"
)

Expand Down Expand Up @@ -631,6 +634,7 @@ func (m *PGStatMonitorQAN) makeBuckets(current, cache map[time.Time]map[string]*
m.l.WithError(err).Warnf("failed to parse histogram from resp calls")
} else {
mb.Postgresql.HistogramItems = histogram
mb.Common.MQueryTimeSketch = querySketchFromHistogram(histogram)
}

if (currentPSM.TotalPlanTime - prevPSM.TotalPlanTime) != 0 {
Expand Down Expand Up @@ -789,5 +793,52 @@ func getHistogramRangesArray(vPGSM pgStatMonitorVersion) []*agentv1.HistogramIte
}
}

// querySketchFromHistogram remaps the pg_stat_monitor response-time histogram
// (per-bucket call counts over millisecond ranges) onto the frozen DDSketch
// layout, using each bucket's representative latency. It is the pg_stat_monitor
// source of mergeable query_time percentiles.
func querySketchFromHistogram(histogram []*agentv1.HistogramItem) map[uint32]uint64 {
dense := ddsketch.New()
for _, item := range histogram {
if item.Frequency == 0 {
continue
}
rep, ok := histogramBucketSeconds(item.Range)
if !ok {
continue
}
ddsketch.AddN(dense, rep, uint64(item.Frequency))
}
return ddsketch.ToWire(dense)
}

// histogramBucketSeconds parses a pg_stat_monitor range such as "(1 - 2)" or
// "(100000 - ...)" (milliseconds) and returns a representative latency in
// seconds: the geometric mean for finite buckets, the upper bound for the
// first [0, hi) bucket, and the lower bound for the open-ended top bucket.
func histogramBucketSeconds(rng string) (float64, bool) {
loStr, hiStr, ok := strings.Cut(strings.TrimSuffix(strings.TrimPrefix(rng, "("), ")"), " - ")
if !ok {
return 0, false
}
lo, err := strconv.ParseFloat(strings.TrimSpace(loStr), 64)
if err != nil {
return 0, false
}
hiStr = strings.TrimSpace(hiStr)
if hiStr == "..." {
return lo / 1000, true
}
hi, err := strconv.ParseFloat(hiStr, 64)
if err != nil {
return 0, false
}
rep := math.Sqrt(lo * hi)
if lo == 0 {
rep = hi
}
return rep / 1000, true
}

// check interfaces.
var _ prometheus.Collector = (*PGStatMonitorQAN)(nil)
Loading
Loading