Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
231c454
Document what duct's resource statistics actually measure
asmacdo Apr 23, 2026
35a05a2
refactor: introduce PsSampler class in place of module-level _get_sample
asmacdo Apr 23, 2026
04cb61a
feat: add --sampler flag and sampler schema tag (POC)
asmacdo Apr 23, 2026
b8bb8fa
test: absorb resource-validation harness from PR #403
asmacdo Apr 23, 2026
5e5d537
test: add workload catalog with known-ground-truth scripts
asmacdo Apr 23, 2026
e3b422e
infra: add sampler-matrix harness (marker + JSONL hook + CSV generator)
asmacdo Apr 23, 2026
6b6fe0f
test: rework matrix harness + add ps-column matrix tests
asmacdo Apr 23, 2026
dbfad42
feat: implement CgroupSampler (cgroup v2 + ps hybrid, POC)
asmacdo Apr 23, 2026
81ef395
test: add cgroup-column matrix tests via systemd-run per duct invocation
asmacdo Apr 24, 2026
bd9307d
test: add scripts/regen_matrix.sh; unseed cgroup CSV for datalad-run
asmacdo Apr 24, 2026
9ec9d31
[DATALAD RUNCMD] scripts/regen_matrix.sh
asmacdo Apr 24, 2026
75800d3
test: expand matrix schema + add ephemeral/spikey CPU workloads
asmacdo Apr 24, 2026
52633ca
test: tune matrix thresholds after empirical calibration
asmacdo Apr 24, 2026
8d03142
[DATALAD RUNCMD] scripts/regen_matrix.sh
asmacdo Apr 24, 2026
dfd2702
docs: revise multiple-samplers.md to cgroup-only SPEC-as-implemented
asmacdo Apr 24, 2026
7de046a
docs: link sampler capability matrix from resource-statistics
asmacdo Apr 24, 2026
9e105c8
test: make ephemeral_cpu workload actually ephemeral via os.fork
asmacdo Apr 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ src/con_duct/_version.py
# AI
.serena/
.local*

# Sampler-matrix raw results; CSV at test/sampler_matrix.csv is committed
.sampler_matrix_results.jsonl
675 changes: 675 additions & 0 deletions docs/design/multiple-samplers.md

Large diffs are not rendered by default.

508 changes: 508 additions & 0 deletions docs/resource-statistics.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ filterwarnings = [
]
markers = [
"flaky: mark a test as being unreliable",
"sampler_matrix(sampler, workload, metric, direction, expected): sampler-matrix cell; direction is 'overreport' or 'underreport'; conftest hook records actual pass/fail for scripts/gen_sampler_matrix.py",
"cgroup_matrix: opt-in matrix tests that require systemd-run --user and a writable /sys/fs/cgroup v2; pass --cgroup-matrix to pytest to run",
]
norecursedirs = ["test/data"]

Expand Down
118 changes: 118 additions & 0 deletions scripts/gen_sampler_matrix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""Regenerate test/sampler_matrix_<sampler>.csv from matrix test results.

The conftest hook in test/conftest.py writes a JSONL file (one record
per sampler_matrix-marked test) during the pytest run. Each record:

{
"sampler": "ps",
"workload": "memory_children",
"metric": "rss",
"direction": "overreport",
"expected": "fail",
"actual": "fail",
"nodeid": "test/duct_main/test_sampler_matrix.py::..."
}

This script pivots the JSONL into one CSV per sampler --
rows=``<workload>/<metric>``, columns=``no_<direction>``,
cells=pass|fail|n/a -- and writes them into test/. The CSVs are
checked in so reviewers can see each sampler's capability profile
(does-not-under-report / does-not-over-report, per workload/metric
pair) without running tests.

Each cell records the *actual* outcome (what the sampler did),
independent of whether that matched our committed expectation. The
commit hash for a given matrix snapshot is the source of truth for
what we expected at that point; the ``expected`` metadata lives in
the test marker, not the CSV.

