diff --git a/README.md b/README.md index 3a00912d..4c87f64b 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,10 @@ Try it out using either `duct` or `con-duct run`: `duct` is most useful when the report-interval is less than the duration of the script. +## Documentation + +- [Interpreting duct's resource statistics](docs/resource-statistics.md) — what `pcpu`, `rss`, and the `con-duct plot` chart actually measure, and where the numbers can mislead. + ## Reference ### con-duct @@ -207,7 +211,8 @@ options: running process. Sample interval must be less than or equal to report interval, and it achieves the best results when sample is significantly less than the - runtime of the process. (default: 1.0) + runtime of the process. Values below 1.0 behave + erratically. (default: 1.0) --report-interval REPORT_INTERVAL, --r-i REPORT_INTERVAL Interval in seconds at which to report aggregated data. (default: 60.0) diff --git a/docs/resource-statistics.md b/docs/resource-statistics.md new file mode 100644 index 00000000..3128d399 --- /dev/null +++ b/docs/resource-statistics.md @@ -0,0 +1,244 @@ +# Interpreting duct's resource statistics + +duct records resource usage in two places: + +- **`usage.jsonl`**: one JSON record per report interval (default: every 60 seconds), capturing per-process and session-total stats aggregated over that window. +- **`execution_summary`** (printed at exit and stored in `info.json`): a whole-run summary of peak and average values across the full execution. + +The numbers in both come from the same sampling pipeline. +This document explains what those numbers actually measure, how `con-duct plot` renders them, and where they're trustworthy vs misleading. + +## How duct samples and aggregates + +Duct polls the monitored process tree on two independent intervals: + +- **`--sample-interval` (default 1.0s)**: how often duct reads per-pid stats via `ps -s `. + Each read is a *sample*. +- **`--report-interval` (default 60.0s)**: how often duct writes an aggregated record to `usage.jsonl`. + Each record summarizes all the samples taken during that report window. + +Aggregation within a report window uses **max reduction**: + +- For each per-pid metric, the reported value is the maximum observed across all samples of that pid in the window. +- For each session-total metric (`totals.rss`, `totals.pcpu`, etc.), the reported value is the maximum observed across all samples' totals in the window. + +Consequences worth knowing: + +1. **Short spikes between samples are not recorded.** + A process that briefly allocates 10GB and frees it within a single sample interval is invisible to duct. +2. **Per-pid and session-total peaks may come from different sample moments.** + Per-pid max-reduction and total max-reduction are independent. + The same record can have `stats[A].rss = X` (A's peak from one sub-sample) and `totals.rss = Y` (the peak simultaneous total from another sub-sample). + +--- + +## CPU — `pcpu` + +### What it measures + +On Linux, `ps -o pcpu` is computed per process as: + +``` +pcpu = ((utime + stime) / (now - process_start_time)) × 100 +``` + +- `utime + stime` is **cumulative CPU time consumed by the process since it started** (kernel ticks from `/proc/[pid]/stat`). +- `now - process_start_time` is **wall-clock time elapsed since the process started**. + +So `pcpu` is *the fraction of wall time the process has spent on CPU, averaged from birth until the moment of sampling*. +It is a **lifetime average**, not an instantaneous rate. +This differs from `top(1)`, which shows instantaneous-over-refresh-interval. + +### `etime` is integer seconds + +`ps -o etime` reports elapsed time as an integer count of seconds (formatted `[[DD-]HH:]MM:SS`). +This has consequences for short-lived and freshly-spawned pids: + +- During a pid's first second of life, `etime` reads as `00:00`. + ps's `pcpu` calculation divides by this `etime`, and the result during sub-second life is unstable. +- A pid sampled at sub-second age that has accumulated meaningful CPU work across multiple threads can yield extreme `pcpu` readings. + Issue [#399](https://github.com/con/duct/issues/399) included a single pid reporting 5347% `pcpu` at `etime=3` on a 20-core machine, which is physically impossible: it came from a sub-second-young sub-sample where ps's calculation was racy. + +This is why sample intervals shorter than `1.0s` behave erratically. +Consecutive samples of the same pid often see the same integer `etime`, so derived measurements (like the `--cpu ps-cpu-timepoint` view, below) discard those points because `Δetime = 0`. + +### Three scenarios to build intuition + +#### Scenario A: long-running steady-state process at 100% CPU + +``` +t = 1s: cumulative CPU = 1.0s, elapsed = 1s → pcpu = 100% +t = 10s: cumulative CPU = 10.0s, elapsed = 10s → pcpu = 100% +t = 60s: cumulative CPU = 60.0s, elapsed = 60s → pcpu = 100% +``` + +For steady-state workloads, lifetime-average converges to instantaneous. +*This is why the mental model "pcpu = current CPU usage" works most of the time.* + +#### Scenario B: brief burst, then idle + +A process that does 1 second of 100% CPU work, then sits idle: + +``` +t = 1s: cumulative CPU = 1.0s, elapsed = 1s → pcpu = 100% +t = 2s: cumulative CPU = 1.0s, elapsed = 2s → pcpu = 50% +t = 10s: cumulative CPU = 1.0s, elapsed = 10s → pcpu = 10% +t = 100s: cumulative CPU = 1.0s, elapsed = 100s → pcpu = 1% +``` + +After the burst, `pcpu` decays toward 0. +The process "remembers" past CPU work and slowly forgets as its elapsed time grows. +Counterintuitive if you expected a real-time number. + +#### Scenario C: the pathological summation case + +Many short-lived, multi-threaded native child processes, as happens under tox when pip compiles C extensions: + +``` +Child 1 runs for 200ms on 4 cores, observed by sample at t=150ms: + cumulative CPU = 600ms, elapsed = 150ms + → pcpu reported = 400% + +…30 such children observed during a single sample… + +sum across children at sample time = 30 × 400% = 12,000% +system physical ceiling (20 cores) = 2,000% +``` + +Each individual child's number is correct for what `ps` is answering ("fraction of wall time spent on CPU, averaged from start"). +The problem is that *summing* lifetime-averages across processes that took turns on the CPU produces a total claiming work the system didn't have the cores to do. +The children ran sequentially, but the sum over the report window treats the spikes as simultaneous. + +### When `pcpu` is reliable + +| Workload shape | `pcpu` reliability | +|-----------------------------------------------|---------------------------------------------------------| +| Single long-running steady-state process | Accurate | +| Few long-running processes at steady state | Accurate | +| Bursty processes that are long-running | Accurate at the average; misses burst structure | +| Many short-lived (few second) child processes | **Unreliable: can inflate dramatically when summed** | +| Multi-threaded native code bursts | Per-process `pcpu` correct; summed totals may overshoot | + +--- + +## Memory — `rss` and `pmem` + +`ps -o rss` reports per-process **resident set size**: physical memory currently mapped into the process's address space, in kilobytes. +This counts: + +- Private pages the process has allocated and touched. +- **Shared pages** (libraries, copy-on-write memory after `fork()`) that the process has mapped, counted independently in **each** process that maps them. + +`ps -o pmem` is derived: `rss` divided by total system RAM, expressed as a percentage. +It inherits every property of `rss` and adds a host-dependent denominator. + +### The shared-page issue + +When multiple processes share the same physical page, that page appears in **each process's RSS**, but the physical page exists only once. + +Example: a Python parent process with 100MB RSS forks 10 child workers. +Immediately after fork: + +``` +Parent RSS: 100MB +Child 1 RSS: 100MB +… +Child 10 RSS: 100MB + +Sum of RSS across processes: 1100MB +Actual physical memory used: ~100MB (all shared with parent) +``` + +As children write to their copy of each page, copy-on-write triggers and the page becomes private. +At that point physical use genuinely grows. +So `sum(rss)` is a **loose upper bound** on actual usage: never less than true usage, often much more. + +For a duct-monitored Python test suite with `pytest-xdist` spawning 8 workers, expect `sum(rss)` to overstate physical memory by 3-5×. + +--- + +## What `con-duct plot` renders + +`con-duct plot ` renders, per report-interval record: + +- **Per-pid traces**: one faint dotted line per pid. + CPU is on the primary y-axis. + RSS is on a secondary axis (`twinx`) so the two scales don't fight. + Color encodes metric, not pid identity. +- **Envelope lines**: summarize the per-pid cloud at each timestamp (one solid lower bound + one dashed upper bound). +- **Optional host-memory annotation**: when `info.json` is alongside the usage file, the rss legend label includes total host RAM (e.g. `rss (host: 256.0GB)`). + Useful for SLURM contexts. + Without `info.json`, plain `rss`. + +### `--cpu` mode flag + +duct stores `pcpu` (lifetime average from ps) per pid in `usage.jsonl`. +The plot can render this two ways: + +- **`--cpu ps-pcpu` (default)**: plot the raw lifetime ratio untransformed. + "Lossless" view: every point on the chart is an unaltered ps reading. + Useful when you want to see exactly what the sampler captured. +- **`--cpu ps-cpu-timepoint`**: at plot time, derive a per-interval estimate from consecutive `(pcpu, etime)` pairs: `(curr_pcpu × curr_etime − prev_pcpu × prev_etime) / Δetime`. + This inverts ps's lifetime-average formula to extract an approximate instantaneous CPU rate. + Motivated by [Scenario C](#scenario-c-the-pathological-summation-case): lifetime averages of short-lived bursty processes overstate "current" usage by orders of magnitude. + +Both modes have caveats: + +- The raw `ps-pcpu` mode shows what ps reported, including lifetime-average inflation. + A pid that ran on 4 cores for 150ms and went idle peaks at 400% in the first report interval that observed it, then decays toward its true average as `etime` grows in subsequent intervals. +- The derived `ps-cpu-timepoint` mode is approximate (delta math on max-reduced samples). + It discards each pid's first observation (no prior point to delta against), so short-lived pids that appear in only one record drop out entirely. + CPU bursts from those pids are not visible in the timepoint view, but remain visible in the `ps-pcpu` view via `totals.pcpu`. + +### Envelope semantics + +The plot draws two envelopes over the per-pid trace cloud: + +- **Lower bound (solid)**: max-across-pids at each timestamp. + Reads as "at least this much was in use." +- **Upper bound (dashed)**: depends on what's being plotted. + - **RSS, and CPU in `ps-pcpu` mode**: `totals.*` from the record. + duct computes this as the peak simultaneous total observed across the report window's sub-samples. + - **CPU in `ps-cpu-timepoint` mode**: sum-across-pids of the derived (instantaneous) values at each timestamp. + Used here because `totals.pcpu` is a peak of *lifetime averages* and doesn't share units with the derived instantaneous values. + +## Common questions + +### Why is the raw `pcpu` line in `ps-pcpu` mode so much higher than `ncores × 100%`? + +Two compounding reasons, either of which can do it alone: + +1. **Single-pid extremes from ps.** + For pids sampled at sub-second age, ps's `cputime / etime` calculation is unstable (see [`etime` is integer seconds](#etime-is-integer-seconds)). + Individual pids can briefly report thousands of percent. +2. **Summed lifetime-averages across many short-lived pids.** + Even if each pid's `pcpu` is finite, summing lifetime averages across processes that took turns on the cores produces a total claiming work the cores couldn't have done. + See [Scenario C](#scenario-c-the-pathological-summation-case). + Most common in workloads that spawn many short-lived child processes involving native/multi-threaded code: pip install compiling C extensions, `make -j`, tox, any CI/build workflow. + +### Why is the `ps-cpu-timepoint` line lower than the `ps-pcpu` line? + +`ps-pcpu` plots the lifetime average from ps. +A burst captured early in a pid's life pulls the reported `pcpu` high, and that pid's trace decays slowly as `etime` grows. +`ps-cpu-timepoint` instead estimates an instantaneous rate per report interval, so a burst contributes only to the interval that contained it. + +Example: a pid that did 600ms of CPU on 4 cores in its first 150ms and was idle thereafter. +`ps-pcpu` shows ~400% in the first report interval and a decaying trace in subsequent intervals (until the pid dies or the trace falls off the chart). +`ps-cpu-timepoint` shows ~400% only in the burst interval and ~0% thereafter. + +The timepoint view is more "honest" about current usage but loses the cumulative-effort information that `ps-pcpu` carries. + +### Why does `totals.*` not equal `sum(per-pid max)` in a record? + +duct max-reduces per-pid stats and session totals independently within a report window. +A pid's reported `rss` is its max across sub-samples in the window; `totals.rss` is the max of the *simultaneous total* across those sub-samples. +The per-pid peaks may have happened at different moments, so summing them counts moments that never coexisted. +`totals.*` is the actual peak simultaneous footprint and is the right number for sizing. + +### My RSS chart grew a lot when I added more worker processes. Did memory usage really grow proportionally? + +Probably not. +If the workers are forked children of a common parent, each child's RSS counts the shared pages it inherited. +Per-pid traces and their max envelope grow roughly linearly with child count even when physical memory grows much less. +The dashed `totals.rss` upper bound is closer to actual physical use, but still over-counts shared libraries linked by independent processes. +See [The shared-page issue](#the-shared-page-issue). diff --git a/src/con_duct/_duct_main.py b/src/con_duct/_duct_main.py index 206167bb..e2954927 100644 --- a/src/con_duct/_duct_main.py +++ b/src/con_duct/_duct_main.py @@ -58,6 +58,12 @@ def execute( raise ValueError( "--report-interval must be greater than or equal to --sample-interval." ) + if sample_interval < 1.0: + lgr.warning( + "--sample-interval=%.3f is below 1.0s and may behave erratically. " + "See docs/resource-statistics.md for details.", + sample_interval, + ) log_paths = LogPaths.create(output_prefix, pid=os.getpid()) try: diff --git a/src/con_duct/_formatter.py b/src/con_duct/_formatter.py index 5ae89251..c804dc2b 100644 --- a/src/con_duct/_formatter.py +++ b/src/con_duct/_formatter.py @@ -9,6 +9,24 @@ lgr = logging.getLogger("con-duct") +# Decimal (SI) byte units, single source of truth for byte humanization in +# duct. Used by SummaryFormatter.naturalsize for run-summary output and by +# the plot axis formatter for tick labels, so a "kB" means the same thing +# in both places. A future opt-in could add a 1024-base + IEC-suffix +# variant (KiB/MiB/GiB/...) and let callers pick. +FILESIZE_UNITS: list[tuple[str, int]] = [ + ("B", 1), + ("kB", 1000**1), + ("MB", 1000**2), + ("GB", 1000**3), + ("TB", 1000**4), + ("PB", 1000**5), + ("EB", 1000**6), + ("ZB", 1000**7), + ("YB", 1000**8), +] + + class SummaryFormatter(string.Formatter): OK = "OK" NOK = "X" @@ -16,7 +34,6 @@ class SummaryFormatter(string.Formatter): BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(30, 38) RESET_SEQ = "\033[0m" COLOR_SEQ = "\033[1;%dm" - FILESIZE_SUFFIXES = (" kB", " MB", " GB", " TB", " PB", " EB", " ZB", " YB") def __init__(self, enable_colors: bool = False) -> None: self.enable_colors = enable_colors @@ -34,7 +51,7 @@ def naturalsize( >>> formatter.naturalsize(3000000) '3.0 MB' >>> formatter.naturalsize(3000, "%.3f") - '2.930 kB' + '3.000 kB' >>> formatter.naturalsize(10**28) '10000.0 YB' ``` @@ -46,24 +63,23 @@ def naturalsize( Returns: str: Human readable representation of a filesize. """ - base = 1000 bytes_ = float(value) abs_bytes = abs(bytes_) if abs_bytes == 1: return "%d Byte" % bytes_ - if abs_bytes < base: + if abs_bytes < 1000: return "%d Bytes" % bytes_ - for i, _s in enumerate(self.FILESIZE_SUFFIXES): - unit = base ** (i + 2) - - if abs_bytes < unit: - break - - ret: str = format % (base * bytes_ / unit) + _s - return ret + # Pick the largest unit where the value is at least 1 of that unit. + # FILESIZE_UNITS is ordered ascending; iterate to keep updating until + # one further step would underflow. + name, divisor = FILESIZE_UNITS[1] # default to "kB" (loop entry value) + for n, d in FILESIZE_UNITS[1:]: + if abs_bytes / d >= 1: + name, divisor = n, d + return f"{format % (bytes_ / divisor)} {name}" def color_word(self, s: str, color: int) -> str: """Color `s` with `color`. diff --git a/src/con_duct/_utils.py b/src/con_duct/_utils.py index 6ccd29c0..80c57773 100644 --- a/src/con_duct/_utils.py +++ b/src/con_duct/_utils.py @@ -8,6 +8,115 @@ def assert_num(*values: Any) -> None: assert isinstance(value, (float, int)) +# TODO: consider asking ps for `etimes` (seconds) directly via +# `-o etimes` instead of parsing `etime`. Even if we switch, this +# parser is worth keeping for backwards compatibility with existing +# usage.jsonl logs that persist `etime` as a string. +def etime_to_etimes(etime: str) -> float: + """Parse a ps ``etime`` string into seconds. + + ps's ``etime`` format is ``[[DD-]HH:]MM:SS``: ``MM:SS`` always, + with ``HH:`` prepended after one hour and ``DD-`` prepended after + one day. + + :param etime: elapsed-time string from ``ps -o etime``. + :returns: elapsed time in seconds. + :raises ValueError: if ``etime`` does not match the expected shape. + """ + if "-" in etime: + days_str, rest = etime.split("-", 1) + days = int(days_str) + else: + days = 0 + rest = etime + parts = rest.split(":") + if len(parts) == 2: + hours, minutes, seconds = 0, int(parts[0]), int(parts[1]) + elif len(parts) == 3: + hours, minutes, seconds = int(parts[0]), int(parts[1]), int(parts[2]) + else: + raise ValueError(f"Unparsable etime: {etime!r}") + return float(days * 86400 + hours * 3600 + minutes * 60 + seconds) + + +def is_same_pid( + prev_etimes: float, + curr_etimes: float, + wall_delta: float, + *, + tolerance: float = 2.0, +) -> bool: + """Decide whether two consecutive observations of the same pid number + correspond to the same physical process (vs kernel pid reuse). + + A continuously-existing pid's ``etime`` grows by exactly the + wall-clock time between two samples (etime is wall-since-fork, + not cputime). ps reports ``etime`` at integer-second resolution, + so the only slack is rounding plus small clock drift. + + A stricter ``etime_2 < etime_1`` check would miss reuses where + the new pid's etime crept above the old pid's last reading by + less than the sample interval. Comparing ``Δetime`` against + ``Δwall`` catches those too. + + :param prev_etimes: elapsed seconds at the earlier sample. + :param curr_etimes: elapsed seconds at the later sample. + :param wall_delta: wall-clock seconds between the two samples. + :param tolerance: slack (seconds) absorbed by ps's integer-second + ``etime`` plus clock drift. 2 seconds is plenty in practice. + :returns: ``True`` iff ``Δetime`` is close enough to ``Δwall`` + to be the same continuous process. + """ + return curr_etimes - prev_etimes >= wall_delta - tolerance + + +def pdcpu_from_pcpu( + prev_pcpu: float, + prev_etimes: float, + curr_pcpu: float, + curr_etimes: float, +) -> float | None: + """Delta-corrected %CPU between two ps samples of the same pid. + + Inverts the procps identity ``pcpu = cputime / etime * 100`` to + recover cputime at each sample, takes the cputime delta, and + divides by the elapsed interval. The ``/100`` and ``*100`` + cancel, so the result is in the same units as ``pcpu``. + + Linux-only: assumes ``pcpu`` is the cumulative ``cputime/etime`` + ratio. Invalid on Darwin (decayed EWMA). + + Identity is the caller's responsibility: invoke ``is_same_pid`` + first. If the inputs satisfy that precondition, ``Δetime`` is + positive and the math is well-defined. + + :param prev_pcpu: %CPU from the earlier sample. + :param prev_etimes: elapsed seconds at the earlier sample. + :param curr_pcpu: %CPU from the later sample. + :param curr_etimes: elapsed seconds at the later sample. + :returns: delta-corrected %CPU over the interval, or ``None`` + in two "no measurement" cases: + + - ``Δetime <= 0`` -- defensive guard for callers that + skipped ``is_same_pid``; covers sub-quantum and obvious + pid-reuse. + - Computed ``pdcpu < 0`` -- aggregation-timing artifact. + When duct's per-pid ``pcpu`` is max-across-samples while + ``etime`` is from the last sample, a spike-then-idle + pattern earlier in the run inflates ``prev_pcpu * + prev_etimes`` enough that the cputime "delta" goes + negative. The pid is the same; the math is just noisy. + A small minus dip is honestly null, not zero. + """ + interval = curr_etimes - prev_etimes + if interval <= 0: + return None + pdcpu = (curr_pcpu * curr_etimes - prev_pcpu * prev_etimes) / interval + if pdcpu < 0: + return None + return pdcpu + + def parse_version(version_str: str) -> tuple[int, int, int]: x_y_z = version_str.split(".") if len(x_y_z) != 3: diff --git a/src/con_duct/cli.py b/src/con_duct/cli.py index 258d00aa..32997efc 100644 --- a/src/con_duct/cli.py +++ b/src/con_duct/cli.py @@ -11,7 +11,7 @@ from con_duct._duct_main import execute as duct_execute from con_duct._models import Outputs, RecordTypes, SessionMode from con_duct.ls import LS_FIELD_CHOICES, ls -from con_duct.plot import matplotlib_plot +from con_duct.plot import CPU_MODE_PS_PCPU, CPU_MODES, matplotlib_plot from con_duct.pprint_json import pprint_json # Default .env file search paths (in precedence order) @@ -300,7 +300,8 @@ def _create_run_parser() -> argparse.ArgumentParser: default=float(os.getenv("DUCT_SAMPLE_INTERVAL", "1.0")), help="Interval in seconds between status checks of the running process. " "Sample interval must be less than or equal to report interval, and it achieves the " - "best results when sample is significantly less than the runtime of the process.", + "best results when sample is significantly less than the runtime of the process. " + "Values below 1.0 behave erratically.", ) parser.add_argument( "--report-interval", @@ -399,6 +400,15 @@ def _create_plot_parser() -> argparse.ArgumentParser: help="Minimum ratio for axis unit selection (default: 3.0). Lower values use larger units sooner. " "Use -1 to always use base units (seconds, bytes).", ) + parser.add_argument( + "--cpu", + choices=CPU_MODES, + default=CPU_MODE_PS_PCPU, + help="Which CPU value to plot. 'ps-pcpu' uses the raw lifetime ratio " + "from ps with no transformation. 'ps-cpu-timepoint' computes a " + "delta-corrected time-point estimate from consecutive (pcpu, etime) " + "pairs.", + ) return parser diff --git a/src/con_duct/plot.py b/src/con_duct/plot.py index 2ac424f8..b3ebf756 100644 --- a/src/con_duct/plot.py +++ b/src/con_duct/plot.py @@ -1,11 +1,39 @@ +"""Resource-usage plotting for con-duct. + +Renders a per-pid CPU / rss cloud overlaid by envelopes: +max-across-pids as a lower bound, and either ``totals.*`` from the +record (RSS, and CPU in ``ps-pcpu`` mode) or sum-across-pids of the +derived values (CPU in ``ps-cpu-timepoint`` mode) as the upper +bound. CPU lives on the primary y-axis (percent), rss on a +secondary y-axis (bytes). + +The per-pid overlay is loosely modeled on brainlife's smon task viewer: +https://github.com/brainlife/warehouse/blob/b833b98e3518181eacef71cc04ae773a7592b1a8/ui/src/modals/taskinfo.vue +""" + import argparse from datetime import datetime import json import logging from pathlib import Path -from typing import Any, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple +from con_duct._constants import SUFFIXES +from con_duct._formatter import FILESIZE_UNITS, SummaryFormatter +from con_duct._utils import etime_to_etimes, is_same_pid, pdcpu_from_pcpu from con_duct.json_utils import is_info_file, load_info_file, load_usage_file +# Color per metric (all pid lines for that metric share this color). +PCPU_COLOR = "tab:orange" +RSS_COLOR = "tab:blue" + +# --cpu choices. +# ps-pcpu: raw lifetime ratio from `ps -o pcpu`, no transformation. +# ps-cpu-timepoint: delta-corrected pdcpu computed from consecutive +# (pcpu, etime) pairs -- our derived time-point estimate. +CPU_MODE_PS_PCPU = "ps-pcpu" +CPU_MODE_PS_CPU_TIMEPOINT = "ps-cpu-timepoint" +CPU_MODES = (CPU_MODE_PS_PCPU, CPU_MODE_PS_CPU_TIMEPOINT) + lgr = logging.getLogger(__name__) _TIME_UNITS = [ @@ -15,15 +43,6 @@ ("d", 86400), ] -_MEMORY_UNITS = [ - ("B", 1), - ("KB", 1024**1), - ("MB", 1024**2), - ("GB", 1024**3), - ("TB", 1024**4), - ("PB", 1024**5), -] - # Class in a Class to avoid importing matplotlib until we need it. class HumanizedAxisFormatter: @@ -66,6 +85,160 @@ def __call__(self, x: float, _pos: Optional[int] = 0) -> str: return _HumanizedAxisFormatter(min_ratio=min_ratio, units=units) +def _build_pid_series( + data: List[Dict[str, Any]], + cpu_mode: str = CPU_MODE_PS_PCPU, +) -> Dict[str, Dict[str, Any]]: + """Walk usage records once, return per-pid time series. + + For each pid present in any record, returns aligned lists of + ``elapsed`` (seconds since first record), ``cpu``, ``pmem``, + ``rss``. + + The ``cpu`` field is populated according to ``cpu_mode``: + + - ``ps-pcpu``: raw ``pcpu`` from each record, untransformed. Every + record contributes a value; no entry is dropped. + - ``ps-cpu-timepoint``: delta-corrected pdcpu computed from + consecutive (etime, pcpu) pairs. ``None`` for first observation + per pid, records with ``etime == "00:00"``, sub-quantum + intervals, pid reuse, and the negative-pdcpu clamp. + """ + if not data: + return {} + base_ts = datetime.fromisoformat(data[0]["timestamp"]) + pid_state: Dict[str, Optional[Tuple[float, float, datetime]]] = {} + series: Dict[str, Dict[str, Any]] = {} + for entry in data: + entry_ts = datetime.fromisoformat(entry["timestamp"]) + elapsed = (entry_ts - base_ts).total_seconds() + for pid, p in entry.get("processes", {}).items(): + pcpu = float(p.get("pcpu", 0.0)) + cpu_value: Optional[float] + if cpu_mode == CPU_MODE_PS_PCPU: + cpu_value = pcpu + else: + try: + etime_sec = etime_to_etimes(p.get("etime", "")) + except ValueError: + continue + # Use the per-process timestamp (the moment this pid was last + # sampled) for the wall-delta in is_same_pid -- not the report + # timestamp. duct emits each report at the end of its interval, + # but a short-lived pid's last sample within the interval can + # be many seconds earlier. Comparing Δetime against Δreport + # timestamp would falsely flag a continuous-but-short pid as + # reused. + proc_ts = datetime.fromisoformat(p.get("timestamp", entry["timestamp"])) + prev = pid_state.get(pid) + cpu_value = None + if etime_sec != 0.0 and prev is not None: + prev_etime, prev_pcpu, prev_proc_ts = prev + wall_delta = (proc_ts - prev_proc_ts).total_seconds() + if is_same_pid(prev_etime, etime_sec, wall_delta): + cpu_value = pdcpu_from_pcpu( + prev_pcpu, prev_etime, pcpu, etime_sec + ) + # else: pid reuse -- cpu stays None, re-baseline below. + # Don't baseline from etime=0 -- next sample is a "first observation". + pid_state[pid] = ( + None if etime_sec == 0.0 else (etime_sec, pcpu, proc_ts) + ) + entry_series = series.setdefault( + pid, + { + "cmd": p.get("cmd", ""), + "elapsed": [], + "cpu": [], + "pmem": [], + "rss": [], + }, + ) + entry_series["elapsed"].append(elapsed) + entry_series["cpu"].append(cpu_value) + entry_series["pmem"].append(float(p.get("pmem", 0.0))) + entry_series["rss"].append(float(p.get("rss", 0.0))) + return series + + +def _envelopes( + series: Dict[str, Dict[str, Any]], + metric: str, +) -> Tuple[List[float], List[float], List[float]]: + """Per-grid-timestamp max and sum across kept pids for ``metric``. + + Each pid contributes a value at every entry-elapsed timestamp where it + appeared *and* its value at that timestamp is not ``None`` (pdcpu can be + ``None`` for first-observation, etime=0, sub-quantum, pid-reuse, or the + negative-pdcpu clamp). Grid points where no kept pid had a value are + omitted entirely rather than reported as zero -- a missing measurement + is not the same as zero load. + + The grid is the union of (kept pids') elapsed values; the alternative + of "every entry timestamp regardless of who appeared" would only add + grid points that are missing measurements anyway. + + :returns: ``(grid_xs, max_ys, sum_ys)`` aligned, sorted by ``grid_xs``. + """ + grid: Dict[float, List[float]] = {} + for s in series.values(): + for x, v in zip(s["elapsed"], s[metric]): + if v is None: + continue + grid.setdefault(x, []).append(v) + xs = sorted(grid.keys()) + return xs, [max(grid[x]) for x in xs], [sum(grid[x]) for x in xs] + + +def _totals_series( + data: List[Dict[str, Any]], field: str +) -> Tuple[List[float], List[float]]: + """Return ``(elapsed, totals[field])`` per record. + + ``totals[field]`` is duct's max-of-(sum-per-sample) within each report + interval -- the highest concurrent value observed at any single sample + in that interval. Used as the upper-bound line on the chart, replacing + sum-of-per-pid-peaks (which over-counts pids whose peaks never + coexisted within the same sample -- "phantom coexistence"). + """ + if not data: + return [], [] + base = datetime.fromisoformat(data[0]["timestamp"]) + xs: List[float] = [] + ys: List[float] = [] + for entry in data: + xs.append((datetime.fromisoformat(entry["timestamp"]) - base).total_seconds()) + ys.append(float(entry["totals"][field])) + return xs, ys + + +def _load_host_memory_total(file_path: Path) -> Optional[int]: + """Best-effort lookup of ``system.memory_total`` (bytes) from info.json. + + Accepts either an info.json path or a usage path; for the latter, falls + back to a sibling info.json named by stripping the usage suffix and + appending ``info.json``. Returns ``None`` on any failure -- the caller + treats absence as "host RAM unknown" and renders a plain legend label. + """ + try: + if is_info_file(str(file_path)): + info_data = load_info_file(str(file_path)) + else: + usage_str = str(file_path) + sibling: Optional[Path] = None + for suffix in (SUFFIXES["usage"], SUFFIXES["usage_legacy"]): + if usage_str.endswith(suffix): + sibling = Path(usage_str[: -len(suffix)] + SUFFIXES["info"]) + break + if sibling is None or not sibling.exists(): + return None + info_data = load_info_file(str(sibling)) + value = info_data["system"]["memory_total"] + return int(value) + except (FileNotFoundError, KeyError, ValueError, TypeError, json.JSONDecodeError): + return None + + def matplotlib_plot(args: argparse.Namespace) -> int: try: import matplotlib @@ -74,8 +247,8 @@ def matplotlib_plot(args: argparse.Namespace) -> int: if args.output is not None: matplotlib.use("Agg") + from matplotlib.lines import Line2D import matplotlib.pyplot as plt - import numpy as np except ImportError as e: lgr.error("con-duct plot failed: missing dependency: %s", e) return 1 @@ -103,7 +276,8 @@ def matplotlib_plot(args: argparse.Namespace) -> int: ) # Handle info.json files by determining the path to usage file - file_path = Path(args.file_path) + arg_path = Path(args.file_path) + file_path = arg_path if is_info_file(str(file_path)): try: info_data = load_info_file(str(file_path)) @@ -112,6 +286,7 @@ def matplotlib_plot(args: argparse.Namespace) -> int: except (FileNotFoundError, KeyError, json.JSONDecodeError) as e: lgr.error("Error reading info file %s: %s", args.file_path, e) return 1 + host_memory_total = _load_host_memory_total(arg_path) try: data = load_usage_file(str(file_path)) @@ -123,19 +298,9 @@ def matplotlib_plot(args: argparse.Namespace) -> int: return 1 try: - # Convert timestamps to datetime objects - timestamps = [datetime.fromisoformat(entry["timestamp"]) for entry in data] - - # Calculate elapsed time in seconds - elapsed_time = np.array( - [(ts - timestamps[0]).total_seconds() for ts in timestamps] - ) - - # Extract other data - pmem = np.array([entry["totals"]["pmem"] for entry in data]) - pcpu = np.array([entry["totals"]["pcpu"] for entry in data]) - rss_kb = np.array([entry["totals"]["rss"] for entry in data]) - vsz_kb = np.array([entry["totals"]["vsz"] for entry in data]) + pid_series = _build_pid_series(data, cpu_mode=args.cpu) + totals_rss_xs, totals_rss_ys = _totals_series(data, "rss") + totals_pcpu_xs, totals_pcpu_ys = _totals_series(data, "pcpu") except KeyError as e: lgr.error("Usage file %s is missing required field: %s", file_path, e) return 1 @@ -146,35 +311,132 @@ def matplotlib_plot(args: argparse.Namespace) -> int: lgr.error("Error processing usage file %s: %s", file_path, e) return 1 - # Plotting - fig, ax1 = plt.subplots() + fig, ax = plt.subplots() + ax2 = ax.twinx() # type: ignore[attr-defined] + + # Per-pid traces: dotted, faint, single color per metric. The cloud of + # pid lines reads as background texture; the envelopes carry the signal. + for s in pid_series.values(): + pdcpu_xs = [t for t, v in zip(s["elapsed"], s["cpu"]) if v is not None] + pdcpu_ys = [v for v in s["cpu"] if v is not None] + if pdcpu_xs: + ax.plot( # type: ignore[call-arg] + pdcpu_xs, + pdcpu_ys, + color=PCPU_COLOR, + linestyle=":", + linewidth=0.8, + alpha=0.4, + ) + ax2.plot( # type: ignore[call-arg] + s["elapsed"], + s["rss"], + color=RSS_COLOR, + linestyle=":", + linewidth=0.8, + alpha=0.4, + ) + + # Envelopes: max-across-pids (lower bound) solid, upper bound dashed. + # If some pid was at 50%, the total was at least 50% -- max-of-pids is + # a true lower bound on the concurrent total in both metrics. + # + # Upper bounds: for both rss and (in ps-pcpu mode) cpu, we use duct's + # per-record ``totals[field]`` -- the peak concurrent value observed + # at any single sub-sample within the report interval. This is a + # tight upper bound under "observed samples only" framing, and avoids + # the phantom-coexistence inflation of summing per-pid peaks (pids + # whose peaks fell in different sub-samples within the interval would + # otherwise both contribute their peak). + # + # In ps-cpu-timepoint mode there is no per-record ``totals.pdcpu`` -- + # pdcpu is computed at plot time -- so we fall back to summing per-pid + # pdcpu. The negative-pdcpu clamp filters the worst aggregation-timing + # artifacts; remaining looseness is a known, accepted caveat. + pcpu_xs, pcpu_max, pcpu_sum = _envelopes(pid_series, "cpu") + if pcpu_xs: + ax.plot( # type: ignore[call-arg] + pcpu_xs, pcpu_max, color=PCPU_COLOR, linestyle="-", linewidth=2.0 + ) + if args.cpu == CPU_MODE_PS_PCPU: + if totals_pcpu_xs: + ax.plot( # type: ignore[call-arg] + totals_pcpu_xs, + totals_pcpu_ys, + color=PCPU_COLOR, + linestyle="--", + linewidth=1.5, + ) + else: + ax.plot( # type: ignore[call-arg] + pcpu_xs, pcpu_sum, color=PCPU_COLOR, linestyle="--", linewidth=1.5 + ) + rss_xs, rss_max, _ = _envelopes(pid_series, "rss") + if rss_xs: + ax2.plot( # type: ignore[call-arg] + rss_xs, rss_max, color=RSS_COLOR, linestyle="-", linewidth=2.0 + ) + if totals_rss_xs: + ax2.plot( # type: ignore[call-arg] + totals_rss_xs, + totals_rss_ys, + color=RSS_COLOR, + linestyle="--", + linewidth=1.5, + ) - # Plot pmem and pcpu on primary y-axis - ax1.plot(elapsed_time, pmem, label="pmem (%)", color="tab:blue") - ax1.plot(elapsed_time, pcpu, label="pcpu (%)", color="tab:orange") - ax1.set_xlabel("Elapsed Time") - ax1.set_ylabel("Percentage") - ax1.legend(loc="upper left") + ax.set_xlabel("Elapsed Time") + ax.set_ylabel(f"{args.cpu} (%)") + ax2.set_ylabel("rss") + if pid_series: + # Two legends, color-agnostic linestyle key on the right and metric + # color key on the left. Linestyle entries are listed in the order + # a viewer's eye scans the chart: upper bound (the high dashed line), + # lower bound (the solid line below it), per-pid (dotted cloud). + style_handles = [ + Line2D( + [0], + [0], + color="black", + linestyle="--", + linewidth=1.5, + label="upper bound", + ), + Line2D( + [0], + [0], + color="black", + linestyle="-", + linewidth=2.0, + label="lower bound", + ), + Line2D( + [0], [0], color="black", linestyle=":", linewidth=0.8, label="per-pid" + ), + ] + style_legend = ax.legend( # type: ignore[call-arg] + handles=style_handles, loc="upper right", fontsize=9 + ) + ax.add_artist(style_legend) # type: ignore[attr-defined] + rss_label = "rss" + if host_memory_total is not None: + rss_label = ( + f"rss (host: {SummaryFormatter().naturalsize(host_memory_total)})" + ) + color_handles = [ + Line2D([0], [0], color=PCPU_COLOR, linewidth=2.0, label=args.cpu), + Line2D([0], [0], color=RSS_COLOR, linewidth=2.0, label=rss_label), + ] + ax.legend(handles=color_handles, loc="upper left", fontsize=9) # type: ignore[call-arg] - ax1.xaxis.set_major_formatter( # type: ignore[attr-defined] + ax.xaxis.set_major_formatter( # type: ignore[attr-defined] HumanizedAxisFormatter(min_ratio=args.min_ratio, units=_TIME_UNITS) ) - - # Create a second y-axis for rss and vsz - ax2 = ax1.twinx() # type: ignore[attr-defined] - ax2.plot(elapsed_time, rss_kb, label="rss", color="tab:green") - ax2.plot(elapsed_time, vsz_kb, label="vsz", color="tab:red") - ax2.set_ylabel("Memory") - ax2.legend(loc="upper right") - ax2.yaxis.set_major_formatter( # type: ignore[attr-defined] - HumanizedAxisFormatter(min_ratio=args.min_ratio, units=_MEMORY_UNITS) + HumanizedAxisFormatter(min_ratio=args.min_ratio, units=FILESIZE_UNITS) ) - plt.title("Resource Usage Over Time") - - # Adjust layout to prevent labels from being cut off - plt.tight_layout() # type: ignore[attr-defined] + plt.title("Resource Usage Over Time (per pid)") if args.output is not None: plt.savefig(args.output) diff --git a/test/duct_main/test_duct_utils.py b/test/duct_main/test_duct_utils.py index c25a4991..fefc2d52 100644 --- a/test/duct_main/test_duct_utils.py +++ b/test/duct_main/test_duct_utils.py @@ -1,7 +1,12 @@ """Tests for utility functions in _duct_main.py""" import pytest -from con_duct._utils import assert_num +from con_duct._utils import ( + assert_num, + etime_to_etimes, + is_same_pid, + pdcpu_from_pcpu, +) @pytest.mark.parametrize("input_value", [0, 1, 2, -1, 100, 0.001, -1.68]) @@ -13,3 +18,129 @@ def test_assert_num_green(input_value: int) -> None: def test_assert_num_red(input_value: int) -> None: with pytest.raises(AssertionError): assert_num(input_value) + + +@pytest.mark.parametrize( + "etime,expected", + [ + ("00:42", 42.0), + ("01:30", 90.0), + ("01:00:00", 3600.0), + ("12:34:56", 12 * 3600 + 34 * 60 + 56), + ("02-03:04:05", 2 * 86400 + 3 * 3600 + 4 * 60 + 5), + ("100-00:00:00", 100 * 86400.0), + ], +) +def test_etime_to_etimes_green(etime: str, expected: float) -> None: + assert etime_to_etimes(etime) == expected + + +@pytest.mark.parametrize("etime", ["", "garbage", "12", "1:2:3:4", "ab:cd"]) +def test_etime_to_etimes_red(etime: str) -> None: + with pytest.raises(ValueError): + etime_to_etimes(etime) + + +@pytest.mark.parametrize( + "prev_pcpu,prev_etimes,curr_pcpu,curr_etimes,expected", + [ + # Motivating con/duct#399 case: 100% for 60s then idle for 60s. + # Lifetime pcpu still reads 50% at t=120 (60 cputime / 120 + # etime), but the corrected reading is 0%. + (100.0, 60.0, 50.0, 120.0, 0.0), + # Constant 84% load across a 10s interval. + (80.0, 10.0, 82.0, 20.0, 84.0), + # Pid that ramps from 50% lifetime to 75% lifetime over 100s + # of new wall time -> 100% during the new interval. + (50.0, 100.0, 75.0, 200.0, 100.0), + ], +) +def test_pdcpu_from_pcpu_green( + prev_pcpu: float, + prev_etimes: float, + curr_pcpu: float, + curr_etimes: float, + expected: float, +) -> None: + assert pdcpu_from_pcpu(prev_pcpu, prev_etimes, curr_pcpu, curr_etimes) == expected + + +@pytest.mark.parametrize( + "prev_pcpu,prev_etimes,curr_pcpu,curr_etimes", + [ + # etimes regressed -> suspected pid reuse; defensive guard. + (80.0, 100.0, 10.0, 2.0), + # Same instant -> interval is zero, no rate definable. + (50.0, 100.0, 50.0, 100.0), + ], +) +def test_pdcpu_from_pcpu_returns_none_when_no_interval( + prev_pcpu: float, + prev_etimes: float, + curr_pcpu: float, + curr_etimes: float, +) -> None: + assert pdcpu_from_pcpu(prev_pcpu, prev_etimes, curr_pcpu, curr_etimes) is None + + +@pytest.mark.parametrize( + "prev_pcpu,prev_etimes,curr_pcpu,curr_etimes", + [ + # Aggregation-timing skew: max pcpu in prev was captured early in + # the prev interval (pcpu_max=400 on a 4-core box, then idle). + # Curr's max happens late, so curr's pcpu*etime estimate of cputime + # is much smaller than prev's. Δcputime goes negative even though + # the pid is continuous (Δetime ≈ Δwall). + (400.0, 59.0, 100.0, 119.0), + # Smaller version of the same effect. + (50.0, 60.0, 5.0, 120.0), + ], +) +def test_pdcpu_from_pcpu_returns_none_on_negative_result( + prev_pcpu: float, + prev_etimes: float, + curr_pcpu: float, + curr_etimes: float, +) -> None: + """Negative pdcpu == aggregation-timing artifact (not pid reuse). + + See is_same_pid for identity; this clamp catches the residual case + where identity holds but the max-vs-end-etime mismatch in duct's + aggregated records produces a spurious negative delta. + """ + assert pdcpu_from_pcpu(prev_pcpu, prev_etimes, curr_pcpu, curr_etimes) is None + + +@pytest.mark.parametrize( + "prev_etimes,curr_etimes,wall_delta,expected", + [ + # Continuous pid: Δetime exactly matches Δwall. + (50.0, 110.0, 60.0, True), + # 1-second slack from ps integer rounding -- still same pid. + (50.0, 109.0, 60.0, True), + # 2-second slack at the tolerance boundary -- still same pid. + (50.0, 108.0, 60.0, True), + # Concrete con/duct#399 case: pid 3323259 went 49->54 over 60s of + # wall time. 5 << 58. Definitely a different physical process. + (49.0, 54.0, 60.0, False), + # etime regressed -- obvious reuse. + (100.0, 5.0, 60.0, False), + # Sub-quantum: same instant, no time elapsed for either etime or + # wall -- treat as continuous (no reuse signal). + (50.0, 50.0, 0.0, True), + ], +) +def test_is_same_pid( + prev_etimes: float, + curr_etimes: float, + wall_delta: float, + expected: bool, +) -> None: + assert is_same_pid(prev_etimes, curr_etimes, wall_delta) is expected + + +def test_is_same_pid_tolerance_kwarg() -> None: + # 5s gap inside default tolerance? No (default is 2s). + assert not is_same_pid(50.0, 105.0, 60.0) + # ...but it is inside a 6s tolerance. + assert is_same_pid(50.0, 105.0, 60.0, tolerance=6.0) diff --git a/test/test_plot.py b/test/test_plot.py index eda2d569..fbd5fd14 100644 --- a/test/test_plot.py +++ b/test/test_plot.py @@ -1,13 +1,16 @@ """Tests for plot command.""" import argparse +import json import os +from pathlib import Path from typing import Any, List, Tuple from unittest.mock import MagicMock, Mock, mock_open, patch import pytest pytest.importorskip("matplotlib") from con_duct import cli, plot # noqa: E402 +from con_duct._formatter import FILESIZE_UNITS # noqa: E402 @pytest.mark.parametrize( @@ -52,12 +55,12 @@ def test_pick_unit_with_varying_ratios( (plot._TIME_UNITS, (0, 300), 2.3 * 60, "2.3min"), (plot._TIME_UNITS, (0, 11000), 7.8 * 60 * 60, "7.8h"), (plot._TIME_UNITS, (0, 260000), 3.2 * 60 * 60 * 24, "3.2d"), - # Memory formatting tests - (plot._MEMORY_UNITS, (0, 5 * 1024), 2.6 * 1024, "2.6KB"), - (plot._MEMORY_UNITS, (0, 4 * 1024**2), 1.5 * (1024**2), "1.5MB"), - (plot._MEMORY_UNITS, (0, 3 * 1024**3), 8.3 * 1024**3, "8.3GB"), - (plot._MEMORY_UNITS, (0, 3 * 1024**4), 1.3 * 1024**4, "1.3TB"), - (plot._MEMORY_UNITS, (0, 3.1 * 1024**5), 6.5 * 1024**5, "6.5PB"), + # Memory formatting tests (base 1000, kB/MB/GB/TB/PB). + (FILESIZE_UNITS, (0, 5 * 1000), 2.6 * 1000, "2.6kB"), + (FILESIZE_UNITS, (0, 4 * 1000**2), 1.5 * (1000**2), "1.5MB"), + (FILESIZE_UNITS, (0, 3 * 1000**3), 8.3 * 1000**3, "8.3GB"), + (FILESIZE_UNITS, (0, 3 * 1000**4), 1.3 * 1000**4, "1.3TB"), + (FILESIZE_UNITS, (0, 3.1 * 1000**5), 6.5 * 1000**5, "6.5PB"), ], ) def test_formatter_output( @@ -85,6 +88,7 @@ def test_matplotlib_plot_sanity(self, mock_plot_save: MagicMock) -> None: func=plot.matplotlib_plot, log_level="INFO", min_ratio=3.0, + cpu="ps-pcpu", ) assert cli.execute(args) == 0 mock_plot_save.assert_called_once_with("outfile.png") @@ -102,6 +106,7 @@ def test_matplotlib_plot_uses_agg_backend_with_output( func=plot.matplotlib_plot, log_level="INFO", min_ratio=3.0, + cpu="ps-pcpu", ) assert cli.execute(args) == 0 mock_use.assert_called_once_with("Agg") @@ -144,6 +149,7 @@ def test_matplotlib_plot_info_json(self, mock_plot_save: MagicMock) -> None: func=plot.matplotlib_plot, log_level="INFO", min_ratio=3.0, + cpu="ps-pcpu", ) assert cli.execute(args) == 0 mock_plot_save.assert_called_once_with("outfile.png") @@ -167,6 +173,7 @@ def test_matplotlib_plot_info_json_absolute_path( func=plot.matplotlib_plot, log_level="INFO", min_ratio=3.0, + cpu="ps-pcpu", ) assert cli.execute(args) == 0 mock_plot_save.assert_called_once_with("outfile.png") @@ -211,6 +218,7 @@ def test_matplotlib_plot_non_interactive_backend( func=plot.matplotlib_plot, log_level="INFO", min_ratio=3.0, + cpu="ps-pcpu", ) result = cli.execute(args) assert result == 1 @@ -233,6 +241,7 @@ def test_matplotlib_plot_non_interactive_backend_with_get_backend( func=plot.matplotlib_plot, log_level="INFO", min_ratio=3.0, + cpu="ps-pcpu", ) result = cli.execute(args) assert result == 1 @@ -253,6 +262,7 @@ def test_matplotlib_plot_interactive_backend_with_get_backend( func=plot.matplotlib_plot, log_level="INFO", min_ratio=3.0, + cpu="ps-pcpu", ) result = cli.execute(args) assert result == 0 @@ -295,8 +305,112 @@ def test_matplotlib_plot_no_backend_registry( func=plot.matplotlib_plot, log_level="INFO", min_ratio=3.0, + cpu="ps-pcpu", ) result = cli.execute(args) assert result == 0 mock_show.assert_called_once() assert "matplotlib < 3.9" in caplog.text + + +class TestBuildPidSeriesCpuMode: + """``cpu`` series content varies by ``cpu_mode``.""" + + @staticmethod + def _record(ts: str, pid: str, pcpu: float, etime: str) -> dict: + return { + "timestamp": ts, + "processes": { + pid: { + "pcpu": pcpu, + "pmem": 0.0, + "rss": 0, + "etime": etime, + "timestamp": ts, + "cmd": "x", + } + }, + } + + def test_ps_pcpu_takes_raw_values(self) -> None: + data = [ + self._record("2026-05-06T00:00:00", "1", 1.5, "00:01"), + self._record("2026-05-06T00:01:00", "1", 2.5, "01:01"), + self._record("2026-05-06T00:02:00", "1", 3.5, "02:01"), + ] + series = plot._build_pid_series(data, cpu_mode=plot.CPU_MODE_PS_PCPU) + assert series["1"]["cpu"] == [1.5, 2.5, 3.5] + + def test_ps_pcpu_includes_etime_zero_record(self) -> None: + # In timepoint mode etime=="00:00" is dropped before delta math; in + # raw mode every record contributes its pcpu unchanged. + data = [ + self._record("2026-05-06T00:00:00", "1", 0.0, "00:00"), + self._record("2026-05-06T00:01:00", "1", 5.0, "01:00"), + ] + series = plot._build_pid_series(data, cpu_mode=plot.CPU_MODE_PS_PCPU) + assert series["1"]["cpu"] == [0.0, 5.0] + + def test_ps_cpu_timepoint_drops_first_and_etime_zero(self) -> None: + # Baseline: timepoint mode unchanged by the new flag -- first record + # is None (no prior baseline), etime=="00:00" record is None. + data = [ + self._record("2026-05-06T00:00:00", "1", 10.0, "01:00"), + self._record("2026-05-06T00:01:00", "1", 0.0, "00:00"), + self._record("2026-05-06T00:02:00", "1", 50.0, "03:00"), + ] + series = plot._build_pid_series(data, cpu_mode=plot.CPU_MODE_PS_CPU_TIMEPOINT) + cpu = series["1"]["cpu"] + assert cpu[0] is None + assert cpu[1] is None + + def test_default_is_ps_pcpu(self) -> None: + data = [self._record("2026-05-06T00:00:00", "1", 7.0, "00:30")] + series = plot._build_pid_series(data) + assert series["1"]["cpu"] == [7.0] + + +class TestLoadHostMemoryTotal: + """Best-effort host memory_total lookup for the rss legend label.""" + + def _write_info(self, path: Path, data: dict) -> None: + path.write_text(json.dumps(data)) + + def test_info_json_input(self, tmp_path: Path) -> None: + info = tmp_path / "run_info.json" + self._write_info(info, {"system": {"memory_total": 12345}}) + assert plot._load_host_memory_total(info) == 12345 + + def test_usage_jsonl_with_sibling(self, tmp_path: Path) -> None: + info = tmp_path / "run_info.json" + usage = tmp_path / "run_usage.jsonl" + self._write_info(info, {"system": {"memory_total": 67890}}) + usage.write_text("") + assert plot._load_host_memory_total(usage) == 67890 + + def test_usage_legacy_with_sibling(self, tmp_path: Path) -> None: + info = tmp_path / "run_info.json" + usage = tmp_path / "run_usage.json" + self._write_info(info, {"system": {"memory_total": 42}}) + usage.write_text("") + assert plot._load_host_memory_total(usage) == 42 + + def test_no_sibling_returns_none(self, tmp_path: Path) -> None: + usage = tmp_path / "run_usage.jsonl" + usage.write_text("") + assert plot._load_host_memory_total(usage) is None + + def test_missing_key_returns_none(self, tmp_path: Path) -> None: + info = tmp_path / "run_info.json" + self._write_info(info, {"system": {}}) + assert plot._load_host_memory_total(info) is None + + def test_invalid_json_returns_none(self, tmp_path: Path) -> None: + info = tmp_path / "run_info.json" + info.write_text("{not json") + assert plot._load_host_memory_total(info) is None + + def test_unparseable_filename_returns_none(self, tmp_path: Path) -> None: + weird = tmp_path / "weird.txt" + weird.write_text("") + assert plot._load_host_memory_total(weird) is None