Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build/ansible/roles/clickhouse/files/default-users.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<profiles>
<!-- Default settings. -->
<default>
<async_insert>1</async_insert>
Comment thread
ademidoff marked this conversation as resolved.
</default>

<!-- Profile that allows only read queries. -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<profiles>
<!-- Default settings. -->
<default>
<async_insert>1</async_insert>
Comment thread
ademidoff marked this conversation as resolved.
<!-- PMM: Low-memory tuning: block size, download threads, parallel parsing/formatting -->
<max_block_size>8192</max_block_size> <!-- PMM: Before 65409, now 8192 rows -->
<max_download_threads>1</max_download_threads> <!-- PMM: Default no limit (0), now 1-->
Expand Down
4 changes: 2 additions & 2 deletions qan-api2/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ The `metrics` table stores one row per query fingerprint per collection period:
- Use prepared statements for inserts
- Add ClickHouse migrations as numbered SQL files in `migrations/sql/`
- Support cluster mode: use template conditions for ReplicatedMergeTree vs MergeTree
- Use LBAC (Label-Based Access Control) filters from `X-PMM-LBAC-Filters` header when building reports
- Use LBAC (Label-Based Access Control) filters from `X-Proxy-Filter` header when building reports

### Don't
- Don't use an ORM for ClickHouse — raw SQL with sqlx is the pattern
Expand All @@ -91,7 +91,7 @@ CLI flags (parsed via `kingpin`):
- `--json-bind` (default `:9922`) — JSON/REST listen address
- `--listen-debug-addr` (default `127.0.0.1:9933`) — debug endpoint
- `--dsn` — ClickHouse DSN (alternative to individual `--clickhouse-*` flags)
- `--clickhouse-addr`, `--clickhouse-database`, `--clickhouse-pool-size`
- `--clickhouse-addr`, `--clickhouse-database`, `--clickhouse-user`, `--clickhouse-password` — ClickHouse connection parameters
- `--clickhouse-cluster` — enable cluster mode
- `--clickhouse-cluster-name` — cluster name for ReplicatedMergeTree
- `--data-retention` — how long to keep data (default 30 days)
Expand Down
13 changes: 6 additions & 7 deletions qan-api2/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,15 @@ install-race: ## Install qan-api2 binary with race detector
TEST_CONTAINER_NAME := pmm-clickhouse-test
TEST_IMAGE_NAME := clickhouse/clickhouse-server:25.3.6.56

_start-clickhouse:
start-clickhouse:
@if [ "$$(uname)" = "Darwin" ]; then \
if ! command -v timeout > /dev/null 2>&1; then \
echo "Error: missing timeout (run 'brew install coreutils')"; exit 1; \
fi; \
fi
@echo "-> Starting Clickhouse container $(TEST_CONTAINER_NAME)"
@docker run -d \
--platform=linux/amd64 \
--name $(TEST_CONTAINER_NAME) \
--name $(TEST_CONTAINER_NAME) \
-p 19000:9000 \
-p 18123:8123 \
-e CLICKHOUSE_USER=default \
Expand All @@ -60,22 +59,22 @@ _start-clickhouse:
(echo "Timed out waiting for ClickHouse to become ready" && exit 1)
@echo "✅ Clickhouse container is ready for use"

_stop-clickhouse:
stop-clickhouse:
@echo "-> Terminating Clickhouse container $(TEST_CONTAINER_NAME)"
docker stop $(TEST_CONTAINER_NAME)
docker rm $(TEST_CONTAINER_NAME)

_import-test-data: ## Create pmm_test DB and load test data
import-test-data: ## Create pmm_test DB and load test data
@echo "-> Importing test data..."
docker exec $(TEST_CONTAINER_NAME) clickhouse client --password=clickhouse --query="CREATE DATABASE IF NOT EXISTS pmm_test;"
go run $(CURDIR)/cmd/render-migrations | docker exec -i $(TEST_CONTAINER_NAME) clickhouse client --password=clickhouse -d pmm_test --multiline --multiquery
cat $(CURDIR)/fixture/metrics.part_*.json | docker exec -i $(TEST_CONTAINER_NAME) clickhouse client --password=clickhouse -d pmm_test --query="INSERT INTO metrics FORMAT JSONEachRow"
@echo "✅ Test data is ready for use"

test-env-up: _start-clickhouse _import-test-data ## Start docker containers used for testing and import test data
test-env-up: start-clickhouse import-test-data ## Start docker containers used for testing and import test data
@echo "✅ Clickhouse is ready for use on address 127.0.0.1 ports 18123,19000"

test-env-down: _stop-clickhouse ## Stop and remove docker containers used for testing
test-env-down: stop-clickhouse ## Stop and remove docker containers used for testing
@echo "✅ Clickhouse container has been cleaned up"