Usage:
python -m pytest test/ # populate .sampler_matrix_results.jsonl
python scripts/gen_sampler_matrix.py
"""

from __future__ import annotations
import csv
import json
from pathlib import Path
import sys

REPO_ROOT = Path(__file__).resolve().parent.parent
JSONL_PATH = REPO_ROOT / ".sampler_matrix_results.jsonl"
CSV_DIR = REPO_ROOT / "test"

UNTESTED = "n/a"


def load_records() -> list[dict[str, str]]:
if not JSONL_PATH.exists():
return []
records = []
with JSONL_PATH.open() as f:
for line in f:
line = line.strip()
if line:
records.append(json.loads(line))
return records


def group_by_sampler(
records: list[dict[str, str]],
) -> dict[str, list[dict[str, str]]]:
grouped: dict[str, list[dict[str, str]]] = {}
for r in records:
sampler = r.get("sampler")
if not sampler:
continue
grouped.setdefault(sampler, []).append(r)
return grouped


def write_csv_for_sampler(sampler: str, records: list[dict[str, str]]) -> Path:
# row label -> column label -> actual
by_row: dict[str, dict[str, str]] = {}
columns: set[str] = set()
for r in records:
workload = r.get("workload")
metric = r.get("metric")
direction = r.get("direction")
actual = r.get("actual")
if not (workload and metric and direction and actual):
continue
row_label = f"{workload}/{metric}"
col_label = f"no_{direction}"
# Last write wins if the same cell appears twice in one run.
by_row.setdefault(row_label, {})[col_label] = actual
columns.add(col_label)

sorted_columns = sorted(columns)
out_path = CSV_DIR / f"sampler_matrix_{sampler}.csv"
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["workload/metric", *sorted_columns])
for row_label in sorted(by_row):
row = [row_label]
for col in sorted_columns:
row.append(by_row[row_label].get(col, UNTESTED))
writer.writerow(row)
return out_path


def main() -> int:
records = load_records()
if not records:
print(
f"no matrix results found at {JSONL_PATH}; " "run `pytest test/` first",
file=sys.stderr,
)
return 0
for sampler, sampler_records in sorted(group_by_sampler(records).items()):
path = write_csv_for_sampler(sampler, sampler_records)
print(f"wrote {path} ({len(sampler_records)} records)")
return 0


if __name__ == "__main__":
sys.exit(main())
30 changes: 30 additions & 0 deletions scripts/regen_matrix.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env bash
# Regenerate sampler matrix CSVs from a fresh test run.
#
# Canonical invocation (records command + inputs/outputs as provenance):
#
# datalad run scripts/regen_matrix.sh
#
# Plain invocation also works; you lose the datalad-run metadata but
# the CSVs are identical.
#
# Requirements on the host:
# - .tox/py312 present (run `tox -e py312` once if absent)
# - systemd-run --user --scope working, for the opt-in cgroup_matrix
# tests (hosts without user systemd skip those tests, and the
# cgroup column stays at its prior values / (not yet tested))
#
# The script runs the matrix tests with --cgroup-matrix and invokes the
# CSV generator. Both are fast (<20 s combined) against an already-
# populated .tox/py312 env.

set -euo pipefail

cd "$(git rev-parse --show-toplevel)"

.tox/py312/bin/python -m pytest \
-o addopts= \
--cgroup-matrix \
test/duct_main/test_sampler_matrix.py

python3 scripts/gen_sampler_matrix.py
11 changes: 11 additions & 0 deletions src/con_duct/_duct_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import IO, TextIO
from con_duct._models import LogPaths, Outputs, RecordTypes, SessionMode
from con_duct._output import TailPipe, prepare_outputs, remove_files, safe_close_files
from con_duct._sampling import make_sampler
from con_duct._signals import SigIntHandler
from con_duct._tracker import Report, monitor_process

Expand Down Expand Up @@ -49,6 +50,7 @@ def execute(
colors: bool,
mode: SessionMode,
message: str = "",
sampler: str = "ps",
) -> int:
"""A wrapper to execute a command, monitor and log the process details.

Expand All @@ -59,6 +61,14 @@ def execute(
"--report-interval must be greater than or equal to --sample-interval."
)

sampler_instance = make_sampler(sampler)
# TODO(poc): the cgroup sampler reads duct's own cgroup via
# /proc/self/cgroup; a new-session spawn would stay in the same
# cgroup on Linux, but explicit current-session is cleaner and
# keeps the "duct measures the cgroup it lives in" invariant.
if sampler == "cgroup-ps-hybrid" and mode == SessionMode.NEW_SESSION:
raise ValueError("sampler 'cgroup-ps-hybrid' requires --mode=current-session")

log_paths = LogPaths.create(output_prefix, pid=os.getpid())
log_paths.prepare_paths(clobber, capture_outputs)
stdout, stderr = prepare_outputs(capture_outputs, outputs, log_paths)
Expand Down Expand Up @@ -86,6 +96,7 @@ def execute(
colors,
clobber,
message=message,
sampler=sampler_instance,
)
files_to_close.append(report.usage_file)

Expand Down
130 changes: 128 additions & 2 deletions src/con_duct/_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
from datetime import datetime
import logging
import os
from pathlib import Path
import platform
import subprocess
import sys
from typing import Callable, Optional
import time
from typing import Optional, Union
from con_duct._models import Averages, ProcessStats, Sample

SYSTEM = platform.system()
Expand Down Expand Up @@ -143,4 +145,128 @@ def _get_sample_mac(session_id: int) -> Optional[Sample]:
"Linux": _get_sample_linux,
"Darwin": _get_sample_mac,
}
_get_sample: Callable[[int], Optional[Sample]] = _get_sample_per_system[SYSTEM] # type: ignore[assignment]


class PsSampler:
"""Sampler that uses `ps` to collect per-process stats."""

name = "ps"

