diff --git a/.gitignore b/.gitignore index 670257ea..0fcd9ccd 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,6 @@ src/con_duct/_version.py # AI .serena/ .local* + +# Sampler-matrix raw results; CSV at test/sampler_matrix.csv is committed +.sampler_matrix_results.jsonl diff --git a/docs/design/multiple-samplers.md b/docs/design/multiple-samplers.md new file mode 100644 index 00000000..7c0c257d --- /dev/null +++ b/docs/design/multiple-samplers.md @@ -0,0 +1,675 @@ +# Design: multiple samplers (cgroup-ps-hybrid) + +**Status:** POC implementation landed on branch `sampler-choice`. This +document describes the design *as implemented* for in-scope items and +lists deferred work in §10 (Future Directions). + +**Related:** [`docs/resource-statistics.md`](../resource-statistics.md) +documents the current `ps`-based semantics honestly; this document +describes how we give users a *different* sampler alongside `ps` with +cleaner semantics for the cases that matter. + +--- + +## 1. Summary + +Duct historically sampled resource usage exclusively via `ps(1)`. The +semantics `ps` provides are correct for what `ps` measures but mis-fit +for the HPC / job-accounting use case where duct's numbers are often +used to size follow-up SLURM allocations. Two failure modes in +particular surface in practice: + +- `ps -o pcpu` is a *lifetime average*, not an instantaneous rate. + Summed across a session with short-lived or bursty multi-threaded + children, totals can legitimately exceed the system's physical CPU + ceiling — issue [#399](https://github.com/con/duct/issues/399) + reported 5363% on a 20-core box. +- `ps -o rss` counts shared pages in every process that maps them. + Summed across forked workers, shared libraries and copy-on-write + memory get counted 3–10× on typical Python workloads. + +Neither is a duct bug; both are correct consequences of what `ps` +measures. Fixing them requires a different measurement source. + +This design ships **one** additional sampler: + +- **`cgroup-ps-hybrid`** — Linux-only, zero new dependency, reads + cgroup v2 counters (`memory.current`, `cpu.stat.usage_usec`) for + session totals, keeps `ps` as the per-pid sub-sampler. Matches the + counters SLURM's `slurmstepd` and Docker's `docker stats` already + report. + +`ps` remains the default — backwards-compatible, stdlib-only, works +on every platform duct currently supports. `cgroup-ps-hybrid` is +explicitly opt-in via `--sampler=cgroup-ps-hybrid`. + +> **Name note.** `cgroup-ps-hybrid` is a deliberately clunky +> *placeholder* for the POC. It is descriptive (it *is* a hybrid of +> cgroup totals and ps per-pid) but not a production name. Renaming +> is listed under §9 Schema Open Questions. + +Other candidate samplers (notably `psutil`) are interesting and are +enumerated in §10 Future Directions, but are not in the POC scope. + +--- + +## 2. Problem statement + +`docs/resource-statistics.md` has the user-facing explanation of what +the current `ps`-based sampler does and doesn't measure. Condensed +motivation for this design: + +### 2.1 CPU: lifetime-cumulative ratio × per-pid summing + +Linux `ps -o pcpu` reports `cputime / elapsed`, with both numbers +accumulated over the process's entire life. For a just-spawned +multi-threaded native workload, the elapsed denominator is small and +the ratio inflates — a worker that's been alive 10 ms and consumed +40 ms of multi-thread CPU reports 400%. Duct's per-pid sampling then +sums these per-pid values across the session, compounding the +inflation across N workers. Real-world examples on `pip install` +workloads under `tox` (short-lived C-extension compilers) have hit +over 1000% aggregated pcpu. + +Note: this is **Linux-specific**. BSD/Darwin `ps -o pcpu` is a +decaying ~1-minute average, not a lifetime cumulative ratio, so the +inflation mechanism doesn't apply there. + +### 2.2 Memory: shared pages per-pid × per-pid summing + +`ps -o rss` counts every page a process has mapped, including shared +pages like the Python interpreter's `.text` segment. Forking N +children that all share the interpreter causes duct's per-pid rss +sum to count those shared pages N times. On a typical Python workload +with a few children, the overcount is 3–10× actual physical memory. + +### 2.3 Why cgroup + +Linux cgroup v2 counters are kernel-accounted for the entire cgroup +at once: + +- `memory.current` is physical memory use of the cgroup. Shared pages + are counted once regardless of how many processes map them. +- `cpu.stat.usage_usec` is cumulative microseconds of CPU consumed + by the cgroup. Delta between two reads divided by wall-clock gives + instantaneous %CPU, physically bounded by cores in use. + +These are the same counters SLURM's `slurmstepd` and `docker stats` +read. Using them for duct's totals makes duct's numbers directly +comparable to the scheduler's own accounting — which is the killer +feature for HPC users sizing follow-up allocations from duct logs. + +The cost is that cgroup counters are cgroup-scoped, not pid-scoped. +Per-pid attribution still requires `ps` (or a future alternative). +That's why the POC shipped a *hybrid*: cgroup for session totals, ps +for the per-pid breakdown. + +--- + +## 3. Requirements + +Three axes, pulling against each other: + +| Requirement | What it means | +|-----------------------------|----------------------------------------------------------------------------------------------------| +| **Portable** | Linux primary (HPC), macOS secondary (dev). Must not regress any platform duct currently works on. | +| **Lightweight at runtime** | Sampling overhead scales reasonably with process count and polling cadence. | +| **Lightweight to install** | Stdlib-only path (`pip install con-duct`) keeps working without any new mandatory dep. | + +Secondary: + +- **Accurate** for what each sampler claims to measure. No sampler + should lie about its semantics. +- **Backwards compatible** with existing `usage.jsonl` / `info.json` + consumers. Additive schema changes only. + +The multi-sampler approach satisfies these by keeping `ps` as the +default (portable, stdlib, unchanged behavior) and adding +`cgroup-ps-hybrid` as opt-in for the platform and use case that needs +different semantics. + +--- + +## 4. Sampler comparison (in-scope) + +| Sampler | Portable | Runtime cost | Install cost | CPU semantics | Memory semantics | +|----------------------|--------------------------|------------------------------------------|--------------|----------------------------------------------------------------|---------------------------------------------------------------| +| `ps` | Linux + macOS | O(N) fork+exec per sample, 10–20 ms/poll | stdlib | Lifetime-average per-pid summed across session (macOS: decaying average) | Per-process RSS summed across session (counts shared pages N×) | +| `cgroup-ps-hybrid` | Linux (cgroup v2 only) | O(1) cgroup read per sample plus one ps call | stdlib | Delta of kernel cumulative `usage_usec`; physically bounded | Kernel-accounted `memory.current`; shared pages counted once | + +Alternatives considered but not implemented for the POC (`psutil`, +`pidstat`, `cgmemtime`, `memory_profiler`, a hand-rolled `/proc` +parser) are enumerated in §10 Future Directions. + +--- + +## 5. Architecture (as implemented) + +### 5.1 Sampler abstraction + +`src/con_duct/_sampling.py` moved from platform-dispatch to +sampler-dispatch. Each sampler is a concrete class that exposes a +`name` attribute and a `sample(session_id) -> Optional[Sample]` +method. A `Sampler` type alias (`Union[PsSampler, CgroupSampler]`) +stands in for a Protocol/ABC — introducing one is easy later if a +third concrete class joins; the POC didn't need it. + +`Report` (in `_tracker.py`) takes an optional `sampler` argument in +its constructor, defaulting to a fresh `PsSampler()` so every existing +caller is unaffected. `Report.collect_sample` delegates to +`self.sampler.sample(...)`. + +The `Sample` dataclass (in `_models.py`) is unchanged. `Sample.aggregate` +is unchanged — the peak-is-max, running-average pattern was kept +deliberately; the "aggregate totals mismatch" described in +`resource-statistics.md §Peak vs. average` is a property of what peaks +and averages respectively mean, not a bug. + +### 5.2 Backends + +#### `ps` sampler + +- Default. Preserves every byte of historical duct behavior. +- Only change: records carry `"sampler": "ps"` (see §5.4). +- `_get_sample_linux` / `_get_sample_mac` became private helpers + wrapped by `PsSampler.sample`. + +#### `cgroup-ps-hybrid` sampler + +Reader mode, hybrid. Implementation in `_sampling.py::CgroupSampler`: + +- **Availability.** `__init__` checks `/sys/fs/cgroup/cgroup.controllers` + exists (cgroup v2 unified hierarchy) and reads `/proc/self/cgroup` + to resolve duct's own cgroup path. v1 is refused with a clear + `NotImplementedError`. +- **Session totals from cgroup:** + - `memory.current` → `sample.total_rss` (overwrites the ps-sum; + the `"sampler"` tag disambiguates which source produced a given + record). + - `cpu.stat.usage_usec` delta over the last sample interval → + `sample.total_pcpu`. The sampler holds `(usage_usec, monotonic)` + state across calls; the first sample's delta is taken from a + baseline captured in `__init__`. +- **Per-pid data from ps.** The ps sub-sampler runs as before to + populate `sample.stats[pid]`, so the usage.jsonl records keep the + per-pid breakdown users expect. That's why the sampler is called + `-hybrid`. +- **`total_vsz` and `total_pmem` are still ps-sourced.** cgroup v2 + has no direct analogs (`memory.current` is already physical; vsz + is per-process by definition). +- **Catch-and-release** on cgroup read failures: OSError / ValueError + are caught and re-raised as a `RuntimeError` pointing at the cgroup + path for debuggability. + +The cgroup-ps-hybrid sampler additionally **requires +`--mode=current-session`**. Reader mode assumes duct and the tracked +command share a cgroup, which they do by default when duct does not +start a new session. `--mode=new-session` with `--sampler=cgroup-ps-hybrid` +errors out at startup before log paths are created. + +### 5.3 Selection UX + +**Explicit selection** (no auto-detection in the POC): + +- CLI: `--sampler={ps,cgroup-ps-hybrid}` +- Env: `DUCT_SAMPLER=…` +- Dotenv: loaded via the existing `DUCT_CONFIG_PATHS` mechanism + +Unknown sampler → `ValueError`. Known sampler unavailable in the +environment (cgroup v2 absent, Darwin asking for cgroup, etc.) → +clear error at startup; **no silent fallback**. The user explicitly +asked for something; if duct can't honor it, it refuses instead of +silently doing something different. + +Default is `ps` — preserves compatibility with every existing invocation. + +### 5.4 Schema + +The `usage.jsonl` and `info.json` shape changes minimally: + +- **New `sampler` field** on every usage.jsonl record and in + info.json. Value is the sampler's `name` (e.g., `"ps"` or + `"cgroup-ps-hybrid"`). Consumers that ignore unknown fields keep + working. Consumers that care can switch interpretation based on + the tag. +- **`total_*` fields are populated from cgroup counters** when the + cgroup sampler is in use. Field names are unchanged; only the + data source is different, and the `sampler` tag disambiguates. +- **`stats[pid]`** remains populated from the per-pid sub-sampler (ps + for this POC). + +No field renames. Consumers don't have to change. This is deliberately +the smallest possible schema delta; §9 lists the schema-level Open +Questions that were *not* resolved in the POC. + +--- + +## 6. Per-sampler detail + +### 6.1 `ps` sampler + +No substantive behavior change beyond the `sampler` tag on records. +`resource-statistics.md` carries the honest-labeling documentation of +what ps measures; the code doesn't need to change to make semantics +clearer. + +### 6.2 `cgroup-ps-hybrid` sampler + +**Availability detection.** On import of `_sampling.py`, no cgroup +action happens — the check runs when `CgroupSampler()` is instantiated +from `make_sampler`. Three steps, any failure → `NotImplementedError` +with an actionable message: + +1. Check `/sys/fs/cgroup/cgroup.controllers` exists (v2 filesystem). +2. Read `/proc/self/cgroup`, find the v2 line (`0::/…`), resolve it + to an absolute path under `/sys/fs/cgroup`. +3. (Implicit) Subsequent reads of `memory.current` and `cpu.stat` in + that cgroup either succeed or the sampler's catch-and-release turns + any OSError into a `RuntimeError` that names the failing path. + +**Counter reads per sample.** + +- `memory.current` → `sample.total_rss`. Kernel-accounted, shared + pages counted once. +- `cpu.stat` parsed for `usage_usec`. Delta from the previous + `(usage_usec, monotonic_clock)` stored on the sampler instance → + `sample.total_pcpu = delta_usec / delta_wall_seconds / 10_000` + (percent of one core). `__init__` captures the baseline so the + first call produces a meaningful delta. + +**Reader-mode scope caveat.** The cgroup duct reads is whatever +cgroup duct lives in. In HPC contexts this matches exactly what we +want (SLURM step cgroup; a container's cgroup namespace; a +`systemd-run --user --scope` transient unit). In a bare interactive +login session it's the user's slice, which contains every other +process the user has running — the numbers are then "all my stuff" +rather than "the command I asked about." This is documented, not +fixed; creator mode (see §10) is the proper remedy. + +**Explicit TODOs flagged in code** as `TODO(poc)`: + +- The ps-shaped polling cadence is a compromise. cgroup counters are + naturally *cumulative*; the sampler could emit deltas (or a single + end-of-run cumulative read) without the per-sample poll pattern. + The POC kept polling to reuse the existing `Sample`/`Report` + pipeline; reshaping that is §10 work. +- At end of run, `full_run_stats.total_rss` is the max across + per-sample `memory.current` reads. The kernel tracks a proper + high-water mark in `memory.peak` (available on Linux ≥5.13); + overwriting `full_run_stats.total_rss` from `memory.peak` at end of + run would give a more accurate peak than max-of-currents. +- The sampler assumes the tracked command stays in duct's cgroup. + `systemd-run` and other cgroup-migrating wrappers silently break + this. Creator mode (see §10) fixes it. + +--- + +## 7. Test strategy + +Test infrastructure lives in three layers: + +**Tier 0 — model unit tests.** Sampler-agnostic. `Sample`, +`aggregate`, etc. with handcrafted fixtures. Unchanged by this work. + +**Tier 1 — per-sampler behavior tests.** +`test/duct_main/test_resource_validation.py` (absorbed from PR #403) +exercises duct end-to-end against workload scripts with known ground +truth. These still run under the default `ps` sampler. + +**Tier 2 — sampler matrix.** `test/duct_main/test_sampler_matrix.py` +probes each `(sampler, workload, metric, direction)` cell of the +capability matrix. Each test: + +- Is marked with `@pytest.mark.sampler_matrix(sampler, workload, + metric, direction, expected)` carrying the cell metadata. +- Asserts one polarity (`peak >= floor` for `direction="underreport"`, + `peak <= ceiling` for `direction="overreport"`). +- Has an `expected` of `"pass"` or `"fail"`. A conftest hook converts + `expected="fail"` into `xfail(strict=True)`, so known sampler + limitations stay committed as expected-fail cells without making + the overall suite red; if a sampler improves, the xpass surfaces + noisily and we flip the expectation. + +The conftest also writes each test's actual outcome to +`.sampler_matrix_results.jsonl`. `scripts/gen_sampler_matrix.py` +pivots that into one CSV per sampler +(`test/sampler_matrix_.csv`, rows=`/`, +columns=`no_`, cells=`pass|fail|n/a`). Regeneration is +wrapped in `scripts/regen_matrix.sh` and canonically invoked via +`datalad run scripts/regen_matrix.sh --explicit --output …` so the +regeneration commit carries command provenance. + +**Opt-in cgroup matrix cells.** The `cgroup-ps-hybrid` matrix cells +are marked `@pytest.mark.cgroup_matrix` and skipped unless pytest is +invoked with `--cgroup-matrix`. Each such test spawns duct in a +transient systemd scope (`systemd-run --user --scope --collect`) so +the cgroup CgroupSampler reads is dedicated to just `duct + workload`, +not polluted by pytest or sibling host processes. This makes assertions +meaningful on a normal developer machine without requiring a SLURM +job. Hosts without a user systemd instance skip these tests. + +**Workload catalog** (`test/data/workloads/`). Small, deterministic, +standalone-runnable scripts with documented ground-truth contracts: + +- `steady_cpu.py` — single-core busy-loop. +- `alloc_memory.py` — known-size `bytearray` allocation. +- `ephemeral_cpu.py` — short-lived parallel workers that die between + duct samples. Anchors the "ps misses dead children's CPU" story. +- `spikey_cpu.py` — multi-threaded `hashlib.pbkdf2_hmac` bursts. + GIL-released, truly parallel. Anchors the "ps cputime/elapsed + inflation × per-pid summing" story. Linux-only (Darwin ps is a + decaying average, not cumulative). +- `test/data/memory_children.py` (absorbed from PR #403) — forked + children each holding a known-size allocation. Anchors the "ps + double-counts shared library pages" story. + +**CI.** A follow-up adds a GitHub Actions job that runs +`pytest --cgroup-matrix` on a runner with user systemd enabled (e.g. +via `loginctl enable-linger`) so the cgroup column of the matrix is +regenerated in CI and drift is caught. The POC commit does not wire +this up — CSVs are regenerated locally and committed via +`datalad run`. See §9 and §10. + +--- + +## 8. Results + +The POC's claim — that `cgroup-ps-hybrid` addresses real ps +pathologies — is captured in the committed CSVs: + +**`test/sampler_matrix_ps.csv`:** + +| workload/metric | no_overreport | no_underreport | +|----------------------|---------------|----------------| +| alloc_memory/rss | n/a | pass | +| ephemeral_cpu/pcpu | n/a | **fail** | +| memory_children/rss | **fail** | pass | +| spikey_cpu/pcpu | **fail** | n/a | +| steady_cpu/pcpu | n/a | pass | + +**`test/sampler_matrix_cgroup-ps-hybrid.csv`:** + +| workload/metric | no_overreport | no_underreport | +|----------------------|---------------|----------------| +| alloc_memory/rss | n/a | pass | +| ephemeral_cpu/pcpu | n/a | **pass** | +| memory_children/rss | **pass** | pass | +| spikey_cpu/pcpu | **pass** | n/a | +| steady_cpu/pcpu | n/a | pass | + +Three ps failure modes flip to pass under cgroup-ps-hybrid: + +- **`memory_children/rss/no_overreport`** — shared-library per-pid + double-counting (§2.2). +- **`spikey_cpu/pcpu/no_overreport`** — cputime/elapsed inflation on + young multi-threaded processes × per-pid summing (§2.1). +- **`ephemeral_cpu/pcpu/no_underreport`** — ps misses CPU consumed by + children that exit between samples; cgroup's cumulative + `cpu.stat.usage_usec` captures it regardless. + +`n/a` cells mean "no test currently probes that (workload, metric, +direction) combination." They are not claims; §10 lists matrix +completeness as future work. + +--- + +## 9. Schema open questions + +Schema changes in the POC were deliberately minimal: one additive +`"sampler"` field on records and in info.json. The following +questions were deferred rather than resolved: + +1. **Explicit `schema_version` bump?** The POC keeps + `__schema_version__` unchanged since the change is strictly + additive. A production release may want to bump anyway, so + consumers have a clean signal that the sampler tag might be + present. + +2. **Translation / compat layer for pre-tag consumers?** Current + consumers (`con-duct pprint`, `plot`, `ls`) are all in-repo and + handle the optional field transparently. External consumers that + parse `usage.jsonl` directly will see a new field; most will + ignore it. If any consumer key-collides or strictly-validates, + they'd need a reader that tolerates the new field. The POC does + not ship any compat shim. + +3. **Per-block source tagging.** Right now `sampler` is a single + top-level field per record. Under the hybrid sampler, session + totals come from cgroup but per-pid stats come from ps. A more + faithful record shape would tag each block with its source + (e.g., `totals.source = "cgroup"`, `stats[pid].source = "ps"`). + Not required for the in-repo consumers; worth considering if + external tools want to treat the two sources differently. + +4. **Should the placeholder `cgroup-ps-hybrid` name change for + production?** The current name is descriptive-but-clunky. A + production name should either commit to "this is the cgroup + sampler" (and the ps per-pid fallback becomes an internal + implementation detail) or expose the hybrid composition more + intentionally (`--sampler=cgroup --per-pid-sampler=ps`). See §10. + +--- + +## 10. Future directions + +A fair amount of the original proposal has been explicitly deferred. +Listed here roughly in order of "how likely this is the next thing +someone wants": + +### 10.1 `psutil` sampler — cross-platform instantaneous + +Optional pip dep (`pip install con-duct[psutil]`). Per-pid sampling +uses `psutil.Process.cpu_percent(interval=None)` for delta CPU +(maintains per-pid prior-cputime state in the sampler) and +`memory_full_info()` for PSS on Linux / USS on macOS / RSS fallback. +Session enumeration via `psutil.process_iter()` + `os.getsid(pid)` +filter. + +Platform coverage: Linux, macOS, Windows (bonus — a psutil-only +Windows path would be the first Windows support duct has ever had). + +Install footprint: one C-extension dep, ~500 KB wheel per platform, +wheels available for every Python duct supports. + +Pid lifecycle is the main wrinkle: first sample after pid birth +returns 0% pcpu (psutil's documented "discard first reading" caveat). +Subsequent samples return proper deltas. The sampler needs to retain +`Process` objects across duct samples, keyed by pid, and prune on +disappearance. + +This was a first-class sampler in the original proposal; the POC +deliberately scoped it out to get the cgroup story shipped first. + +### 10.2 Cgroup creator mode (scoping robustness) + +The POC's reader mode trusts that duct and its command stay in the +same cgroup. That holds in SLURM / Docker / `systemd-run --user` +invocations but breaks on a bare interactive shell where the +enclosing cgroup is the user's session slice. + +Creator mode places the child in a new sub-cgroup — either via +`systemd-run --user --scope` (easy; needs user systemd) or by writing +directly to cgroupfs (harder; needs an explicitly delegated cgroup +subtree, à la BenchExec). Either way the measurement becomes exact +regardless of invocation context. + +### 10.3 `memory.peak` end-of-run overwrite + +`CgroupSampler` currently reports `total_rss` = `memory.current` +per sample, and `full_run_stats.total_rss` is the max across samples +— which misses any peak that fell between samples. The kernel +already tracks a proper high-water mark in `memory.peak` (Linux +≥5.13). At end of run, overwriting `full_run_stats.total_rss` from +`memory.peak` would give a truer peak than max-of-currents. (On +kernels without `memory.peak`, document the fallback and move on.) + +### 10.4 Reshape Sample / Report pipeline for cgroup's cumulative nature + +The POC forces cgroup counters into the ps-shaped sample-at-interval +pipeline. cgroup is fundamentally *cumulative*: a single +`cpu.stat.usage_usec` read at end of run answers "how much CPU did +this cgroup use?" exactly, no polling required. Reshaping the +pipeline so each sampler can emit its natural shape (ps: point-in-time +snapshots; cgroup: cumulative + delta; both: final) would make the +samplers less awkward and allow cheaper measurement. + +This is a bigger refactor (affects `Sample`, `Report.collect_sample`, +the JSONL schema, and consumers) and is explicitly out of POC scope. + +### 10.5 Auto-detection of sampler + +Heuristics that safely default to `cgroup-ps-hybrid` when we can tell +the invocation context has a tight scope: + +- `SLURM_JOB_ID` set → step cgroup. +- `container=docker` / `INVOCATION_ID` (systemd-run) → scoped cgroup. +- Fall back to `ps`. + +Not done in POC because explicit selection is easier to reason about +for reviewers. Default-change is a breaking change to `usage.jsonl` +interpretation and warrants a real deprecation cycle anyway. + +### 10.6 Matrix completeness + +Each test in `test_sampler_matrix.py` probes one +`(workload, metric, direction)` cell. We wrote cells for the +directions that tell a story (where ps fails and cgroup doesn't, or +vice versa). Cells where both samplers would pass — e.g., +`alloc_memory/rss/no_overreport`, `steady_cpu/pcpu/no_overreport` — +are currently `n/a`. A future pass could fill every cell, giving +reviewers a complete capability card at the cost of ~8 more +"both-pass" tests. + +### 10.7 cgroup v1 support + +The POC refuses cleanly on v1. RHEL 8 and older HPC systems ship v1, +so if that's blocking any real user, a v1 fallback path would read +the equivalent counters from the v1 memory / cpuacct controllers. +Added complexity; worth it only if there's demand. + +### 10.8 Windows support + +Neither `ps` nor `cgroup` are relevant. `psutil` (see §10.1) is the +natural route. Would require a session-scoping model since Windows +doesn't have POSIX sessions. + +### 10.9 Renaming the `cgroup-ps-hybrid` placeholder + +The name is a POC marker. Production should pick one of: + +- **`cgroup`** — commits to "this is the cgroup sampler" and treats + the ps per-pid fallback as an implementation detail. Easier to + explain to users. +- **`cgroup+ps`** / **`cgroup:ps`** / similar — exposes the hybrid + composition and leaves room for `cgroup+psutil` later. +- **Decouple entirely:** `--sampler=cgroup --per-pid-sampler=ps` + (two knobs). Most flexible, most complex. + +Tied to §9.4. + +### 10.10 Schema translation layer for pre-tag consumers + +If external tools consume `usage.jsonl` and the `sampler` field +interacts with their parsing (e.g., JSONSchema validation with +`additionalProperties: false`), duct could offer a legacy-format +writer that drops the tag. Not needed for in-repo consumers; listed +here in case demand surfaces. + +### 10.11 CI coverage for the cgroup matrix + +Default GitHub Actions Ubuntu runners don't have a user systemd +instance active, so `pytest --cgroup-matrix` skips everything. A +separate CI job could `sudo loginctl enable-linger` the runner user +or use a container with cgroup-namespace isolation to exercise the +cgroup cells. Without it, the committed cgroup CSV is a local-run +snapshot; drift is caught by developer re-run before PR. + +--- + +## 11. Non-goals + +Explicitly out of scope for *this POC*, so they don't get rolled in +accidentally. (Several appear in §10 as future directions; these are +the things the POC says "not now, full stop" about.) + +- **New metrics.** Duct's metric set (pcpu, rss, vsz, pmem) stays the + same; what changed is where some of those numbers come from. +- **Field renames in `usage.jsonl`.** Only the additive `sampler` + tag. Consumers don't have to change. +- **Reshaping `--sample-interval` / `--report-interval` semantics.** + Every sampler honors both; no interaction beyond what already + exists. +- **Default-sampler change.** `ps` stays the default. Changing the + default is a breaking change to `usage.jsonl` interpretation; not + something the POC does under cover. + +--- + +## 12. Resolved / still-open from the original proposal + +The original proposal (`pcpu-overshoot-investigation` branch) listed +eight open questions. Status after the POC: + +1. **cgroup v1 support** — *resolved: no.* v2 only; v1 deferred to + §10.7. +2. **Auto-detection timeline** — *deferred to §10.5.* POC ships + explicit selection only. +3. **Will the default sampler ever change?** — *still open.* §10.5 + discusses the deprecation cycle that'd be required. +4. **Hybrid mode UX — implicit or explicit knobs?** — *resolved: + implicit.* `--sampler=cgroup-ps-hybrid` is the hybrid; no + separate per-pid-sampler knob. +5. **Creator mode scope** — *deferred to §10.2.* Reader-only in POC. +6. **Sampler tag discoverability** — *resolved.* `sampler` appears on + every record and in info.json. +7. **Error UX when requested sampler unavailable** — *resolved.* + cgroup-ps-hybrid errors with a message naming the missing + `cgroup.controllers` file or the unavailable `/proc/self/cgroup` + v2 entry. +8. **cgroup v2 features not available on older kernels** — *deferred + to §10.3.* `memory.peak` is Linux ≥5.13; POC uses + `memory.current` + max-across-samples as the fallback. + +--- + +## 13. Implementation sequencing (as landed) + +The POC landed on branch `sampler-choice` as a sequence of +topic-focused commits: + +1. Sampler abstraction refactor — no behavior change. +2. `--sampler` flag, `DUCT_SAMPLER` env, `sampler` schema tag. +3. Absorb resource-validation harness from PR #403. +4. Workload catalog (initial). +5. Sampler-matrix harness (marker + JSONL hook + CSV generator). +6. Rework the harness + add ps-column cells. +7. `CgroupSampler` implementation (cgroup v2 + ps hybrid). +8. cgroup-column matrix cells (via systemd-run subprocess per test). +9. `scripts/regen_matrix.sh` + `datalad run` regeneration. +10. Matrix expansion (schema, ephemeral + spikey workloads). +11. This document's revision. + +The commits are small enough that, if the direction is accepted, this +can split into three follow-up PRs — docs, tests, impl — or be +polished and merged in place. + +--- + +## 14. References + +- [`docs/resource-statistics.md`](../resource-statistics.md) — the + semantics of the ps-based sampler. Required background. +- [Issue #399](https://github.com/con/duct/issues/399) — peak pcpu + overshoot report that prompted this work. +- [Linux cgroup v2 admin guide](https://docs.kernel.org/admin-guide/cgroup-v2.html) + — canonical reference for the counters we read. +- [psutil documentation](https://psutil.readthedocs.io/) — the + library `§10.1` would depend on. +- [BenchExec](https://github.com/sosy-lab/benchexec) — reference + implementation of cgroup-based reproducible resource measurement; + worth cribbing from for creator-mode work (§10.2). +- `RESEARCH.md` on branch `pcpu-overshoot-investigation` — deeper + analysis of alternatives considered (pidstat, cgmemtime, + memory_profiler, a hand-rolled `/proc` parser). +- `DEEP_DIVE_PROGRESS.md` on the same branch — investigation + confirmation for the ps pathologies described in §2. diff --git a/docs/resource-statistics.md b/docs/resource-statistics.md new file mode 100644 index 00000000..d454b825 --- /dev/null +++ b/docs/resource-statistics.md @@ -0,0 +1,508 @@ +# Interpreting duct's resource statistics + +duct records resource usage in two places: + +- **`usage.jsonl`** — one JSON record per report interval (default: + every 60 seconds), capturing per-process and session-total stats + aggregated over that window. +- **`execution_summary`** (printed at exit and stored in `info.json`) + — a whole-run summary: peak and average values across the full + execution. + +The numbers in both come from the same sampling pipeline. This +document explains what those numbers actually measure, where they're +accurate, where they're not, and what questions they can and can't +answer. + +> **Who this is for.** End users reading `usage.jsonl` or +> `execution_summary` to interpret resource usage — for example, +> sizing a SLURM allocation for a follow-up job, debugging a memory +> issue, or comparing runs. If you're looking at a number and +> wondering "what does that actually mean" or "should I trust it," +> this is the right document. + +> **Scope.** This describes duct's current behavior, which uses +> `ps(1)` as the underlying sampler. + +--- + +## Quick reference + +| Field | Answers | Does **not** answer | Key caveat | +|-------------|---------------------------------------------------------------------------------------------------------------------|---------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `pcpu` | fraction of wall time a process has spent on CPU, **averaged from its birth until each measurement** | current CPU usage at the moment of sampling | short-lived or recently-bursty processes can report much higher numbers than their current activity; summed across many such processes, totals can exceed the system's physical CPU ceiling | +| `rss` | physical memory mapped into each process's address space, counting **shared pages in every process that maps them** | unique physical memory used by the session | summing `rss` across forked children double-counts shared libraries and copy-on-write pages | +| `pmem` | `rss` as a percentage of total system RAM | anything `rss` doesn't answer already | inherits RSS caveats; comparing `pmem` across hosts with different RAM is misleading | +| `vsz` | total **virtual** address space the process has reserved (not necessarily using) | how much physical RAM the process is using | includes mapped files (even unread), library mappings, thread stacks, and allocator reservations; often orders of magnitude larger than `rss` | +| `peak_*` | max observed value during the run | anything happening *between* samples | sub-sample-interval spikes are not captured | +| `average_*` | time-weighted mean of samples | "average while the process was active" | averaged over the whole run, including idle time at start/end | + +--- + +## How duct samples and aggregates + +Duct polls the monitored process tree on two independent intervals: + +- **`--sample-interval` (default 1.0s)** — how often duct reads + per-pid stats via `ps -s `. Each read is a *sample*. +- **`--report-interval` (default 60.0s)** — how often duct writes an + aggregated record to `usage.jsonl`. Each record summarizes all the + samples taken during that report window. + +Aggregation within a report window uses **max reduction**: + +- For each per-pid metric, the reported value is the maximum observed + across all samples of that pid in the window. +- For each session-total metric (`total_rss`, `total_pcpu`, etc.), + the reported value is the maximum observed across all samples' + totals in the window. + +Two consequences worth knowing: + +1. **Short spikes between samples are not recorded.** If a process + briefly allocates 10GB and frees it within a single sample + interval, duct doesn't see it. Lowering `--sample-interval` + catches shorter spikes at the cost of higher polling overhead. + +2. **Per-pid and session-total peaks may come from different sample + moments.** The reported peak for pid A is when *A* was at its + max, which may not be when *the session as a whole* was at its max. + See [Peak vs. average](#peak-vs-average) for a worked example. + +--- + +## CPU — `pcpu` + +### What it measures + +On Linux, `ps -o pcpu` is computed per process as: + +``` +pcpu = ((utime + stime) / (now - process_start_time)) × 100 +``` + +Where: + +- `utime + stime` is **cumulative CPU time consumed by the process + since it started** — a kernel counter, user-mode plus system-mode + ticks from `/proc/[pid]/stat`. +- `now - process_start_time` is **wall-clock time elapsed since the + process started**. + +So `pcpu` is *the fraction of wall time the process has spent on CPU, +averaged from birth until the moment of sampling*. It is a **lifetime +average**, not an instantaneous rate. + +This differs from what many users expect, and from what `top(1)` +shows (which is instantaneous-over-refresh-interval). A process's +`pcpu` in `ps` and in `top` can differ substantially. + +### Four scenarios to build intuition + +#### Scenario A: long-running steady-state process at 100% CPU + +A compute-bound single-threaded process running at 100% CPU +continuously: + +``` +t = 1s: cumulative CPU = 1.0s, elapsed = 1s → pcpu = 100% +t = 10s: cumulative CPU = 10.0s, elapsed = 10s → pcpu = 100% +t = 60s: cumulative CPU = 60.0s, elapsed = 60s → pcpu = 100% +``` + +For steady-state workloads, lifetime-average converges to +instantaneous. *This is why the mental model "pcpu = current CPU +usage" works most of the time* — long-running daemons and big compute +jobs pinning a core report numbers that match intuition. + +#### Scenario B: brief burst, then idle + +A process that does 1 second of 100% CPU work, then sits idle: + +``` +t = 1s: cumulative CPU = 1.0s, elapsed = 1s → pcpu = 100% +t = 2s: cumulative CPU = 1.0s, elapsed = 2s → pcpu = 50% +t = 10s: cumulative CPU = 1.0s, elapsed = 10s → pcpu = 10% +t = 100s: cumulative CPU = 1.0s, elapsed = 100s → pcpu = 1% +``` + +After the burst ends, `pcpu` decays toward 0. The process +"remembers" past CPU work and slowly forgets as its elapsed time +grows. Counterintuitive if you were expecting a real-time number. + +#### Scenario C: multi-threaded native process + +A process with 4 native threads, each pinning a separate core, +running for 1 second of wall time: + +``` +cumulative CPU = 4.0s (each thread contributed 1.0s) +elapsed time = 1s +pcpu = 400% +``` + +A single process can legitimately report `pcpu > 100%` when it uses +multiple cores simultaneously. The per-process ceiling is +`Ncores × 100`. + +(Pure-Python multi-threaded code cannot exceed ~100% because of the +GIL — each process gets one core's worth of CPU regardless of thread +count. C extensions and native code can break out of this and report +multi-hundred-percent legitimately.) + +#### Scenario D: the pathological summation case + +This is the mechanism behind issue +[#399](https://github.com/con/duct/issues/399). Many short-lived, +multi-threaded native child processes, as happens under tox when pip +compiles C extensions for many dependencies: + +``` +Child 1 runs for 200ms on 4 cores, observed by sample at t=150ms: + cumulative CPU = 600ms, elapsed = 150ms + → pcpu reported = 400% + +Child 2 runs for 200ms on 4 cores: + → pcpu reported = 400% + +…30 such children observed during a single sample… + +sum across children at sample time = 30 × 400% = 12,000% +system physical ceiling (20 cores) = 2,000% +``` + +Each individual child's number is correct for what `ps` is answering +("fraction of wall time spent on CPU, averaged from start"). The +problem is that *summing* those lifetime-averages across processes +that took turns on the CPU produces a total claiming work the system +didn't have the cores to do. The children ran sequentially; each +reports a high number individually; the sum treats them as +simultaneous. + +This is why duct can report `peak_pcpu` numbers that exceed the +system's physical CPU ceiling. It's not wrong per se — it's an +accurate sum of individually-correct lifetime averages — but it +doesn't answer the question most users are asking. + +### When `pcpu` is reliable + +| Workload shape | `pcpu` reliability | +|-----------------------------------------------|---------------------------------------------------------| +| Single long-running steady-state process | Accurate | +| Few long-running processes at steady state | Accurate | +| Bursty processes that are long-running | Accurate at the average; misses burst structure | +| Many short-lived (few second) child processes | **Unreliable — can inflate dramatically when summed** | +| Multi-threaded native code bursts | Per-process `pcpu` correct; summed totals may overshoot | + +### Linux vs. macOS + +macOS's `ps` uses a different formula: an **exponentially decaying +average** with a roughly 1-minute time constant. After a CPU burst +ends, the reported `pcpu` drops visibly within a minute rather than +decaying over the full process lifetime. + +Practical implications: + +- On macOS, `pcpu` for a long-idle-now-previously-busy process + converges back to 0 much faster than on Linux. +- On macOS, summation across short-lived processes is less prone to + overshoot than on Linux, because each process's contribution decays + out faster. +- **Numbers are not directly comparable across platforms.** The same + workload can report different `peak_pcpu` and `average_pcpu` on + Linux vs. macOS. If you're comparing runs, compare within a + platform. + +--- + +## Memory — `rss` and `pmem` + +### What they measure + +`ps -o rss` reports per-process **resident set size**: the amount of +physical memory currently mapped into the process's address space, in +kilobytes. This counts: + +- Private (anonymous) pages the process has allocated and touched. +- Shared pages (libraries, copy-on-write memory after `fork()`) **that + the process has mapped**, with each process counted independently. + +`ps -o pmem` is derived: `rss` divided by total system RAM, expressed +as a percentage. It inherits every property of `rss` and adds a +host-dependent denominator. + +### The shared-page issue + +The critical subtlety: when multiple processes share the same physical +page (because of `fork()`, or because they link the same library), +that page appears in **each process's RSS** — but the physical page +exists only once. + +Example: a Python parent process with 100MB RSS (libpython, libc, +site-packages) forks 10 child workers. Each child inherits the +parent's address space via copy-on-write. Immediately after fork: + +``` +Parent RSS: 100MB +Child 1 RSS: 100MB +Child 2 RSS: 100MB +… +Child 10 RSS: 100MB + +Sum of RSS across processes: 1100MB +Actual physical memory used: ~100MB (all shared with parent) +``` + +Summing RSS reports 1100MB. The system is using ~100MB. The sum is +off by a factor of 11. + +As children write to their copy of each page, copy-on-write triggers +and the page becomes private to that child — at which point physical +memory use genuinely grows. So over time, `sum(rss)` becomes a looser +upper bound on actual usage: it's never less than true usage, but +can be much more. + +### Typical inflation ranges + +| Workload | Sum-of-RSS vs. actual | +|---------------------------------------------------------|----------------------------------------------------| +| Single process | 1:1 (accurate) | +| Parent + few forked children, children mostly read-only | 3-5× inflated | +| Parent + many forked children running similar code | 10× or more inflated | +| Independent processes (not forked from a common parent) | Closer to 1:1, but still double-counts shared libs | + +For a duct-monitored Python test suite with `pytest-xdist` spawning 8 +workers, expect `sum(rss)` to overstate actual physical memory by +3-5×. For a parallel native compile (`make -j16`), each compile child +loads its own copy of libstdc++ and similar, so the overstating is +smaller (1.5-2×) but still present. + +### When RSS is safe to read + +- Single-process workloads: RSS is the right number. +- Per-process RSS in the per-pid data: correct for that process's + perspective ("how much of its address space is resident"), even if + much of that is shared. +- `sum(rss)` across forked children: treat as an **upper bound** on + physical memory, not the actual footprint. + +--- + +## Memory — `vsz` + +### What it measures + +`ps -o vsz` reports per-process **virtual set size**: the total +amount of virtual address space the process has reserved, in +kilobytes. + +This counts much more than physical memory: + +- **Memory-mapped files, even if never read.** `mmap(100GB_file)` + adds 100GB to `vsz` without using any physical RAM until pages are + touched. +- **Shared libraries.** Every `.so` mapped into the process — libc, + libpython, libssl, libcuda, all the native parts of + numpy/scipy/pandas/etc — contributes to `vsz`. These are shared + with other processes and use far less physical memory than `vsz` + implies. +- **Thread stacks.** Each thread gets an 8MB stack *reservation* by + default on Linux. A process with 1000 threads has 8GB of `vsz` from + stacks alone; physical use is kilobytes per thread. +- **Allocator reservations.** glibc malloc reserves arenas per thread + (~64MB each). Go's runtime reserves a very large heap upfront + (trivial Go programs can show 100GB+ `vsz` on some versions). The + JVM reserves its configured max heap (`-Xmx`) in `vsz` whether or + not it's used. +- **Sparse reservations.** `mmap(PROT_NONE, big_region)` reserves + address space without committing anything. + +`vsz` is "the largest amount of memory this process *could* touch if +it tried" — a theoretical upper bound, not a usage number. On a +64-bit system, a process can have terabytes of `vsz` with only +megabytes of RSS. This is normal. + +### Summing `vsz` across processes isn't meaningful as a memory metric + +Summing `vsz` has all the problems of summing RSS (shared libs counted +N times) plus the bigger-magnitude inflation from reservations and +mapped files. On a tox session that clones datalad datasets, pulls +containers, and scaffolds jobs, cumulative `vsz` can reach hundreds +of GB or multiple TB on a machine with 16GB of RAM. That number +doesn't correspond to anything physical. + +### Where `vsz` is still useful — anomaly detection + +`vsz` is not a sizing metric, but it is a useful **canary**. If your +workload's `peak_vsz` is much larger than you'd expect, something +did a lot of memory mapping. This could be: + +- **Benign:** mmap'd container layers, git-annex objects, mmap'd data + files, many threads, a Go or JVM runtime reserving heap. +- **Worth investigating:** a thread leak, a runaway `mmap` loop, a + container tool misbehaving. + +The value is: an order-of-magnitude change in `peak_vsz` between +runs of the same workload is a signal that something changed about +memory mapping behavior. It doesn't tell you *what*, but it tells +you *look*. + +--- + +## Peak vs. average + +### Peak + +`execution_summary.peak_*` and per-sample-window maxes come from the +same reduction: the maximum observed value across samples. + +Key properties: + +- **Misses sub-sample-interval spikes.** With default + `--sample-interval=1.0`, a 200ms spike to 10GB between samples is + invisible. Lowering the interval catches shorter spikes, at the + cost of more polling work. + +- **Per-pid peak and session-total peak may come from different + moments.** Example: + + ``` + sample 1: pid A = 1GB, pid B = 0 → total = 1GB + sample 2: pid A = 0, pid B = 1GB → total = 1GB + + aggregated report: + stats[A].rss = 1GB (A's peak, from sample 1) + stats[B].rss = 1GB (B's peak, from sample 2) + total_rss = 1GB (max observed simultaneous total) + ``` + + `total_rss` is **not** the sum of per-pid peaks — that would be + 2GB, implying both processes were simultaneously at their peaks, + which didn't happen. `total_rss` is the peak *simultaneous* + usage. Both numbers are correct answers to different questions: + + - *"What's the most any individual process used?"* → per-pid peak. + - *"What's the most the whole job used at any one moment?"* → + `total_*`. + + **For SLURM allocation sizing**, you want `total_*` — it's the + peak simultaneous footprint, and therefore the minimum + allocation that would have fit this workload. + +### Average + +`average_*` is the time-weighted mean of samples over the run. +Sampling artifacts: + +- **Averaged over the whole run, including idle time at start and + end.** A process that used 2 cores for 10 seconds during a 60-second + run has `average_pcpu` ≈ 33%, not 200%. +- **Sensitive to how the run started and ended.** A long startup or + long teardown (both low-CPU) drags the average down. + +Under `ps`-based sampling, `average_pcpu` is also affected by the +lifetime-average semantics described above — it's an average of +samples of lifetime-averages, which is a compound measurement. Treat +it as a rough "overall intensity" number, not a precise +utilization figure. + +--- + +## Common questions + +### Why is my `peak_pcpu` greater than `Ncores × 100%`? + +Because `ps -o pcpu` reports a lifetime-average per process, and +summing lifetime-averages across many short-lived processes produces +totals that exceed what the system's cores could have done +simultaneously. See +[Scenario D](#scenario-d-the-pathological-summation-case). + +Most common in workloads that spawn many short-lived child processes, +especially involving native/multi-threaded code: pip install +compiling C extensions, `make -j`, tox, any CI/build workflow. + +### Why is my `peak_vsz` many times larger than total RAM? + +`vsz` is virtual address space, not physical memory. It includes +reservations (thread stacks, allocator arenas, JVM or Go heap), +memory-mapped files (even unread), and shared library mappings. On a +64-bit system, `vsz` can legitimately exceed physical RAM by orders +of magnitude. See [Memory — `vsz`](#memory--vsz). + +### My RSS number changed a lot when I added more worker processes. Did memory usage actually increase proportionally? + +Probably not. If the workers are forked children of a common parent, +each child's RSS counts the shared pages (libraries, copy-on-write +memory) it inherited. `sum(rss)` grows roughly linearly with child +count even when physical memory usage grows much less. See +[The shared-page issue](#the-shared-page-issue). + +### Can I compare numbers across Linux and macOS? + +For the general shape, yes, but results will differ, with Linux over-reporting +in comparison to macOS. macOS `pcpu` uses decaying-exponential semantics; +Linux uses lifetime-average. The same workload can report different +`peak_pcpu` and `average_pcpu` on the two platforms. Memory numbers +(`rss`, `vsz`) have similar per-process semantics but differ in how +shared memory is reported. Compare runs within a single platform. + +### How do I catch memory spikes shorter than my sample interval? + +Lower `--sample-interval`. The default is 1.0s; dropping to 0.1s +catches spikes longer than ~100ms. Log size does not grow with +faster sampling, because the report window aggregates samples into +fixed-size records regardless of how many are taken. + +The tradeoff is polling overhead — `ps` forks a subprocess each +sample. At very fast sampling (tens of ms), the polling cost becomes +a non-trivial fraction of a core. + +### What should I use to size a SLURM allocation? + +- **Memory allocation**: `execution_summary.peak_rss` as an upper + bound. Remember it over-counts shared pages across forked + children, so it's a safe-but-loose bound. For workloads with many + forks, the real physical peak is often 1.5-5× smaller. +- **CPU allocation**: `execution_summary.peak_pcpu / 100` rounded up + as a core-count estimate **only if your workload is + steady-state**. For workloads with many short-lived native + children, `peak_pcpu` can inflate past the system ceiling — in + those cases, `Ncores` on the system that ran the job is a + better upper bound than `peak_pcpu / 100`. +- **Wall time**: `execution_summary.wall_clock_time` plus headroom + for variation. + +### Why does `total_rss` disagree with `sum(per-pid rss)` in an aggregated record? + +They answer different questions. `total_rss` is the peak +*simultaneous* total — the most the session used at any one moment. +`sum(per-pid rss)` over an aggregated record sums each process's +individual peak, which may have occurred at different moments. For +sizing and budgeting, `total_rss` is the one you want. See +[Peak vs. average](#peak-vs-average). + +--- + +## Alternative sampler: `cgroup-ps-hybrid` + +Most of this document describes behavior specific to `ps`-based +sampling. Duct also ships an opt-in alternative, +`--sampler=cgroup-ps-hybrid` (Linux cgroup v2 only), which reads +kernel cgroup counters for session totals while keeping `ps` for +the per-pid breakdown. Three of the `ps` pathologies above go away +for the session-wide numbers: + +- Shared pages counted once (not per forked child). +- `total_pcpu` is physically bounded by the cores in use. +- CPU from children that exit between samples is still captured. + +Per-pid values (`stats[pid].*`) are still `ps`-sourced. The +committed capability matrix shows which cells each sampler gets +right: + +- [`test/sampler_matrix_ps.csv`](../test/sampler_matrix_ps.csv) +- [`test/sampler_matrix_cgroup-ps-hybrid.csv`](../test/sampler_matrix_cgroup-ps-hybrid.csv) + +See [`design/multiple-samplers.md`](design/multiple-samplers.md) +for the full design. diff --git a/pyproject.toml b/pyproject.toml index 16377161..3ddb5601 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,6 +79,8 @@ filterwarnings = [ ] markers = [ "flaky: mark a test as being unreliable", + "sampler_matrix(sampler, workload, metric, direction, expected): sampler-matrix cell; direction is 'overreport' or 'underreport'; conftest hook records actual pass/fail for scripts/gen_sampler_matrix.py", + "cgroup_matrix: opt-in matrix tests that require systemd-run --user and a writable /sys/fs/cgroup v2; pass --cgroup-matrix to pytest to run", ] norecursedirs = ["test/data"] diff --git a/scripts/gen_sampler_matrix.py b/scripts/gen_sampler_matrix.py new file mode 100644 index 00000000..9bd9a7c8 --- /dev/null +++ b/scripts/gen_sampler_matrix.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +"""Regenerate test/sampler_matrix_.csv from matrix test results. + +The conftest hook in test/conftest.py writes a JSONL file (one record +per sampler_matrix-marked test) during the pytest run. Each record: + + { + "sampler": "ps", + "workload": "memory_children", + "metric": "rss", + "direction": "overreport", + "expected": "fail", + "actual": "fail", + "nodeid": "test/duct_main/test_sampler_matrix.py::..." + } + +This script pivots the JSONL into one CSV per sampler -- +rows=``/``, columns=``no_``, +cells=pass|fail|n/a -- and writes them into test/. The CSVs are +checked in so reviewers can see each sampler's capability profile +(does-not-under-report / does-not-over-report, per workload/metric +pair) without running tests. + +Each cell records the *actual* outcome (what the sampler did), +independent of whether that matched our committed expectation. The +commit hash for a given matrix snapshot is the source of truth for +what we expected at that point; the ``expected`` metadata lives in +the test marker, not the CSV. + +Usage: + python -m pytest test/ # populate .sampler_matrix_results.jsonl + python scripts/gen_sampler_matrix.py +""" + +from __future__ import annotations +import csv +import json +from pathlib import Path +import sys + +REPO_ROOT = Path(__file__).resolve().parent.parent +JSONL_PATH = REPO_ROOT / ".sampler_matrix_results.jsonl" +CSV_DIR = REPO_ROOT / "test" + +UNTESTED = "n/a" + + +def load_records() -> list[dict[str, str]]: + if not JSONL_PATH.exists(): + return [] + records = [] + with JSONL_PATH.open() as f: + for line in f: + line = line.strip() + if line: + records.append(json.loads(line)) + return records + + +def group_by_sampler( + records: list[dict[str, str]], +) -> dict[str, list[dict[str, str]]]: + grouped: dict[str, list[dict[str, str]]] = {} + for r in records: + sampler = r.get("sampler") + if not sampler: + continue + grouped.setdefault(sampler, []).append(r) + return grouped + + +def write_csv_for_sampler(sampler: str, records: list[dict[str, str]]) -> Path: + # row label -> column label -> actual + by_row: dict[str, dict[str, str]] = {} + columns: set[str] = set() + for r in records: + workload = r.get("workload") + metric = r.get("metric") + direction = r.get("direction") + actual = r.get("actual") + if not (workload and metric and direction and actual): + continue + row_label = f"{workload}/{metric}" + col_label = f"no_{direction}" + # Last write wins if the same cell appears twice in one run. + by_row.setdefault(row_label, {})[col_label] = actual + columns.add(col_label) + + sorted_columns = sorted(columns) + out_path = CSV_DIR / f"sampler_matrix_{sampler}.csv" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["workload/metric", *sorted_columns]) + for row_label in sorted(by_row): + row = [row_label] + for col in sorted_columns: + row.append(by_row[row_label].get(col, UNTESTED)) + writer.writerow(row) + return out_path + + +def main() -> int: + records = load_records() + if not records: + print( + f"no matrix results found at {JSONL_PATH}; " "run `pytest test/` first", + file=sys.stderr, + ) + return 0 + for sampler, sampler_records in sorted(group_by_sampler(records).items()): + path = write_csv_for_sampler(sampler, sampler_records) + print(f"wrote {path} ({len(sampler_records)} records)") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/regen_matrix.sh b/scripts/regen_matrix.sh new file mode 100755 index 00000000..fdce9292 --- /dev/null +++ b/scripts/regen_matrix.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +# Regenerate sampler matrix CSVs from a fresh test run. +# +# Canonical invocation (records command + inputs/outputs as provenance): +# +# datalad run scripts/regen_matrix.sh +# +# Plain invocation also works; you lose the datalad-run metadata but +# the CSVs are identical. +# +# Requirements on the host: +# - .tox/py312 present (run `tox -e py312` once if absent) +# - systemd-run --user --scope working, for the opt-in cgroup_matrix +# tests (hosts without user systemd skip those tests, and the +# cgroup column stays at its prior values / (not yet tested)) +# +# The script runs the matrix tests with --cgroup-matrix and invokes the +# CSV generator. Both are fast (<20 s combined) against an already- +# populated .tox/py312 env. + +set -euo pipefail + +cd "$(git rev-parse --show-toplevel)" + +.tox/py312/bin/python -m pytest \ + -o addopts= \ + --cgroup-matrix \ + test/duct_main/test_sampler_matrix.py + +python3 scripts/gen_sampler_matrix.py diff --git a/src/con_duct/_duct_main.py b/src/con_duct/_duct_main.py index f83f7db1..62e4efa2 100644 --- a/src/con_duct/_duct_main.py +++ b/src/con_duct/_duct_main.py @@ -9,6 +9,7 @@ from typing import IO, TextIO from con_duct._models import LogPaths, Outputs, RecordTypes, SessionMode from con_duct._output import TailPipe, prepare_outputs, remove_files, safe_close_files +from con_duct._sampling import make_sampler from con_duct._signals import SigIntHandler from con_duct._tracker import Report, monitor_process @@ -49,6 +50,7 @@ def execute( colors: bool, mode: SessionMode, message: str = "", + sampler: str = "ps", ) -> int: """A wrapper to execute a command, monitor and log the process details. @@ -59,6 +61,14 @@ def execute( "--report-interval must be greater than or equal to --sample-interval." ) + sampler_instance = make_sampler(sampler) + # TODO(poc): the cgroup sampler reads duct's own cgroup via + # /proc/self/cgroup; a new-session spawn would stay in the same + # cgroup on Linux, but explicit current-session is cleaner and + # keeps the "duct measures the cgroup it lives in" invariant. + if sampler == "cgroup-ps-hybrid" and mode == SessionMode.NEW_SESSION: + raise ValueError("sampler 'cgroup-ps-hybrid' requires --mode=current-session") + log_paths = LogPaths.create(output_prefix, pid=os.getpid()) log_paths.prepare_paths(clobber, capture_outputs) stdout, stderr = prepare_outputs(capture_outputs, outputs, log_paths) @@ -86,6 +96,7 @@ def execute( colors, clobber, message=message, + sampler=sampler_instance, ) files_to_close.append(report.usage_file) diff --git a/src/con_duct/_sampling.py b/src/con_duct/_sampling.py index caa90517..b18a3752 100644 --- a/src/con_duct/_sampling.py +++ b/src/con_duct/_sampling.py @@ -5,10 +5,12 @@ from datetime import datetime import logging import os +from pathlib import Path import platform import subprocess import sys -from typing import Callable, Optional +import time +from typing import Optional, Union from con_duct._models import Averages, ProcessStats, Sample SYSTEM = platform.system() @@ -143,4 +145,128 @@ def _get_sample_mac(session_id: int) -> Optional[Sample]: "Linux": _get_sample_linux, "Darwin": _get_sample_mac, } -_get_sample: Callable[[int], Optional[Sample]] = _get_sample_per_system[SYSTEM] # type: ignore[assignment] + + +class PsSampler: + """Sampler that uses `ps` to collect per-process stats.""" + + name = "ps" + + def sample(self, session_id: int) -> Optional[Sample]: + return _get_sample_per_system[SYSTEM](session_id) + + +_CGROUP_V2_ROOT = Path("/sys/fs/cgroup") + + +def _detect_cgroup_v2_path() -> Path: + """Return the absolute path to duct's own cgroup v2 directory. + + Raises NotImplementedError if cgroup v2 unified hierarchy is not + mounted, or if /proc/self/cgroup does not expose a v2 entry. + """ + controllers = _CGROUP_V2_ROOT / "cgroup.controllers" + if not controllers.exists(): + raise NotImplementedError( + "cgroup-ps-hybrid requires cgroup v2 unified hierarchy at " + f"{_CGROUP_V2_ROOT}; this host does not appear to have v2 mounted" + ) + # cgroup v2 entry in /proc//cgroup has the shape "0::/". + proc_cgroup = Path("/proc/self/cgroup").read_text() + for line in proc_cgroup.splitlines(): + if line.startswith("0::"): + rel = line.split("::", 1)[1].strip().lstrip("/") + return _CGROUP_V2_ROOT / rel + raise NotImplementedError( + "cgroup-ps-hybrid could not find a cgroup v2 entry ('0::/...') in " + "/proc/self/cgroup" + ) + + +def _read_cgroup_cpu_usage_usec(cgroup_root: Path) -> int: + """Read cumulative CPU microseconds from cgroup v2 ``cpu.stat``.""" + for line in (cgroup_root / "cpu.stat").read_text().splitlines(): + if line.startswith("usage_usec "): + return int(line.split()[1]) + raise RuntimeError(f"usage_usec not present in {cgroup_root / 'cpu.stat'}") + + +class CgroupSampler: + """Hybrid cgroup v2 + ps sampler. + + Session totals (``total_rss``, ``total_pcpu``) come from kernel + cgroup counters; per-pid stats come from a ``ps`` sub-sample. The + ``sampler`` tag on each emitted record disambiguates the source. + + Reader mode only: duct does NOT create a cgroup; it reads the one + it already lives in via ``/proc/self/cgroup``. This requires + ``--mode=current-session`` so duct and the tracked command share a + cgroup (enforced in ``_duct_main.execute``). + + TODO(poc): our polling shape (sample-at-interval + Sample.aggregate + max) is inherited from the ps model. cgroup could emit cumulative/ + delta directly -- e.g. one ``memory.peak`` read at end-of-run + instead of max-of-currents -- but that would require reshaping the + Sample/Report pipeline. Deferred post-POC. + """ + + name = "cgroup-ps-hybrid" + + def __init__(self) -> None: + self._cgroup_root = _detect_cgroup_v2_path() + # Baseline for delta-based pcpu on the first sample. + self._last_cpu_usec = _read_cgroup_cpu_usage_usec(self._cgroup_root) + self._last_cpu_time = time.monotonic() + + def sample(self, session_id: int) -> Optional[Sample]: + # Per-pid stats via the ps path so records still carry the pid + # breakdown users expect from duct. + sample = _get_sample_per_system[SYSTEM](session_id) + if sample is None: + return None + try: + # Memory: session total from cgroup (replaces the ps sum). + mem_current = int( + (self._cgroup_root / "memory.current").read_text().strip() + ) + sample.total_rss = mem_current + + # CPU: delta of cumulative usage_usec over the last interval. + now_usec = _read_cgroup_cpu_usage_usec(self._cgroup_root) + now_time = time.monotonic() + delta_sec = now_time - self._last_cpu_time + if delta_sec > 0: + delta_usec = now_usec - self._last_cpu_usec + # usage_usec / elapsed_usec * 100 = percent of one core. + sample.total_pcpu = delta_usec / delta_sec / 10_000 + self._last_cpu_usec = now_usec + self._last_cpu_time = now_time + except (OSError, ValueError) as exc: + raise RuntimeError( + f"cgroup read failed at {self._cgroup_root}: {exc}" + ) from exc + + # TODO(poc): total_vsz and total_pmem remain ps-sourced; cgroup + # v2 has no direct analogs (memory.current is already physical). + # TODO(poc): overwrite full_run_stats.total_rss with memory.peak + # at end of run for a truer run-level peak than max-of-currents. + # TODO(poc): this assumes the tracked command stays in duct's + # cgroup; systemd-run or similar would migrate the child out + # and silently break the measurement. + + # Recompute averages so they reflect the cgroup-sourced totals + # rather than the stale ps-sourced values set by _get_sample_*. + sample.averages = Averages.from_sample(sample=sample) + return sample + + +Sampler = Union[PsSampler, CgroupSampler] + + +def make_sampler(name: str) -> Sampler: + """Factory: resolve a sampler name (as passed on the CLI) to an instance.""" + if name == PsSampler.name: + return PsSampler() + if name == CgroupSampler.name: + return CgroupSampler() + raise ValueError(f"unknown sampler: {name!r}") diff --git a/src/con_duct/_tracker.py b/src/con_duct/_tracker.py index 1d563b1a..e0cefa6a 100644 --- a/src/con_duct/_tracker.py +++ b/src/con_duct/_tracker.py @@ -17,7 +17,7 @@ from con_duct._formatter import SummaryFormatter from con_duct._models import LogPaths, Sample, SystemInfo from con_duct._output import safe_close_files -from con_duct._sampling import _get_sample +from con_duct._sampling import PsSampler, Sampler __version__ = version("con-duct") @@ -38,6 +38,7 @@ def __init__( clobber: bool = False, process: subprocess.Popen | None = None, message: str = "", + sampler: Optional[Sampler] = None, ) -> None: self._command = command self.arguments = arguments @@ -46,6 +47,7 @@ def __init__( self.clobber = clobber self.colors = colors self.message = message + self.sampler = sampler if sampler is not None else PsSampler() # Defaults to be set later self.start_time: float | None = None self.process = process @@ -135,7 +137,7 @@ def get_system_info(self) -> None: def collect_sample(self) -> Optional[Sample]: assert self.session_id is not None try: - sample = _get_sample(self.session_id) + sample = self.sampler.sample(self.session_id) return sample except subprocess.CalledProcessError as exc: # when session_id has no processes lgr.debug("Error collecting sample: %s", str(exc)) @@ -154,7 +156,9 @@ def write_subreport(self) -> None: assert self.current_sample is not None if self.usage_file is None: self.usage_file = open(self.log_paths.usage, "w") - self.usage_file.write(json.dumps(self.current_sample.for_json()) + "\n") + record = self.current_sample.for_json() + record["sampler"] = self.sampler.name + self.usage_file.write(json.dumps(record) + "\n") self.usage_file.flush() # Force flush immediately @property @@ -199,6 +203,7 @@ def dump_json(self) -> str: "output_paths": asdict(self.log_paths), "working_directory": self.working_directory, "message": self.message, + "sampler": self.sampler.name, } ) diff --git a/src/con_duct/cli.py b/src/con_duct/cli.py index 258d00aa..4444767b 100644 --- a/src/con_duct/cli.py +++ b/src/con_duct/cli.py @@ -141,6 +141,7 @@ def _replay_early_logs(log_buffer: List[tuple[str, str]]) -> None: DUCT_REPORT_INTERVAL: see --report-interval DUCT_CAPTURE_OUTPUTS: see --capture-outputs DUCT_MESSAGE: see --message + DUCT_SAMPLER: see --sampler DUCT_CONFIG_PATHS: paths to .env files separated by platform path separator (':' on Unix) (see below) @@ -361,6 +362,16 @@ def _create_run_parser() -> argparse.ArgumentParser: "'current-session' tracks the current session instead of starting a new one. " "Useful for tracking slurm jobs or other commands that should run in the current session.", ) + parser.add_argument( + "--sampler", + default=os.getenv("DUCT_SAMPLER", "ps"), + choices=("ps", "cgroup-ps-hybrid"), + type=str, + help="Resource sampler to use. 'ps' (default) polls `ps` for per-process " + "stats; 'cgroup-ps-hybrid' combines cgroup v2 session totals with `ps` " + "per-process stats (Linux cgroup v2 only; POC, not yet implemented). " + "You can also provide value via DUCT_SAMPLER env variable.", + ) return parser diff --git a/src/con_duct/ls.py b/src/con_duct/ls.py index 403eed6a..2348f17e 100644 --- a/src/con_duct/ls.py +++ b/src/con_duct/ls.py @@ -53,6 +53,7 @@ "num_samples", "num_reports", "prefix", + "sampler", "schema_version", "stderr", "stdout", diff --git a/test/conftest.py b/test/conftest.py index 04765bb3..5640501f 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,9 +1,92 @@ +import json import logging import os from pathlib import Path -from typing import Generator +from typing import Any, Generator import pytest +SAMPLER_MATRIX_RESULTS = Path(__file__).parent.parent / ".sampler_matrix_results.jsonl" + + +def pytest_sessionstart() -> None: + """Clear stale sampler-matrix results from a previous run.""" + SAMPLER_MATRIX_RESULTS.unlink(missing_ok=True) + + +def pytest_addoption(parser: pytest.Parser) -> None: + parser.addoption( + "--cgroup-matrix", + action="store_true", + default=False, + help="Run opt-in cgroup_matrix tests (systemd-run --user required).", + ) + + +def pytest_collection_modifyitems( + config: pytest.Config, items: list[pytest.Item] +) -> None: + """Two jobs: + + 1. Auto-apply xfail(strict=True) to matrix tests marked expected='fail' + (known sampler/workload limitations; CI stays green, xpass surfaces + improvements). + 2. Skip cgroup_matrix-marked tests unless --cgroup-matrix was passed. + These tests spawn subprocesses in transient systemd scopes, which + isn't appropriate for default runs. + """ + run_cgroup_matrix = config.getoption("--cgroup-matrix") + skip_cgroup_matrix = pytest.mark.skip( + reason="opt-in; pass --cgroup-matrix to pytest (or `tox -- --cgroup-matrix`)" + ) + for item in items: + if "cgroup_matrix" in item.keywords and not run_cgroup_matrix: + item.add_marker(skip_cgroup_matrix) + marker = item.get_closest_marker("sampler_matrix") + if marker is None: + continue + if marker.kwargs.get("expected") == "fail": + item.add_marker( + pytest.mark.xfail( + strict=True, + reason="expected sampler/workload limitation", + ) + ) + + +@pytest.hookimpl(hookwrapper=True) +def pytest_runtest_makereport( + item: pytest.Item, call: pytest.CallInfo[Any] +) -> Generator[None, Any, None]: + """Record each sampler_matrix-marked test's actual outcome to JSONL. + + The *actual* cell value is derived from the raw assertion outcome + (call.excinfo), not from pytest's post-xfail interpretation: we want + the CSV to reflect what the sampler actually did, independent of + whether that matched our committed expectation. + + scripts/gen_sampler_matrix.py pivots the JSONL into one CSV per + sampler (rows=workload/metric, columns=no_, + cells=pass|fail|n/a). + """ + yield + if call.when != "call": + return + marker = item.get_closest_marker("sampler_matrix") + if marker is None: + return + actual = "pass" if call.excinfo is None else "fail" + record = { + "sampler": marker.kwargs.get("sampler"), + "workload": marker.kwargs.get("workload"), + "metric": marker.kwargs.get("metric"), + "direction": marker.kwargs.get("direction"), + "expected": marker.kwargs.get("expected"), + "actual": actual, + "nodeid": item.nodeid, + } + with SAMPLER_MATRIX_RESULTS.open("a") as f: + f.write(json.dumps(record) + "\n") + @pytest.fixture(scope="session", autouse=True) def set_test_config() -> Generator: diff --git a/test/data/memory_children.py b/test/data/memory_children.py new file mode 100644 index 00000000..33b512be --- /dev/null +++ b/test/data/memory_children.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +"""Spawn child processes that each allocate a known amount of memory. + +Usage: memory_children.py + +Each child allocates mb_per_child MB and holds it for hold_seconds. +The parent waits for all children to finish. +""" + +from __future__ import annotations +import multiprocessing +import sys +import time + + +def _allocate_and_hold(mb: int, seconds: float) -> None: + """Allocate mb megabytes and hold for seconds.""" + data = bytearray(mb * 1024 * 1024) + assert data # prevent optimization + time.sleep(seconds) + + +def main() -> None: + num_children = int(sys.argv[1]) + mb_per_child = int(sys.argv[2]) + hold_seconds = float(sys.argv[3]) + + processes = [] + for _ in range(num_children): + p = multiprocessing.Process( + target=_allocate_and_hold, args=(mb_per_child, hold_seconds) + ) + p.start() + processes.append(p) + + for p in processes: + p.join() + + +if __name__ == "__main__": + main() diff --git a/test/data/workloads/README.md b/test/data/workloads/README.md new file mode 100644 index 00000000..f8bb71de --- /dev/null +++ b/test/data/workloads/README.md @@ -0,0 +1,67 @@ +# Workload catalog + +Small, deterministic scripts with known ground-truth resource-usage +patterns. The sampler matrix tests (landing in a follow-up commit) +invoke duct against these scripts under each sampler and assert +expected behavior per cell (accurate / known-limitation). + +Each script is **standalone-runnable** — you can invoke it directly +(without duct) to verify it does what it claims. Each script's +docstring states its ground-truth contract. + +## Scripts in this directory + +- `steady_cpu.py ` — pin to one core, busy-loop. + Ground truth: single-process `peak_pcpu ~= 100%` (one core + saturated), wall clock `~= duration`. Any sampler should be + accurate. + +- `alloc_memory.py ` — allocate a contiguous + bytearray and hold it. Ground truth: `peak_rss >= size_mb * 1024 * + 1024` bytes. + +- `ephemeral_cpu.py ` — fork N + short-lived parallel children doing `work_ms` of CPU each, parent + holds for `hold_ms`. Ground truth: cgroup used ~`N * work_ms` of + CPU in ~`work_ms` wall time. **ps misses this**: children die + between samples, so `ps -s ` returns an empty session and + reported `peak_pcpu` is near zero. **cgroup catches this**: + `cpu.stat.usage_usec` is cumulative even for exited processes. + +- `spikey_cpu.py ` — + fork N workers, each spawning M threads running + `hashlib.pbkdf2_hmac` (GIL-released, truly parallel) for + `duration` seconds. Ground truth: peak instantaneous %CPU ≤ + `cpu_count * 100%`. **Linux ps dramatically OVER-reports** + (Bug 1, #399): `ps -o pcpu` is lifetime cumulative + `cputime / elapsed`, and a young multi-threaded worker's ratio is + arbitrarily inflated; duct's per-pid sum across the session then + compounds across workers. Real-world cases hit >1000%. **cgroup is + bounded**: `cpu.stat.usage_usec` delta over a sample interval + measures actual CPU time consumed, capped by cores in use. + Principled stdlib equivalent of the real #399 trigger (pip + compiling C extensions under tox). + +## Additional workload scripts (elsewhere) + +- `test/data/memory_children.py + ` — fork N child processes, each holding M MB. Ground + truth diverges by sampler: under `ps`, each PID's RSS counts shared + library pages separately, so `sum(rss)` overcounts actual physical + memory; cgroup v2 session totals reflect real physical usage. Kept + at its original location to avoid history churn; the TODO below + covers longer-term consolidation. + +## TODO: consolidate workload scripts + +Workload scripts are currently split across `test/data/` (legacy: +`test_script.py`, `memory_children.py`) and this directory. A future +cleanup could either: + +- Migrate all workload scripts into this directory so the catalog has + a single location; or +- Collapse the pre-POC `test_script.py` (`--memory-size` + + `--cpu-load` multi-purpose) into the one-phenomenon-per-script + style used here. + +Decision deferred pending POC acceptance. diff --git a/test/data/workloads/alloc_memory.py b/test/data/workloads/alloc_memory.py new file mode 100644 index 00000000..5a908a31 --- /dev/null +++ b/test/data/workloads/alloc_memory.py @@ -0,0 +1,39 @@ +"""Single-process memory allocation workload. + +Allocates a contiguous bytearray of known size and holds it for the +requested duration. In CPython, ``bytearray(N)`` is eagerly allocated, +so RSS grows by ~N bytes immediately (plus a small Python overhead). + +Ground truth: ``peak_rss >= size_mb * 1024 * 1024`` for any sampler +that reports RSS (ps sums per-process rss; cgroup-v2 reports +``memory.peak`` for the session). + +Standalone usage (without duct): + + python test/data/workloads/alloc_memory.py +""" + +from __future__ import annotations +import sys +import time + + +def main() -> None: + if len(sys.argv) != 3: + print( + "usage: alloc_memory.py ", + file=sys.stderr, + ) + sys.exit(2) + size_mb = int(sys.argv[1]) + hold_seconds = float(sys.argv[2]) + + buffer = bytearray(size_mb * 1024 * 1024) + # Touch the buffer so pages are guaranteed resident (defensive - + # CPython bytearray is already eagerly allocated). + assert buffer[0] == 0 + time.sleep(hold_seconds) + + +if __name__ == "__main__": + main() diff --git a/test/data/workloads/ephemeral_cpu.py b/test/data/workloads/ephemeral_cpu.py new file mode 100644 index 00000000..17f7db19 --- /dev/null +++ b/test/data/workloads/ephemeral_cpu.py @@ -0,0 +1,73 @@ +"""Ephemeral CPU workload: short-lived parallel children that die fast. + +Forks N child processes in parallel via ``os.fork()``, each busy-loops +on CPU for ``work_ms`` milliseconds, then ``os._exit()``s. The parent +then sleeps for ``hold_ms`` milliseconds so duct gets at least one +sample window *after* all children have died. + +Uses bare ``os.fork`` (not ``multiprocessing.Process``) so child +lifetime is determined by ``work_ms``, not by interpreter re-init / +spawn-method overhead. Under ``multiprocessing`` on PyPy or Python +3.14+ (forkserver default), child startup bloats well past ``work_ms`` +and children get caught by ``ps`` sampling -- defeating the "children +die between samples" story this workload anchors. + +Ground truth: the cgroup used approximately ``N * work_ms`` total CPU +milliseconds in a span of ``work_ms`` wall milliseconds -- i.e. a +burst at roughly ``N * 100%`` peak pcpu. A sampler that reads +cumulative cgroup counters (``cpu.stat.usage_usec``) captures this +even if the children have already exited by sample time. A sampler +that relies on per-pid snapshots at sample time (``ps -s ``) +sees an empty session and misses the burst. + +POSIX-only (``os.fork``); duct's session sampling requires POSIX +anyway. + +Standalone usage (without duct): + + python test/data/workloads/ephemeral_cpu.py \\ + +""" + +from __future__ import annotations +import os +import sys +import time + + +def _busy(duration_s: float) -> None: + end = time.monotonic() + duration_s + while time.monotonic() < end: + pass + + +def main() -> None: + if len(sys.argv) != 4: + print( + "usage: ephemeral_cpu.py ", + file=sys.stderr, + ) + sys.exit(2) + num_workers = int(sys.argv[1]) + work_ms = int(sys.argv[2]) + hold_ms = int(sys.argv[3]) + + work_s = work_ms / 1000 + pids = [] + for _ in range(num_workers): + pid = os.fork() + if pid == 0: + _busy(work_s) + os._exit(0) + pids.append(pid) + for pid in pids: + os.waitpid(pid, 0) + + # Keep the parent alive so duct's monitor thread gets at least one + # sample window after all children have exited. Without this, the + # workload may end before any sampling happens at all. + time.sleep(hold_ms / 1000) + + +if __name__ == "__main__": + main() diff --git a/test/data/workloads/spikey_cpu.py b/test/data/workloads/spikey_cpu.py new file mode 100644 index 00000000..5909993b --- /dev/null +++ b/test/data/workloads/spikey_cpu.py @@ -0,0 +1,75 @@ +"""Spikey CPU workload: multi-threaded native bursts (pcpu-overshoot trigger). + +Forks N parallel worker processes. Each worker spawns M threads that +call ``hashlib.pbkdf2_hmac`` in a tight loop for ``duration`` seconds. +``pbkdf2_hmac`` releases the GIL inside its C implementation, so the +threads actually parallelize across cores -- this is a principled +stdlib-only equivalent of the real #399 trigger (pip compiling C +extensions under tox: short-lived multi-core native work). + +Ground truth: across ``duration`` wall-seconds the cgroup uses up to +``min(cpu_count, N*M) * duration`` CPU-seconds. Real peak instantaneous +%CPU is bounded by ``cpu_count * 100%``. + +Pathology on Linux (Bug 1, RESEARCH.md section 1.1): ``ps -o pcpu`` +is ``cputime / elapsed`` accumulated over each process's lifetime. +For a young multi-threaded worker, the small elapsed denominator +inflates the ratio arbitrarily. Duct's per-pid summing across the +session then compounds this across workers. Real-world cases hit +>1000% reported pcpu. + +Standalone usage (without duct): + + python test/data/workloads/spikey_cpu.py \\ + +""" + +from __future__ import annotations +import hashlib +import multiprocessing +import sys +import threading +import time + +_PBKDF2_ITERATIONS = 10_000 + + +def _worker(num_threads: int, duration_s: float) -> None: + """Spawn num_threads threads doing pbkdf2_hmac until duration elapses.""" + end = time.monotonic() + duration_s + + def burst() -> None: + while time.monotonic() < end: + # Fresh bytes each round so the interpreter can't cache. + hashlib.pbkdf2_hmac("sha256", b"password", b"salt", _PBKDF2_ITERATIONS) + + threads = [threading.Thread(target=burst) for _ in range(num_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + + +def main() -> None: + if len(sys.argv) != 4: + print( + "usage: spikey_cpu.py " "", + file=sys.stderr, + ) + sys.exit(2) + num_workers = int(sys.argv[1]) + num_threads = int(sys.argv[2]) + duration_s = float(sys.argv[3]) + + procs = [ + multiprocessing.Process(target=_worker, args=(num_threads, duration_s)) + for _ in range(num_workers) + ] + for p in procs: + p.start() + for p in procs: + p.join() + + +if __name__ == "__main__": + main() diff --git a/test/data/workloads/steady_cpu.py b/test/data/workloads/steady_cpu.py new file mode 100644 index 00000000..dbc2a488 --- /dev/null +++ b/test/data/workloads/steady_cpu.py @@ -0,0 +1,35 @@ +"""Steady-state CPU workload: pin to one core, busy-loop for N seconds. + +Ground truth: a single-process workload pinned to one core saturates +that core, so any accurate sampler should report ``peak_pcpu ~= 100%`` +and wall-clock time roughly equal to the requested duration. + +Standalone usage (without duct): + + python test/data/workloads/steady_cpu.py +""" + +from __future__ import annotations +import os +import sys +import time + + +def main() -> None: + if len(sys.argv) != 2: + print("usage: steady_cpu.py ", file=sys.stderr) + sys.exit(2) + duration = float(sys.argv[1]) + + # Pin to one core so the pcpu ceiling is deterministic across machines. + # Linux-only; macOS lacks a stdlib equivalent, so we run unpinned there. + if hasattr(os, "sched_setaffinity"): + os.sched_setaffinity(0, {0}) + + end = time.monotonic() + duration + while time.monotonic() < end: + pass + + +if __name__ == "__main__": + main() diff --git a/test/duct_main/test_resource_validation.py b/test/duct_main/test_resource_validation.py new file mode 100644 index 00000000..67622aeb --- /dev/null +++ b/test/duct_main/test_resource_validation.py @@ -0,0 +1,331 @@ +"""Validate that duct-reported resource stats match actual resource usage. + +These tests run programs with known, predictable resource consumption +(memory allocation, CPU usage) and verify that duct's measurements +fall within expected bounds. This bridges the gap between unit tests +(which verify aggregation math) and real-world accuracy. +""" + +from __future__ import annotations +import json +import os +from pathlib import Path +import sys +from typing import Any +import pytest +from utils import run_duct_command +from con_duct._constants import SUFFIXES + +TEST_DATA_DIR = Path(__file__).parent.parent / "data" +TEST_SCRIPT = str(TEST_DATA_DIR / "test_script.py") +MEMORY_CHILDREN_SCRIPT = str(TEST_DATA_DIR / "memory_children.py") + + +def _read_info(temp_output_dir: str) -> Any: + with open(os.path.join(temp_output_dir, SUFFIXES["info"])) as f: + return json.loads(f.read()) + + +def _read_usage(temp_output_dir: str) -> list[Any]: + lines = [] + with open(os.path.join(temp_output_dir, SUFFIXES["usage"])) as f: + for line in f: + line = line.strip() + if line: + lines.append(json.loads(line)) + return lines + + +@pytest.mark.flaky(reruns=3) +def test_memory_allocation_detected(temp_output_dir: str) -> None: + """Allocate a known amount of memory and verify duct detects it. + + Runs test_script.py which allocates --memory-size MB via bytearray. + Duct should report peak RSS at least that large (plus Python overhead). + """ + alloc_mb = 50 + alloc_bytes = alloc_mb * 1024 * 1024 + + assert ( + run_duct_command( + [ + sys.executable, + TEST_SCRIPT, + "--duration", + "2", + "--memory-size", + str(alloc_mb), + "--cpu-load", + "1", + ], + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + + info = _read_info(temp_output_dir) + summary = info["execution_summary"] + + peak_rss = summary["peak_rss"] + # RSS must be at least the allocated amount (bytearray is contiguous in memory) + assert peak_rss >= alloc_bytes, ( + f"peak_rss ({peak_rss / 1024 / 1024:.1f} MB) should be >= " + f"allocated ({alloc_mb} MB)" + ) + # Sanity upper bound: shouldn't report more than allocation + 200MB overhead + overhead_limit = alloc_bytes + 200 * 1024 * 1024 + assert peak_rss < overhead_limit, ( + f"peak_rss ({peak_rss / 1024 / 1024:.1f} MB) unreasonably high " + f"for {alloc_mb} MB allocation" + ) + + +@pytest.mark.flaky(reruns=3) +def test_wall_clock_time_accurate(temp_output_dir: str) -> None: + """Verify wall clock time matches actual sleep duration.""" + duration = 1.0 + + assert ( + run_duct_command( + ["sleep", str(duration)], + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + + info = _read_info(temp_output_dir) + wall_clock = info["execution_summary"]["wall_clock_time"] + + # Should be close to the requested duration + assert ( + wall_clock >= duration + ), f"wall_clock_time ({wall_clock:.2f}s) < requested sleep ({duration}s)" + # Allow generous overhead for slow CI environments + assert wall_clock < duration + 2.0, ( + f"wall_clock_time ({wall_clock:.2f}s) unreasonably high " + f"for {duration}s sleep" + ) + + +@pytest.mark.flaky(reruns=3) +def test_idle_process_low_cpu(temp_output_dir: str) -> None: + """A sleeping process should report near-zero CPU usage.""" + assert ( + run_duct_command( + ["sleep", "1"], + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + + info = _read_info(temp_output_dir) + summary = info["execution_summary"] + + assert ( + summary["peak_pcpu"] < 5.0 + ), f"peak_pcpu ({summary['peak_pcpu']}) should be near-zero for sleep" + assert ( + summary["average_pcpu"] < 5.0 + ), f"average_pcpu ({summary['average_pcpu']}) should be near-zero for sleep" + + +@pytest.mark.flaky(reruns=3) +def test_cpu_intensive_detected(temp_output_dir: str) -> None: + """A busy-loop process should report significant CPU usage.""" + assert ( + run_duct_command( + [ + sys.executable, + TEST_SCRIPT, + "--duration", + "2", + "--memory-size", + "1", + "--cpu-load", + "100000", + ], + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + + info = _read_info(temp_output_dir) + summary = info["execution_summary"] + + # A busy-loop should show meaningful CPU usage + assert ( + summary["peak_pcpu"] > 10.0 + ), f"peak_pcpu ({summary['peak_pcpu']}) should be significant for busy-loop" + + +@pytest.mark.flaky(reruns=3) +def test_usage_samples_recorded(temp_output_dir: str) -> None: + """Verify that usage.jsonl contains samples with expected structure.""" + assert ( + run_duct_command( + ["sleep", "1"], + sample_interval=0.1, + report_interval=0.3, + output_prefix=temp_output_dir, + ) + == 0 + ) + + samples = _read_usage(temp_output_dir) + + # With 1s sleep and 0.3s report interval, expect at least 2 reports + assert len(samples) >= 2, f"Expected at least 2 usage samples, got {len(samples)}" + + for i, sample in enumerate(samples): + assert "timestamp" in sample, f"Sample {i} missing timestamp" + assert "totals" in sample, f"Sample {i} missing totals" + totals = sample["totals"] + assert "rss" in totals, f"Sample {i} totals missing rss" + assert "pcpu" in totals, f"Sample {i} totals missing pcpu" + # RSS should be non-negative + assert totals["rss"] >= 0, f"Sample {i} has negative rss: {totals['rss']}" + + +@pytest.mark.flaky(reruns=3) +def test_multiple_samples_show_consistent_memory(temp_output_dir: str) -> None: + """Memory held for the full duration should appear consistently across samples.""" + alloc_mb = 30 + alloc_bytes = alloc_mb * 1024 * 1024 + + assert ( + run_duct_command( + [ + sys.executable, + TEST_SCRIPT, + "--duration", + "2", + "--memory-size", + str(alloc_mb), + "--cpu-load", + "1", + ], + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + + samples = _read_usage(temp_output_dir) + assert len(samples) >= 2, f"Expected multiple samples, got {len(samples)}" + + # At least some samples should show the allocated memory + samples_above_threshold = [s for s in samples if s["totals"]["rss"] >= alloc_bytes] + assert len(samples_above_threshold) >= 1, ( + f"No usage samples showed RSS >= {alloc_mb} MB. " + f"Sample RSS values: {[s['totals']['rss'] / 1024 / 1024 for s in samples]}" + ) + + +# --- Child/forked process resource validation --- + + +@pytest.mark.flaky(reruns=3) +def test_child_processes_memory_aggregated(temp_output_dir: str) -> None: + """Spawn children that each allocate memory; verify total RSS reflects the sum. + + Uses multiprocessing to fork N children each holding M MB. + The total RSS across all processes should be at least N * M MB. + """ + num_children = 3 + mb_per_child = 20 + hold_seconds = 3.0 + total_alloc_bytes = num_children * mb_per_child * 1024 * 1024 + + assert ( + run_duct_command( + [ + sys.executable, + MEMORY_CHILDREN_SCRIPT, + str(num_children), + str(mb_per_child), + str(hold_seconds), + ], + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + + info = _read_info(temp_output_dir) + summary = info["execution_summary"] + + # peak_rss is total across all tracked processes + peak_rss = summary["peak_rss"] + assert peak_rss >= total_alloc_bytes, ( + f"peak_rss ({peak_rss / 1024 / 1024:.1f} MB) should be >= " + f"total allocation ({num_children} x {mb_per_child} = " + f"{num_children * mb_per_child} MB)" + ) + + # Also check usage.jsonl samples show multiple processes + samples = _read_usage(temp_output_dir) + max_pids_seen = max(len(s["processes"]) for s in samples) + # Should see parent + N children (at least N+1 processes) + assert max_pids_seen >= num_children + 1, ( + f"Expected at least {num_children + 1} processes in samples, " + f"but max PIDs in any sample was {max_pids_seen}" + ) + + +@pytest.mark.flaky(reruns=3) +def test_child_processes_individually_tracked(temp_output_dir: str) -> None: + """Verify per-process stats in usage.jsonl track individual children.""" + num_children = 2 + mb_per_child = 25 + hold_seconds = 3.0 + child_alloc_bytes = mb_per_child * 1024 * 1024 + + assert ( + run_duct_command( + [ + sys.executable, + MEMORY_CHILDREN_SCRIPT, + str(num_children), + str(mb_per_child), + str(hold_seconds), + ], + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + + samples = _read_usage(temp_output_dir) + + # Find samples where children are running (multiple processes visible) + multi_proc_samples = [s for s in samples if len(s["processes"]) > 1] + assert len(multi_proc_samples) >= 1, "No samples captured multiple processes" + + # In at least one sample, individual child processes should show their allocation + # (each child holds mb_per_child MB) + children_with_expected_rss = set() + for sample in multi_proc_samples: + for pid, proc in sample["processes"].items(): + if proc["rss"] >= child_alloc_bytes: + children_with_expected_rss.add(pid) + + rss_debug = [ + {pid: p["rss"] / 1024 / 1024 for pid, p in s["processes"].items()} + for s in multi_proc_samples[:3] + ] + assert len(children_with_expected_rss) >= num_children, ( + f"Expected at least {num_children} child processes with RSS >= " + f"{mb_per_child} MB, found {len(children_with_expected_rss)}. " + f"Per-process RSS values: {rss_debug}" + ) diff --git a/test/duct_main/test_sampler_matrix.py b/test/duct_main/test_sampler_matrix.py new file mode 100644 index 00000000..569d6773 --- /dev/null +++ b/test/duct_main/test_sampler_matrix.py @@ -0,0 +1,639 @@ +"""Sampler matrix: per-(sampler, workload, metric, direction) cells. + +Each test is marked with ``@pytest.mark.sampler_matrix`` carrying the +``(sampler, workload, metric, direction, expected)`` metadata. The +conftest hook converts ``expected="fail"`` into ``xfail(strict=True)``, +so the suite stays green both when a sampler meets an expected-pass +claim AND when a known limitation keeps on holding. Cells are emitted +to ``.sampler_matrix_results.jsonl`` and pivoted into +``test/sampler_matrix_.csv`` by +``scripts/gen_sampler_matrix.py`` (rows=``/``, +columns=``no_``). + +``direction`` is either ``"underreport"`` or ``"overreport"`` and +names what the test asserts the sampler DOES NOT do. A test marked +``direction="overreport"`` asserts ``measured <= ceiling``; failing +means the sampler over-reported. A test marked +``direction="underreport"`` asserts ``measured >= floor``; failing +means the sampler under-reported. + +Conventions: + +- Each test exercises exactly one cell of the matrix. +- Test name follows + ``test____no_`` so the matrix + story is readable in pytest output too. +- Short durations + small allocations so the matrix is cheap to run. + +TODO: consolidate with ``test/duct_main/test_resource_validation.py`` +once the POC direction lands. Both files exercise the same workloads +with similar bounds; the matrix version adds per-cell metadata and +xfail dispatch. Either merge or retire the duplicate after acceptance. +""" + +from __future__ import annotations +import json +import os +from pathlib import Path +import platform +import shutil +import subprocess +import sys +from typing import Any +import pytest +from utils import run_duct_command +from con_duct._constants import SUFFIXES + +DATA_DIR = Path(__file__).parent.parent / "data" +WORKLOADS_DIR = DATA_DIR / "workloads" +ALLOC_MEMORY_SCRIPT = str(WORKLOADS_DIR / "alloc_memory.py") +STEADY_CPU_SCRIPT = str(WORKLOADS_DIR / "steady_cpu.py") +EPHEMERAL_CPU_SCRIPT = str(WORKLOADS_DIR / "ephemeral_cpu.py") +SPIKEY_CPU_SCRIPT = str(WORKLOADS_DIR / "spikey_cpu.py") +MEMORY_CHILDREN_SCRIPT = str(DATA_DIR / "memory_children.py") + +# Path to duct in the current venv -- resolves to .tox/py312/bin/duct +# under tox, .venv-austin/bin/duct in a direct venv invocation, etc. +DUCT_BIN = str(Path(sys.executable).parent / "duct") + + +def _read_info(temp_output_dir: str) -> Any: + with open(os.path.join(temp_output_dir, SUFFIXES["info"])) as f: + return json.loads(f.read()) + + +def _skip_unless_systemd_run_scope() -> None: + """Skip the calling test unless ``systemd-run --user --scope`` is usable. + + Cgroup-matrix tests spawn each duct invocation in a transient + systemd scope so the cgroup counters CgroupSampler reads are + dedicated to just duct + its workload, not polluted by the test + runner or sibling processes in the caller's cgroup. + """ + if shutil.which("systemd-run") is None: + pytest.skip("systemd-run not on PATH") + probe = subprocess.run( + ["systemctl", "--user", "is-system-running"], + capture_output=True, + text=True, + ) + if probe.returncode != 0 and not probe.stdout.strip(): + pytest.skip(f"user systemd not running: {probe.stderr.strip() or 'unknown'}") + if not Path(DUCT_BIN).exists(): + pytest.skip(f"duct binary not found at {DUCT_BIN}") + + +def _skip_unless_linux() -> None: + """Skip on non-Linux. Bug 1 (ps pcpu overshoot) is Linux-specific. + + BSD/Darwin ``ps -o pcpu`` is a decaying ~1min average, not a + lifetime cumulative ratio, so the overshoot mechanism doesn't + apply. See RESEARCH.md section 1.1. + """ + if platform.system() != "Linux": + pytest.skip( + f"Bug 1 is Linux-specific; {platform.system()} ps uses a " + f"decaying average, not lifetime cumulative" + ) + + +def _skip_if_no_thread_oversubscription(demand: int) -> None: + """Skip when cpu_count >= workload thread demand. + + Bug 1's ps-pcpu-overshoot amplifies when threads oversubscribe + cores (young processes + scheduler contention). On hosts with + far more cores than our workload demands, ps reports stay close + to legitimate physical peak and the ``no_overreport`` ceiling + can't cleanly discriminate ps from cgroup. Workload + ceiling + are tuned for 4-12 core hosts (typical laptops / small runners). + """ + have = os.cpu_count() or 1 + if have >= demand: + pytest.skip( + f"host has {have} cores and workload demands {demand} threads; " + f"no oversubscription, so Bug 1 cannot reliably inflate ps " + f"beyond physical. Run on a host with cpu_count < {demand}." + ) + + +def _run_duct_in_scope( + out_prefix: str, + workload_args: list[str], + sample_interval: float = 0.1, + report_interval: float = 0.5, +) -> None: + """Run duct in a transient systemd scope so its cgroup is dedicated. + + The scope's cgroup contains exactly ``duct + workload`` -- pytest + and anything else running on the host stay in their own cgroups. + CgroupSampler therefore reads clean ``memory.current`` / ``cpu.stat`` + values, and matrix ceiling/floor assertions can actually + discriminate ps-with-bug-1 vs. cgroup. + """ + subprocess.run( + [ + "systemd-run", + "--user", + "--scope", + "--collect", + "--quiet", + "--", + DUCT_BIN, + "--sampler=cgroup-ps-hybrid", + "--mode=current-session", + "--log-level=ERROR", + f"--sample-interval={sample_interval}", + f"--report-interval={report_interval}", + "-p", + out_prefix, + *workload_args, + ], + check=True, + ) + + +# N and M chosen so ps's per-pid shared-lib double-counting is large +# enough to reliably exceed a ceiling that cgroup totals would respect. +# Empirically: alloc=80 MB, ps reports ~135 MB, cgroup reports ~110 MB. +_MEM_CHILDREN_N = 4 +_MEM_CHILDREN_M = 20 + +# Ephemeral CPU: short-lived parallel workers that die between duct +# samples (sample_interval=0.1s, each worker runs ~30ms of CPU then +# exits). Parent holds 500ms so duct gets at least one sample after +# children are gone. +_EPHEMERAL_N_WORKERS = 4 +_EPHEMERAL_WORK_MS = 30 +_EPHEMERAL_HOLD_MS = 500 + +# Spikey CPU: multi-threaded native work (pbkdf2_hmac releases GIL) +# via N parallel workers, each with M threads, for D seconds. Fast +# sampling (0.01s) catches workers at young lifetimes where ps's +# cputime/elapsed ratio is inflated per hub's Bug 1 analysis. +_SPIKEY_N_WORKERS = 4 +_SPIKEY_N_THREADS = 8 +_SPIKEY_DURATION_S = 0.3 +# Physical ceiling on instantaneous %CPU. Bounded by cores in use; +# cgroup respects this, ps (Bug 1) can exceed by an order of magnitude. +_SPIKEY_PCPU_CEILING = (os.cpu_count() or 1) * 100 + 100 + + +# ---- ps column ---- + + +@pytest.mark.flaky(reruns=3) +@pytest.mark.sampler_matrix( + sampler="ps", + workload="alloc_memory", + metric="rss", + direction="underreport", + expected="pass", +) +def test_ps_alloc_memory_rss_no_underreport(temp_output_dir: str) -> None: + alloc_mb = 50 + assert ( + run_duct_command( + [sys.executable, ALLOC_MEMORY_SCRIPT, str(alloc_mb), "1.5"], + sampler="ps", + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + peak_rss = _read_info(temp_output_dir)["execution_summary"]["peak_rss"] + assert peak_rss >= alloc_mb * 1024 * 1024, ( + f"peak_rss ({peak_rss / 1024 / 1024:.1f} MB) should be >= " + f"allocated ({alloc_mb} MB)" + ) + + +@pytest.mark.flaky(reruns=3) +@pytest.mark.sampler_matrix( + sampler="ps", + workload="memory_children", + metric="rss", + direction="underreport", + expected="pass", +) +def test_ps_memory_children_rss_no_underreport( + temp_output_dir: str, +) -> None: + assert ( + run_duct_command( + [ + sys.executable, + MEMORY_CHILDREN_SCRIPT, + str(_MEM_CHILDREN_N), + str(_MEM_CHILDREN_M), + "1.5", + ], + sampler="ps", + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + peak_rss = _read_info(temp_output_dir)["execution_summary"]["peak_rss"] + alloc_bytes = _MEM_CHILDREN_N * _MEM_CHILDREN_M * 1024 * 1024 + assert peak_rss >= alloc_bytes, ( + f"peak_rss ({peak_rss / 1024 / 1024:.1f} MB) should be >= " + f"total allocation " + f"({_MEM_CHILDREN_N * _MEM_CHILDREN_M} MB)" + ) + + +@pytest.mark.flaky(reruns=3) +@pytest.mark.sampler_matrix( + sampler="ps", + workload="memory_children", + metric="rss", + direction="overreport", + expected="fail", +) +def test_ps_memory_children_rss_no_overreport( + temp_output_dir: str, +) -> None: + """ps sums RSS per PID, double-counting shared library pages. + + Expected to fail: the sum of per-PID RSS reported by ps exceeds a + realistic ceiling on actual physical memory used by the session. + Under cgroup-ps-hybrid, session totals come from the kernel and + this assertion holds. + """ + assert ( + run_duct_command( + [ + sys.executable, + MEMORY_CHILDREN_SCRIPT, + str(_MEM_CHILDREN_N), + str(_MEM_CHILDREN_M), + "1.5", + ], + sampler="ps", + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + peak_rss = _read_info(temp_output_dir)["execution_summary"]["peak_rss"] + alloc_bytes = _MEM_CHILDREN_N * _MEM_CHILDREN_M * 1024 * 1024 + ceiling = alloc_bytes + 40 * 1024 * 1024 + assert peak_rss <= ceiling, ( + f"ps reported peak_rss {peak_rss / 1024 / 1024:.1f} MB > " + f"physical ceiling {ceiling / 1024 / 1024:.1f} MB " + f"({_MEM_CHILDREN_N} children x {_MEM_CHILDREN_M} MB)" + ) + + +@pytest.mark.flaky(reruns=3) +@pytest.mark.sampler_matrix( + sampler="ps", + workload="steady_cpu", + metric="pcpu", + direction="underreport", + expected="pass", +) +def test_ps_steady_cpu_pcpu_no_underreport( + temp_output_dir: str, +) -> None: + duration = 2.0 + assert ( + run_duct_command( + [sys.executable, STEADY_CPU_SCRIPT, str(duration)], + sampler="ps", + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + peak_pcpu = _read_info(temp_output_dir)["execution_summary"]["peak_pcpu"] + assert peak_pcpu >= 80.0, ( + f"peak_pcpu ({peak_pcpu}) should be >= 80% for one-core-pinned " f"busy-loop" + ) + + +@pytest.mark.flaky(reruns=3) +@pytest.mark.sampler_matrix( + sampler="ps", + workload="ephemeral_cpu", + metric="pcpu", + direction="underreport", + expected="fail", +) +def test_ps_ephemeral_cpu_pcpu_no_underreport( + temp_output_dir: str, +) -> None: + """ps misses CPU consumed by children that exit between samples. + + Expected to fail: short-lived parallel workers die before ps + samples them, so ``ps -s `` shows an empty session at sample + time and reported peak_pcpu stays near zero -- even though the + cgroup actually burned ~N-cores worth of CPU. Under + cgroup-ps-hybrid, ``cpu.stat.usage_usec`` is cumulative and + captures work regardless of process lifetime. + """ + assert ( + run_duct_command( + [ + sys.executable, + EPHEMERAL_CPU_SCRIPT, + str(_EPHEMERAL_N_WORKERS), + str(_EPHEMERAL_WORK_MS), + str(_EPHEMERAL_HOLD_MS), + ], + sampler="ps", + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + peak_pcpu = _read_info(temp_output_dir)["execution_summary"]["peak_pcpu"] or 0.0 + # Floor chosen to comfortably discriminate ps (reports ~0% because + # children die between samples) from cgroup (reports > 100% for + # 4 parallel bursts over the sample window). Generous slack for + # Python startup + sample-window dilution. + floor = 80.0 + assert peak_pcpu >= floor, ( + f"peak_pcpu ({peak_pcpu}) should be >= {floor}% for " + f"{_EPHEMERAL_N_WORKERS} parallel {_EPHEMERAL_WORK_MS}ms workers" + ) + + +@pytest.mark.flaky(reruns=3) +@pytest.mark.sampler_matrix( + sampler="ps", + workload="spikey_cpu", + metric="pcpu", + direction="overreport", + expected="fail", +) +def test_ps_spikey_cpu_pcpu_no_overreport( + temp_output_dir: str, +) -> None: + """ps Bug 1: multi-threaded young processes inflate cputime/elapsed. + + Expected to fail (Linux only): ``ps -o pcpu`` is cumulative over + a process's lifetime, so a worker that just started a multi-core + pbkdf2 burst reports ``cputime / elapsed`` with a small elapsed + denominator -- easily several hundred percent per worker. Duct's + per-pid sum across the session then compounds this across N + parallel workers. Real-world #399 (pip compiling C extensions + under tox) hits >1000%. + + Cite: RESEARCH.md section 1.1 (pcpu fully broken); + DEEP_DIVE_PROGRESS.md section 2 (Bug 1 confirmed in + src/con_duct/_sampling.py). Distinct from Bug 2 (aggregation + inconsistency, xfailed in c017800) -- do not conflate. + """ + _skip_unless_linux() + _skip_if_no_thread_oversubscription(_SPIKEY_N_WORKERS * _SPIKEY_N_THREADS) + assert ( + run_duct_command( + [ + sys.executable, + SPIKEY_CPU_SCRIPT, + str(_SPIKEY_N_WORKERS), + str(_SPIKEY_N_THREADS), + str(_SPIKEY_DURATION_S), + ], + sampler="ps", + sample_interval=0.01, + report_interval=0.1, + output_prefix=temp_output_dir, + ) + == 0 + ) + peak_pcpu = _read_info(temp_output_dir)["execution_summary"]["peak_pcpu"] or 0.0 + # Ceiling: the workload demands N*M threads worth of parallel + # work, but the host only has cpu_count cores; physical peak can't + # exceed that. Plus a generous 100% slack. + assert peak_pcpu <= _SPIKEY_PCPU_CEILING, ( + f"ps reported peak_pcpu {peak_pcpu:.0f}% > " + f"physical ceiling {_SPIKEY_PCPU_CEILING:.0f}% " + f"(Bug 1: cputime/elapsed inflation x per-pid sum)" + ) + + +# ---- cgroup-ps-hybrid column ---- +# +# Opt-in: marked cgroup_matrix (see conftest.py --cgroup-matrix flag). +# Each test spawns duct in a transient systemd --user --scope so the +# cgroup CgroupSampler reads is dedicated to just duct + the workload. +# Without the scope, duct's ambient cgroup (a login user slice, a +# non-ns container, etc.) contains far more memory than our workload +# and matrix assertions become meaningless. + + +@pytest.mark.cgroup_matrix +@pytest.mark.flaky(reruns=3) +@pytest.mark.sampler_matrix( + sampler="cgroup-ps-hybrid", + workload="alloc_memory", + metric="rss", + direction="underreport", + expected="pass", +) +def test_cgroup_alloc_memory_rss_no_underreport( + temp_output_dir: str, +) -> None: + _skip_unless_systemd_run_scope() + alloc_mb = 50 + _run_duct_in_scope( + out_prefix=temp_output_dir, + workload_args=[ + sys.executable, + ALLOC_MEMORY_SCRIPT, + str(alloc_mb), + "1.5", + ], + ) + peak_rss = _read_info(temp_output_dir)["execution_summary"]["peak_rss"] + assert peak_rss >= alloc_mb * 1024 * 1024, ( + f"cgroup peak_rss ({peak_rss / 1024 / 1024:.1f} MB) should be >= " + f"allocated ({alloc_mb} MB)" + ) + + +@pytest.mark.cgroup_matrix +@pytest.mark.flaky(reruns=3) +@pytest.mark.sampler_matrix( + sampler="cgroup-ps-hybrid", + workload="memory_children", + metric="rss", + direction="underreport", + expected="pass", +) +def test_cgroup_memory_children_rss_no_underreport( + temp_output_dir: str, +) -> None: + _skip_unless_systemd_run_scope() + _run_duct_in_scope( + out_prefix=temp_output_dir, + workload_args=[ + sys.executable, + MEMORY_CHILDREN_SCRIPT, + str(_MEM_CHILDREN_N), + str(_MEM_CHILDREN_M), + "1.5", + ], + ) + peak_rss = _read_info(temp_output_dir)["execution_summary"]["peak_rss"] + alloc_bytes = _MEM_CHILDREN_N * _MEM_CHILDREN_M * 1024 * 1024 + assert peak_rss >= alloc_bytes, ( + f"cgroup peak_rss ({peak_rss / 1024 / 1024:.1f} MB) should be >= " + f"total allocation " + f"({_MEM_CHILDREN_N * _MEM_CHILDREN_M} MB)" + ) + + +@pytest.mark.cgroup_matrix +@pytest.mark.flaky(reruns=3) +@pytest.mark.sampler_matrix( + sampler="cgroup-ps-hybrid", + workload="memory_children", + metric="rss", + direction="overreport", + expected="pass", +) +def test_cgroup_memory_children_rss_no_overreport( + temp_output_dir: str, +) -> None: + """cgroup memory.current reports physical memory, not per-pid sums. + + Expected to pass: this is the flip of the ps-column cell. cgroup + counts each physical page once regardless of how many PIDs have + it mapped, so shared library pages don't inflate the total. + """ + _skip_unless_systemd_run_scope() + _run_duct_in_scope( + out_prefix=temp_output_dir, + workload_args=[ + sys.executable, + MEMORY_CHILDREN_SCRIPT, + str(_MEM_CHILDREN_N), + str(_MEM_CHILDREN_M), + "1.5", + ], + ) + peak_rss = _read_info(temp_output_dir)["execution_summary"]["peak_rss"] + # Ceiling: alloc + 60 MB (interpreter + shared-libs-once + headroom). + # ps-column reports ~180 MB for this workload (double-counts shared + # libs per child) so the 140 MB ceiling discriminates. + alloc_bytes = _MEM_CHILDREN_N * _MEM_CHILDREN_M * 1024 * 1024 + ceiling = alloc_bytes + 60 * 1024 * 1024 + assert peak_rss <= ceiling, ( + f"cgroup reported peak_rss {peak_rss / 1024 / 1024:.1f} MB > " + f"physical ceiling {ceiling / 1024 / 1024:.1f} MB " + f"({_MEM_CHILDREN_N} children x {_MEM_CHILDREN_M} MB)" + ) + + +@pytest.mark.cgroup_matrix +@pytest.mark.flaky(reruns=3) +@pytest.mark.sampler_matrix( + sampler="cgroup-ps-hybrid", + workload="steady_cpu", + metric="pcpu", + direction="underreport", + expected="pass", +) +def test_cgroup_steady_cpu_pcpu_no_underreport( + temp_output_dir: str, +) -> None: + _skip_unless_systemd_run_scope() + duration = 2.0 + _run_duct_in_scope( + out_prefix=temp_output_dir, + workload_args=[sys.executable, STEADY_CPU_SCRIPT, str(duration)], + ) + peak_pcpu = _read_info(temp_output_dir)["execution_summary"]["peak_pcpu"] + assert peak_pcpu >= 80.0, ( + f"cgroup peak_pcpu ({peak_pcpu}) should be >= 80% for " + f"one-core-pinned busy-loop" + ) + + +@pytest.mark.cgroup_matrix +@pytest.mark.flaky(reruns=3) +@pytest.mark.sampler_matrix( + sampler="cgroup-ps-hybrid", + workload="ephemeral_cpu", + metric="pcpu", + direction="underreport", + expected="pass", +) +def test_cgroup_ephemeral_cpu_pcpu_no_underreport( + temp_output_dir: str, +) -> None: + """cgroup cpu.stat is cumulative: dead children's CPU is counted. + + Expected to pass: short-lived workers that finish between duct + samples still contribute to ``cpu.stat.usage_usec``. The delta + between two samples captures the full burst. + """ + _skip_unless_systemd_run_scope() + _run_duct_in_scope( + out_prefix=temp_output_dir, + workload_args=[ + sys.executable, + EPHEMERAL_CPU_SCRIPT, + str(_EPHEMERAL_N_WORKERS), + str(_EPHEMERAL_WORK_MS), + str(_EPHEMERAL_HOLD_MS), + ], + ) + peak_pcpu = _read_info(temp_output_dir)["execution_summary"]["peak_pcpu"] or 0.0 + # Same floor as ps version: discriminates "captured non-zero CPU" + # from "missed entirely." + floor = 80.0 + assert peak_pcpu >= floor, ( + f"cgroup peak_pcpu ({peak_pcpu}) should be >= {floor}% for " + f"{_EPHEMERAL_N_WORKERS} parallel {_EPHEMERAL_WORK_MS}ms workers" + ) + + +@pytest.mark.cgroup_matrix +@pytest.mark.flaky(reruns=3) +@pytest.mark.sampler_matrix( + sampler="cgroup-ps-hybrid", + workload="spikey_cpu", + metric="pcpu", + direction="overreport", + expected="pass", +) +def test_cgroup_spikey_cpu_pcpu_no_overreport( + temp_output_dir: str, +) -> None: + """cgroup cpu.stat delta is bounded by cores in use. + + Expected to pass: unlike ps's lifetime-cumulative per-pid ratio, + ``usage_usec`` delta over a sample interval measures actual CPU + time consumed during that window. Bounded by + ``cpu_count * sample_interval``. + """ + _skip_unless_systemd_run_scope() + _skip_unless_linux() + _skip_if_no_thread_oversubscription(_SPIKEY_N_WORKERS * _SPIKEY_N_THREADS) + _run_duct_in_scope( + out_prefix=temp_output_dir, + workload_args=[ + sys.executable, + SPIKEY_CPU_SCRIPT, + str(_SPIKEY_N_WORKERS), + str(_SPIKEY_N_THREADS), + str(_SPIKEY_DURATION_S), + ], + sample_interval=0.01, + report_interval=0.1, + ) + peak_pcpu = _read_info(temp_output_dir)["execution_summary"]["peak_pcpu"] or 0.0 + assert peak_pcpu <= _SPIKEY_PCPU_CEILING, ( + f"cgroup reported peak_pcpu {peak_pcpu:.0f}% > " + f"physical ceiling {_SPIKEY_PCPU_CEILING:.0f}% " + f"(cgroup cpu.stat should be bounded by cpu_count)" + ) diff --git a/test/sampler_matrix_cgroup-ps-hybrid.csv b/test/sampler_matrix_cgroup-ps-hybrid.csv new file mode 100644 index 00000000..aea8a028 --- /dev/null +++ b/test/sampler_matrix_cgroup-ps-hybrid.csv @@ -0,0 +1,6 @@ +workload/metric,no_overreport,no_underreport +alloc_memory/rss,n/a,pass +ephemeral_cpu/pcpu,n/a,pass +memory_children/rss,pass,pass +spikey_cpu/pcpu,pass,n/a +steady_cpu/pcpu,n/a,pass diff --git a/test/sampler_matrix_ps.csv b/test/sampler_matrix_ps.csv new file mode 100644 index 00000000..a771ae60 --- /dev/null +++ b/test/sampler_matrix_ps.csv @@ -0,0 +1,6 @@ +workload/metric,no_overreport,no_underreport +alloc_memory/rss,n/a,pass +ephemeral_cpu/pcpu,n/a,fail +memory_children/rss,fail,pass +spikey_cpu/pcpu,fail,n/a +steady_cpu/pcpu,n/a,pass