test: ## Run tests (run `make test-env-up` beforehand)
Expand Down
6 changes: 6 additions & 0 deletions qan-api2/models/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ import (

const queryTimeout = 30 * time.Second

// MaxParallelQueries bounds the number of ClickHouse queries a single request may
// run concurrently. It is kept well below the connection pool size (see maxOpenConns
// in main.go) so one request cannot monopolize the pool — which is shared with the
// data ingestion writer — or flood ClickHouse with concurrent scans.
const MaxParallelQueries = 4

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this is maxOpenConns enough? It is 10 and shared with the ingest writer, report Select(), filter queries, and up to 4 parallel sparklines per report. I believe in this case few concurrent QAN users could exhaust the pool and time out.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — this is moot now. I've removed the sparkline parallelization (and MaxParallelQueries) in 1434e43.

Running several sparkline aggregations concurrently multiplies peak ClickHouse memory per report, which works against the OOM/crash fix this PR targets — most acutely on the low-memory profile — and it coupled read concurrency to the shared pool (a burst of reports could starve the single ingest writer). Sparklines are serial again and maxOpenConns/maxIdleConns stay at baseline (10/5), so the pool-exhaustion concern no longer applies.

The report-latency win will be revisited separately, likely via a pre-aggregated rollup rather than more concurrency.


var sparklinePointAllFields = []string{
"point",
"timestamp",
Expand Down
52 changes: 19 additions & 33 deletions qan-api2/models/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,40 +458,26 @@ func (r *Reporter) SelectSparklines(ctx context.Context, dimensionVal string,
return results, err
}

// queryDimension lists every value of a dimension in the period (so the filter panel
// can show values that currently match no active filter) together with the main metric
// summed only over rows matching the other selected dimensions. A single scan with
// sumIf replaces the previous filtered + "enumerate with 0" UNION ALL (two scans).
const queryDimension = `
SELECT
key,
value,
sum(main_metric_sum) AS main_metric_sum
FROM
(
SELECT
'{{ .DimensionName }}' AS key,
{{ .DimensionName }} AS value,
SUM({{ .MainMetric }}) AS main_metric_sum
FROM metrics
WHERE (period_start >= ?) AND (period_start <= ?)
{{range $key, $vals := .Dimensions }} AND ({{ $key }} IN ('{{ StringsJoin $vals "', '" }}')){{ end }}
{{ if .LbacFilter }}
AND ({{ .LbacFilter }})
{{ end }}
GROUP BY {{ .DimensionName }}
UNION ALL
SELECT
'{{ .DimensionName }}' AS key,
{{ .DimensionName }} AS value,
0 AS main_metric_sum
FROM metrics
WHERE (period_start >= ?) AND (period_start <= ?)
{{ if .LbacFilter }}
AND ({{ .LbacFilter }})
{{ end }}
GROUP BY {{ .DimensionName }}
)
GROUP BY
key,
value
WITH TOTALS
'{{ .DimensionName }}' AS key,
{{ .DimensionName }} AS value,
sumIf({{ .MainMetric }},
{{ if .Dimensions }}{{ $i := 0 }}
{{ range $key, $vals := .Dimensions }}{{ $i = inc $i }}{{ if gt $i 1 }} AND {{ end }}{{ $key }} IN ('{{ StringsJoin $vals "', '" }}'){{ end }}
{{ else }}1{{ end }}
) AS main_metric_sum
FROM metrics
WHERE (period_start >= ?) AND (period_start <= ?)
{{ if .LbacFilter }}
AND ({{ .LbacFilter }})
{{ end }}
GROUP BY {{ .DimensionName }}
WITH TOTALS
ORDER BY
main_metric_sum DESC,
value ASC
Expand Down Expand Up @@ -622,7 +608,7 @@ func (r *Reporter) queryFilters(ctx context.Context, periodStartFromSec,
return nil, 0, fmt.Errorf("cannot execute tmplQueryFilter %s: %w", queryBuffer.String(), err)
}

rows, err := r.db.QueryContext(ctx, queryBuffer.String(), periodStartFromSec, periodStartToSec, periodStartFromSec, periodStartToSec)
rows, err := r.db.QueryContext(ctx, queryBuffer.String(), periodStartFromSec, periodStartToSec)
if err != nil {
return nil, 0, fmt.Errorf("failed to select for QueryFilter %s: %w", queryBuffer.String(), err)
}
Expand Down
81 changes: 81 additions & 0 deletions qan-api2/models/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
package models

import (
"bytes"
"context"
"encoding/base64"
"regexp"
"strings"
"testing"

"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -282,3 +285,81 @@ func TestMatchersToSQL(t *testing.T) {
require.Contains(t, err.Error(), "unsupported matcher type")
})
}

func TestQueryDimensionTemplate(t *testing.T) {
type tmplArgs struct {
MainMetric string
DimensionName string
Dimensions map[string][]string
Labels map[string][]string
LbacFilter string
}

whitespace := regexp.MustCompile(`\s+`)
render := func(t *testing.T, args tmplArgs) string {
t.Helper()
var buf bytes.Buffer
require.NoError(t, queryDimensionTmpl.Execute(&buf, args))
return strings.TrimSpace(whitespace.ReplaceAllString(buf.String(), " "))
}

testCases := []struct {
name string
args tmplArgs
expected string
}{
{
// No filters: enumerate every value (sumIf predicate is the constant 1).
name: "no dimension filters",
args: tmplArgs{MainMetric: "m_query_time_sum", DimensionName: "service_name"},
expected: "SELECT 'service_name' AS key, service_name AS value, " +
"sumIf(m_query_time_sum, 1 ) AS main_metric_sum " +
"FROM metrics WHERE (period_start >= ?) AND (period_start <= ?) " +
"GROUP BY service_name WITH TOTALS ORDER BY main_metric_sum DESC, value ASC",
},
{
name: "single dimension filter",
args: tmplArgs{
MainMetric: "m_query_time_sum",
DimensionName: "service_name",
Dimensions: map[string][]string{"environment": {"prod", "dev"}},
},
expected: "SELECT 'service_name' AS key, service_name AS value, " +
"sumIf(m_query_time_sum, environment IN ('prod', 'dev') ) AS main_metric_sum " +
"FROM metrics WHERE (period_start >= ?) AND (period_start <= ?) " +
"GROUP BY service_name WITH TOTALS ORDER BY main_metric_sum DESC, value ASC",
},
{
// text/template ranges maps in sorted key order, so the predicate is deterministic.
name: "multiple dimension filters ANDed in sorted key order",
args: tmplArgs{
MainMetric: "num_queries",
DimensionName: "username",
Dimensions: map[string][]string{"environment": {"prod"}, "cluster": {"c1", "c2"}},
},
expected: "SELECT 'username' AS key, username AS value, " +
"sumIf(num_queries, cluster IN ('c1', 'c2') AND environment IN ('prod') ) AS main_metric_sum " +
"FROM metrics WHERE (period_start >= ?) AND (period_start <= ?) " +
"GROUP BY username WITH TOTALS ORDER BY main_metric_sum DESC, value ASC",
},
{
name: "LBAC filter is added to WHERE",
args: tmplArgs{
MainMetric: "m_query_time_sum",
DimensionName: "service_name",
LbacFilter: "service_type = 'mysql'",
},
expected: "SELECT 'service_name' AS key, service_name AS value, " +
"sumIf(m_query_time_sum, 1 ) AS main_metric_sum " +
"FROM metrics WHERE (period_start >= ?) AND (period_start <= ?) " +
"AND (service_type = 'mysql') " +
"GROUP BY service_name WITH TOTALS ORDER BY main_metric_sum DESC, value ASC",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expected, render(t, tc.args))
})
}
}
53 changes: 34 additions & 19 deletions qan-api2/services/analytics/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"strings"

"golang.org/x/sync/errgroup"

qanpb "github.com/percona/pmm/api/qan/v1"
"github.com/percona/pmm/qan-api2/models"
)
Expand Down Expand Up @@ -138,6 +140,7 @@ func (s *Service) GetReport(ctx context.Context, in *qanpb.GetReportRequest) (*q
resp.Offset = in.Offset
resp.Limit = in.Limit

resp.Rows = make([]*qanpb.Row, len(results))
for i, res := range results {
numQueries := interfaceToFloat32(res["num_queries"])
//nolint:forcetypeassert
Expand All @@ -157,32 +160,44 @@ func (s *Service) GetReport(ctx context.Context, in *qanpb.GetReportRequest) (*q
row.Fingerprint = "TOTAL"
}

// The row with index 0 is total.
isTotal := i == 0

sparklines, err := s.rm.SelectSparklines(
ctx,
row.Dimension,
periodStartFromSec,
periodStartToSec,
dimensions,
labels,
group,
mainMetric,
isTotal,
)
if err != nil {
return nil, err
}
row.Sparkline = sparklines
for _, c := range columns {
stats := makeStats(c, total, res, numQueries, periodDurationSec)
row.Metrics[c] = &qanpb.Metric{
Stats: stats,
}
}
resp.Rows = append(resp.Rows, row)
resp.Rows[i] = row
}

// Fetch sparklines for all rows concurrently instead of one query per row in series.
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(models.MaxParallelQueries)
for i, row := range resp.Rows {
isTotal := i == 0 // the row with index 0 is total.
g.Go(func() error {
sparklines, err := s.rm.SelectSparklines(
gCtx,
row.Dimension,
periodStartFromSec,
periodStartToSec,
dimensions,
labels,
group,
mainMetric,
isTotal,
)
if err != nil {
return err
}
row.Sparkline = sparklines
return nil
})
}
err = g.Wait()
if err != nil {
return nil, err
}

return resp, nil
}

Expand Down
2 changes: 1 addition & 1 deletion vmproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package main

import (
"errors"
"net"
"net/http"
"net/url"
"strconv"

"github.com/alecthomas/kong"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/percona/pmm/version"
Expand Down
Loading