def sample(self, session_id: int) -> Optional[Sample]:
return _get_sample_per_system[SYSTEM](session_id)


_CGROUP_V2_ROOT = Path("/sys/fs/cgroup")


def _detect_cgroup_v2_path() -> Path:
"""Return the absolute path to duct's own cgroup v2 directory.

Raises NotImplementedError if cgroup v2 unified hierarchy is not
mounted, or if /proc/self/cgroup does not expose a v2 entry.
"""
controllers = _CGROUP_V2_ROOT / "cgroup.controllers"
if not controllers.exists():
raise NotImplementedError(
"cgroup-ps-hybrid requires cgroup v2 unified hierarchy at "
f"{_CGROUP_V2_ROOT}; this host does not appear to have v2 mounted"
)
# cgroup v2 entry in /proc/<pid>/cgroup has the shape "0::/<path>".
proc_cgroup = Path("/proc/self/cgroup").read_text()
for line in proc_cgroup.splitlines():
if line.startswith("0::"):
rel = line.split("::", 1)[1].strip().lstrip("/")
return _CGROUP_V2_ROOT / rel
raise NotImplementedError(
"cgroup-ps-hybrid could not find a cgroup v2 entry ('0::/...') in "
"/proc/self/cgroup"
)


def _read_cgroup_cpu_usage_usec(cgroup_root: Path) -> int:
"""Read cumulative CPU microseconds from cgroup v2 ``cpu.stat``."""
for line in (cgroup_root / "cpu.stat").read_text().splitlines():
if line.startswith("usage_usec "):
return int(line.split()[1])
raise RuntimeError(f"usage_usec not present in {cgroup_root / 'cpu.stat'}")


class CgroupSampler:
"""Hybrid cgroup v2 + ps sampler.

Session totals (``total_rss``, ``total_pcpu``) come from kernel
cgroup counters; per-pid stats come from a ``ps`` sub-sample. The
``sampler`` tag on each emitted record disambiguates the source.

Reader mode only: duct does NOT create a cgroup; it reads the one
it already lives in via ``/proc/self/cgroup``. This requires
``--mode=current-session`` so duct and the tracked command share a
cgroup (enforced in ``_duct_main.execute``).

TODO(poc): our polling shape (sample-at-interval + Sample.aggregate
max) is inherited from the ps model. cgroup could emit cumulative/
delta directly -- e.g. one ``memory.peak`` read at end-of-run
instead of max-of-currents -- but that would require reshaping the
Sample/Report pipeline. Deferred post-POC.
"""

name = "cgroup-ps-hybrid"

def __init__(self) -> None:
self._cgroup_root = _detect_cgroup_v2_path()
# Baseline for delta-based pcpu on the first sample.
self._last_cpu_usec = _read_cgroup_cpu_usage_usec(self._cgroup_root)
self._last_cpu_time = time.monotonic()

def sample(self, session_id: int) -> Optional[Sample]:
# Per-pid stats via the ps path so records still carry the pid
# breakdown users expect from duct.
sample = _get_sample_per_system[SYSTEM](session_id)
if sample is None:
return None
try:
# Memory: session total from cgroup (replaces the ps sum).
mem_current = int(
(self._cgroup_root / "memory.current").read_text().strip()
)
sample.total_rss = mem_current

# CPU: delta of cumulative usage_usec over the last interval.
now_usec = _read_cgroup_cpu_usage_usec(self._cgroup_root)
now_time = time.monotonic()
delta_sec = now_time - self._last_cpu_time
if delta_sec > 0:
delta_usec = now_usec - self._last_cpu_usec
# usage_usec / elapsed_usec * 100 = percent of one core.
sample.total_pcpu = delta_usec / delta_sec / 10_000
self._last_cpu_usec = now_usec
self._last_cpu_time = now_time
except (OSError, ValueError) as exc:
raise RuntimeError(
f"cgroup read failed at {self._cgroup_root}: {exc}"
) from exc

# TODO(poc): total_vsz and total_pmem remain ps-sourced; cgroup
# v2 has no direct analogs (memory.current is already physical).
# TODO(poc): overwrite full_run_stats.total_rss with memory.peak
# at end of run for a truer run-level peak than max-of-currents.
# TODO(poc): this assumes the tracked command stays in duct's
# cgroup; systemd-run or similar would migrate the child out
# and silently break the measurement.

# Recompute averages so they reflect the cgroup-sourced totals
# rather than the stale ps-sourced values set by _get_sample_*.
sample.averages = Averages.from_sample(sample=sample)
return sample


Sampler = Union[PsSampler, CgroupSampler]


def make_sampler(name: str) -> Sampler:
"""Factory: resolve a sampler name (as passed on the CLI) to an instance."""
if name == PsSampler.name:
return PsSampler()
if name == CgroupSampler.name:
return CgroupSampler()
raise ValueError(f"unknown sampler: {name!r}")
Loading
Loading