diff --git a/tools/wta/src/master/mod.rs b/tools/wta/src/master/mod.rs index 9483079e0..6ff817b7e 100644 --- a/tools/wta/src/master/mod.rs +++ b/tools/wta/src/master/mod.rs @@ -1583,6 +1583,80 @@ async fn run_master_loop(cli: Cli, pipe_name: String) -> Result<()> { }); } + + // Periodic disk rescan. The initial scan above only fires once at + // master spawn — so when the user starts their FIRST ever agent CLI + // session in IT (no `~/.copilot/session-state/` / `~/.claude/projects/` + // dirs existed at master boot), the resulting on-disk session uuid + // dir is never picked up until master itself restarts. Hook-based + // live updates *should* cover this, but copilot's SessionStart hook + // observably fires with an empty `session_id` on the first run, + // which routes through `route_agent_event_to_registry_with_hook_sink` + // as a synthetic `pane:` key and is dropped before reaching + // master (see app.rs:704 — synthetic keys are local-only). Periodic + // rescan recovers from that and any future hook-payload regression: + // whatever the CLI writes to disk surfaces in the registry within + // RESCAN_PERIOD_SECS, and the 5s `session/list` tick on each helper + // then renders it in session management view. + // + // We only insert sessions that are NOT already in the registry — + // never clobber a live row with disk-inferred state (history_loader + // can't see status=Working / Attention / current_tool, which live + // hooks track). + let inner_for_periodic = Arc::clone(&inner); + tokio::task::spawn_local(async move { + const RESCAN_PERIOD_SECS: u64 = 30; + let mut ticker = tokio::time::interval(std::time::Duration::from_secs(RESCAN_PERIOD_SECS)); + // Skip the first tick — initial scan above already handled boot. + ticker.tick().await; + loop { + ticker.tick().await; + let scan_started = std::time::Instant::now(); + let sessions = match tokio::task::spawn_blocking(|| { + crate::history_loader::load_all() + }) + .await + { + Ok(s) => s, + Err(e) => { + tracing::warn!( + target: "master_history", + error = %e, + "periodic history rescan task panicked; will retry next tick" + ); + continue; + } + }; + let scanned = sessions.len(); + let mut added: usize = 0; + for s in &sessions { + let info = crate::session_registry::agent_session_to_session_info(s); + if inner_for_periodic.registry.insert_if_absent(info).await { + added += 1; + } + } + tracing::debug!( + target: "master_history", + scanned, + added, + elapsed_ms = scan_started.elapsed().as_millis() as u64, + "periodic history rescan" + ); + if added > 0 { + tracing::info!( + target: "master_history", + added, + "periodic rescan found new on-disk sessions; broadcasting sessions/changed" + ); + broadcast_ext_to_helpers( + &inner_for_periodic, + crate::session_registry::build_sessions_changed_notification(), + ) + .await; + } + } + }); + let client = MasterClient { state: Arc::clone(&inner), }; diff --git a/tools/wta/src/session_registry.rs b/tools/wta/src/session_registry.rs index 03cb350eb..08067b4aa 100644 --- a/tools/wta/src/session_registry.rs +++ b/tools/wta/src/session_registry.rs @@ -765,6 +765,17 @@ pub trait SessionRegistry: Send + Sync { /// twice with the same `session_id` keeps only the latest copy. async fn upsert(&self, info: SessionInfo); + /// Insert `info` ONLY if no row exists for `info.session_id`. Returns + /// `true` if the insert happened. The check + insert run under a + /// single lock so a concurrent live `apply_event` for the same SID + /// cannot race in between (which would otherwise let an + /// `upsert`-after-`lookup` clobber freshly-set live state like + /// `status=Working` / `current_tool`). Used by the periodic + /// history rescan in master to surface newly-created on-disk + /// sessions without overwriting any live row a hook has already + /// installed in the same window. + async fn insert_if_absent(&self, info: SessionInfo) -> bool; + /// Remove the row for `sid`. Returns the prior value if any (the master /// uses this both for routing teardown and to know what to broadcast /// in `session_removed` ext-notifications). @@ -843,6 +854,15 @@ impl SessionRegistry for InMemoryRegistry { upsert_locked(&mut guard, info); } + async fn insert_if_absent(&self, info: SessionInfo) -> bool { + let mut guard = self.inner.lock().await; + if guard.sessions.contains_key(&info.session_id) { + return false; + } + upsert_locked(&mut guard, info); + true + } + async fn remove(&self, sid: &acp::SessionId) -> Option { let mut guard = self.inner.lock().await; remove_locked(&mut guard, sid) @@ -1346,6 +1366,44 @@ mod tests { assert_eq!(reg.snapshot().await.len(), 1, "no duplicate rows"); } + #[tokio::test] + async fn insert_if_absent_inserts_missing_row() { + let reg = InMemoryRegistry::new(); + let original = info("sess-1", Some("pane-A")); + assert!(reg.insert_if_absent(original.clone()).await); + assert_eq!(reg.lookup(&original.session_id).await, Some(original)); + } + + #[tokio::test] + async fn insert_if_absent_preserves_existing_row() { + let reg = InMemoryRegistry::new(); + let sid = acp::SessionId::new("sess-1".to_string()); + assert!(reg.insert_if_absent(info("sess-1", Some("pane-A"))).await); + assert!(!reg.insert_if_absent(info("sess-1", Some("pane-B"))).await); + assert_eq!( + reg.lookup(&sid).await.unwrap().pane_session_id.as_deref(), + Some("pane-A") + ); + } + + #[tokio::test] + async fn insert_if_absent_allows_only_one_concurrent_insert() { + let reg = Arc::new(InMemoryRegistry::new()); + let left = Arc::clone(®); + let right = Arc::clone(®); + let (left_inserted, right_inserted) = tokio::join!( + async move { left.insert_if_absent(info("sess-1", Some("pane-A"))).await }, + async move { right.insert_if_absent(info("sess-1", Some("pane-B"))).await } + ); + + let inserted = [left_inserted, right_inserted] + .into_iter() + .filter(|inserted| *inserted) + .count(); + assert_eq!(inserted, 1); + assert_eq!(reg.snapshot().await.len(), 1); + } + #[tokio::test] async fn remove_returns_prior_and_subsequent_lookup_is_none() { let reg = InMemoryRegistry::new(); diff --git a/tools/wta/src/shell/wt_channel/cli_channel.rs b/tools/wta/src/shell/wt_channel/cli_channel.rs index aa2916ce9..7a169c8c4 100644 --- a/tools/wta/src/shell/wt_channel/cli_channel.rs +++ b/tools/wta/src/shell/wt_channel/cli_channel.rs @@ -379,40 +379,36 @@ impl CliChannel { /// Start background event listener (wraps `wtcli listen --json`). /// wtcli inherits WT_COM_CLSID from this process's env. + /// + /// If the `wtcli listen` child exits unexpectedly (silent COM blip, + /// transient RPC error, WT restart), we retry **once**. If the retry + /// also exits, we give up — at that point something is fundamentally + /// broken and a tight respawn loop would just spam logs. + /// + /// Session-management freshness does not depend on this path: the + /// 30 s master disk rescan in `serve_master` covers new on-disk + /// sessions even when the listener is dead. Autofix and other push + /// events do go dark after a permanent failure, which is the + /// accepted tradeoff for keeping the recovery logic trivial. pub async fn start_reader(self: &std::sync::Arc) { + const MAX_ATTEMPTS: u32 = 2; let wtcli = self.wtcli_path.clone(); let weak = std::sync::Arc::downgrade(self); tokio::spawn(async move { - let Ok(mut child) = tokio::process::Command::new(&wtcli) - .args(["--json", "listen"]) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::null()) - .spawn() - else { - return; - }; - - let stdout = child.stdout.take().unwrap(); - let mut reader = tokio::io::BufReader::new(stdout); - let mut line = String::new(); - - loop { - line.clear(); - use tokio::io::AsyncBufReadExt; - match reader.read_line(&mut line).await { - Ok(0) => break, - Ok(_) => { - let Some(this) = weak.upgrade() else { break }; - if let Ok(val) = serde_json::from_str::(line.trim()) { - let tx = this.event_tx.lock().unwrap(); - if let Some(tx) = tx.as_ref() { - let _ = tx.send(val); - } - } - } - Err(_) => break, + for attempt in 1..=MAX_ATTEMPTS { + if weak.upgrade().is_none() { + return; } + run_wtcli_listen_once(&wtcli, &weak, attempt).await; } + tracing::error!( + target: "wt_channel", + wtcli = %wtcli, + attempts = MAX_ATTEMPTS, + "wtcli listen exited after retry; giving up. \ + Push events (autofix, agent_event hooks) are now dark for the lifetime of this helper. \ + Session management still recovers via master's 30s disk rescan." + ); }); } @@ -440,6 +436,90 @@ impl CliChannel { } } +/// Spawn `wtcli --json listen` once, forward each parsed JSON line to the +/// channel's `event_tx`, and return when the child exits (clean EOF or +/// pipe error). Returns control to the caller so it can decide whether to +/// retry. Lifecycle (start / exit reason / events seen) is logged. +async fn run_wtcli_listen_once( + wtcli: &str, + weak: &std::sync::Weak, + attempt: u32, +) { + use tokio::io::AsyncBufReadExt; + + let mut child = match tokio::process::Command::new(wtcli) + .args(["--json", "listen"]) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + // CliChannel-dropped-mid-read → Child dropped without explicit + // kill → kill_on_drop reaps wtcli instead of orphaning it. + .kill_on_drop(true) + .spawn() + { + Ok(c) => { + tracing::info!(target: "wt_channel", wtcli, attempt, "wtcli listen spawned"); + c + } + Err(err) => { + tracing::warn!(target: "wt_channel", wtcli, attempt, error = %err, "wtcli listen spawn failed"); + return; + } + }; + + let stdout = match child.stdout.take() { + Some(s) => s, + None => { + tracing::warn!(target: "wt_channel", attempt, "wtcli listen child missing stdout pipe"); + let _ = child.start_kill(); + let _ = child.wait().await; + return; + } + }; + + let mut reader = tokio::io::BufReader::new(stdout); + let mut line = String::new(); + let mut events_seen: u64 = 0; + let started = std::time::Instant::now(); + + let exit_reason = loop { + line.clear(); + match reader.read_line(&mut line).await { + Ok(0) => break "stdout EOF", + Ok(_) => { + let Some(this) = weak.upgrade() else { + let _ = child.start_kill(); + let _ = child.wait().await; + return; + }; + if let Ok(val) = serde_json::from_str::(line.trim()) { + events_seen += 1; + let tx = this.event_tx.lock().unwrap(); + if let Some(tx) = tx.as_ref() { + let _ = tx.send(val); + } + } + } + Err(_) => break "stdout read error", + } + }; + + // Pipe error doesn't imply the child has exited yet — kill before + // wait so we don't block on a child that's otherwise healthy. + // Harmless on the "stdout EOF" path. + let _ = child.start_kill(); + let exit_status = child.wait().await.ok(); + + tracing::warn!( + target: "wt_channel", + attempt, + events_seen, + exit_reason, + exit_status = ?exit_status, + lived_ms = started.elapsed().as_millis() as u64, + "wtcli listen exited" + ); +} + #[async_trait::async_trait] impl WtChannel for CliChannel { async fn request(