Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile.devcontainer
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ release-vmproxy: ## Build vmproxy
make -C vmproxy release

release-qan: ## Build QAN
make -C qan-api2 release
make -C qan release

# used by host Makefile
_bash:
Expand Down Expand Up @@ -47,9 +47,9 @@ run-vmproxy-ci: release-vmproxy ## Replace vmproxy from build and restart (used
run-vmproxy: run-vmproxy-ci ## Replace vmproxy from build, restart and tail logs
tail -f /srv/logs/vmproxy.log

run-qan-ci: release-qan ## Replace qan-api2 from build and restart (used in CI)
run-qan-ci: release-qan ## Replace qan from build and restart (used in CI)
supervisorctl stop qan-api2
install -m 755 $(PMM_RELEASE_PATH)/qan-api2 /usr/sbin/percona-qan-api2
install -m 755 $(PMM_RELEASE_PATH)/qan /usr/sbin/percona-qan
truncate -s 0 /srv/logs/qan-api2.log
supervisorctl start qan-api2

Expand Down
3 changes: 2 additions & 1 deletion Makefile.include
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ release: ## Build release versions of all components
make -C admin release
make -C managed release
make -C qan-api2 release
make -C qan release

gen: clean ## Generate files
make -C api gen
Expand All @@ -40,7 +41,7 @@ gen-mocks:
go tool mockery --config .mockery.yaml

test-common: ## Run tests from API (and other shared) packages only (i.e it ignores directories that are explicitly listed)
go test $(shell go list ./... | grep -v -e admin -e agent -e managed -e api-tests -e qan-api2 -e update)
go test $(shell go list ./... | grep -v -e admin -e agent -e managed -e api-tests -e qan-api2 -e qan -e update)

api-test: ## Run API tests on dev env.
go test -count=1 -race -p 1 -v ./api-tests/... -pmm.server-insecure-tls
Expand Down
13 changes: 13 additions & 0 deletions agent/agents/mongodb/shared/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/percona/pmm/agent/utils/truncate"
agentv1 "github.com/percona/pmm/api/agent/v1"
inventoryv1 "github.com/percona/pmm/api/inventory/v1"
"github.com/percona/pmm/utils/ddsketch"
)

// DefaultInterval is interval for aggregator tick.
Expand Down Expand Up @@ -253,6 +254,17 @@ func (a *Aggregator) createResult(_ context.Context) *report.Result {
queryStats := queries.CalcQueriesStats(int64(DefaultInterval))
buckets := make([]*agentv1.MetricsBucket, 0, len(queryStats))

// Build a query_time DDSketch per class from the raw per-op times (ms -> s),
// keyed by class ID (shared with QueryStats.ID below).
sketches := make(map[string]map[uint32]uint64, len(queries))
for i := range queries {
dense := ddsketch.New()
for _, ms := range queries[i].QueryTime {
ddsketch.Add(dense, ms/millisecondsToSeconds)
}
sketches[queries[i].ID] = ddsketch.ToWire(dense)
}

a.logger.Tracef("Queries: %#v", queries)
a.logger.Tracef("Query Stats: %#v", queryStats)

Expand Down Expand Up @@ -293,6 +305,7 @@ func (a *Aggregator) createResult(_ context.Context) *report.Result {
bucket.Common.MQueryTimeMin = float32(v.QueryTime.Min) / millisecondsToSeconds
bucket.Common.MQueryTimeP99 = float32(v.QueryTime.Pct99) / millisecondsToSeconds
bucket.Common.MQueryTimeSum = float32(v.QueryTime.Total) / millisecondsToSeconds
bucket.Common.MQueryTimeSketch = sketches[v.ID]

bucket.Mongodb.MDocsReturnedCnt = float32(v.Count) // PMM-13788
bucket.Mongodb.MDocsReturnedMax = float32(v.Returned.Max)
Expand Down
27 changes: 27 additions & 0 deletions agent/agents/mongodb/shared/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/percona/pmm/agent/utils/truncate"
agentv1 "github.com/percona/pmm/api/agent/v1"
inventoryv1 "github.com/percona/pmm/api/inventory/v1"
"github.com/percona/pmm/utils/ddsketch"
)

func TestAggregator(t *testing.T) {
Expand Down Expand Up @@ -78,6 +79,7 @@ func TestAggregator(t *testing.T) {
ExampleType: agentv1.ExampleType_EXAMPLE_TYPE_RANDOM,
NumQueries: 1,
MQueryTimeCnt: 1,
MQueryTimeSketch: map[uint32]uint64{0: 1},
},
Mongodb: &agentv1.MetricsBucket_MongoDB{
MDocsReturnedCnt: 1,
Expand All @@ -102,6 +104,30 @@ func TestAggregator(t *testing.T) {
}, *result)
})

