diff --git a/tools/wta/src/master/mod.rs b/tools/wta/src/master/mod.rs index 45d8ff630..78b3686b4 100644 --- a/tools/wta/src/master/mod.rs +++ b/tools/wta/src/master/mod.rs @@ -1348,18 +1348,29 @@ async fn run_master_loop(cli: Cli, pipe_name: String) -> Result<()> { // `None` and `handle_focus_session` returns a structured // "focus channel unavailable" error instead of crashing the // helper's ext_method call. - let wt: Option> = + // + // The concrete `Arc` is also kept around so the + // push-based pane-close drainer (`run_master_event_drainer`) + // can subscribe to WT events on the same channel. The trait + // object only exposes request/is_available — the event API + // lives on the concrete type, see CliChannel::subscribe_events + // and the rationale in run_master_event_drainer's doc-comment + // for why we don't extend the trait. + let wt_concrete: Option> = match crate::shell::wt_channel::CliChannel::connect().await { Ok(ch) => Some(Arc::new(ch)), Err(err) => { tracing::warn!( target: "master", error = %err, - "CliChannel unavailable; intellterm.wta/focus_session will error" + "CliChannel unavailable; intellterm.wta/focus_session will error and the pane-close drainer will not run" ); None } }; + let wt: Option> = wt_concrete + .as_ref() + .map(|c| Arc::clone(c) as Arc); let resolved_agent_id = crate::agent_registry::resolve_agent_id_from_cmd(&cli.agent); let cli_source = crate::agent_sessions::CliSource::from_agent_id(resolved_agent_id); tracing::info!( @@ -1379,6 +1390,17 @@ async fn run_master_loop(cli: Cli, pipe_name: String) -> Result<()> { cli_source, }); + // Push-based pane-close reconcile: spawn the WT event drainer on + // the same CliChannel we just constructed. Picks up + // `connection_state: closed` events and drops matching registry + // rows so the F2 session list doesn't strand entries when the + // helper pipe stays alive past pane destruction (Gemini-class + // bug). No-op when wt isn't available. + if let Some(ch) = wt_concrete { + let state_for_drainer = Arc::clone(&inner); + tokio::task::spawn_local(run_master_event_drainer(state_for_drainer, ch)); + } + // Seed the registry with historical sessions scanned from // `~/.copilot/`, `~/.claude/`, `~/.gemini/` so `wta sessions list` // and helper session management viewers see the full set, not just live sessions @@ -1766,6 +1788,194 @@ async fn drop_sessions_for_helper(state: &MasterStateInner, helper_id: HelperId) victims.len() } +/// Drop every registry row bound to `pane_session_id`. Per-sid +/// counterpart to [`drop_sessions_for_helper`] — that one reaps by +/// helper-id (used when a helper pipe disconnects); this one reaps +/// by pane (used when the WT pane closes but the helper somehow +/// stays alive, e.g. Gemini doesn't reliably exit on stdin EOF). +/// +/// Returns the number of rows actually dropped. **Idempotent:** +/// `registry.remove(sid).is_some()` gates the per-sid broadcast so +/// a concurrent caller (e.g. an in-flight `drop_sessions_for_helper` +/// for the same sid) doesn't cause a duplicate `session_removed` to +/// fan out to helpers. The trailing `sessions/changed` only fires if +/// we actually dropped at least one row. +/// +/// Pane id comparison is case-insensitive — WT/wtcli can return +/// either casing in `pane_id`/`session_id`, and the registry's +/// stored `pane_session_id` isn't guaranteed lowercased (only the +/// `active_by_pane` index key is, see `pane_key` in session_registry). +pub(crate) async fn drop_sessions_for_pane( + state: &MasterStateInner, + pane_session_id: &str, +) -> usize { + let pane_lc = pane_session_id.to_ascii_lowercase(); + let snapshot = state.registry.snapshot().await; + let candidates: Vec = snapshot + .iter() + .filter(|s| { + s.pane_session_id + .as_ref() + .map(|p| p.to_ascii_lowercase() == pane_lc) + .unwrap_or(false) + }) + .map(|s| s.session_id.clone()) + .collect(); + if candidates.is_empty() { + return 0; + } + let mut dropped = 0_usize; + for sid in &candidates { + // Lock ordering matches `drop_sessions_for_helper`: + // session_to_helper first (sub-µs hashmap op), release, then + // touch the registry. + { + let mut map = state.session_to_helper.lock().await; + map.remove(sid); + } + if state.registry.remove(sid).await.is_some() { + broadcast_ext_to_helpers( + state, + crate::session_registry::build_session_removed_notification(sid), + ) + .await; + dropped += 1; + } + } + if dropped > 0 { + broadcast_ext_to_helpers( + state, + crate::session_registry::build_sessions_changed_notification(), + ) + .await; + tracing::info!( + target: "master", + op = "drop_sessions_for_pane", + pane_session_id = %pane_lc, + dropped, + "auto-dropped stale registry rows on WT pane close" + ); + } + dropped +} + +/// Parse a single WT event from the `wtcli listen` JSON stream and +/// apply its side effects on master state. Returns the number of +/// rows dropped (0 for everything except a successful +/// `connection_state: closed` cleanup). +/// +/// Extracted from [`run_master_event_drainer`] so the dispatch logic +/// (JSON shape, field-name fallback, state filter) is unit-testable +/// without spawning `wtcli`. +/// +/// Wire format (see `src/cascadia/TerminalApp/TerminalPage.cpp:5076`): +/// +/// ```json +/// { "type": "event", +/// "method": "connection_state", +/// "params": { "state": "closed", "pane_id": "", "tab_id": "?" } } +/// ``` +/// +/// `pane_id` is canonical; `session_id` is a fallback for older wtcli +/// builds that haven't been renamed (mirrors the helper-side fallback +/// in `main.rs:2059`). +/// +/// We deliberately ignore `state == "failed"` — the helper keeps that +/// distinct from `closed` (Error vs Ended) and demoting on `failed` +/// here would conflate two user-visible states. +async fn handle_master_wt_event( + state: &MasterStateInner, + msg: &serde_json::Value, +) -> usize { + if msg.get("type").and_then(|v| v.as_str()) != Some("event") { + return 0; + } + let method = msg.get("method").and_then(|v| v.as_str()).unwrap_or(""); + if method != "connection_state" { + return 0; + } + let Some(params) = msg.get("params") else { + return 0; + }; + let state_str = params + .get("state") + .and_then(|v| v.as_str()) + .unwrap_or(""); + if state_str != "closed" { + return 0; + } + let pane_id = params + .get("pane_id") + .and_then(|v| v.as_str()) + .filter(|s| !s.is_empty()) + .or_else(|| params.get("session_id").and_then(|v| v.as_str())) + .unwrap_or(""); + if pane_id.is_empty() { + tracing::debug!( + target: "master", + op = "wt_event", + "connection_state:closed with no pane_id or session_id; ignoring" + ); + return 0; + } + drop_sessions_for_pane(state, pane_id).await +} + +/// Master-side WT event drainer. Subscribes to the same `wtcli listen` +/// stream the helpers consume and turns `connection_state: closed` +/// events into per-pane registry cleanup. Provides an **automatic** +/// counterpart to `drop_sessions_for_helper` for cases where the +/// helper pipe stays connected even after the WT pane closes — +/// notably Gemini, whose CLI doesn't reliably exit on stdin EOF +/// when its pane is destroyed via Ctrl+Shift+W or tab close. +/// +/// ## Single-subscriber assumption +/// +/// `CliChannel::event_tx` is a singleton `Mutex>` and +/// `start_reader` spawns exactly one `wtcli --json listen` subprocess. +/// Master is the only subscriber on its own `CliChannel` — helpers +/// each construct their own — so single-subscriber holds. If we ever +/// add a second master-side subscriber, refactor `CliChannel` to +/// broadcast first. +/// +/// ## No missed-event window +/// +/// `CliChannel::connect()` only resolves `wtcli` and validates +/// `WT_COM_CLSID`; it does NOT subscribe. WT only starts emitting +/// `connection_state` events for our subscription when the +/// `wtcli listen` subprocess calls `Subscribe()` on the COM server +/// (which itself triggers `_ensurePageEventsRegistered`). So spawning +/// the drainer at master boot doesn't risk dropping events that fired +/// before we subscribed — there were none. +async fn run_master_event_drainer( + state: Arc, + ch: Arc, +) { + let mut event_rx = ch.subscribe_events(); + Arc::clone(&ch).start_reader().await; + + tracing::info!( + target: "master", + "started WT event drainer for push-based pane-close reconcile" + ); + + while let Some(msg) = event_rx.recv().await { + let dropped = handle_master_wt_event(&state, &msg).await; + if dropped > 0 { + tracing::debug!( + target: "master", + op = "wt_event_drained", + dropped, + "applied WT event → registry cleanup" + ); + } + } + tracing::warn!( + target: "master", + "WT event drainer exited — push-based pane-close reconcile no longer active" + ); +} + /// Fan an ACP `ExtNotification` out to every currently-attached helper. /// /// Sends are non-blocking (`UnboundedSender::send` is a sync call that @@ -2215,6 +2425,7 @@ async fn handle_session_focus( #[cfg(test)] mod tests { use super::*; + use crate::session_registry; use acp::{ContentChunk, SessionId, SessionNotification, SessionUpdate}; fn make_state() -> Arc { @@ -2708,6 +2919,330 @@ mod tests { assert_eq!(got, expected); } + // ─── drop_sessions_for_pane / handle_master_wt_event tests ───────────── + + /// Build a registry row with an explicit pane binding for tests. + fn pane_bound_row(sid: &str, pane: Option<&str>) -> session_registry::SessionInfo { + let mut info = + session_registry::SessionInfo::new(SessionId::new(sid), PathBuf::from(format!("/r/{sid}"))); + if let Some(p) = pane { + info = info.with_pane_session_id(p.to_string()); + } + info + } + + /// `drop_sessions_for_pane` drops every row sharing the pane, + /// broadcasts one `session_removed` per dropped sid plus a single + /// trailing `sessions/changed`. Rows without a pane binding or + /// bound to a different pane are untouched. + #[tokio::test] + async fn drop_sessions_for_pane_drops_all_matching_rows() { + let state = make_state(); + let sid_match_1 = SessionId::new("sess-match-1"); + let sid_match_2 = SessionId::new("sess-match-2"); + let sid_other = SessionId::new("sess-other"); + let sid_unbound = SessionId::new("sess-unbound"); + state + .registry + .upsert(pane_bound_row("sess-match-1", Some("pane-target"))) + .await; + state + .registry + .upsert(pane_bound_row("sess-match-2", Some("pane-target"))) + .await; + state + .registry + .upsert(pane_bound_row("sess-other", Some("pane-other"))) + .await; + state + .registry + .upsert(pane_bound_row("sess-unbound", None)) + .await; + let (ext_tx, mut ext_rx) = mpsc::unbounded_channel::(); + { + let mut subs = state.helper_ext_subscribers.lock().await; + subs.insert(HelperId(42), ext_tx); + } + + let dropped = drop_sessions_for_pane(&state, "pane-target").await; + assert_eq!(dropped, 2); + + assert!(state.registry.lookup(&sid_match_1).await.is_none()); + assert!(state.registry.lookup(&sid_match_2).await.is_none()); + assert!(state.registry.lookup(&sid_other).await.is_some()); + assert!(state.registry.lookup(&sid_unbound).await.is_some()); + + let mut removed: Vec = Vec::new(); + let mut changed = 0; + while let Ok(ext) = ext_rx.try_recv() { + match session_registry::parse_ext_notification(&ext) { + session_registry::WtaExtNotification::SessionRemoved(sid) => removed.push(sid), + session_registry::WtaExtNotification::SessionsChanged => changed += 1, + other => panic!("unexpected ext: {other:?}"), + } + } + removed.sort_by(|a, b| a.0.cmp(&b.0)); + let mut expected = vec![sid_match_1, sid_match_2]; + expected.sort_by(|a, b| a.0.cmp(&b.0)); + assert_eq!(removed, expected); + assert_eq!(changed, 1, "single trailing sessions/changed"); + } + + /// No rows match → no broadcasts, no registry mutation. + #[tokio::test] + async fn drop_sessions_for_pane_no_match_is_noop() { + let state = make_state(); + state + .registry + .upsert(pane_bound_row("sess-x", Some("pane-x"))) + .await; + let (ext_tx, mut ext_rx) = mpsc::unbounded_channel::(); + { + let mut subs = state.helper_ext_subscribers.lock().await; + subs.insert(HelperId(1), ext_tx); + } + + let dropped = drop_sessions_for_pane(&state, "pane-not-here").await; + + assert_eq!(dropped, 0); + assert!(state.registry.lookup(&SessionId::new("sess-x")).await.is_some()); + assert!(ext_rx.try_recv().is_err()); + } + + /// Pane id stored in mixed case, event uses lowercase — match must + /// still succeed and the row must drop. Mirrors how WT/wtcli return + /// GUIDs in arbitrary casing. + #[tokio::test] + async fn drop_sessions_for_pane_matches_case_insensitively() { + let state = make_state(); + let sid = SessionId::new("sess-mixed"); + state + .registry + .upsert(pane_bound_row("sess-mixed", Some("AAA-BBB-CCC"))) + .await; + + let dropped = drop_sessions_for_pane(&state, "aaa-bbb-ccc").await; + assert_eq!(dropped, 1); + assert!(state.registry.lookup(&sid).await.is_none()); + } + + /// Historical / disk-only rows (no pane binding) are never matched + /// — `drop_sessions_for_pane` only acts on rows whose + /// `pane_session_id.is_some()`. + #[tokio::test] + async fn drop_sessions_for_pane_skips_rows_without_pane_binding() { + let state = make_state(); + let sid = SessionId::new("hist-1"); + state + .registry + .upsert(pane_bound_row("hist-1", None)) + .await; + + // Even if a caller passes an empty pane (which event_handler + // already filters out, but defense-in-depth), or any non-empty + // pane, the unbound row stays. + let dropped = drop_sessions_for_pane(&state, "any-pane").await; + assert_eq!(dropped, 0); + assert!(state.registry.lookup(&sid).await.is_some()); + } + + /// Idempotency: if a registry row was already removed (e.g. by a + /// concurrent `drop_sessions_for_helper`), the per-sid broadcast + /// must NOT fire a second time. We simulate the race by removing + /// the row from the registry before calling `drop_sessions_for_pane` + /// while still snapshotting it via the in-memory candidate list — + /// the second `registry.remove(sid)` returns None and the gate + /// suppresses the broadcast. + #[tokio::test] + async fn drop_sessions_for_pane_is_idempotent_on_already_dropped_sids() { + let state = make_state(); + let sid = SessionId::new("racy"); + state + .registry + .upsert(pane_bound_row("racy", Some("pane-racy"))) + .await; + let (ext_tx, mut ext_rx) = mpsc::unbounded_channel::(); + { + let mut subs = state.helper_ext_subscribers.lock().await; + subs.insert(HelperId(1), ext_tx); + } + // First call: drops normally. + let first = drop_sessions_for_pane(&state, "pane-racy").await; + assert_eq!(first, 1); + // Second call: row is gone, but pane→snapshot loop finds + // nothing anyway → 0 dropped, no extra broadcasts. + let second = drop_sessions_for_pane(&state, "pane-racy").await; + assert_eq!(second, 0); + + let mut removed: Vec = Vec::new(); + let mut changed = 0; + while let Ok(ext) = ext_rx.try_recv() { + match session_registry::parse_ext_notification(&ext) { + session_registry::WtaExtNotification::SessionRemoved(s) => removed.push(s), + session_registry::WtaExtNotification::SessionsChanged => changed += 1, + other => panic!("unexpected ext: {other:?}"), + } + } + assert_eq!(removed, vec![sid], "exactly one SessionRemoved across both calls"); + assert_eq!(changed, 1, "exactly one sessions/changed"); + } + + /// Happy path: a `connection_state: closed` event with a matching + /// pane drops the row. + #[tokio::test] + async fn handle_master_wt_event_drops_on_connection_state_closed() { + let state = make_state(); + let sid = SessionId::new("sess-close"); + state + .registry + .upsert(pane_bound_row("sess-close", Some("pane-AAA"))) + .await; + + let msg = serde_json::json!({ + "type": "event", + "method": "connection_state", + "params": { "state": "closed", "pane_id": "pane-AAA" } + }); + let dropped = handle_master_wt_event(&state, &msg).await; + assert_eq!(dropped, 1); + assert!(state.registry.lookup(&sid).await.is_none()); + } + + /// Old wtcli builds emit `params["session_id"]` instead of + /// `params["pane_id"]`. Helper-side (`main.rs:2059`) falls back; + /// master must too. + #[tokio::test] + async fn handle_master_wt_event_falls_back_to_session_id_when_pane_id_empty() { + let state = make_state(); + let sid = SessionId::new("sess-old-wtcli"); + state + .registry + .upsert(pane_bound_row("sess-old-wtcli", Some("pane-BBB"))) + .await; + + let msg = serde_json::json!({ + "type": "event", + "method": "connection_state", + "params": { "state": "closed", "session_id": "pane-BBB" } + }); + let dropped = handle_master_wt_event(&state, &msg).await; + assert_eq!(dropped, 1); + assert!(state.registry.lookup(&sid).await.is_none()); + } + + /// `pane_id` is preferred over `session_id` when both are present. + /// The fallback only kicks in when `pane_id` is missing/empty. + #[tokio::test] + async fn handle_master_wt_event_prefers_pane_id_over_session_id() { + let state = make_state(); + let sid_correct = SessionId::new("correct"); + let sid_wrong = SessionId::new("wrong"); + state + .registry + .upsert(pane_bound_row("correct", Some("pane-CORRECT"))) + .await; + state + .registry + .upsert(pane_bound_row("wrong", Some("pane-WRONG"))) + .await; + + let msg = serde_json::json!({ + "type": "event", + "method": "connection_state", + "params": { "state": "closed", "pane_id": "pane-CORRECT", "session_id": "pane-WRONG" } + }); + let dropped = handle_master_wt_event(&state, &msg).await; + assert_eq!(dropped, 1); + assert!(state.registry.lookup(&sid_correct).await.is_none()); + assert!( + state.registry.lookup(&sid_wrong).await.is_some(), + "session_id fallback must NOT fire when pane_id is populated" + ); + } + + /// Non-event messages (responses, errors) must be ignored. + #[tokio::test] + async fn handle_master_wt_event_ignores_non_event_messages() { + let state = make_state(); + state + .registry + .upsert(pane_bound_row("alive", Some("pane-X"))) + .await; + + let response = serde_json::json!({ + "type": "response", + "id": 1, + "result": { "ok": true } + }); + let dropped = handle_master_wt_event(&state, &response).await; + assert_eq!(dropped, 0); + assert!(state.registry.lookup(&SessionId::new("alive")).await.is_some()); + } + + /// vt_sequence and other event methods must be ignored — only + /// connection_state drives drops. + #[tokio::test] + async fn handle_master_wt_event_ignores_non_connection_state_methods() { + let state = make_state(); + state + .registry + .upsert(pane_bound_row("alive", Some("pane-Y"))) + .await; + + let msg = serde_json::json!({ + "type": "event", + "method": "vt_sequence", + "params": { "sequence": "osc:133;A", "pane_id": "pane-Y" } + }); + let dropped = handle_master_wt_event(&state, &msg).await; + assert_eq!(dropped, 0); + assert!(state.registry.lookup(&SessionId::new("alive")).await.is_some()); + } + + /// `connection_state: failed` is preserved as Error/ConnectionFailed + /// on the helper side. Master must NOT drop the row — that would + /// collapse two distinct user-visible states into one (Ended). + #[tokio::test] + async fn handle_master_wt_event_ignores_non_closed_states() { + let state = make_state(); + state + .registry + .upsert(pane_bound_row("err", Some("pane-Z"))) + .await; + + for s in ["failed", "connected", "unknown", "weird"] { + let msg = serde_json::json!({ + "type": "event", + "method": "connection_state", + "params": { "state": s, "pane_id": "pane-Z" } + }); + let dropped = handle_master_wt_event(&state, &msg).await; + assert_eq!(dropped, 0, "state={s} must not drop"); + } + assert!(state.registry.lookup(&SessionId::new("err")).await.is_some()); + } + + /// connection_state:closed with no pane identification at all + /// (neither pane_id nor session_id) is a noop — we can't tell + /// what to drop. Logged at debug. + #[tokio::test] + async fn handle_master_wt_event_ignores_missing_pane_id_and_session_id() { + let state = make_state(); + state + .registry + .upsert(pane_bound_row("alive", Some("pane-W"))) + .await; + + let msg = serde_json::json!({ + "type": "event", + "method": "connection_state", + "params": { "state": "closed" } + }); + let dropped = handle_master_wt_event(&state, &msg).await; + assert_eq!(dropped, 0); + assert!(state.registry.lookup(&SessionId::new("alive")).await.is_some()); + } + /// `route_for` (used by every `MasterClient::` /// forwarder) must return `internal_error` when the agent CLI /// sends a request for a session that no helper has registered