Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 141 additions & 22 deletions src/core/tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,26 @@ impl Tracker {
if let Some(parent) = db_path.parent() {
std::fs::create_dir_all(parent)?;
}
Self::open_at_path(&db_path)
}

let conn = Connection::open(&db_path)?;
// WAL mode + busy_timeout for concurrent access (multiple Claude Code instances).
/// Open (or create) the history DB at an explicit path, applying the same
/// pragmas and schema as `new()`. Kept separate so tests can target an
/// isolated file without mutating the process-global RTK_DB_PATH env var.
fn open_at_path(db_path: &std::path::Path) -> Result<Self> {
let conn = Connection::open(db_path)?;
// Set busy_timeout first, separately from journal_mode=WAL.
// When the Claude Code hook rewrites a command it first spawns `rtk rewrite`,
// which starts a background telemetry thread that opens the DB. If that
// process exits before the thread finishes, the WAL shared-memory file
// (.db-shm) can be briefly inconsistent, causing `PRAGMA journal_mode=WAL`
// to return a transient error. Because execute_batch() stops on the first
// error, bundling both pragmas meant busy_timeout silently stayed at 0,
// making every lock-contention attempt fail immediately instead of waiting.
// Running them separately guarantees busy_timeout is always applied.
let _ = conn.execute_batch("PRAGMA busy_timeout=5000;");
// Non-fatal: NFS/read-only filesystems may not support WAL.
let _ = conn.execute_batch(
"PRAGMA journal_mode=WAL;
PRAGMA busy_timeout=5000;",
);
let _ = conn.execute_batch("PRAGMA journal_mode=WAL;");
conn.execute(
"CREATE TABLE IF NOT EXISTS commands (
id INTEGER PRIMARY KEY,
Expand Down Expand Up @@ -1358,14 +1370,27 @@ impl TimedExecution {
let input_tokens = estimate_tokens(input);
let output_tokens = estimate_tokens(output);

if let Ok(tracker) = Tracker::new() {
let _ = tracker.record(
original_cmd,
rtk_cmd,
input_tokens,
output_tokens,
elapsed_ms,
);
// Retry once on transient DB-open failures. When a Claude Code hook
// rewrites a command it first invokes `rtk rewrite`, which spawns a
// background telemetry thread that briefly holds a DB connection. If
// that process exits before the thread finishes, SQLite's WAL shared-
// memory file (.db-shm) may be inconsistent for a few milliseconds,
// causing Tracker::new() to fail. A single short sleep + retry is
// sufficient to outlast that window without blocking the user.
for attempt in 0..2u8 {
if let Ok(tracker) = Tracker::new() {
let _ = tracker.record(
original_cmd,
rtk_cmd,
input_tokens,
output_tokens,
elapsed_ms,
);
return;
}
if attempt == 0 {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
}

Expand All @@ -1391,9 +1416,17 @@ impl TimedExecution {
/// ```
pub fn track_passthrough(&self, original_cmd: &str, rtk_cmd: &str) {
let elapsed_ms = self.start.elapsed().as_millis() as u64;
// input_tokens=0, output_tokens=0 won't dilute savings statistics
if let Ok(tracker) = Tracker::new() {
let _ = tracker.record(original_cmd, rtk_cmd, 0, 0, elapsed_ms);
// input_tokens=0, output_tokens=0 won't dilute savings statistics.
// Same retry logic as track() to handle transient DB-open failures in
// hook execution contexts.
for attempt in 0..2u8 {
if let Ok(tracker) = Tracker::new() {
let _ = tracker.record(original_cmd, rtk_cmd, 0, 0, elapsed_ms);
return;
}
if attempt == 0 {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
}
}
Expand Down Expand Up @@ -1423,6 +1456,10 @@ pub fn args_display(args: &[OsString]) -> String {
mod tests {
use super::*;

// Serializes tests that mutate the process-global RTK_DB_PATH env var so they
// don't race each other (set_var/remove_var is process-wide).
static DB_ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());

// 1. estimate_tokens — verify ~4 chars/token ratio
#[test]
fn test_estimate_tokens() {
Expand Down Expand Up @@ -1550,9 +1587,7 @@ mod tests {
#[test]
fn test_db_path_env_and_default() {
use std::env;
use std::sync::Mutex;
static ENV_LOCK: Mutex<()> = Mutex::new(());
let _guard = ENV_LOCK.lock().unwrap();
let _guard = DB_ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());

let custom_path = env::temp_dir().join("rtk_test_custom.db");
env::set_var("RTK_DB_PATH", &custom_path);
Expand Down Expand Up @@ -1606,7 +1641,91 @@ mod tests {
);
}

// 12. record_parse_failure + get_parse_failure_summary roundtrip
// 12. busy_timeout pragma lets a tracked write WAIT for a concurrent writer's
// lock instead of failing immediately. Regression for #1082: both pragmas were
// in one execute_batch() call, so a transient journal_mode=WAL error left
// busy_timeout at 0 and every lock-contended write failed with SQLITE_BUSY —
// exactly the hook-spawned concurrent-rtk case where gain stats went missing.
// We reproduce real contention: a second connection holds the write lock on a
// background thread while the tracker attempts its INSERT.
#[test]
fn test_record_waits_for_concurrent_write_lock() {
use std::sync::mpsc;
use std::time::Duration;

// The competing writer holds the lock this long: well under the 5s
// busy_timeout (so the waiting write succeeds when the fix is present) but
// long enough that a busy_timeout of 0 fails immediately.
const LOCK_HOLD_MS: u64 = 300;

// Isolated temp dir (auto-removed on drop, no RTK_DB_PATH env mutation) so
// this can't race the other tracking tests that use the default DB under
// `cargo test --all`.
let pid = std::process::id();
let dir = tempfile::tempdir().expect("create temp dir");
let db_path = dir.path().join("busy_timeout.db");

// Connection carrying the busy_timeout pragma (the fix), at the isolated path.
let tracker = Tracker::open_at_path(&db_path).expect("tracker open must succeed");

// Competing writer grabs the write lock and holds it briefly on a thread.
let (locked_tx, locked_rx) = mpsc::channel();
let lock_path = db_path.clone();
let blocker = std::thread::spawn(move || {
let conn = Connection::open(&lock_path).expect("blocker open must succeed");
conn.execute_batch("BEGIN IMMEDIATE;")
.expect("blocker must acquire the write lock");
locked_tx.send(()).expect("signal lock held");
std::thread::sleep(Duration::from_millis(LOCK_HOLD_MS));
conn.execute_batch("COMMIT;")
.expect("blocker must release the write lock");
});

locked_rx
.recv()
.expect("blocker must signal the lock is held");

// With busy_timeout=5000 this INSERT waits for the lock; with busy_timeout=0
// (the bug) it returns SQLITE_BUSY immediately and the stat is lost.
let cmd = format!("git status hook_{}", pid);
let result = tracker.record("git status", &cmd, 100, 20, 5);

blocker.join().expect("blocker thread must finish");
// `dir` is dropped at end of scope, removing the temp DB (no explicit
// fs::remove_file — keeps the filesystem-deletion lint clean).

result.expect("record must wait for the concurrent lock and succeed (busy_timeout)");
}

// 13. TimedExecution::track succeeds even when a concurrent reader holds the DB.
// Regression: hook-rewritten commands previously failed to record because a
// background telemetry thread (from `rtk rewrite`) held the DB open at the
// moment the tracking write was attempted, and busy_timeout was 0.
#[test]
fn test_track_records_with_concurrent_reader() {
let pid = std::process::id();
let hook_cmd = format!("rtk git status hook_rewritten_{}", pid);

// Simulate concurrent reader (e.g., telemetry thread from `rtk rewrite`)
let reader = Tracker::new().expect("reader must open");

// Now track — must succeed despite the concurrent reader connection
let timer = TimedExecution::start();
timer.track("git status", &hook_cmd, "raw output", "filtered");

// Verify the record was actually written (not silently dropped)
let verify = Tracker::new().expect("verify tracker must open");
let recent = verify.get_recent(20).expect("get_recent must work");
assert!(
recent.iter().any(|r| r.rtk_cmd == hook_cmd),
"hook-rewritten tracking record must be present in DB (got {} records)",
recent.len()
);

drop(reader);
}

// 14. record_parse_failure + get_parse_failure_summary roundtrip
#[test]
fn test_parse_failure_roundtrip() {
let tracker = Tracker::new().expect("Failed to create tracker");
Expand All @@ -1624,7 +1743,7 @@ mod tests {
assert!(summary.recent.iter().any(|r| r.raw_command == test_cmd));
}

// 13. recovery_rate calculation
// 15. recovery_rate calculation
#[test]
fn test_parse_failure_recovery_rate() {
let tracker = Tracker::new().expect("Failed to create tracker");
Expand Down
Loading