t.Run("queryTimeSketch", func(t *testing.T) {
aggregator := New(time.Now(), "test-agent", logrus.WithField("component", "test"), truncate.GetMongoDBDefaultMaxQueryLength())
aggregator.Start()
defer aggregator.Stop()
ctx := context.TODO()
// 100 ops of the same class with 1..100 ms latency.
for i := 1; i <= 100; i++ {
err := aggregator.Add(ctx, proto.SystemProfile{Ns: "collection.people", Op: "insert", Millis: i})
require.NoError(t, err)
}

result := aggregator.createResult(ctx)
require.Len(t, result.Buckets, 1)

wire := result.Buckets[0].Common.MQueryTimeSketch
require.NotEmpty(t, wire)
dense := ddsketch.New()
for idx, c := range wire {
dense[idx] = c
}
// p99 of 1..100 ms is ~0.099 s, recovered within the DDSketch error bound.
require.InDelta(t, 0.099, ddsketch.Quantile(dense, 0.99), 0.099*ddsketch.Alpha+1e-3)
})

t.Run("createResultInvalidUTF8", func(t *testing.T) {
agentID := "test-agent"
startPeriod := time.Now()
Expand Down Expand Up @@ -147,6 +173,7 @@ func TestAggregator(t *testing.T) {
ExampleType: agentv1.ExampleType_EXAMPLE_TYPE_RANDOM,
NumQueries: 1,
MQueryTimeCnt: 1,
MQueryTimeSketch: map[uint32]uint64{0: 1},
},
Mongodb: &agentv1.MetricsBucket_MongoDB{
MDocsReturnedCnt: 1,
Expand Down
34 changes: 33 additions & 1 deletion agent/agents/mysql/slowlog/slowlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/percona/pmm/agent/utils/truncate"
agentv1 "github.com/percona/pmm/api/agent/v1"
inventoryv1 "github.com/percona/pmm/api/inventory/v1"
"github.com/percona/pmm/utils/ddsketch"
)

const (
Expand Down Expand Up @@ -349,6 +350,9 @@ func (s *SlowLog) processFile(ctx context.Context, file string, outlierTime floa
s.changes <- agents.Change{Status: inventoryv1.AgentStatus_AGENT_STATUS_RUNNING}

aggregator := event.NewAggregator(true, 0, outlierTime)
// Per-query-class DDSketch of query_time, built from individual events alongside
// the aggregator so the server can compute mergeable percentiles.
sketches := make(map[string][]uint64)
ctxDone := ctx.Done()

// aggregate every minute at 00 seconds
Expand All @@ -375,15 +379,23 @@ func (s *SlowLog) processFile(ctx context.Context, file string, outlierTime floa
fingerprint := query.Fingerprint(e.Query)
digest := hashIntoQueryID(fingerprint)
aggregator.AddEvent(e, digest, e.User, e.Host, e.Db, e.Server, e.Query)
if qt, ok := e.TimeMetrics["Query_time"]; ok {
if sketches[digest] == nil {
sketches[digest] = ddsketch.New()
}
ddsketch.Add(sketches[digest], qt)
}

case <-t.C:
lengthS := uint32(math.Round(wait.Seconds())) // round 59.9s/60.1s to 60s
res := aggregator.Finalize()
buckets := makeBuckets(s.params.AgentID, res, start, lengthS, s.params.DisableCommentsParsing, s.params.DisableQueryExamples, s.params.MaxQueryLength, s.l)
buckets := makeBuckets(s.params.AgentID, res, start, lengthS, s.params.DisableCommentsParsing,
s.params.DisableQueryExamples, s.params.MaxQueryLength, sketches, s.l)
s.l.Debugf("Made %d buckets out of %d classes in %s+%d interval. Wait time: %s.",
len(buckets), len(res.Class), start.Format("15:04:05"), lengthS, time.Since(start))

aggregator = event.NewAggregator(true, 0, outlierTime)
sketches = make(map[string][]uint64)
start = time.Now()
wait = start.Truncate(aggregateInterval).Add(aggregateInterval).Sub(start)
s.l.Debugf("Scheduling next aggregation in %s at %s.", wait, start.Add(wait).Format("15:04:05"))
Expand All @@ -406,6 +418,24 @@ func hashIntoQueryID(fingerprint string) string {
return strings.ToUpper(hex.EncodeToString(id.Sum(nil)))
}

// sketchToWire converts a dense DDSketch into the wire map (bucket index -> count),
// returning nil when there is no data so older behavior (no sketch) is preserved.
func sketchToWire(dense []uint64) map[uint32]uint64 {
if dense == nil {
return nil
}
out := make(map[uint32]uint64)
for i, c := range dense {
if c > 0 {
out[uint32(i)] = c
}
}
if len(out) == 0 {
return nil
}
return out
}

// makeBuckets is a pure function for easier testing.
//
//nolint:gocognit,cyclop,maintidx
Expand All @@ -417,6 +447,7 @@ func makeBuckets(
disableCommentsParsing bool,
disableQueryExamples bool,
maxQueryLength int32,
sketches map[string][]uint64,
l *logrus.Entry,
) []*agentv1.MetricsBucket {
buckets := make([]*agentv1.MetricsBucket, 0, len(res.Class))
Expand Down Expand Up @@ -498,6 +529,7 @@ func makeBuckets(
mb.Common.MQueryTimeMin = float32(*m.Min)
mb.Common.MQueryTimeP99 = float32(*m.P99)
}
mb.Common.MQueryTimeSketch = sketchToWire(sketches[v.Id])
// lock_time - Lock_time
if m, ok := v.Metrics.TimeMetrics["Lock_time"]; ok {
mb.Mysql.MLockTimeCnt = float32(m.Cnt)
Expand Down
26 changes: 24 additions & 2 deletions agent/agents/mysql/slowlog/slowlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/percona/pmm/agent/utils/version"
agentv1 "github.com/percona/pmm/api/agent/v1"
inventoryv1 "github.com/percona/pmm/api/inventory/v1"
"github.com/percona/pmm/utils/ddsketch"
)

func getDataFromFile(t *testing.T, filePath string, data any) {
Expand Down Expand Up @@ -62,7 +63,7 @@ func TestSlowLogMakeBucketsInvalidUTF8(t *testing.T) {
},
}

actualBuckets := makeBuckets(agentID, parsingResult, periodStart, 60, false, false, truncate.GetDefaultMaxQueryLength(), logrus.NewEntry(logrus.New()))
actualBuckets := makeBuckets(agentID, parsingResult, periodStart, 60, false, false, truncate.GetDefaultMaxQueryLength(), nil, logrus.NewEntry(logrus.New()))
expectedBuckets := []*agentv1.MetricsBucket{
{
Common: &agentv1.MetricsBucket_Common{
Expand All @@ -87,6 +88,27 @@ func TestSlowLogMakeBucketsInvalidUTF8(t *testing.T) {
tests.AssertBucketsEqual(t, expectedBuckets[0], actualBuckets[0])
}

func TestSlowLogMakeBucketsSketch(t *testing.T) {
t.Parallel()

const agentID = "73ee2f92-d5aa-45f0-8b09-6d3df605fd44"
parsingResult := event.Result{
Class: map[string]*event.Class{
"q1": {Id: "q1", Metrics: &event.Metrics{}, Fingerprint: "SELECT 1"},
},
}
dense := ddsketch.New()
for i := 1; i <= 100; i++ {
ddsketch.Add(dense, float64(i)/1000.0)
}
sketches := map[string][]uint64{"q1": dense}

buckets := makeBuckets(agentID, parsingResult, time.Unix(1557137220, 0), 60, false, false, truncate.GetDefaultMaxQueryLength(), sketches, logrus.NewEntry(logrus.New()))
require.Len(t, buckets, 1)
require.NotEmpty(t, buckets[0].Common.MQueryTimeSketch)
require.Equal(t, sketchToWire(dense), buckets[0].Common.MQueryTimeSketch)
}

func TestSlowLogMakeBuckets(t *testing.T) {
t.Parallel()

Expand All @@ -96,7 +118,7 @@ func TestSlowLogMakeBuckets(t *testing.T) {
parsingResult := event.Result{}
getDataFromFile(t, "slowlog_fixture.json", &parsingResult)

actualBuckets := makeBuckets(agentID, parsingResult, periodStart, 60, false, false, truncate.GetDefaultMaxQueryLength(), logrus.NewEntry(logrus.New()))
actualBuckets := makeBuckets(agentID, parsingResult, periodStart, 60, false, false, truncate.GetDefaultMaxQueryLength(), nil, logrus.NewEntry(logrus.New()))

var expectedBuckets []*agentv1.MetricsBucket
getDataFromFile(t, "slowlog_expected.json", &expectedBuckets)
Expand Down
52 changes: 52 additions & 0 deletions agent/agents/postgres/pgstatmonitor/pgstatmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"database/sql"
"fmt"
"io"
"math"
"strconv"
"strings"
"time"

"github.com/AlekSi/pointer"
Expand All @@ -37,6 +39,7 @@ import (
"github.com/percona/pmm/agent/utils/version"
agentv1 "github.com/percona/pmm/api/agent/v1"
inventoryv1 "github.com/percona/pmm/api/inventory/v1"
"github.com/percona/pmm/utils/ddsketch"
"github.com/percona/pmm/utils/sqlmetrics"
)

Expand Down Expand Up @@ -631,6 +634,7 @@ func (m *PGStatMonitorQAN) makeBuckets(current, cache map[time.Time]map[string]*
m.l.WithError(err).Warnf("failed to parse histogram from resp calls")
} else {
mb.Postgresql.HistogramItems = histogram
mb.Common.MQueryTimeSketch = querySketchFromHistogram(histogram)
}

if (currentPSM.TotalPlanTime - prevPSM.TotalPlanTime) != 0 {
Expand Down Expand Up @@ -789,5 +793,53 @@ func getHistogramRangesArray(vPGSM pgStatMonitorVersion) []*agentv1.HistogramIte
}
}

// querySketchFromHistogram remaps the pg_stat_monitor response-time histogram
// (per-bucket call counts over millisecond ranges) onto the frozen DDSketch
// layout, using each bucket's representative latency. It is the pg_stat_monitor
// source of mergeable query_time percentiles.
func querySketchFromHistogram(histogram []*agentv1.HistogramItem) map[uint32]uint64 {
dense := ddsketch.New()
for _, item := range histogram {
if item.Frequency == 0 {
continue
}
rep, ok := histogramBucketSeconds(item.Range)
if !ok {
continue
}
ddsketch.AddN(dense, rep, uint64(item.Frequency))
}
return ddsketch.ToWire(dense)
}

// histogramBucketSeconds parses a pg_stat_monitor range such as "(1 - 2)" or
// "(100000 - ...)" (milliseconds) and returns a representative latency in
// seconds: the geometric mean for finite buckets, the upper bound for the
// first [0, hi) bucket, and the lower bound for the open-ended top bucket.
func histogramBucketSeconds(rng string) (float64, bool) {
const msPerSec = 1000.0
loStr, hiStr, ok := strings.Cut(strings.TrimSuffix(strings.TrimPrefix(rng, "("), ")"), " - ")
if !ok {
return 0, false
}
lo, err := strconv.ParseFloat(strings.TrimSpace(loStr), 64)
if err != nil {
return 0, false
}
hiStr = strings.TrimSpace(hiStr)
if hiStr == "..." {
return lo / msPerSec, true
}
hi, err := strconv.ParseFloat(hiStr, 64)
if err != nil {
return 0, false
}
rep := math.Sqrt(lo * hi)
if lo == 0 {
rep = hi
}
return rep / msPerSec, true
}

// check interfaces.
var _ prometheus.Collector = (*PGStatMonitorQAN)(nil)
Loading
Loading