diff --git a/CHANGELOG.md b/CHANGELOG.md index f48ebd4..8d5d08a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ For upstream changes, see [UPSTREAM-README.md](UPSTREAM-README.md). ### Added +- Retroactive secret sanitizer for already-indexed data (SESF-42). New `cleanup.py sanitize` CLI subcommand and `sanitize_index` MCP tool remove secrets already persisted in the Milvus `document` field, the FTS5 `content` column, and the embedding vector. Default `--dry-run` reports per-rule counts + affected turns and writes a value-free `0600` audit JSONL (`~/.sessionflow/audit/`) without touching any store; `--apply --yes` redacts in place and re-embeds the redacted text (throttled through the embedding budget, checkpointed/resumable); `--apply --yes --drop` deletes affected turns instead. New primitives: `secret_redaction.scan_spans` (per-occurrence, value-free audit spans, reusing the SESF-41 detector), `rag_engine.upsert_document` (Milvus upsert-by-PK + FTS metadata-preserving rewrite) and `delete_by_doc_id`. Both surfaces refuse to apply without explicit confirmation and never emit a secret value. **Redaction is irreversible and is not a substitute for rotation — rotate any key that was ever indexed.** - Issue-ID extraction at ingestion: every turn is scanned for issue references (`[A-Z][A-Z0-9]+-\d+`, with a technical-standard prefix denylist) and tagged into a new `issue_ids` Milvus `VARCHAR(4096)` field + FTS5 metadata column (SESF-25) - Optional `issue_id` filter on the `search_all_sessions` and `search_session` MCP tools — structured exact-token pre-filter, combinable with `provider` / `project_root` / `date_from` / `date_to` (SESF-25) - `get_issue_timeline` MCP tool and `GET /timeline` HTTP endpoint — cross-harness chronological feed of all turns referencing an issue, merged from the structured field + FTS fallback, deduped by `doc_id`, sorted oldest-first, with `limit` / `provider` / `date_from` / `date_to` (SESF-25, SESF-26) diff --git a/CLAUDE.md b/CLAUDE.md index 8028708..6c5b257 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -37,6 +37,7 @@ Semantic search over Claude Code session transcripts. Independent project, origi - **FTS5 thread affinity (SESF-13)** — `FTSIndex` keeps per-thread persistent connections (`threading.local`). Server-mode connections opened on the embed executor and request threads are isolated, and cross-thread `close_all()` is a no-op rather than a noisy WARN. - **OpenCode timestamps (SESF-14)** — `provider_adapters.normalize_timestamp()` coerces int-ms epochs (and any other numeric/datetime input) to ISO-8601 strings before they hit Milvus's `VARCHAR(64)` timestamp field. All four provider adapters route timestamps through it. - **Secret redaction guard (SESF-41)** — ingestion-time redaction hooked once in `add_turns` (covers the embedding, Milvus `document`, and FTS `content` sinks plus `add_turns_async`, and runs before `_extract_issue_ids`). Pure engine in `secret_redaction.py`: `redact(text, *, mode, allowlist) -> (redacted_text, hits)`. Config: `SESSIONFLOW_REDACT` (on/off, default **on**); `SESSIONFLOW_REDACT_MODE` (`enforce`|`report`, default **report** when unset — detect + count, store raw so operators can size the false-positive rate before enforcing); `SESSIONFLOW_REDACT_ALLOWLIST` (path to an operator regex allowlist, one pattern per line). Per-rule detection counts surface via `get_stats` under the `redaction` key. Tier-3 entropy is a length-gated Shannon scanner (detect-secrets' entropy plugins are unused — `scan_line` ignores their limit); GitHub/GitLab use custom regexes (detect-secrets reports a truncated prefix). +- **Retroactive sanitizer (SESF-42)** — removes secrets *already* indexed (the cleanup half of SESF-35; SESF-41 is the forward guard). Orchestrator in `sanitize.py` (`scan` dry-run / `apply` redact|drop), driven by `cleanup.py sanitize` (CLI) and the `sanitize_index` MCP tool. **Dry-run is the default and writes nothing**; `--apply`/`apply` **refuse without `--yes`/`confirm`**. Reuses the SESF-41 detector via `secret_redaction.scan_spans` (per-occurrence, value-free audit spans — snippets mask every detected span in-window, including advisory Tier-3 whose forcing keyword sits outside the ±24 window). New Milvus primitives `rag_engine.upsert_document` (upsert-by-PK + **metadata-preserving** FTS delete-then-insert; `new_vector=None` keeps the stored vector on FTS-only re-converge) and `delete_by_doc_id` (`DeleteResult(deleted, fts_ok)`). **FTS rewrite/delete failure keeps a turn unfinished/retryable** — FTS healing only hydrates *missing* doc_ids, so a stale row left in place would never be re-redacted. Per-run audit JSONL at `~/.sessionflow/audit/redaction-.jsonl` + checkpoint `~/.sessionflow/sanitize_state.json` (both `0700` dir / `0600` file, value-free). Sanitization is irreversible — **redaction ≠ rotation; rotate any indexed key**. ## Code Style diff --git a/README.md b/README.md index 6624f69..326f395 100644 --- a/README.md +++ b/README.md @@ -165,6 +165,45 @@ python cleanup.py backfill enqueue --provider antigravity_cli --mode recent Pause state and queued jobs persist on disk, so a restart (or LaunchAgent re-launch) resumes the same plan. +## Retroactive secret sanitizer + +If a secret was indexed before the ingestion-time redaction guard caught it, +`cleanup.py sanitize` finds and removes it from the Milvus `document` field, the +FTS5 `content` column, and the embedding vector derived from them. Detection +reuses the same engine as the ingestion guard, so what the sanitizer flags is +exactly what live ingestion would now redact. + +**Dry-run is the default** — it reports per-rule counts, the affected-turn count, +and an audit path, and writes nothing to the index: + +```bash +python cleanup.py sanitize # dry-run over the whole index +python cleanup.py sanitize --provider claude_code_cli # scope by provider +python cleanup.py sanitize --project /path/to/repo --since 2026-05-01 +``` + +`--apply` rewrites the affected turns (redact the text, re-embed, overwrite the +row). With `--drop` it deletes the affected turns instead. Both require an +explicit `--yes` — there is **no interactive prompt**. `--apply` without `--yes` +refuses before any read or write and exits non-zero: + +```bash +python cleanup.py sanitize --apply --yes # redact + re-embed in place +python cleanup.py sanitize --apply --yes --drop # delete affected turns +``` + +Scope flags (`--project`, `--provider`, `--session`, `--since`) apply to both +dry-run and apply. `--drop` is only valid with `--apply`. + +Every run writes a **value-free** JSONL audit trail under `~/.sessionflow/audit/` +(0600) — rule names, tiers, integer offsets, and pre-masked snippets only, never +a raw secret value. Output to stdout is likewise counts-only. + +> **Redaction is not safety — rotate the key.** Removing a secret from the index +> does not un-expose it. Once a credential has been written anywhere, treat it as +> compromised and rotate it at the source; the sanitizer warns about this on every +> apply but cannot perform the rotation for you. + ## Hosted embeddings — deferred Hosted/OpenAI embeddings are **deferred and not implemented in SESF-6**. diff --git a/cleanup.py b/cleanup.py index 021cf57..bbf88b9 100644 --- a/cleanup.py +++ b/cleanup.py @@ -381,6 +381,90 @@ def get_manager() -> BackfillManager: return 0 +def cmd_sanitize(args) -> int: + """Retroactively scan or remove secrets already persisted in the index. + + Thin CLI adapter over :mod:`sanitize` (SESF-42 Component 4). Default posture + is **dry-run**: report per-rule counts, the affected-turn count, and the audit + path, writing nothing. ``--apply`` rewrites (redact + re-embed) or, with + ``--drop``, deletes the affected turns — but only after an explicit ``--yes``. + + The confirmation gate is **refuse-before-writes**: ``--apply`` without ``--yes`` + prints that confirmation is required, makes no read or write (``sanitize.apply`` + is never called), and returns a non-zero exit code. This intentionally avoids + the interactive ``[y/N]`` prompt that ``reset``/``migrate-schema`` use, matching + the MCP ``apply && !confirm`` behavior (Requirement 2.4). + + Args: + args: Parsed argparse namespace with ``apply``, ``drop``, ``yes``, and the + scope flags (``project``, ``provider``, ``session``, ``since``). + + Returns: + A process exit code: ``0`` on success, non-zero when the confirmation gate + refuses an ``--apply`` run. + """ + import sanitize + + if args.drop and not args.apply: + print("--drop is only valid with --apply.", file=sys.stderr) + return 2 + + if args.apply and not args.yes: + print( + "Refusing to apply: confirmation required. Re-run with --yes to " + "rewrite or drop the affected turns.", + file=sys.stderr, + ) + return 1 + + scope = sanitize.Scope( + project_root=getattr(args, "project", None), + provider=getattr(args, "provider", None), + session_id=getattr(args, "session", None), + since=getattr(args, "since", None), + ) + + if not args.apply: + report = sanitize.scan(scope) + _print_sanitize_report(report) + return 0 + + report = sanitize.apply(scope, drop=args.drop, confirmed=True) + _print_sanitize_report(report) + print( + "\nWARNING: redaction is not rotation. The detected secret was already " + "exposed — rotate the affected key/credential at its source.", + ) + return 0 + + +def _print_sanitize_report(report) -> None: + """Print a value-free summary of a sanitize ``report`` (counts only, no values). + + Emits the mode, per-rule detection counts, affected/processed/incomplete-FTS + tallies, and the audit-file path. It deliberately reads only the report's + aggregate fields, never any document text or raw secret value. + """ + print(f"Mode: {report.mode}") + print(f"Affected turns: {report.affected_count}") + if report.mode != "dry-run": + print(f"Processed: {report.processed_count}") + if report.incomplete_fts: + print(f"Incomplete FTS: {report.incomplete_fts} (re-run to converge)") + print(f"Status: {report.status}") + + counts = report.counts or {} + if counts: + print("\nDetected secrets by rule:") + for rule, count in sorted(counts.items(), key=lambda kv: kv[1], reverse=True): + print(f" {rule}: {count}") + else: + print("\nNo secrets found in scope.") + + if report.audit_path: + print(f"\nAudit log: {report.audit_path}") + + def build_parser(): """Build the argparse parser for the cleanup CLI subcommands.""" parser = argparse.ArgumentParser( @@ -418,6 +502,38 @@ def build_parser(): ) p_migrate.add_argument("--yes", "-y", action="store_true", help="Skip confirmation") + # sanitize + p_sanitize = subparsers.add_parser( + "sanitize", + help="Retroactively scan/remove secrets already indexed (dry-run by default)", + ) + sanitize_mode = p_sanitize.add_mutually_exclusive_group() + sanitize_mode.add_argument( + "--dry-run", + action="store_true", + help="Report findings without writing (default)", + ) + sanitize_mode.add_argument( + "--apply", + action="store_true", + help="Rewrite (redact + re-embed) or drop affected turns; requires --yes", + ) + p_sanitize.add_argument( + "--drop", + action="store_true", + help="With --apply, delete affected turns instead of redacting them", + ) + p_sanitize.add_argument( + "--yes", + "-y", + action="store_true", + help="Confirm an --apply run (required; no interactive prompt)", + ) + p_sanitize.add_argument("--project", help="Restrict to a project root") + p_sanitize.add_argument("--provider", help="Restrict to a provider") + p_sanitize.add_argument("--session", help="Restrict to a session id") + p_sanitize.add_argument("--since", help="Restrict to turns at/after an ISO date") + # status p_status = subparsers.add_parser("status", help="Show provider/backfill/embedding status") p_status.add_argument("--project", help="Filter to a specific project root") @@ -462,6 +578,7 @@ def main(): "reset": cmd_reset, "stats": cmd_stats, "migrate-schema": cmd_migrate_schema, + "sanitize": cmd_sanitize, "status": cmd_status, "backfill": cmd_backfill, } diff --git a/rag_engine.py b/rag_engine.py index 28d5ebe..8a50283 100644 --- a/rag_engine.py +++ b/rag_engine.py @@ -31,7 +31,7 @@ from pymilvus.exceptions import MilvusException from contextlib import contextmanager from concurrent.futures import ThreadPoolExecutor -from typing import Iterator, List, Dict, Optional +from typing import Iterator, List, Dict, NamedTuple, Optional import asyncio import logging import sys @@ -890,7 +890,7 @@ def add_turns(turns: List[Dict], db_path: Optional[str] = None) -> int: # Stable hash: SHA-256 truncated to int64. Python's hash() is # randomized per process, so the same doc_id would get different # primary keys across server restarts. - int_id = int(hashlib.sha256(turn["doc_id"].encode()).hexdigest()[:15], 16) + int_id = _pk_from_doc_id(turn["doc_id"]) data.append({ "id": int_id, "vector": emb, @@ -1956,6 +1956,245 @@ def delete_by_session(session_id: str, db_path: Optional[str] = None) -> int: return len(results) +def _pk_from_doc_id(doc_id: str) -> int: + """Derive the stable INT64 Milvus primary key from a doc_id (SHA-256, 60-bit).""" + return int(hashlib.sha256(doc_id.encode()).hexdigest()[:15], 16) + + +class UpsertResult(NamedTuple): + """Outcome of an in-place document overwrite across both stores (SESF-42). + + Attributes: + milvus_ok: True when the Milvus upsert (atomic by primary key) succeeded. + fts_ok: True when the FTS delete-then-insert of the redacted content + succeeded. Reported distinctly from ``milvus_ok``: a Milvus-clean but + FTS-failed row still satisfies a keyword search for the old secret, so + the caller must treat ``fts_ok is False`` as unfinished/retryable and + NOT mark the row done. FTS background healing only hydrates *missing* + doc_ids, so a stale row left in place is never auto-redacted. + """ + + milvus_ok: bool + fts_ok: bool + + +class DeleteResult(NamedTuple): + """Outcome of a doc-id-scoped dual delete across both stores (SESF-42). + + Attributes: + deleted: Number of Milvus rows deleted (0 or 1). + fts_ok: True when the FTS delete of ``doc_id`` succeeded. Surfaced + distinctly so the drop path can mirror the redact contract: a turn is + only marked sanitized when its FTS row is gone too. A swallowed FTS + failure would otherwise leave the secret-bearing keyword row alive + while the caller marks the doc done. + """ + + deleted: int + fts_ok: bool + + +def get_row_by_doc_id(doc_id: str, + db_path: Optional[str] = None) -> Optional[dict]: + """Fetch a single Milvus row by ``doc_id``, or ``None`` when absent. + + Used by the SESF-42 sanitizer to read an affected row's full payload + (document + metadata) just-in-time at apply time, so a large scan never has + to cache every row's text in memory. Looks the row up by its integer primary + key (``_pk_from_doc_id``) for an O(1) point lookup rather than a VARCHAR scan; + the numeric filter needs no escaping. The returned row still carries every + field, including the original ``id`` and ``doc_id``. + + Args: + doc_id: Stable document id of the turn to read. + db_path: Optional Milvus DB path. + + Returns: + The row as a plain dict (every field, ``output_fields=["*"]``), or + ``None`` when the collection is missing or no row matches. + """ + with milvus_client(db_path) as client: + if not client.has_collection(COLLECTION_NAME): + return None + rows = client.query( + collection_name=COLLECTION_NAME, + filter=f"id == {_pk_from_doc_id(doc_id)}", + output_fields=["*"], + ) + if not rows: + return None + return dict(rows[0]) + + +def upsert_document(doc_id: str, *, new_document: str, + new_vector: Optional[list] = None, + db_path: Optional[str] = None) -> UpsertResult: + """Overwrite a single turn's document (and optionally vector) in place, then rewrite FTS. + + Reads the existing Milvus row for ``doc_id`` (preserving every metadata + field), swaps in the new ``document`` (UTF-8 truncated to the VARCHAR cap), + and upserts it by primary key (atomic — the row is either fully old or fully + new). Then rewrites the FTS sidecar as a delete-then-insert of the new + content (``FTSIndex.insert`` skips existing doc_ids, so the prior row must be + deleted first). + + When ``new_vector`` is None, the existing embedding is left untouched and only + the ``document`` field + FTS row are converged. This is the resume/no-spans + path: a row already redacted in Milvus must not have its fixed-dim HNSW vector + clobbered by a zero-length list — so callers that only need to retry the FTS + rewrite pass ``new_vector=None``. + + Unlike ``delete_by_session``, an FTS failure here is surfaced rather than + swallowed: a failed rewrite leaves the old secret-bearing row searchable, and + the FTS heal path only hydrates *missing* doc_ids, so it would never be + re-redacted by background healing. + + Args: + doc_id: Stable document id of the turn to overwrite. + new_document: Replacement document text (the redacted content). + new_vector: Replacement embedding vector for ``new_document``. When None, + the stored vector is preserved (document + FTS converge only). + db_path: Optional Milvus DB path; also derives the FTS sidecar path. + + Returns: + UpsertResult: ``milvus_ok`` and ``fts_ok`` reported independently. The + row is fully sanitized only when both are True. + """ + milvus_ok = False + row: dict = {} + try: + with milvus_client(db_path) as client: + if not client.has_collection(COLLECTION_NAME): + return UpsertResult(False, False) + # Point lookup by integer PK (O(1)) rather than a VARCHAR doc_id + # scan; kept inline so the fetched row stays inside this client/ + # upsert context for the in-place rewrite. + rows = client.query( + collection_name=COLLECTION_NAME, + filter=f"id == {_pk_from_doc_id(doc_id)}", + output_fields=["*"], + ) + if not rows: + return UpsertResult(False, False) + row = dict(rows[0]) + row["document"] = _truncate_utf8(new_document, 65535) + # Leave the existing fixed-dim vector intact when no replacement is + # supplied — never write a 0-length vector into an HNSW/COSINE row. + if new_vector is not None: + row["vector"] = new_vector + client.upsert(collection_name=COLLECTION_NAME, data=[row]) + milvus_ok = True + except Exception as e: + logger.warning("Milvus upsert failed for doc_id: %s", _scrub_exception(e)) + return UpsertResult(False, False) + + # FTS rewrite: delete-then-insert (insert dedups by doc_id, so the stale + # row must go first). FTS failure is surfaced, never swallowed. The record is + # built from the already-fetched row so every metadata column the normal + # ingest FTS record carries is preserved — content is the redacted document. + fts_ok = False + if db_path: + conn = None + try: + conn = _fts.connection(db_path) + _fts.delete(conn, "doc_id", doc_id) + _fts.insert(conn, [{ + "doc_id": doc_id, + # Use the UTF-8-truncated text stored in Milvus (row["document"]), + # not new_document — a redacted payload can expand past the 65535 + # cap, and the two stores must index identical content (SESF-42). + "content": row["document"], + "session_id": row.get("session_id", ""), + "logical_session_id": row.get( + "logical_session_id", row.get("session_id", "")), + "provider": row.get("provider", ""), + "source_kind": row.get("source_kind", ""), + "source_class": row.get("source_class", ""), + "source_id": row.get("source_id", ""), + "source_path": row.get("source_path", ""), + "git_branch": row.get("git_branch", ""), + "turn_index": row.get("turn_index", 0), + "timestamp": row.get("timestamp", ""), + "chunk_type": row.get("chunk_type", "turn"), + "project_root": row.get("project_root", ""), + "issue_ids": row.get("issue_ids", ""), + }]) + fts_ok = True + except Exception as e: + logger.warning("FTS rewrite failed for doc_id: %s", _scrub_exception(e)) + fts_ok = False + finally: + # Always release the ephemeral connection, even when the rewrite + # raised partway through — otherwise a failed sanitize leaks a + # SQLite handle per affected row. + if conn is not None: + _fts.close_ephemeral(conn) + else: + fts_ok = True + + return UpsertResult(milvus_ok, fts_ok) + + +def delete_by_doc_id(doc_id: str, db_path: Optional[str] = None) -> DeleteResult: + """Delete a single turn from both Milvus (by PK) and FTS (by doc_id). + + Mirrors ``delete_by_session``'s symmetric dual-write: the Milvus delete keys + on the stable primary key derived from ``doc_id`` (the same + ``int(sha256(doc_id)[:15], 16)`` derivation used at insert time). + + Unlike ``delete_by_session``, the FTS delete outcome is surfaced rather than + swallowed: a turn that is gone from Milvus but still keyword-searchable in FTS + has not been sanitized, and FTS healing only hydrates *missing* doc_ids, so a + surviving stale row is never auto-redacted. Callers must treat + ``fts_ok is False`` as unfinished and not mark the doc done. + + Args: + doc_id: Stable document id of the turn to delete. + db_path: Optional Milvus DB path; also derives the FTS sidecar path. + + Returns: + DeleteResult: ``deleted`` (Milvus rows removed, 0 or 1) and ``fts_ok`` + reported independently. The turn is fully removed only when the FTS + delete also succeeded. + """ + pk = _pk_from_doc_id(doc_id) + deleted = 0 + with milvus_client(db_path) as client: + if not client.has_collection(COLLECTION_NAME): + return DeleteResult(0, True) + results = client.query( + collection_name=COLLECTION_NAME, + filter=f"id == {pk}", + output_fields=["id"], + ) + if results: + client.delete(collection_name=COLLECTION_NAME, filter=f"id == {pk}") + deleted = len(results) + + # Also delete from FTS. The outcome is surfaced (not swallowed) so the caller + # can keep an FTS-failed doc_id on the worklist instead of marking it done. + fts_ok = False + if db_path: + conn = None + try: + conn = _fts.connection(db_path) + _fts.delete(conn, "doc_id", doc_id) + fts_ok = True + except Exception as e: + logger.warning( + "FTS delete by doc_id failed: %s", _scrub_exception(e)) + fts_ok = False + finally: + # Release the ephemeral connection on every path so a failed FTS + # delete doesn't leak a SQLite handle. + if conn is not None: + _fts.close_ephemeral(conn) + else: + fts_ok = True + + return DeleteResult(deleted, fts_ok) + + def delete_by_branch(git_branch: str, db_path: Optional[str] = None) -> int: """Delete all turns for a given git branch.""" escaped_branch = _escape_filter_scalar(git_branch) diff --git a/sanitize.py b/sanitize.py new file mode 100644 index 0000000..eca157b --- /dev/null +++ b/sanitize.py @@ -0,0 +1,683 @@ +"""Retroactive secret sanitizer orchestration for SESF-42. + +This module drives the dry-run/apply flow that retroactively removes secrets +already embedded into the SessionFlow stores. It holds *policy only*: detection +routes through :func:`secret_redaction.scan_spans`, and every store mutation goes +through the two SESF-42 :mod:`rag_engine` primitives +(:func:`rag_engine.upsert_document` / :func:`rag_engine.delete_by_doc_id`). + +The flow has two entry points: + +* :func:`scan` — a dry-run that iterates the in-scope turns, detects secrets, and + reports per-rule counts plus the affected ``doc_id`` set without writing to any + store. It still persists a value-free audit trail and a worklist checkpoint. +* :func:`apply` — the destructive pass. It refuses to read or write unless + ``confirmed`` is set, then for each affected turn either re-embeds the redacted + text and overwrites the row (``drop=False``) or deletes the row outright + (``drop=True``). Progress is checkpointed after each batch so an interrupted run + resumes from the durable worklist + done-set. + +Hard invariants (design.md "Security Considerations", AC-18): + +* No raw secret value ever reaches a log line, the audit file, stdout, or a return + value — only rule names, tiers, integer offsets, and pre-masked snippets. +* Re-embedding always flows through the shared embedding budget, never below the + 200ms MLX cooldown floor. +* Milvus filters are built via :func:`rag_engine._escape_filter_scalar`, never raw + f-string interpolation of operator/scope data. +""" + +from __future__ import annotations + +import json +import os +import time +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from threading import Event +from typing import Iterator, Optional + +import rag_engine +import secret_redaction + +# Output fields pulled from each Milvus scan row — everything needed to rebuild the +# row on upsert and to emit a value-free audit record. ``document`` is the only +# field that may carry a secret; it never leaves this module un-redacted. +_SCAN_FIELDS = [ + "doc_id", + "document", + "provider", + "source_path", + "turn_index", + "timestamp", + "session_id", + "project_root", +] + +# State-file layout under ``~/.sessionflow`` (mirrors the existing convention so a +# redirected HOME isolates them per test/run). +_STATE_DIRNAME = ".sessionflow" +_CHECKPOINT_NAME = "sanitize_state.json" +_AUDIT_DIRNAME = "audit" + +# 0600 — owner read/write only. The audit + checkpoint may reference doc ids and +# masked snippets, so they are never world-readable. +_FILE_MODE = 0o600 + + +@dataclass +class Scope: + """A selection of turns to sanitize, expressed as Milvus filter dimensions. + + Each populated dimension contributes one escaped equality clause to the Milvus + filter; ``since`` contributes a timestamp lower bound. An empty scope matches + every turn in the collection. + + Attributes: + project_root: Restrict to turns whose ``project_root`` equals this value. + provider: Restrict to a single harness provider (e.g. ``claude_code_cli``). + session_id: Restrict to a single session. + since: ISO date/timestamp lower bound; maps to ``timestamp >= ""``. + """ + + project_root: Optional[str] = None + provider: Optional[str] = None + session_id: Optional[str] = None + since: Optional[str] = None + + def to_filter(self) -> str: + """Build an escaped Milvus boolean filter for this scope. + + Returns: + A conjunction of escaped clauses, or an empty string when no dimension + is set (match-all). Every string scalar is escaped via + :func:`rag_engine._escape_filter_scalar` so a quote or backslash in + operator/scope data cannot break out of the filter literal. + """ + clauses: list[str] = [] + if self.project_root: + esc = rag_engine._escape_filter_scalar(self.project_root) + clauses.append(f'project_root == "{esc}"') + if self.provider: + esc = rag_engine._escape_filter_scalar(self.provider) + clauses.append(f'provider == "{esc}"') + if self.session_id: + esc = rag_engine._escape_filter_scalar(self.session_id) + clauses.append(f'session_id == "{esc}"') + if self.since: + esc = rag_engine._escape_filter_scalar(self.since) + clauses.append(f'timestamp >= "{esc}"') + return " and ".join(clauses) + + def as_dict(self) -> dict: + """Return the populated scope dimensions as a JSON-serializable dict.""" + out: dict = {} + if self.project_root: + out["project_root"] = self.project_root + if self.provider: + out["provider"] = self.provider + if self.session_id: + out["session_id"] = self.session_id + if self.since: + out["since"] = self.since + return out + + +@dataclass +class SanitizeReport: + """Outcome of a :func:`scan` or :func:`apply` run (value-free). + + Attributes: + mode: ``"dry-run"`` for a scan, or ``"redact"`` / ``"drop"`` for an apply. + counts: Per-rule detection histogram (e.g. ``{"AWS": 2}``); rule names only. + affected_count: Number of distinct secret-bearing turns in scope. + processed_count: Number of turns this run actually wrote (or deleted). + incomplete_fts: Count of turns whose Milvus row was cleaned but whose FTS + rewrite failed — these stay unfinished/retryable. + audit_path: Filesystem path to the JSONL audit trail for this run. + status: ``"complete"`` | ``"incomplete"`` | ``"paused"``. + rotate_warning: True on any apply run — redaction is not rotation, so the + operator must still rotate the exposed credential. + """ + + mode: str + counts: dict = field(default_factory=dict) + affected_count: int = 0 + processed_count: int = 0 + incomplete_fts: int = 0 + audit_path: Optional[str] = None + status: str = "complete" + rotate_warning: bool = False + + +def _ensure_private_dir(path: Path) -> Path: + """Create ``path`` (and parents) and force owner-only 0700 permissions. + + ``mkdir(mode=...)`` is masked by the process umask and is a no-op on an + already-existing directory, so an explicit ``chmod`` is what actually + guarantees the operator-private mode required by SESF-42 (Req 5.1) — audit + and checkpoint state may enumerate which turns held secrets. + """ + path.mkdir(parents=True, exist_ok=True, mode=0o700) + os.chmod(path, 0o700) + return path + + +def _state_dir() -> Path: + """Return ``~/.sessionflow`` (resolved against the live HOME), owner-only 0700.""" + return _ensure_private_dir(Path.home() / _STATE_DIRNAME) + + +def _checkpoint_path() -> Path: + """Return the durable checkpoint path (``~/.sessionflow/sanitize_state.json``).""" + return _state_dir() / _CHECKPOINT_NAME + + +def _audit_path(run_id: str) -> Path: + """Return the audit JSONL path for ``run_id``, creating the audit dir 0700.""" + audit_dir = _ensure_private_dir(_state_dir() / _AUDIT_DIRNAME) + return audit_dir / f"redaction-{run_id}.jsonl" + + +def _new_run_id() -> str: + """Return a short, filesystem-safe run identifier.""" + return uuid.uuid4().hex[:12] + + +def _load_checkpoint() -> Optional[dict]: + """Load the durable checkpoint, or ``None`` when absent/unreadable.""" + path = _checkpoint_path() + if not path.exists(): + return None + try: + return json.loads(path.read_text()) + except (OSError, ValueError): + return None + + +def _save_checkpoint(state: dict) -> None: + """Persist ``state`` to the checkpoint with 0600 permissions.""" + path = _checkpoint_path() + path.write_text(json.dumps(state)) + try: + os.chmod(path, _FILE_MODE) + except OSError: + pass + + +def _allowlist() -> list: + """Load the operator allowlist via rag_engine settings (impure by design).""" + _enabled, _mode, allowlist_path = rag_engine._redaction_settings() + return rag_engine.load_allowlist(allowlist_path) + + +def _db_path() -> Optional[str]: + """Return the configured Milvus DB path (used to derive the FTS sidecar).""" + try: + import cleanup + + return cleanup.get_db_path() + except Exception: # pragma: no cover - cleanup import/lookup never fails in tests + return None + + +def _iter_rows(scope: Scope, db_path: Optional[str]) -> Iterator[dict]: + """Yield each in-scope Milvus row, one at a time, via the keyset scan.""" + filter_expr = scope.to_filter() + for batch in rag_engine._query_batches( + _SCAN_FIELDS, filter_expr=filter_expr or None, db_path=db_path + ): + for row in batch: + yield dict(row) + + +class _AuditWriter: + """Append-only JSONL audit writer; emits one value-free record per span. + + Records carry only rule/tier/offset/length/masked_snippet (from the Span) plus + non-sensitive row metadata and the action. A raw secret value never reaches a + record because the snippet is pre-masked and the value itself is never copied. + """ + + def __init__(self, run_id: str): + """Open the audit file for ``run_id`` in append mode, 0600 at creation. + + Opened with ``O_CREAT | O_APPEND`` at mode 0600 so (1) a resumed run that + reuses the same ``run_id`` appends to its existing trail rather than + truncating it, and (2) the file is owner-only from the instant it is + created — closing the perms-after-create race a separate ``chmod`` left + open. + """ + self.run_id = run_id + self.path = _audit_path(run_id) + fd = os.open(self.path, os.O_WRONLY | os.O_CREAT | os.O_APPEND, _FILE_MODE) + self._handle = os.fdopen(fd, "a", encoding="utf-8") + + def write(self, *, row: dict, spans: list, action: str) -> None: + """Append one audit line per span for ``row`` under ``action``.""" + for span in spans: + record = { + "run_id": self.run_id, + "doc_id": row.get("doc_id"), + "provider": row.get("provider"), + "source_path": row.get("source_path"), + "turn_index": row.get("turn_index"), + "timestamp": row.get("timestamp"), + "rule": span.rule_name, + "tier": span.tier, + "offset": span.start, + "length": span.end - span.start, + "masked_snippet": span.masked_snippet, + "action": action, + } + self._handle.write(json.dumps(record) + "\n") + self._handle.flush() + + def close(self) -> None: + """Close the underlying file handle.""" + try: + self._handle.close() + except OSError: + pass + + +def _accumulate_counts(counts: dict, spans: list) -> None: + """Fold ``spans`` rule names into the ``counts`` histogram in place.""" + for span in spans: + counts[span.rule_name] = counts.get(span.rule_name, 0) + 1 + + +def scan(scope: Scope) -> SanitizeReport: + """Dry-run the sanitizer over ``scope`` and report findings without writing. + + Iterates every in-scope turn, detects secrets via + :func:`secret_redaction.scan_spans` in ``report`` mode, and accumulates per-rule + counts plus the affected ``doc_id`` set. A value-free audit JSONL and a worklist + checkpoint are written, but no Milvus/FTS/embedding write occurs. + + Args: + scope: The selection of turns to inspect. + + Returns: + A :class:`SanitizeReport` with ``mode="dry-run"`` summarizing the findings. + """ + run_id = _new_run_id() + allowlist = _allowlist() + db_path = _db_path() + + counts: dict = {} + worklist: list[str] = [] + audit = _AuditWriter(run_id) + try: + for row in _iter_rows(scope, db_path): + _scanned, spans = secret_redaction.scan_spans( + row.get("document", ""), mode="report", allowlist=allowlist + ) + if not spans: + continue + doc_id = row.get("doc_id") + if doc_id not in worklist: + worklist.append(doc_id) + _accumulate_counts(counts, spans) + audit.write(row=row, spans=spans, action="dry-run") + finally: + audit.close() + + _save_checkpoint( + { + "run_id": run_id, + "created_at": _now_iso(), + "mode": "scan", + "scope": scope.as_dict(), + "worklist": list(worklist), + "done": [], + "counts": counts, + "status": "scanning", + } + ) + + return SanitizeReport( + mode="dry-run", + counts=counts, + affected_count=len(worklist), + processed_count=0, + incomplete_fts=0, + audit_path=str(audit.path), + status="complete", + rotate_warning=False, + ) + + +def _now_iso() -> str: + """Return the current UTC time as an ISO-8601 string.""" + from datetime import datetime, timezone + + return datetime.now(timezone.utc).isoformat() + + +def _throttled_embed(redacted: str) -> Optional[list]: + """Embed ``redacted`` through the shared budget, honoring the cooldown floor. + + Routes the single-text re-embed through the process-wide embedding budget's + ``split_batches`` / ``before_batch`` / ``after_batch`` so the MLX cooldown floor + is never undercut. Mirrors :func:`rag_engine.add_turns`' deny handling: + + * Denied **with** a ``retry_after_seconds`` delay → sleep that delay and retry + once (the budget is asking us to wait out a cooldown). + * Denied **without** a delay → a hard pause/cap. Do **not** embed; return + ``None`` so the caller aborts the run cleanly (checkpoint + ``paused``). + + Returns: + The embedding vector for ``redacted``, or ``None`` when the budget + hard-denied the batch (the gate must not be bypassed). + """ + budget = rag_engine.get_embedding_budget() + batches = budget.split_batches([redacted]) + vector: Optional[list] = None + for batch in batches: + decision = budget.before_batch(len(batch), 0) + if not getattr(decision, "allowed", True): + delay = getattr(decision, "retry_after_seconds", 0.0) or 0.0 + if delay > 0: + # Soft deny: wait out the cooldown, then retry once. + time.sleep(delay) + decision = budget.before_batch(len(batch), 0) + if not getattr(decision, "allowed", True): + # Hard deny (pause/cap, no delay) — never embed past the gate. + return None + started = time.monotonic() + error: Optional[BaseException] = None + try: + vectors = rag_engine.embed_texts(batch, is_query=False) + vector = vectors[0] + except BaseException as exc: # noqa: BLE001 - scrub before re-raise + error = exc + rag_engine._scrub_exception_args(exc) + raise + finally: + budget.after_batch(time.monotonic() - started, len(batch), error) + return vector if vector is not None else [] + + +def _audit_meta(row: dict) -> dict: + """Extract only the value-free audit metadata from a scan row. + + Never carries ``document`` — the one field that may hold a secret. The + document is fetched just-in-time at apply time via + :func:`rag_engine.get_row_by_doc_id`, so the worklist's per-turn metadata + stays bounded regardless of collection size. + """ + return { + "provider": row.get("provider"), + "source_path": row.get("source_path"), + "turn_index": row.get("turn_index"), + "timestamp": row.get("timestamp"), + } + + +def _build_worklist( + scope: Scope, + allowlist: list, + db_path: Optional[str], +) -> tuple[list[str], dict[str, dict], dict]: + """Scan ``scope`` to build the affected worklist + audit metadata + counts. + + Streams the scope one row at a time and keeps **only** the doc_ids of + secret-bearing turns plus their tiny value-free audit metadata + (provider/source_path/turn_index/timestamp). The document text is never + cached — :func:`apply` fetches each affected row's ``document`` + just-in-time by doc_id, so memory stays bounded on large indices. + + Returns: + ``(worklist, meta_by_id, counts)`` where ``worklist`` is the ordered + list of affected ``doc_id``s, ``meta_by_id`` maps each affected doc_id + to its value-free audit metadata, and ``counts`` is the per-rule + histogram. + """ + worklist: list[str] = [] + seen: set[str] = set() + meta_by_id: dict[str, dict] = {} + counts: dict = {} + for row in _iter_rows(scope, db_path): + doc_id = row.get("doc_id") + _scanned, spans = secret_redaction.scan_spans( + row.get("document", ""), mode="report", allowlist=allowlist + ) + if not spans: + continue + if doc_id not in seen: + seen.add(doc_id) + worklist.append(doc_id) + meta_by_id[doc_id] = _audit_meta(row) + _accumulate_counts(counts, spans) + return worklist, meta_by_id, counts + + +def apply( + scope: Scope, + *, + drop: bool, + confirmed: bool, + resume: bool = False, + pause_event: Optional[Event] = None, +) -> SanitizeReport: + """Apply the sanitizer destructively over ``scope`` (guarded by ``confirmed``). + + Refuses to perform any read or write when ``confirmed`` is False, returning an + empty paused report. Otherwise builds (or resumes) the affected worklist and, per + turn, either redacts + re-embeds + upserts the row (``drop=False``) or deletes it + (``drop=True``). Progress is checkpointed after each turn so an interrupted run + resumes from the durable worklist + done-set. + + A turn is marked ``done`` only when both stores succeed. When Milvus succeeds but + the FTS rewrite fails (``fts_ok is False``), the ``doc_id`` stays in the worklist, + ``incomplete_fts`` increments, and the run reports ``status="incomplete"``. + + Args: + scope: The selection of turns to sanitize. + drop: When True, delete affected turns instead of redacting them. + confirmed: Must be True for any read or write to occur. + resume: When True, resume from the durable checkpoint's worklist + done-set. + pause_event: Optional ``threading.Event``; when set, the run pauses (status + ``"paused"``) and checkpoints before processing further turns. + + Returns: + A :class:`SanitizeReport` describing the run; ``rotate_warning`` is always + True (redaction is not rotation). + """ + mode = "drop" if drop else "redact" + + # Confirmation gate (Requirement 2.4): no read, no write, no audit file. + if not confirmed: + return SanitizeReport( + mode=mode, + counts={}, + affected_count=0, + processed_count=0, + incomplete_fts=0, + audit_path=None, + status="paused", + rotate_warning=True, + ) + + allowlist = _allowlist() + db_path = _db_path() + + checkpoint = _load_checkpoint() if resume else None + meta_by_id: dict[str, dict] = {} + if checkpoint is not None and checkpoint.get("worklist") is not None: + # Resume from the durable checkpoint without a re-scan (Fix B): the + # worklist + done-set load straight from disk, and each affected row's + # document is fetched just-in-time by doc_id at apply time (Fix A). The + # run_id is reused so the audit appends to the same redaction-.jsonl. + run_id = checkpoint.get("run_id") or _new_run_id() + done: set[str] = set(checkpoint.get("done", [])) + counts: dict = dict(checkpoint.get("counts", {})) + worklist: list[str] = list(checkpoint.get("worklist", [])) + else: + run_id = _new_run_id() + done = set() + worklist, meta_by_id, counts = _build_worklist(scope, allowlist, db_path) + + audit = _AuditWriter(run_id) + processed = 0 + incomplete_fts = 0 + paused = False + + def _checkpoint(status: str) -> None: + _save_checkpoint( + { + "run_id": run_id, + "created_at": _now_iso(), + "mode": mode, + "scope": scope.as_dict(), + "worklist": [d for d in worklist if d not in done], + "done": sorted(done), + "counts": counts, + "status": status, + } + ) + + try: + for doc_id in list(worklist): + if doc_id in done: + continue + if pause_event is not None and pause_event.is_set(): + paused = True + break + + # Just-in-time fetch (Fix A): the document text is never cached + # across the whole worklist, so memory stays bounded on large indices. + row = rag_engine.get_row_by_doc_id(doc_id, db_path=db_path) + if row is None: + # Affected id no longer present (already deleted/moved) — leave it + # off the done set so a re-run can reconcile. + continue + + meta = meta_by_id.get(doc_id) + if drop: + fts_ok: Optional[bool] = _apply_drop( + doc_id, row, meta, allowlist, audit, db_path + ) + else: + fts_ok = _apply_redact( + doc_id, row, meta, allowlist, audit, db_path + ) + if fts_ok is None: + # Budget hard-denied the re-embed (pause/cap) — abort cleanly, + # leaving this turn on the worklist for the next run (Fix C). + paused = True + break + if fts_ok: + done.add(doc_id) + processed += 1 + else: + incomplete_fts += 1 + _checkpoint("applying") + finally: + audit.close() + + remaining = [d for d in worklist if d not in done] + if paused: + status = "paused" + elif incomplete_fts or remaining: + status = "incomplete" + else: + status = "complete" + _checkpoint(status) + + return SanitizeReport( + mode=mode, + counts=counts, + affected_count=len(worklist), + processed_count=processed, + incomplete_fts=incomplete_fts, + audit_path=str(audit.path), + status=status, + rotate_warning=True, + ) + + +def _audit_row(doc_id: str, row: dict, meta: Optional[dict]) -> dict: + """Build the value-free audit metadata dict for one turn. + + Prefers the just-in-time-fetched ``row`` (which carries the live metadata), + falling back to the worklist ``meta`` captured at scan time. Never includes + ``document``. + """ + base = dict(meta) if meta else {} + base["doc_id"] = doc_id + for key in ("provider", "source_path", "turn_index", "timestamp"): + if row.get(key) is not None: + base[key] = row.get(key) + return base + + +def _apply_redact( + doc_id: str, + row: dict, + meta: Optional[dict], + allowlist: list, + audit: "_AuditWriter", + db_path: Optional[str], +) -> Optional[bool]: + """Redact + re-embed + upsert one turn; return whether the FTS rewrite succeeded. + + On a resumed row already clean in Milvus (``scan_spans`` yields no spans) the + re-embed + upsert are skipped and only the FTS delete-then-insert of the already + redacted content is retried, keeping the operation idempotent. + + Returns: + ``True`` when the row is fully sanitized (Milvus + FTS both ok), ``False`` + when Milvus is clean but the FTS rewrite still needs a retry, or ``None`` + when the embedding budget hard-denied the re-embed (a pause/cap) and the + run must abort without writing this turn. + """ + document = row.get("document", "") + redacted, spans = secret_redaction.scan_spans( + document, mode="enforce", allowlist=allowlist + ) + + if not spans: + # Already redacted in Milvus (resume case): converge FTS only, no re-embed. + # Pass new_vector=None so the existing fixed-dim vector is preserved — a + # 0-length vector would corrupt the HNSW/COSINE row. + result = rag_engine.upsert_document( + doc_id, new_document=redacted, new_vector=None, db_path=db_path + ) + return bool(result.milvus_ok and result.fts_ok) + + vector = _throttled_embed(redacted) + if vector is None: + # Budget hard-denied the re-embed (pause/cap, no retry delay). Do not + # write this turn — signal the caller to abort and checkpoint. + return None + audit.write(row=_audit_row(doc_id, row, meta), spans=spans, action="redact") + result = rag_engine.upsert_document( + doc_id, new_document=redacted, new_vector=vector, db_path=db_path + ) + return bool(result.milvus_ok and result.fts_ok) + + +def _apply_drop( + doc_id: str, + row: dict, + meta: Optional[dict], + allowlist: list, + audit: "_AuditWriter", + db_path: Optional[str], +) -> bool: + """Delete one turn from both stores and audit it (``action="drop"``, no re-embed). + + Returns: + ``True`` when the turn is fully removed (Milvus + FTS both ok), ``False`` + when the FTS delete failed and the row is still keyword-searchable — the + caller must keep the doc_id on the worklist, mirroring the redact path. + """ + _scanned, spans = secret_redaction.scan_spans( + row.get("document", ""), mode="report", allowlist=allowlist + ) + audit.write(row=_audit_row(doc_id, row, meta), spans=spans, action="drop") + result = rag_engine.delete_by_doc_id(doc_id, db_path=db_path) + return bool(result.fts_ok) diff --git a/secret_redaction.py b/secret_redaction.py index abc6527..f0ebd5a 100644 --- a/secret_redaction.py +++ b/secret_redaction.py @@ -145,6 +145,42 @@ class Hit(NamedTuple): tier: int +class Span(NamedTuple): + """A single detected secret occurrence with value-free audit metadata (SESF-42). + + Unlike :class:`Hit` (which dedups by value), one ``Span`` is emitted per + *occurrence*: the same secret appearing twice yields two spans. The offsets + index into the original scanned text and are integers (not sensitive); the + snippet is pre-masked so no raw value ever leaves the module (AC-18). + + Attributes: + rule_name: The detector class name used to build the placeholder, e.g. + ``"OPENAI"`` → ``[REDACTED:OPENAI]``. + tier: The detection tier that produced the hit (1 = structured provider + tokens / PEM / JWT, 2 = contextual assignment, 3 = entropy). + start: Offset of the occurrence in the original text (inclusive). + end: Offset of the occurrence in the original text (exclusive), so + ``text[start:end]`` is the matched value. + masked_snippet: A fixed-width window of the original text around the match + (±:data:`_SNIPPET_PAD` chars) in which every detected occurrence — the + target and any reported neighbour intersecting the window — is replaced + by its typed ``[REDACTED:…]`` placeholder, masked deterministically by + offset (not by re-running detection over the slice). It never contains a + raw secret value. + """ + + rule_name: str + tier: int + start: int + end: int + masked_snippet: str + + +# Half-width of the value-free context window stored on each Span (SESF-42). The +# window is ``text[start - _SNIPPET_PAD : end + _SNIPPET_PAD]``, then masked. +_SNIPPET_PAD = 24 + + def _placeholder(rule_name: str) -> str: """Render the typed redaction placeholder for ``rule_name`` (AC-4 fallback).""" return f"[REDACTED:{rule_name or _FALLBACK_RULE}]" @@ -294,6 +330,45 @@ def _collect_candidates(text: str) -> list[tuple[str, str, int, bool]]: return candidates +def _aggregate_maskable_candidates( + text: str, + allowlist: list[Pattern[str]], +) -> dict[str, dict]: + """Aggregate detection candidates into one entry per distinct maskable value. + + Runs :func:`_collect_candidates`, applies the operator allowlist to every tier + and the structural-shape exemption to Tier-3 only, then dedups by value: when + one value is matched by several tiers (e.g. a structured token the entropy + scanner also flags) the lowest tier wins and ``force`` is OR-ed across the + occurrences. This is the single source of truth shared by :func:`redact` (live + redaction, SESF-41) and :func:`_maskable_values` (retroactive span scanning, + SESF-42), so the two paths can never drift in which values they mask. + + Args: + text: The text to scan. + allowlist: Operator-supplied compiled regex patterns whose matches are + exempt from detection on all tiers (AC-9). + + Returns: + A mapping of ``value`` to an entry dict ``{"tier": int, "rule": str, + "force": bool}``. Allowlisted and structurally-exempt values are absent. + """ + agg: dict[str, dict] = {} + for value, rule_name, tier, force in _collect_candidates(text): + if _is_operator_allowlisted(value, allowlist): + continue + if tier == 3 and _is_structurally_allowlisted(value): + continue + entry = agg.get(value) + if entry is None: + agg[value] = {"tier": tier, "rule": rule_name, "force": force} + else: + if tier < entry["tier"]: + entry["tier"], entry["rule"] = tier, rule_name + entry["force"] = entry["force"] or force + return agg + + def redact( text: str, *, @@ -322,25 +397,12 @@ def redact( """ allowlist = allowlist or [] - # De-duplicate by value so one secret yields one classification. When the same - # value is matched by several tiers (e.g. a structured token the entropy scanner - # also flags), the lowest tier wins and ``force`` is OR-ed across occurrences. - # Allowlist (AC-9): operator patterns apply to ALL tiers; the structural shape - # exemption (UUID/hex/issue) applies to Tier-3 entropy ONLY, so a high-confidence - # Tier-1/Tier-2 hit on an all-hex secret is still redacted. - agg: dict[str, dict] = {} - for value, rule_name, tier, force in _collect_candidates(text): - if _is_operator_allowlisted(value, allowlist): - continue - if tier == 3 and _is_structurally_allowlisted(value): - continue - entry = agg.get(value) - if entry is None: - agg[value] = {"tier": tier, "rule": rule_name, "force": force} - else: - if tier < entry["tier"]: - entry["tier"], entry["rule"] = tier, rule_name - entry["force"] = entry["force"] or force + # De-duplicate by value so one secret yields one classification (lowest tier + # wins, ``force`` OR-ed), with the operator allowlist on every tier and the + # structural-shape exemption on Tier-3 only. Shared verbatim with + # :func:`_maskable_values` via the common aggregator so the two paths cannot + # drift in which values they mask. + agg = _aggregate_maskable_candidates(text, allowlist) hits = [Hit(rule_name=e["rule"], tier=e["tier"]) for e in agg.values()] @@ -362,3 +424,144 @@ def redact( for value, rule_name in tier_items: out = _replace_outside_placeholders(out, value, _placeholder(rule_name)) return out, hits + + +def _maskable_values( + text: str, + allowlist: list[Pattern[str]], +) -> list[tuple[str, str, int]]: + """Return the ``(value, rule_name, tier)`` triples that enforce mode would mask. + + Reuses :func:`redact`'s aggregation via the shared + :func:`_aggregate_maskable_candidates` helper — operator allowlist on all tiers, + structural shape exemption on Tier-3 only, dedup-by-value with the lowest tier + winning and ``force`` OR-ed — but returns one triple per distinct value rather + than a :class:`Hit`. A Tier-3 value is included only when it is forced (keyword + adjacent or fenced), matching the values enforce actually replaces. Ordered by + tier then descending value length so longer/structured values are placed first, + the same precedence the masker uses. + """ + agg = _aggregate_maskable_candidates(text, allowlist) + + maskable = [ + (value, entry["rule"], entry["tier"]) + for value, entry in agg.items() + if entry["tier"] in (1, 2) or entry["force"] + ] + maskable.sort(key=lambda triple: (triple[2], -len(triple[0]))) + return maskable + + +def _collect_spans(text: str, allowlist: list[Pattern[str]]) -> list[Span]: + """Collect per-occurrence :class:`Span` metadata for ``text`` (SESF-42). + + Locates every occurrence of each maskable value in the *original* ``text``, + skipping occurrences inside an existing ``[REDACTED:…]`` placeholder (AC-15) + and occurrences overlapping an already-claimed higher-precedence span, so an + entropy superset never double-counts a structured token it contains. Each + surviving occurrence becomes its own span (no dedup-by-value). The snippet is + produced last, once the full span set is known, so neighbours can be masked. + """ + protected = [(m.start(), m.end()) for m in _PLACEHOLDER_SPAN_RE.finditer(text)] + claimed: list[tuple[int, int]] = [] + raw: list[tuple[str, int, int, int]] = [] # (rule_name, tier, start, end) + + def _overlaps(spans: list[tuple[int, int]], start: int, end: int) -> bool: + return any(start < c_end and c_start < end for c_start, c_end in spans) + + for value, rule_name, tier in _maskable_values(text, allowlist): + if not value: + continue + cursor = 0 + while True: + idx = text.find(value, cursor) + if idx == -1: + break + end = idx + len(value) + cursor = end + if _overlaps(protected, idx, end): + continue + if _overlaps(claimed, idx, end): + continue + claimed.append((idx, end)) + raw.append((rule_name, tier, idx, end)) + + raw.sort(key=lambda item: item[2]) + + # Build each snippet deterministically by masking EVERY detected occurrence in + # ``raw`` that intersects the window — the target plus any reported neighbour — + # rather than re-running ``redact()`` on a truncated window (SESF-42). The old + # window-redact path re-detected against the slice, so a forced Tier-3 entropy + # token whose forcing keyword fell outside the ±_SNIPPET_PAD window was + # re-evaluated as un-forced and left RAW. Masking by offset guarantees every + # reported span's snippet is value-free regardless of tier or keyword distance. + # Occurrences overlapping an existing ``[REDACTED:…]`` span are absent from + # ``raw`` (skipped above), so emitting the window text verbatim between + # occurrences preserves those placeholders without double-wrapping (AC-15). + spans: list[Span] = [] + for rule_name, tier, start, end in raw: + w_start = max(0, start - _SNIPPET_PAD) + w_end = end + _SNIPPET_PAD + parts: list[str] = [] + cursor = w_start + for occ_rule, _occ_tier, occ_start, occ_end in raw: + if occ_end <= w_start or occ_start >= w_end: + continue # occurrence does not intersect the window + clamped_start = max(occ_start, w_start) + clamped_end = min(occ_end, w_end) + if clamped_start > cursor: + parts.append(text[cursor:clamped_start]) + parts.append(_placeholder(occ_rule)) + cursor = max(cursor, clamped_end) + parts.append(text[cursor:w_end]) + masked_snippet = "".join(parts) + spans.append( + Span( + rule_name=rule_name, + tier=tier, + start=start, + end=end, + masked_snippet=masked_snippet, + ) + ) + return spans + + +def scan_spans( + text: str, + *, + mode: str, + allowlist: Optional[list[Pattern[str]]] = None, +) -> tuple[str, list[Span]]: + """Detect secrets in ``text`` and return per-occurrence, value-free spans (SESF-42). + + Companion to :func:`redact` for retroactive auditing: where ``redact`` returns + one :class:`Hit` per distinct value, this returns one :class:`Span` per + occurrence, with offsets into the original ``text`` and a pre-masked context + snippet. Pure, deterministic, and idempotent — it shares ``redact``'s detection + tiers, allowlist handling, and placeholder-aware skipping, and leaves + ``redact``'s own public contract unchanged. + + Args: + text: The text to scan. + mode: ``"enforce"`` to also return the fully masked text (identical to + ``redact(text, mode="enforce")``), or ``"report"`` to return ``text`` + unchanged. The span list is identical in both modes. + allowlist: Operator-supplied compiled regex patterns whose matches are + exempt from detection — applied exactly as in :func:`redact` (all tiers + for operator patterns; the structural shape exemption is Tier-3 only). + ``None`` means no operator patterns. + + Returns: + A ``(scanned_text, spans)`` tuple. ``scanned_text`` is the masked text in + ``enforce`` mode and the unchanged ``text`` in ``report`` mode; ``spans`` + lists every detected occurrence as a :class:`Span` (rule/tier/offsets plus + a value-free snippet), the same in both modes. + """ + allowlist = allowlist or [] + spans = _collect_spans(text, allowlist) + if mode == "enforce": + scanned_text, _ = redact(text, mode="enforce", allowlist=allowlist) + else: + scanned_text = text + return scanned_text, spans diff --git a/tests/test_cleanup_sanitize.py b/tests/test_cleanup_sanitize.py new file mode 100644 index 0000000..8016e5e --- /dev/null +++ b/tests/test_cleanup_sanitize.py @@ -0,0 +1,302 @@ +"""Tests for the ``cleanup.py sanitize`` CLI adapter (SESF-42 Component 4). + +These exercise the thin CLI over ``sanitize.py`` only: the argparse wiring, the +refuse-before-writes confirmation gate, the scope-flag → ``Scope`` mapping, the +dry-run/apply/drop dispatch, and the value-free output contract. The ``sanitize`` +module is monkeypatched so no real Milvus, FTS, or embedding work is touched — +matching the conftest stubbing style used elsewhere in the suite. + +Every token here is **synthetic and non-functional** (AC-18 / Requirement 5.3), +assembled from string parts so no contiguous secret literal appears in this +source, and no raw value is ever fed into an assertion sink. +""" + +from __future__ import annotations + +import importlib +import types + +import pytest + +# Synthetic, non-functional secret value — assembled from parts so no contiguous +# secret literal sits in this source. It is only ever placed in a *stubbed* +# report's fields to prove the CLI never echoes it; the CLI emits counts only. +SYNTHETIC_SECRET = "AKIA" + "IOSFODNN7EXAMPLE" + +PROJECT = "/Volumes/DATA/GitHub/SessionFlow" +PROVIDER = "claude_code_cli" +SESSION = "synthetic-session-id" +SINCE = "2026-05-01" + + +@pytest.fixture +def cleanup_mod(): + """Import the CLI module under test.""" + return importlib.import_module("cleanup") + + +@pytest.fixture +def stub_sanitize(monkeypatch): + """Replace the ``sanitize`` module with a recording stub. + + Records every ``scan``/``apply`` call (and the ``Scope`` built for it) so a + test can assert which path the CLI took and what scope it passed, without any + real store access. ``scan``/``apply`` return a canned :class:`SanitizeReport` + whose counts are deliberately the only secret-adjacent surface — the report + fields never contain the raw value. + """ + calls: dict[str, list] = {"scan": [], "apply": []} + + class _Scope: + """Mirror of ``sanitize.Scope`` — records the dimensions it was built with.""" + + def __init__( + self, + project_root=None, + provider=None, + session_id=None, + since=None, + ): + self.project_root = project_root + self.provider = provider + self.session_id = session_id + self.since = since + + class _Report: + """Minimal stand-in for ``sanitize.SanitizeReport`` (value-free).""" + + def __init__(self, *, mode, status): + self.mode = mode + self.counts = {"AWS": 2} + self.affected_count = 1 + self.processed_count = 1 + self.incomplete_fts = 0 + self.audit_path = "/synthetic/audit/redaction-deadbeef.jsonl" + self.status = status + self.rotate_warning = mode != "dry-run" + + def fake_scan(scope): + calls["scan"].append(scope) + return _Report(mode="dry-run", status="complete") + + def fake_apply(scope, *, drop, confirmed): + calls["apply"].append({"scope": scope, "drop": drop, "confirmed": confirmed}) + return _Report(mode="drop" if drop else "redact", status="complete") + + module = types.SimpleNamespace( + Scope=_Scope, + SanitizeReport=_Report, + scan=fake_scan, + apply=fake_apply, + calls=calls, + ) + import sys + + monkeypatch.setitem(sys.modules, "sanitize", module) + return module + + +def _args(cleanup_mod, argv): + """Parse ``argv`` through the real parser and return the namespace.""" + return cleanup_mod.build_parser().parse_args(argv) + + +# --- Argparse wiring -------------------------------------------------------- + + +def test_parser_registers_sanitize_subcommand(cleanup_mod): + args = _args(cleanup_mod, ["sanitize"]) + assert args.command == "sanitize" + # Dry-run is the default posture. + assert args.apply is False + assert args.drop is False + assert args.yes is False + + +def test_parser_accepts_all_scope_and_action_flags(cleanup_mod): + args = _args( + cleanup_mod, + [ + "sanitize", + "--apply", + "--drop", + "--yes", + "--project", + PROJECT, + "--provider", + PROVIDER, + "--session", + SESSION, + "--since", + SINCE, + ], + ) + assert args.apply is True + assert args.drop is True + assert args.yes is True + assert args.project == PROJECT + assert args.provider == PROVIDER + assert args.session == SESSION + assert args.since == SINCE + + +# --- Confirmation gate (Requirement 2.4) ------------------------------------ + + +def test_apply_without_yes_refuses_before_any_apply(cleanup_mod, stub_sanitize, capsys): + """``--apply`` sans ``--yes`` must refuse with a non-zero code and never call apply.""" + args = _args(cleanup_mod, ["sanitize", "--apply"]) + rc = cleanup_mod.cmd_sanitize(args) + + out = capsys.readouterr() + assert rc != 0 + # The gate fires before any read or write: apply is NEVER invoked. + assert stub_sanitize.calls["apply"] == [] + assert stub_sanitize.calls["scan"] == [] + # Clear instruction to re-run with confirmation. + combined = out.out + out.err + assert "--yes" in combined + assert "confirmation" in combined.lower() + + +def test_apply_drop_without_yes_also_refuses(cleanup_mod, stub_sanitize, capsys): + args = _args(cleanup_mod, ["sanitize", "--apply", "--drop"]) + rc = cleanup_mod.cmd_sanitize(args) + + assert rc != 0 + assert stub_sanitize.calls["apply"] == [] + + +# --- Dry-run default -------------------------------------------------------- + + +def test_default_runs_scan_not_apply(cleanup_mod, stub_sanitize, capsys): + args = _args(cleanup_mod, ["sanitize"]) + rc = cleanup_mod.cmd_sanitize(args) + + out = capsys.readouterr().out + assert rc in (0, None) + assert len(stub_sanitize.calls["scan"]) == 1 + assert stub_sanitize.calls["apply"] == [] + # Dry-run reports per-rule counts, affected count, and the audit path. + assert "AWS" in out + assert "redaction-deadbeef.jsonl" in out + + +def test_dry_run_flag_is_equivalent_to_default(cleanup_mod, stub_sanitize): + args = _args(cleanup_mod, ["sanitize", "--dry-run"]) + cleanup_mod.cmd_sanitize(args) + assert len(stub_sanitize.calls["scan"]) == 1 + assert stub_sanitize.calls["apply"] == [] + + +# --- Apply paths ------------------------------------------------------------ + + +def test_apply_yes_calls_apply_confirmed_redact(cleanup_mod, stub_sanitize, capsys): + args = _args(cleanup_mod, ["sanitize", "--apply", "--yes"]) + rc = cleanup_mod.cmd_sanitize(args) + + out = capsys.readouterr().out + assert rc in (0, None) + assert len(stub_sanitize.calls["apply"]) == 1 + call = stub_sanitize.calls["apply"][0] + assert call["confirmed"] is True + assert call["drop"] is False + # Apply output carries the rotate-the-key warning. + assert "rotate" in out.lower() + + +def test_apply_yes_drop_passes_drop_true(cleanup_mod, stub_sanitize): + args = _args(cleanup_mod, ["sanitize", "--apply", "--yes", "--drop"]) + cleanup_mod.cmd_sanitize(args) + + assert len(stub_sanitize.calls["apply"]) == 1 + call = stub_sanitize.calls["apply"][0] + assert call["confirmed"] is True + assert call["drop"] is True + + +# --- Scope mapping ---------------------------------------------------------- + + +def test_scope_flags_map_into_scope(cleanup_mod, stub_sanitize): + args = _args( + cleanup_mod, + [ + "sanitize", + "--project", + PROJECT, + "--provider", + PROVIDER, + "--session", + SESSION, + "--since", + SINCE, + ], + ) + cleanup_mod.cmd_sanitize(args) + + scope = stub_sanitize.calls["scan"][0] + assert scope.project_root == PROJECT + assert scope.provider == PROVIDER + assert scope.session_id == SESSION + assert scope.since == SINCE + + +def test_apply_scope_flags_map_into_scope(cleanup_mod, stub_sanitize): + args = _args( + cleanup_mod, + ["sanitize", "--apply", "--yes", "--provider", PROVIDER], + ) + cleanup_mod.cmd_sanitize(args) + + scope = stub_sanitize.calls["apply"][0]["scope"] + assert scope.provider == PROVIDER + assert scope.project_root is None + + +# --- No-leak contract (cross-cutting) --------------------------------------- + + +def test_output_never_contains_a_secret_value(cleanup_mod, monkeypatch, capsys): + """Even if a report somehow carried a raw value, the CLI emits counts only. + + The stub report here deliberately seeds the synthetic secret into fields the + CLI must NOT print (it should print rule names, counts, and the audit path — + never document text or a raw value). + """ + import sys + + leaky_report = types.SimpleNamespace( + mode="dry-run", + counts={"AWS": 1}, + affected_count=1, + processed_count=0, + incomplete_fts=0, + audit_path="/synthetic/audit/redaction-cafef00d.jsonl", + status="complete", + rotate_warning=False, + # Fields the CLI must never echo: + secret_value=SYNTHETIC_SECRET, + document=f"here is a key {SYNTHETIC_SECRET} in text", + ) + + class _Scope: + def __init__(self, **kw): + for k, v in kw.items(): + setattr(self, k, v) + + module = types.SimpleNamespace( + Scope=_Scope, + scan=lambda scope: leaky_report, + apply=lambda scope, **kw: leaky_report, + ) + monkeypatch.setitem(sys.modules, "sanitize", module) + + args = _args(cleanup_mod, ["sanitize"]) + cleanup_mod.cmd_sanitize(args) + + captured = capsys.readouterr() + combined = captured.out + captured.err + assert SYNTHETIC_SECRET not in combined diff --git a/tests/test_sanitize.py b/tests/test_sanitize.py new file mode 100644 index 0000000..2ed9ab6 --- /dev/null +++ b/tests/test_sanitize.py @@ -0,0 +1,996 @@ +"""Tests for the SESF-42 retroactive secret sanitizer. + +Two cohorts live here: + +* **Orchestrator** — ``sanitize.Scope`` / ``sanitize.scan`` / ``sanitize.apply`` + and the ``SanitizeReport`` it returns. The orchestrator drives the dry-run/apply + flow, throttling, checkpointing, and the secret-free audit trail (design.md + Component 3). It owns policy only — detection routes through + ``secret_redaction.scan_spans`` and data access through the + ``rag_engine`` primitives (``upsert_document`` / ``delete_by_doc_id`` / + ``get_row_by_doc_id``). The worklist holds only doc_ids + value-free audit + metadata; affected rows are fetched just-in-time at apply time so memory stays + bounded on large indices, and a resumed run loads the worklist + run_id from + the checkpoint without a re-scan. +* **Primitives** — ``rag_engine.upsert_document`` / ``rag_engine.delete_by_doc_id`` + (design.md Component 2): the in-place Milvus overwrite + FTS rewrite, and the + doc-id-scoped dual delete. + +The implementation is present; this suite is the green contract that guards it. + +Every token in this file is **synthetic and non-functional** (AC-18 / Requirement +5.3) — assembled from string parts so no contiguous secret literal appears, and no +raw value ever reaches stdout, the audit file, or a Milvus/FTS sink in the +assertions below. +""" + +from __future__ import annotations + +import contextlib +import importlib +import json +import re +import types +from pathlib import Path + +import pytest + +secret_redaction = importlib.import_module("secret_redaction") + + +# --- Synthetic, non-functional fixtures (AC-18) ----------------------------- +# Assembled from parts so no contiguous secret literal sits in this source file. +AWS_KEY = "AKIA" + "IOSFODNN7EXAMPLE" +AWS_KEY2 = "AKIA" + "1234567890ABCDEF" +GH_KEY = "ghp" + "_" + "0123456789abcdefABCDEF0123456789abcd" + +PROJECT = "/Volumes/DATA/GitHub/SessionFlow" +PROVIDER = "claude_code_cli" +SESSION = "synthetic-session-id" +SINCE = "2026-05-01" + + +def _require(module_name: str): + """Import a target module (sanitize / rag_engine); RED until it exists.""" + return importlib.import_module(module_name) + + +def _milvus_row(doc_id: str, text: str) -> dict: + """One synthetic Milvus scan row carrying all fields needed to rebuild + audit.""" + return { + "id": int.from_bytes(doc_id.encode()[:7].ljust(7, b"0"), "big"), + "doc_id": doc_id, + "document": text, + "provider": PROVIDER, + "source_path": "/synthetic/transcript.jsonl", + "turn_index": 1, + "timestamp": "2026-05-21T10:00:00Z", + "session_id": SESSION, + "project_root": PROJECT, + } + + +@pytest.fixture +def sanitize_env(monkeypatch, tmp_path): + """Point the sanitizer's audit dir + checkpoint at a tmp HOME (no real writes). + + The audit dir (``~/.sessionflow/audit/``) and checkpoint + (``~/.sessionflow/sanitize_state.json``) follow the existing state-file + convention, so redirecting HOME isolates them per test. + """ + monkeypatch.setenv("HOME", str(tmp_path)) + # Keep detection config deterministic (enforce so masking is observable). + monkeypatch.setenv("SESSIONFLOW_REDACT", "on") + monkeypatch.setenv("SESSIONFLOW_REDACT_MODE", "enforce") + monkeypatch.delenv("SESSIONFLOW_REDACT_ALLOWLIST", raising=False) + return tmp_path + + +@pytest.fixture +def stubbed_engine(monkeypatch): + """Stub the rag_engine seams the orchestrator calls; record every interaction. + + Records scan batches it serves, embed inputs, and upsert/delete/insert calls so + a test can assert a dry-run writes nothing and an apply writes the redacted text. + """ + import rag_engine + + cap = { + "rows": [], # rows the scan iterator yields + "embed_inputs": [], # texts passed to embed_texts (re-embed path) + "upserts": [], # (doc_id, new_document, new_vector) + "deletes": [], # doc_ids passed to delete_by_doc_id + "fts_calls": [], # ordered ("delete"|"insert", doc_id) for ordering checks + "budget_calls": [], # ("before"|"after", ...) budget interactions + "upsert_result": None, # override the UpsertResult the stub returns + "delete_result": None, # override the DeleteResult the stub returns + "before_decisions": None, # optional list of _Decision to serve in order + } + + def fake_query_batches(output_fields, *args, **kwargs): + if cap["rows"]: + yield list(cap["rows"]) + + monkeypatch.setattr(rag_engine, "_query_batches", fake_query_batches, raising=False) + + def fake_get_row_by_doc_id(doc_id, db_path=None): + # JIT fetch (SESF-42 Fix A): apply pulls each affected row's full payload + # by doc_id at write time instead of from an all-rows cache. Serve from + # the same synthetic rows the scan iterator yields. + for row in cap["rows"]: + if row.get("doc_id") == doc_id: + return dict(row) + return None + + monkeypatch.setattr( + rag_engine, "get_row_by_doc_id", fake_get_row_by_doc_id, raising=False + ) + + def fake_embed_texts(texts, is_query=False): + cap["embed_inputs"].append(list(texts)) + return [[0.0] * 8 for _ in texts] + + monkeypatch.setattr(rag_engine, "embed_texts", fake_embed_texts) + + # UpsertResult is a NamedTuple introduced by SESF-42; build via the module so + # the test binds to the real type once it exists. + def fake_upsert_document(doc_id, *, new_document, new_vector=None, db_path=None): + # Record the vector as-passed (None for the resume/FTS-only path) so a test + # can assert the resume branch never sends a zero-length vector. + cap["upserts"].append( + (doc_id, new_document, None if new_vector is None else list(new_vector)) + ) + cap["fts_calls"].append(("delete", doc_id)) + cap["fts_calls"].append(("insert", doc_id)) + if cap["upsert_result"] is not None: + return cap["upsert_result"] + return rag_engine.UpsertResult(milvus_ok=True, fts_ok=True) + + monkeypatch.setattr( + rag_engine, "upsert_document", fake_upsert_document, raising=False + ) + + def fake_delete_by_doc_id(doc_id, db_path=None): + cap["deletes"].append(doc_id) + if cap["delete_result"] is not None: + return cap["delete_result"] + return rag_engine.DeleteResult(deleted=1, fts_ok=True) + + monkeypatch.setattr( + rag_engine, "delete_by_doc_id", fake_delete_by_doc_id, raising=False + ) + + class _Decision: + allowed = True + retry_after_seconds = 0.0 + reason = "" + + class _Budget: + def split_batches(self, turns): + return [list(turns)] if turns else [] + + def before_batch(self, *a, **k): + cap["budget_calls"].append(("before", a, k)) + # When a test seeds a deny decision queue, serve those in order; the + # last one is reused once the queue drains. + queue = cap["before_decisions"] + if queue: + return queue.pop(0) if len(queue) > 1 else queue[0] + return _Decision() + + def after_batch(self, *a, **k): + cap["budget_calls"].append(("after", a, k)) + + monkeypatch.setattr(rag_engine, "get_embedding_budget", lambda: _Budget()) + + return rag_engine, cap + + +# === Scope.to_filter() ====================================================== + + +def test_scope_to_filter_project_provider_session_since(): + """Scope.to_filter() builds escaped Milvus clauses for each scope dimension.""" + sanitize = _require("sanitize") + flt = sanitize.Scope( + project_root=PROJECT, provider=PROVIDER, session_id=SESSION, since=SINCE + ).to_filter() + assert flt is not None + assert f'project_root == "{PROJECT}"' in flt + assert f'provider == "{PROVIDER}"' in flt + assert f'session_id == "{SESSION}"' in flt + # `since` maps to a timestamp lower bound. + assert "timestamp >=" in flt and SINCE in flt + + +def test_scope_to_filter_escapes_injection(): + """A scope value with a quote/backslash is escaped, not concatenated raw.""" + sanitize = _require("sanitize") + flt = sanitize.Scope(project_root='/a"b\\').to_filter() + # The closing quote of the literal is not escaped away by a trailing backslash: + # the escaper doubles backslashes and C-escapes the quote. + assert flt.endswith('"') + assert '/a\\"b\\\\' in flt + + +def test_scope_empty_is_match_all(): + """An empty scope produces no filter (match-all).""" + sanitize = _require("sanitize") + flt = sanitize.Scope().to_filter() + assert flt in (None, "") + + +# === Dry-run does NO writes ================================================= + + +def test_scan_dry_run_lists_affected_and_writes_nothing( + sanitize_env, stubbed_engine +): + """scan() reports affected doc_ids + per-rule counts and performs no writes.""" + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + cap["rows"] = [ + _milvus_row("d1", f"cred {AWS_KEY} end"), + _milvus_row("d2", f"token {GH_KEY} end"), + _milvus_row("d3", "totally benign text"), + ] + report = sanitize.scan(sanitize.Scope(project_root=PROJECT)) + + # Affected = the two secret-bearing turns; the benign one is not listed. + assert report.affected_count == 2 + assert report.counts.get("AWS", 0) >= 1 + assert report.counts.get("GITHUB", 0) >= 1 + assert report.mode in ("dry-run", "report") + # No destructive calls of any kind. + assert cap["upserts"] == [] + assert cap["deletes"] == [] + assert cap["embed_inputs"] == [] + assert cap["fts_calls"] == [] + + +# === Apply (redact + re-embed) ============================================== + + +def test_apply_redact_upserts_redacted_doc_and_reembeds_redacted_text( + sanitize_env, stubbed_engine +): + """apply() upserts a redacted document + a vector from re-embedding the redacted text.""" + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + cap["rows"] = [_milvus_row("d1", f"cred {AWS_KEY} end")] + + report = sanitize.apply( + sanitize.Scope(project_root=PROJECT), drop=False, confirmed=True + ) + + assert cap["upserts"], "apply(redact) must call upsert_document" + doc_id, new_document, _vector = cap["upserts"][0] + assert doc_id == "d1" + # The stored document is redacted, not the original secret. + assert AWS_KEY not in new_document + assert "[REDACTED:AWS]" in new_document + # The re-embed ran over the REDACTED text (not the original). + assert cap["embed_inputs"], "redact path must re-embed" + embedded = cap["embed_inputs"][0][0] + assert AWS_KEY not in embedded + assert "[REDACTED:AWS]" in embedded + # FTS rewrite is delete-then-insert for the doc. + assert ("delete", "d1") in cap["fts_calls"] + assert ("insert", "d1") in cap["fts_calls"] + assert cap["fts_calls"].index(("delete", "d1")) < cap["fts_calls"].index( + ("insert", "d1") + ) + assert report.status == "complete" + + +def test_apply_redact_audit_action_is_redact_and_value_free( + sanitize_env, stubbed_engine +): + """The audit file records action='redact' and never the raw secret value.""" + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + cap["rows"] = [_milvus_row("d1", f"cred {AWS_KEY} end")] + + report = sanitize.apply( + sanitize.Scope(project_root=PROJECT), drop=False, confirmed=True + ) + + audit_text = Path(report.audit_path).read_text() + assert AWS_KEY not in audit_text + actions = {json.loads(line)["action"] for line in audit_text.splitlines() if line} + assert actions == {"redact"} + + +# === Apply (drop fast path) ================================================= + + +def test_apply_drop_deletes_without_reembed(sanitize_env, stubbed_engine): + """drop mode calls delete_by_doc_id, performs NO re-embed, audit action='drop'.""" + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + cap["rows"] = [_milvus_row("d1", f"cred {AWS_KEY} end")] + + report = sanitize.apply( + sanitize.Scope(project_root=PROJECT), drop=True, confirmed=True + ) + + assert cap["deletes"] == ["d1"] + assert cap["upserts"] == [] + assert cap["embed_inputs"] == [] # no re-embed on the drop path + audit_text = Path(report.audit_path).read_text() + assert AWS_KEY not in audit_text + actions = {json.loads(line)["action"] for line in audit_text.splitlines() if line} + assert actions == {"drop"} + + +# === Confirmation gate ====================================================== + + +def test_apply_without_confirmation_makes_no_calls( + sanitize_env, stubbed_engine, monkeypatch +): + """apply(confirmed=False) performs no reads or writes. + + The confirmation gate must short-circuit *before* any read path runs, not just + before the writes — so spy both the scan read (``_query_batches``) and the + just-in-time row fetch (``get_row_by_doc_id``) and assert neither fired. + """ + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + cap["rows"] = [_milvus_row("d1", f"cred {AWS_KEY} end")] + + reads = {"query_batches": 0, "get_row": 0} + real_query_batches = rag_engine._query_batches + real_get_row = rag_engine.get_row_by_doc_id + + def spy_query_batches(*args, **kwargs): + reads["query_batches"] += 1 + yield from real_query_batches(*args, **kwargs) + + def spy_get_row(doc_id, db_path=None): + reads["get_row"] += 1 + return real_get_row(doc_id, db_path=db_path) + + monkeypatch.setattr(rag_engine, "_query_batches", spy_query_batches, raising=False) + monkeypatch.setattr(rag_engine, "get_row_by_doc_id", spy_get_row, raising=False) + + sanitize.apply(sanitize.Scope(project_root=PROJECT), drop=False, confirmed=False) + + # No writes. + assert cap["upserts"] == [] + assert cap["deletes"] == [] + assert cap["embed_inputs"] == [] + # No reads either: the scan iterator and the JIT row fetch never fired. + assert reads["query_batches"] == 0 + assert reads["get_row"] == 0 + + +# === FTS-failure incompletion =============================================== + + +@pytest.mark.parametrize("drop", [False, True], ids=["redact", "drop"]) +def test_apply_fts_failure_leaves_row_incomplete(sanitize_env, stubbed_engine, drop): + """fts_ok=False (either path) -> doc not 'done', incomplete_fts, status='incomplete'. + + Parameterized over redact (FTS rewrite fails) and drop (FTS delete fails): in + both, the Milvus side succeeded but the FTS row may still carry the secret, so + the doc_id must stay on the worklist, off the ``done`` set. + """ + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + cap["rows"] = [_milvus_row("d1", f"cred {AWS_KEY} end")] + if drop: + cap["delete_result"] = rag_engine.DeleteResult(deleted=1, fts_ok=False) + else: + cap["upsert_result"] = rag_engine.UpsertResult(milvus_ok=True, fts_ok=False) + + report = sanitize.apply( + sanitize.Scope(project_root=PROJECT), drop=drop, confirmed=True + ) + + assert report.incomplete_fts >= 1 + assert report.status == "incomplete" # NOT complete while FTS-failed rows remain + + # The doc_id stays in the checkpoint worklist, off the `done` set. + state_path = Path(sanitize_env) / ".sessionflow" / "sanitize_state.json" + state = json.loads(state_path.read_text()) + assert "d1" not in state.get("done", []) + assert "d1" in state.get("worklist", []) + + +# === Checkpoint / resume ==================================================== + + +def test_apply_resume_skips_already_done_doc_ids( + sanitize_env, stubbed_engine, monkeypatch +): + """A run with a pre-seeded `done` set skips those doc_ids (no re-process).""" + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + cap["rows"] = [ + _milvus_row("d1", f"cred {AWS_KEY} end"), + _milvus_row("d2", f"token {GH_KEY} end"), + ] + + # Pre-seed the durable checkpoint with d1 already done. + state_dir = Path(sanitize_env) / ".sessionflow" + state_dir.mkdir(parents=True, exist_ok=True) + (state_dir / "sanitize_state.json").write_text( + json.dumps( + { + "run_id": "prior", + "mode": "redact", + "scope": {"project_root": PROJECT}, + "worklist": ["d1", "d2"], + "done": ["d1"], + "counts": {}, + "status": "applying", + } + ) + ) + + sanitize.apply( + sanitize.Scope(project_root=PROJECT), + drop=False, + confirmed=True, + resume=True, + ) + + # Only d2 is (re)processed; d1 was already done. + upserted_ids = [u[0] for u in cap["upserts"]] + assert "d1" not in upserted_ids + assert "d2" in upserted_ids + + +def test_apply_resume_no_spans_passes_none_vector( + sanitize_env, stubbed_engine +): + """A resumed row already clean in Milvus (no spans) upserts with new_vector=None. + + The resume/FTS-only converge path must never send a 0-length vector — that + would corrupt the fixed-dim HNSW row. It also performs no re-embed. + """ + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + # The row's Milvus document is already redacted (scan_spans yields no spans), + # but the doc_id is still on the durable worklist from a prior crash — so the + # resume pass must converge FTS only. + cap["rows"] = [_milvus_row("d1", "totally benign already redacted text")] + + state_dir = Path(sanitize_env) / ".sessionflow" + state_dir.mkdir(parents=True, exist_ok=True) + (state_dir / "sanitize_state.json").write_text( + json.dumps( + { + "run_id": "prior", + "mode": "redact", + "scope": {"project_root": PROJECT}, + "worklist": ["d1"], + "done": [], + "counts": {}, + "status": "applying", + } + ) + ) + + sanitize.apply( + sanitize.Scope(project_root=PROJECT), + drop=False, + confirmed=True, + resume=True, + ) + + assert cap["upserts"], "the FTS-only converge still calls upsert_document" + doc_id, _new_doc, vector = cap["upserts"][0] + assert doc_id == "d1" + # None — not [] — so the stored vector is preserved, not zeroed. + assert vector is None + # No re-embed on the clean/resume path. + assert cap["embed_inputs"] == [] + + +# === Budget throttle ======================================================== + + +def test_apply_reembed_invokes_embedding_budget(sanitize_env, stubbed_engine): + """The re-embed path goes through the budget's before_batch/after_batch.""" + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + cap["rows"] = [_milvus_row("d1", f"cred {AWS_KEY} end")] + + sanitize.apply(sanitize.Scope(project_root=PROJECT), drop=False, confirmed=True) + + kinds = [c[0] for c in cap["budget_calls"]] + assert "before" in kinds, "re-embed must request budget via before_batch" + assert "after" in kinds, "re-embed must report completion via after_batch" + + +def test_apply_budget_hard_deny_aborts_without_embed(sanitize_env, stubbed_engine): + """A budget hard-deny (allowed=False, no retry) -> no embed, status='paused'. + + Mirrors rag_engine.add_turns: when the budget denies with no retry delay (a + pause/cap), the run must NOT bypass the gate and embed — it aborts cleanly, + checkpoints the un-processed turn, and reports status='paused'. + """ + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + cap["rows"] = [_milvus_row("d1", f"cred {AWS_KEY} end")] + + class _HardDeny: + allowed = False + retry_after_seconds = 0.0 + reason = "paused" + + cap["before_decisions"] = [_HardDeny()] + + report = sanitize.apply( + sanitize.Scope(project_root=PROJECT), drop=False, confirmed=True + ) + + # The gate was respected: no embed, no upsert for the denied turn. + assert cap["embed_inputs"] == [], "hard-deny must not embed past the gate" + assert cap["upserts"] == [] + assert report.status == "paused" + # The un-processed turn stays on the worklist for the next run. + state_path = Path(sanitize_env) / ".sessionflow" / "sanitize_state.json" + state = json.loads(state_path.read_text()) + assert "d1" in state.get("worklist", []) + assert "d1" not in state.get("done", []) + + +def test_apply_budget_soft_deny_then_retry_embeds(sanitize_env, stubbed_engine): + """A soft-deny (retry_after>0) sleeps then retries; the second decision allows.""" + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + cap["rows"] = [_milvus_row("d1", f"cred {AWS_KEY} end")] + + class _SoftDeny: + allowed = False + retry_after_seconds = 0.001 + reason = "cooldown" + + class _Allow: + allowed = True + retry_after_seconds = 0.0 + reason = "" + + cap["before_decisions"] = [_SoftDeny(), _Allow()] + + report = sanitize.apply( + sanitize.Scope(project_root=PROJECT), drop=False, confirmed=True + ) + + # After the cooldown the embed runs and the turn completes. + assert cap["embed_inputs"], "soft-deny must retry and then embed" + assert report.status == "complete" + + +# === Worklist is bounded (doc_ids only, JIT fetch) ========================== + + +def test_build_worklist_holds_doc_ids_not_full_rows(sanitize_env, stubbed_engine): + """_build_worklist returns doc_ids + value-free metadata, never document text. + + Guards SESF-42 Fix A: the worklist must not cache every scanned row's payload + (which OOMs on large indices). The per-turn metadata it does keep must never + carry the ``document`` field. + """ + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + cap["rows"] = [ + _milvus_row("d1", f"cred {AWS_KEY} end"), + _milvus_row("d2", "totally benign text"), + ] + + worklist, meta_by_id, _counts = sanitize._build_worklist( + sanitize.Scope(project_root=PROJECT), [], None + ) + + # Only the secret-bearing turn is listed; the benign one is dropped. + assert worklist == ["d1"] + # Metadata is value-free: no document text is cached for any doc_id. + for meta in meta_by_id.values(): + assert "document" not in meta + assert set(meta_by_id) == {"d1"} + + +def test_apply_jit_fetches_document_via_get_row_by_doc_id( + sanitize_env, stubbed_engine +): + """apply() reads each affected row's document just-in-time, not from a cache.""" + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + cap["rows"] = [_milvus_row("d1", f"cred {AWS_KEY} end")] + + seen = [] + real = rag_engine.get_row_by_doc_id + + def spy(doc_id, db_path=None): + seen.append(doc_id) + return real(doc_id, db_path=db_path) + + import unittest.mock as _mock + + with _mock.patch.object(rag_engine, "get_row_by_doc_id", spy): + sanitize.apply( + sanitize.Scope(project_root=PROJECT), drop=False, confirmed=True + ) + + assert seen == ["d1"], "apply must JIT-fetch the row by doc_id" + + +def test_apply_resume_loads_worklist_and_run_id_without_rescan( + sanitize_env, stubbed_engine, monkeypatch +): + """resume=True loads worklist + run_id from the checkpoint, skipping _build_worklist. + + Guards SESF-42 Fix B/D: a resumed run must not re-scan (worklist comes from the + checkpoint) and must reuse the prior run_id so the audit appends to the same + redaction-.jsonl rather than starting a fresh trail. + """ + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + cap["rows"] = [_milvus_row("d2", f"token {GH_KEY} end")] + + # Fail loudly if apply re-scans instead of trusting the checkpoint worklist. + def _no_rescan(*a, **k): + raise AssertionError("resume must not call _build_worklist") + + monkeypatch.setattr(sanitize, "_build_worklist", _no_rescan) + + state_dir = Path(sanitize_env) / ".sessionflow" + state_dir.mkdir(parents=True, exist_ok=True) + (state_dir / "sanitize_state.json").write_text( + json.dumps( + { + "run_id": "priorrun01", + "mode": "redact", + "scope": {"project_root": PROJECT}, + "worklist": ["d2"], + "done": [], + "counts": {"GITHUB": 1}, + "status": "applying", + } + ) + ) + + report = sanitize.apply( + sanitize.Scope(project_root=PROJECT), + drop=False, + confirmed=True, + resume=True, + ) + + # The worklist came from the checkpoint (no re-scan), d2 was processed. + assert [u[0] for u in cap["upserts"]] == ["d2"] + # Same run_id reused -> audit appends to redaction-priorrun01.jsonl. + assert report.audit_path.endswith("redaction-priorrun01.jsonl") + + +def test_audit_writer_appends_on_resume(sanitize_env): + """A second _AuditWriter for the same run_id appends, not truncates (Fix D).""" + sanitize = _require("sanitize") + + class _Span: + rule_name = "AWS" + tier = 2 + start = 0 + end = 4 + masked_snippet = "AK..." + + row = {"doc_id": "d1", "provider": PROVIDER} + + w1 = sanitize._AuditWriter("sharedrun") + w1.write(row=row, spans=[_Span()], action="redact") + w1.close() + + # A resumed run reuses the same run_id; the prior line must survive. + w2 = sanitize._AuditWriter("sharedrun") + w2.write(row=row, spans=[_Span()], action="redact") + w2.close() + + text = Path(w2.path).read_text() + assert len(text.splitlines()) == 2, "append mode must preserve the prior line" + # File is owner-only (0600) from creation. + import stat + + mode = stat.S_IMODE(Path(w2.path).stat().st_mode) + assert mode == 0o600 + + +# === No-leak (cross-cutting) ================================================ + + +def test_no_secret_leaks_to_stdout_or_audit_across_all_modes( + sanitize_env, stubbed_engine, capsys +): + """Across dry-run, redact-apply, and drop-apply: no synthetic secret anywhere.""" + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + + audit_paths = [] + + cap["rows"] = [_milvus_row("d1", f"cred {AWS_KEY} end")] + audit_paths.append(sanitize.scan(sanitize.Scope(project_root=PROJECT)).audit_path) + + cap["rows"] = [_milvus_row("d1", f"cred {AWS_KEY} end")] + audit_paths.append( + sanitize.apply( + sanitize.Scope(project_root=PROJECT), drop=False, confirmed=True + ).audit_path + ) + + cap["rows"] = [_milvus_row("d2", f"cred {AWS_KEY2} end")] + audit_paths.append( + sanitize.apply( + sanitize.Scope(project_root=PROJECT), drop=True, confirmed=True + ).audit_path + ) + + captured = capsys.readouterr() + assert AWS_KEY not in captured.out and AWS_KEY not in captured.err + assert AWS_KEY2 not in captured.out and AWS_KEY2 not in captured.err + for path in audit_paths: + text = Path(path).read_text() + assert AWS_KEY not in text + assert AWS_KEY2 not in text + + +# === rotate-the-key warning ================================================= + + +def test_apply_report_carries_rotate_warning(sanitize_env, stubbed_engine): + """An apply run flags rotate_warning (redaction != rotation; Requirement 7.1).""" + sanitize = _require("sanitize") + rag_engine, cap = stubbed_engine + cap["rows"] = [_milvus_row("d1", f"cred {AWS_KEY} end")] + + report = sanitize.apply( + sanitize.Scope(project_root=PROJECT), drop=False, confirmed=True + ) + assert report.rotate_warning is True + + +# === rag_engine.upsert_document primitive =================================== + + +@pytest.fixture +def primitive_harness(monkeypatch): + """Stub the Milvus client + FTS for the new write primitives (no real stores).""" + import rag_engine + + cap = { + "milvus_upserts": None, + "milvus_deletes": [], + "fts_ops": [], # ordered ("delete"|"insert", payload) + "existing_row": None, + } + + class _Client: + def has_collection(self, name): + return True + + def query(self, **kw): + return [cap["existing_row"]] if cap["existing_row"] is not None else [] + + def upsert(self, collection_name, data): + cap["milvus_upserts"] = data + + def delete(self, collection_name, filter): + cap["milvus_deletes"].append(filter) + + @contextlib.contextmanager + def _fake_milvus_client(db_path=None): + yield _Client() + + monkeypatch.setattr(rag_engine, "milvus_client", _fake_milvus_client) + + def _fts_delete(conn, column, value): + cap["fts_ops"].append(("delete", (column, value))) + + def _fts_insert(conn, records): + cap["fts_ops"].append(("insert", records)) + + fake_fts = types.SimpleNamespace( + connection=lambda db_path: object(), + delete=_fts_delete, + insert=_fts_insert, + close_ephemeral=lambda conn: None, + ) + monkeypatch.setattr(rag_engine, "_fts", fake_fts) + return rag_engine, cap + + +def test_upsert_document_milvus_upsert_and_fts_rewrite(primitive_harness): + """upsert_document does a Milvus upsert + FTS delete-then-insert, returns ok/ok.""" + rag_engine, cap = primitive_harness + cap["existing_row"] = { + "id": 123, + "doc_id": "d1", + "document": f"old {AWS_KEY}", + "vector": [0.5] * 8, + "provider": PROVIDER, + "session_id": SESSION, + "project_root": PROJECT, + } + result = rag_engine.upsert_document( + "d1", + new_document="redacted [REDACTED:AWS]", + new_vector=[0.0] * 8, + db_path="/tmp/sf-test.db", + ) + + assert result.milvus_ok is True + assert result.fts_ok is True + # Milvus row swapped: document + vector overwritten, metadata preserved. + assert cap["milvus_upserts"] is not None + row = cap["milvus_upserts"][0] + assert row["document"] == "redacted [REDACTED:AWS]" + assert row["vector"] == [0.0] * 8 + assert row["doc_id"] == "d1" + assert AWS_KEY not in row["document"] + # FTS rewrite is delete-then-insert (ordering matters). + kinds = [op[0] for op in cap["fts_ops"]] + assert kinds == ["delete", "insert"] + inserted = cap["fts_ops"][1][1][0] + assert inserted["content"] == "redacted [REDACTED:AWS]" + + +def test_upsert_document_none_vector_preserves_stored_vector(primitive_harness): + """new_vector=None keeps the existing fixed-dim vector and still rewrites FTS. + + The resume/no-spans path passes None so a 0-length list can never clobber the + HNSW/COSINE row; the document + FTS sidecar still converge. + """ + rag_engine, cap = primitive_harness + original_vector = [0.5] * 8 + cap["existing_row"] = { + "id": 123, + "doc_id": "d1", + "document": "already redacted [REDACTED:AWS]", + "vector": list(original_vector), + "provider": PROVIDER, + "session_id": SESSION, + "project_root": PROJECT, + } + result = rag_engine.upsert_document( + "d1", + new_document="already redacted [REDACTED:AWS]", + new_vector=None, + db_path="/tmp/sf-test.db", + ) + + assert result.milvus_ok is True + assert result.fts_ok is True + row = cap["milvus_upserts"][0] + # The stored vector is untouched — not zero-length, not replaced. + assert row["vector"] == original_vector + assert len(row["vector"]) == 8 + # FTS still converges (delete-then-insert of the redacted content). + kinds = [op[0] for op in cap["fts_ops"]] + assert kinds == ["delete", "insert"] + assert cap["fts_ops"][1][1][0]["content"] == "already redacted [REDACTED:AWS]" + + +def test_upsert_document_fts_record_carries_source_metadata(primitive_harness): + """The FTS rewrite copies every metadata column from the fetched row. + + Matches the normal-ingest FTS record shape so filtered/BM25 search survives a + sanitize — content is the redacted document, metadata is the source row's. + """ + rag_engine, cap = primitive_harness + cap["existing_row"] = { + "id": 7, + "doc_id": "d1", + "document": f"old {AWS_KEY}", + "vector": [0.1] * 8, + "session_id": SESSION, + "logical_session_id": "logical-xyz", + "provider": PROVIDER, + "source_kind": "transcript", + "source_class": "cli", + "source_id": "src-42", + "source_path": "/synthetic/transcript.jsonl", + "git_branch": "main", + "turn_index": 5, + "timestamp": "2026-05-21T10:00:00Z", + "chunk_type": "turn", + "project_root": PROJECT, + "issue_ids": "SESF-42", + } + rag_engine.upsert_document( + "d1", + new_document="redacted [REDACTED:AWS]", + new_vector=[0.0] * 8, + db_path="/tmp/sf-test.db", + ) + + inserted = cap["fts_ops"][1][1][0] + assert inserted["content"] == "redacted [REDACTED:AWS]" + assert inserted["session_id"] == SESSION + assert inserted["logical_session_id"] == "logical-xyz" + assert inserted["provider"] == PROVIDER + assert inserted["source_kind"] == "transcript" + assert inserted["source_class"] == "cli" + assert inserted["source_id"] == "src-42" + assert inserted["source_path"] == "/synthetic/transcript.jsonl" + assert inserted["git_branch"] == "main" + assert inserted["turn_index"] == 5 + assert inserted["timestamp"] == "2026-05-21T10:00:00Z" + assert inserted["chunk_type"] == "turn" + assert inserted["project_root"] == PROJECT + assert inserted["issue_ids"] == "SESF-42" + + +def test_upsert_document_fts_content_matches_truncated_milvus_document(primitive_harness): + """FTS content uses the UTF-8-truncated row['document'], not raw new_document. + + A redacted payload can expand past Milvus's 65535-byte VARCHAR cap; both stores + must index identical content or the dual-write contract breaks (CodeRabbit). + """ + rag_engine, cap = primitive_harness + cap["existing_row"] = { + "id": 9, + "doc_id": "d1", + "document": "old", + "vector": [0.1] * 8, + "session_id": SESSION, + "provider": PROVIDER, + } + oversized = "x" * 70000 # > 65535 bytes -> Milvus truncates + rag_engine.upsert_document( + "d1", new_document=oversized, new_vector=[0.0] * 8, db_path="/tmp/sf-test.db" + ) + + milvus_doc = cap["milvus_upserts"][0]["document"] + fts_content = cap["fts_ops"][1][1][0]["content"] + assert fts_content == milvus_doc # the two stores agree + assert fts_content != oversized # truncated, not the raw payload + assert len(fts_content.encode("utf-8")) <= 65535 + + +def test_upsert_document_fts_failure_reported_distinctly(primitive_harness): + """An FTS-insert failure yields fts_ok=False (not swallowed) with milvus_ok=True.""" + rag_engine, cap = primitive_harness + cap["existing_row"] = {"id": 1, "doc_id": "d1", "document": "x"} + + def boom(conn, records): + raise RuntimeError("fts insert failed") + + rag_engine._fts.insert = boom + result = rag_engine.upsert_document( + "d1", new_document="clean", new_vector=[0.0] * 8, db_path="/tmp/sf-test.db" + ) + assert result.milvus_ok is True + assert result.fts_ok is False + + +def test_delete_by_doc_id_dual_delete(primitive_harness): + """delete_by_doc_id deletes from BOTH stores and reports the outcome distinctly.""" + rag_engine, cap = primitive_harness + cap["existing_row"] = {"id": 999, "doc_id": "d1", "document": "x"} + + result = rag_engine.delete_by_doc_id("d1", db_path="/tmp/sf-test.db") + + # Contract: returns a DeleteResult carrying the Milvus deleted count + fts_ok. + assert isinstance(result, rag_engine.DeleteResult) + assert result.deleted == 1 + assert result.fts_ok is True + assert cap["milvus_deletes"], "Milvus delete must run" + assert any(("delete", ("doc_id", "d1")) == op for op in cap["fts_ops"]) + + +def test_delete_by_doc_id_fts_failure_reported_distinctly(primitive_harness): + """An FTS-delete failure yields fts_ok=False even when the Milvus delete ran.""" + rag_engine, cap = primitive_harness + cap["existing_row"] = {"id": 999, "doc_id": "d1", "document": "x"} + + def boom(conn, column, value): + raise RuntimeError("fts delete failed") + + rag_engine._fts.delete = boom + result = rag_engine.delete_by_doc_id("d1", db_path="/tmp/sf-test.db") + + assert result.deleted == 1 # Milvus delete still happened + assert result.fts_ok is False # but the FTS row may survive -> not done + assert cap["milvus_deletes"], "Milvus delete must run" diff --git a/tests/test_secret_redaction.py b/tests/test_secret_redaction.py index 8b252dd..64bd324 100644 --- a/tests/test_secret_redaction.py +++ b/tests/test_secret_redaction.py @@ -76,6 +76,12 @@ # A high-entropy token matching NO provider format (Tier-3 entropy only). ENTROPY_TOKEN = "Zk9wYmFyQmF6UXV4OTk4MjM3NDYxMjkwTm9wZQ" +# A 32-char high-entropy token (entropy >= 4.0, not UUID / 40hex / 64hex) used by +# the SESF-42 snippet-leak regression. Assembled from parts so no contiguous secret +# literal appears in this source. It is forced (keyword-adjacent within 40 chars) in +# the full line but sits OUTSIDE the +-24 snippet window of its forcing keyword. +ENTROPY_FORCED_TOKEN = "Xk7Qm2Wp9Lr4Bn8" + "Zt5Cv3Hd6Fj1Gs0Yo" + # Allowlist shapes that MUST NOT be flagged (AC-9). UUID_VALUE = "550e8400-e29b-41d4-a716-446655440000" GIT_SHA40 = "5e150f177902427c6f79e1713b608de8f4c5964a" @@ -602,3 +608,218 @@ def _no_collection_client(db_path=None): stats = rag_engine.get_stats(db_path=DB) assert "redaction" in stats # never KeyErrors (AC-10 contract on every path) assert stats["redaction"]["enabled"] is True + + +# === SESF-42 — scan_spans (span-aware, value-free audit scan) =============== +# +# TDD red phase for the new `secret_redaction.scan_spans` companion to `redact`. +# Contract (design.md, Component 1 / Data Models): +# scan_spans(text, *, mode, allowlist=None) -> tuple[str, list[Span]] +# Span = NamedTuple(rule_name: str, tier: int, start: int, end: int, +# masked_snippet: str) +# Invariants vs redact(): +# * per-OCCURRENCE spans (NO dedup-by-value) with offsets into the ORIGINAL text; +# * masked_snippet is value-free (the window is redact(..., enforce)-masked); +# * spans are identical in "report" and "enforce"; only the returned text differs; +# * deterministic; redact()'s own public contract stays unchanged. +# +# Every token here reuses the part-assembled synthetic fixtures above (AC-18). + + +def _has_scan_spans(): + """Return True once `secret_redaction.scan_spans` exists (else skip-on-stub).""" + return hasattr(secret_redaction, "scan_spans") + + +def test_scan_spans_per_occurrence_no_value_dedup(): + """Two occurrences of the SAME synthetic secret yield TWO spans (no dedup).""" + scan_spans = secret_redaction.scan_spans + prefix = "first key " + middle = " and second key " + text = f"{prefix}{AWS_KEY}{middle}{AWS_KEY} end" + _, spans = scan_spans(text, mode="report") + aws_spans = [s for s in spans if s.rule_name == "AWS"] + # Two distinct occurrences -> two spans (redact() would dedup to one Hit). + assert len(aws_spans) == 2 + # Offsets index into the ORIGINAL text and recover the exact value. + for s in aws_spans: + assert s.tier == 1 + assert text[s.start:s.end] == AWS_KEY + # The two spans are at the two real positions, not duplicates of one offset. + starts = sorted(s.start for s in aws_spans) + assert starts == [text.find(AWS_KEY), text.find(AWS_KEY, text.find(AWS_KEY) + 1)] + + +def test_scan_spans_one_span_per_distinct_tier1_token(): + """Each distinct Tier-1 provider token in the text yields its own span.""" + scan_spans = secret_redaction.scan_spans + text = ( + f"aws {AWS_KEY} gh {GH_KEY} gitlab {GITLAB_KEY} " + f"openai {OPENAI_KEY} anthropic {ANTHROPIC_KEY}" + ) + _, spans = scan_spans(text, mode="report") + rules = sorted(s.rule_name for s in spans) + for expected in ("AWS", "GITHUB", "GITLAB", "OPENAI", "ANTHROPIC"): + assert expected in rules + # Each is tier 1 and its offsets recover the right token. + by_rule = {s.rule_name: s for s in spans} + assert by_rule["AWS"].tier == 1 + assert text[by_rule["AWS"].start:by_rule["AWS"].end] == AWS_KEY + assert text[by_rule["GITHUB"].start:by_rule["GITHUB"].end] == GH_KEY + + +def test_scan_spans_tier2_assignment_span(): + """A Tier-2 keyword=value assignment yields an ASSIGNMENT / tier-2 span.""" + scan_spans = secret_redaction.scan_spans + text = f"api_key: {ASSIGN_SECRET_VALUE}" + _, spans = scan_spans(text, mode="report") + assign = [s for s in spans if s.rule_name == "ASSIGNMENT"] + assert assign, "expected an ASSIGNMENT span for the Tier-2 value" + s = assign[0] + assert s.tier == 2 + # The span covers the value (not the key); offsets recover the secret value. + assert text[s.start:s.end] == ASSIGN_SECRET_VALUE + + +def test_scan_spans_masked_snippet_is_value_free(): + """masked_snippet never contains the synthetic secret substring.""" + scan_spans = secret_redaction.scan_spans + text = f"my credential is {AWS_KEY} ok" + _, spans = scan_spans(text, mode="report") + assert spans + for s in spans: + assert AWS_KEY not in s.masked_snippet + + +def test_scan_spans_masked_snippet_masks_neighbor_secret_in_window(): + """Two synthetic secrets within +-24 chars: BOTH masked in each snippet. + + The ±_SNIPPET_PAD window around one secret may overlap a neighbor; the snippet + is produced via redact(window, enforce) so the neighbor is masked too. + """ + scan_spans = secret_redaction.scan_spans + # GH_KEY immediately after AWS_KEY (separated by a single space) -> well within + # the ±24-char window, so each secret sits in the other's snippet window. + text = f"creds {AWS_KEY} {GH_KEY} done" + _, spans = scan_spans(text, mode="report") + assert spans + for s in spans: + assert AWS_KEY not in s.masked_snippet + assert GH_KEY not in s.masked_snippet + + +def test_scan_spans_forced_entropy_snippet_value_free_when_keyword_outside_window(): + """A forced Tier-3 entropy token must not leak RAW in its own snippet (SESF-42). + + Regression: ``_collect_spans`` used to rebuild each snippet by re-running + ``redact(window, enforce)`` over the +-24-char window. A HIGH_ENTROPY hit is + only force-masked when a secret keyword is within 40 chars (``_keyword_adjacent``) + or inside a fence. When the forcing keyword sits >24 but <40 chars from the token, + the +-24 window EXCLUDES it, so ``redact(window)`` re-evaluated the token as + un-forced and left it RAW in the snippet -- even though the span was still + reported. The snippet must be value-free regardless of keyword distance. + """ + scan_spans = secret_redaction.scan_spans + token = ENTROPY_FORCED_TOKEN + # keyword, then ~28 chars of filler, then the token: keyword is within 40 chars + # (so the token is forced/reported) but outside the token's +-24 snippet window. + text = "api_key " + "x" * 28 + " " + token + _, spans = scan_spans(text, mode="report") + # (1) the token is reported as a forced HIGH_ENTROPY span. + entropy_spans = [ + s for s in spans + if s.rule_name == "HIGH_ENTROPY" and text[s.start:s.end] == token + ] + assert entropy_spans, "expected a forced HIGH_ENTROPY span for the token" + # (2) the raw token never appears in ANY span's snippet. + for s in spans: + assert token not in s.masked_snippet + # The target span's own snippet carries the typed placeholder instead. + assert any("[REDACTED:HIGH_ENTROPY]" in s.masked_snippet for s in entropy_spans) + + +def test_scan_spans_identical_in_report_and_enforce(): + """Spans are identical across modes; only the returned text differs.""" + scan_spans = secret_redaction.scan_spans + text = f"cred {AWS_KEY} and {GH_KEY} end" + report_text, report_spans = scan_spans(text, mode="report") + enforce_text, enforce_spans = scan_spans(text, mode="enforce") + # Same audit metadata in both modes. + assert report_spans == enforce_spans + # report leaves text unchanged; enforce masks it. + assert report_text == text + assert AWS_KEY not in enforce_text + assert GH_KEY not in enforce_text + assert "[REDACTED:AWS]" in enforce_text + + +def test_scan_spans_determinism(): + """Identical input -> identical spans and identical returned text.""" + scan_spans = secret_redaction.scan_spans + text = f"key {AWS_KEY} then {OPENAI_KEY} done" + first = scan_spans(text, mode="enforce") + second = scan_spans(text, mode="enforce") + assert first == second + + +def test_scan_spans_span_namedtuple_shape(): + """Span carries exactly rule_name, tier, start, end, masked_snippet.""" + scan_spans = secret_redaction.scan_spans + _, spans = scan_spans(f"key {AWS_KEY} done", mode="report") + assert spans + assert tuple(type(spans[0])._fields) == ( + "rule_name", "tier", "start", "end", "masked_snippet", + ) + + +def test_scan_spans_allowlist_excludes_value(): + """An operator-allowlisted value yields no span (parity with redact).""" + import re + + scan_spans = secret_redaction.scan_spans + text = f"internal {AWS_KEY} ok" + _, spans = scan_spans( + text, mode="report", allowlist=[re.compile(re.escape(AWS_KEY))] + ) + assert not any(s.rule_name == "AWS" for s in spans) + # Positive control: without the allowlist the AWS span IS present. + _, spans2 = scan_spans(text, mode="report") + assert any(s.rule_name == "AWS" for s in spans2) + + +def test_scan_spans_and_redact_agree_on_masked_values(): + """redact() and scan_spans() agree on every value redact() actually masks. + + Both paths route through `_aggregate_maskable_candidates`, so each value + redact() removes from enforce-mode output must be reported by scan_spans() at + the same rule/tier — no behavioral drift between the SESF-41 live guard and + the SESF-42 retroactive scanner. (redact()'s Hit list additionally carries + non-forced advisory entropy hits, which scan_spans rightly omits; this test + compares against the maskable set, the values enforce truly replaces.) + """ + text = ( + f"aws {AWS_KEY} gh {GH_KEY} api_key: {ASSIGN_SECRET_VALUE} " + f"issue {ISSUE_ID}" + ) + redacted, _ = redact(text, mode="enforce") + _, spans = secret_redaction.scan_spans(text, mode="report") + # The (value, rule, tier) the masker would replace == what spans report. + maskable = { + (text[s.start:s.end], s.rule_name, s.tier) for s in spans + } + assert maskable == set(secret_redaction._maskable_values(text, [])) + # Every maskable value is actually gone from redact()'s enforce output. + for value, _rule, _tier in maskable: + assert value not in redacted + # The structurally-exempt issue ID survives and is never reported. + assert ISSUE_ID in redacted + assert ISSUE_ID not in {v for v, _r, _t in maskable} + + +def test_redact_public_contract_unchanged(): + """redact() still returns (str, list[Hit]) with rule_name/tier only.""" + out, hits = redact(f"key {AWS_KEY} done", mode="report") + assert isinstance(out, str) + assert isinstance(hits, list) + assert hits + assert tuple(type(hits[0])._fields) == ("rule_name", "tier") diff --git a/tests/test_tools_sanitize.py b/tests/test_tools_sanitize.py new file mode 100644 index 0000000..67421f7 --- /dev/null +++ b/tests/test_tools_sanitize.py @@ -0,0 +1,222 @@ +"""Tests for the SESF-42 ``sanitize_index`` MCP tool adapter (Component 5). + +These exercise the thin MCP layer in :mod:`tools` only — the ``sanitize`` module +is monkeypatched so no real Milvus, FTS, or embedding backend is touched. The +focus is the adapter contract from design.md "Component 5": + +* the tool is registered with the documented input schema; +* a default invocation calls :func:`sanitize.scan` (never :func:`sanitize.apply`); +* ``apply=true`` without ``confirm`` refuses and never calls :func:`sanitize.apply`; +* ``apply=true, confirm=true`` calls :func:`sanitize.apply` with ``confirmed=True`` + and threads ``drop`` through; +* the rendered response never contains a raw secret value. +""" + +import asyncio +import importlib + +tools = importlib.import_module("tools") + +# A synthetic secret assembled from parts — must never appear in any response. +SECRET = "AKIA" + "IOSFODNN7EXAMPLE" + + +def _tool_registry(): + """Register tools against a capturing fake server; return ({name: schema}, call_tool).""" + captured: dict = {} + + class _Server: + def list_tools(self): + def deco(fn): + captured["list"] = fn + return fn + return deco + + def call_tool(self): + def deco(fn): + captured["call"] = fn + return fn + return deco + + tools.register_tools(_Server()) + listed = asyncio.run(captured["list"]()) + return {t.name: t.inputSchema for t in listed}, captured["call"] + + +def _report(mode, **kwargs): + """Build a value-free :class:`sanitize.SanitizeReport` stub for assertions.""" + defaults = dict( + counts={"AWS": 2}, + affected_count=1, + processed_count=0, + incomplete_fts=0, + audit_path="/tmp/audit/redaction-deadbeef.jsonl", + status="complete", + rotate_warning=(mode != "dry-run"), + ) + defaults.update(kwargs) + return tools.sanitize.SanitizeReport(mode=mode, **defaults) + + +# --------------------------------------------------------------------------- +# Registration +# --------------------------------------------------------------------------- + + +def test_sanitize_index_tool_registered_with_schema(): + # The tool is listed with all documented optional properties and none required. + schemas, _ = _tool_registry() + assert "sanitize_index" in schemas + props = schemas["sanitize_index"]["properties"] + for key in ("apply", "drop", "confirm", "project_root", "provider", "session_id", "since"): + assert key in props, f"missing property: {key}" + assert props["apply"]["type"] == "boolean" + assert props["drop"]["type"] == "boolean" + assert props["confirm"]["type"] == "boolean" + assert schemas["sanitize_index"]["required"] == [] + + +# --------------------------------------------------------------------------- +# Default (dry-run) path +# --------------------------------------------------------------------------- + + +def test_default_invocation_calls_scan_not_apply(monkeypatch): + # No apply flag → scan() is called, apply() is NOT. + calls = {"scan": 0, "apply": 0} + captured_scope = {} + + def fake_scan(scope): + calls["scan"] += 1 + captured_scope["scope"] = scope + return _report("dry-run") + + def fake_apply(*a, **kw): # pragma: no cover - must never run here + calls["apply"] += 1 + raise AssertionError("apply must not be called on a dry-run") + + monkeypatch.setattr(tools.sanitize, "scan", fake_scan) + monkeypatch.setattr(tools.sanitize, "apply", fake_apply) + + _, call_tool = _tool_registry() + result = asyncio.run(call_tool("sanitize_index", { + "project_root": "/repo", "provider": "codex", "since": "2026-01-01", + })) + + assert calls == {"scan": 1, "apply": 0} + # Scope was built from the args. + scope = captured_scope["scope"] + assert scope.project_root == "/repo" + assert scope.provider == "codex" + assert scope.since == "2026-01-01" + # Response carries counts + audit path and is not an error. + text = result[0].text + assert result[0].type == "text" + assert not text.startswith("Error") + assert "AWS: 2" in text + assert "redaction-deadbeef.jsonl" in text + + +# --------------------------------------------------------------------------- +# Confirmation gate +# --------------------------------------------------------------------------- + + +def test_apply_without_confirm_refuses_and_never_calls_apply(monkeypatch): + # apply=true, confirm=false → refuse; apply() must NOT be called. + calls = {"scan": 0, "apply": 0} + + def fake_scan(scope): # pragma: no cover - apply path must not scan either + calls["scan"] += 1 + return _report("dry-run") + + def fake_apply(*a, **kw): # pragma: no cover - must never run + calls["apply"] += 1 + raise AssertionError("apply must not be called without confirmation") + + monkeypatch.setattr(tools.sanitize, "scan", fake_scan) + monkeypatch.setattr(tools.sanitize, "apply", fake_apply) + + _, call_tool = _tool_registry() + result = asyncio.run(call_tool("sanitize_index", {"apply": True})) + + assert calls["apply"] == 0 + text = result[0].text.lower() + assert "confirm" in text + assert "no changes" in text + + +# --------------------------------------------------------------------------- +# Apply path +# --------------------------------------------------------------------------- + + +def test_apply_with_confirm_calls_apply_confirmed(monkeypatch): + # apply=true, confirm=true → apply(confirmed=True, drop=False). + seen = {} + + def fake_apply(scope, *, drop, confirmed): + seen["scope"] = scope + seen["drop"] = drop + seen["confirmed"] = confirmed + return _report("redact", processed_count=1) + + monkeypatch.setattr(tools.sanitize, "apply", fake_apply) + + _, call_tool = _tool_registry() + result = asyncio.run(call_tool("sanitize_index", { + "apply": True, "confirm": True, "session_id": "sess-1", + })) + + assert seen["confirmed"] is True + assert seen["drop"] is False + assert seen["scope"].session_id == "sess-1" + text = result[0].text + assert not text.startswith("Error") + # Apply surfaces the rotate-the-key warning. + assert "rotate" in text.lower() + + +def test_apply_with_drop_threads_drop_true(monkeypatch): + # apply=true, confirm=true, drop=true → apply(drop=True, confirmed=True). + seen = {} + + def fake_apply(scope, *, drop, confirmed): + seen["drop"] = drop + seen["confirmed"] = confirmed + return _report("drop", processed_count=1) + + monkeypatch.setattr(tools.sanitize, "apply", fake_apply) + + _, call_tool = _tool_registry() + result = asyncio.run(call_tool("sanitize_index", { + "apply": True, "confirm": True, "drop": True, + })) + + assert seen["drop"] is True + assert seen["confirmed"] is True + assert not result[0].text.startswith("Error") + + +# --------------------------------------------------------------------------- +# No-leak invariant +# --------------------------------------------------------------------------- + + +def test_response_never_contains_secret_value(monkeypatch): + # Across dry-run and apply, the synthetic secret must never reach the response. + monkeypatch.setattr(tools.sanitize, "scan", lambda scope: _report("dry-run")) + monkeypatch.setattr( + tools.sanitize, + "apply", + lambda scope, *, drop, confirmed: _report("redact", processed_count=1), + ) + + _, call_tool = _tool_registry() + + dry = asyncio.run(call_tool("sanitize_index", {})) + applied = asyncio.run(call_tool("sanitize_index", {"apply": True, "confirm": True})) + refused = asyncio.run(call_tool("sanitize_index", {"apply": True})) + + for result in (dry, applied, refused): + assert SECRET not in result[0].text diff --git a/tools.py b/tools.py index 88e4fad..e963add 100644 --- a/tools.py +++ b/tools.py @@ -10,6 +10,7 @@ from mcp import types import rag_engine +import sanitize from provider_adapters import ( LEGAL_PROVIDERS, LEGAL_SORT_BY, @@ -157,6 +158,50 @@ def format_stats(stats: dict, db_path: str) -> str: return "\n".join(lines) +def format_sanitize_report(report: "sanitize.SanitizeReport") -> str: + """Format a sanitize :class:`SanitizeReport` as value-free markdown. + + Renders the run mode, per-rule detection counts, affected/processed turn + counts, the FTS-incomplete count, run status, and the audit-file path. On an + apply run the rotate-the-key warning is appended. The report carries only rule + names, integer counts, and paths — never a secret value — so the rendered text + is safe to surface to the operator. + + Args: + report: The outcome of :func:`sanitize.scan` or :func:`sanitize.apply`. + + Returns: + A markdown string summarizing the run. + """ + lines = [f"**Sanitize ({report.mode})** — status: {report.status}"] + + if report.counts: + lines.append("\n### Detections by rule") + for rule, count in sorted(report.counts.items(), key=lambda x: (-x[1], x[0])): + lines.append(f"- {rule}: {count}") + else: + lines.append("\nNo secrets detected in scope.") + + lines.append(f"\n**Affected turns:** {report.affected_count}") + if report.mode != "dry-run": + lines.append(f"**Processed turns:** {report.processed_count}") + if report.incomplete_fts: + lines.append( + f"**FTS-incomplete turns (retry needed):** {report.incomplete_fts}" + ) + + if report.audit_path: + lines.append(f"**Audit file:** {report.audit_path}") + + if report.rotate_warning: + lines.append( + "\nWARNING: redaction is not key rotation. Rotate any exposed " + "credential now — removing it from the index does not invalidate it." + ) + + return "\n".join(lines) + + def format_timeline(entries: list[dict]) -> str: """Format an issue timeline feed as markdown (oldest first). @@ -416,6 +461,57 @@ async def list_tools() -> list[types.Tool]: "required": [], }, ), + types.Tool( + name="sanitize_index", + description=( + "Retroactively find and remove secrets already indexed in the " + "session store (Milvus document field, FTS keyword content, and the " + "derived embedding). Defaults to a dry-run that reports per-rule " + "counts, the number of affected turns, and an audit-file path " + "WITHOUT writing. Pass apply=true together with confirm=true to " + "redact-and-re-embed the affected turns (or drop=true to delete " + "them); apply without confirm refuses and makes no changes. Never " + "returns secret values — counts and offsets only. Redaction is not " + "key rotation: rotate any exposed credential after an apply." + ), + inputSchema={ + "type": "object", + "properties": { + "apply": { + "type": "boolean", + "description": "Perform the destructive pass. Default false (dry-run report only).", + "default": False, + }, + "drop": { + "type": "boolean", + "description": "When applying, delete affected turns instead of redacting them. Default false.", + "default": False, + }, + "confirm": { + "type": "boolean", + "description": "Required confirmation token for apply. Without it, apply refuses and makes no changes.", + "default": False, + }, + "project_root": { + "type": "string", + "description": "Restrict the scope to a single project path.", + }, + "provider": { + "type": "string", + "description": "Restrict the scope to a single harness provider (e.g. claude_code_cli, codex).", + }, + "session_id": { + "type": "string", + "description": "Restrict the scope to a single session id.", + }, + "since": { + "type": "string", + "description": "ISO date/timestamp lower bound (e.g. '2026-04-02'); only turns at or after this are scanned.", + }, + }, + "required": [], + }, + ), ] @server.call_tool() @@ -556,6 +652,45 @@ async def call_tool(name: str, arguments: dict) -> list[types.TextContent]: parts.append(f"\nRemaining: {stats['total_turns']} turns across {stats['sessions']} sessions") return [types.TextContent(type="text", text="\n".join(parts))] + elif name == "sanitize_index": + do_apply = bool(arguments.get("apply", False)) + do_drop = bool(arguments.get("drop", False)) + do_confirm = bool(arguments.get("confirm", False)) + + scope = sanitize.Scope( + project_root=arguments.get("project_root"), + provider=arguments.get("provider"), + session_id=arguments.get("session_id"), + since=arguments.get("since"), + ) + + if not do_apply: + # Offload to a thread: scan reads/iterates many turns and + # would otherwise block the asyncio event loop. + report = await asyncio.to_thread(sanitize.scan, scope) + return [types.TextContent( + type="text", text=format_sanitize_report(report))] + + if not do_confirm: + # Refuse before any read or write: apply requires explicit confirm. + action = "delete" if do_drop else "redact" + alt = "" if do_drop else " (drop=true to delete instead)" + return [types.TextContent( + type="text", + text=( + "Refusing to apply: confirmation required. Re-run with " + f"confirm=true to {action}{alt} the affected turns. " + "No changes were made." + ), + )] + + # Offload to a thread: apply re-embeds many turns and would + # otherwise block the asyncio event loop for the whole run. + report = await asyncio.to_thread( + sanitize.apply, scope, drop=do_drop, confirmed=True) + return [types.TextContent( + type="text", text=format_sanitize_report(report))] + else: raise ValueError(f"Unknown tool: {name}")