Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions tools/wta/src/master/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,80 @@ async fn run_master_loop(cli: Cli, pipe_name: String) -> Result<()> {
});
}


// Periodic disk rescan. The initial scan above only fires once at
// master spawn — so when the user starts their FIRST ever agent CLI
// session in IT (no `~/.copilot/session-state/` / `~/.claude/projects/`
// dirs existed at master boot), the resulting on-disk session uuid
// dir is never picked up until master itself restarts. Hook-based
// live updates *should* cover this, but copilot's SessionStart hook
// observably fires with an empty `session_id` on the first run,
// which routes through `route_agent_event_to_registry_with_hook_sink`
// as a synthetic `pane:<guid>` key and is dropped before reaching
// master (see app.rs:704 — synthetic keys are local-only). Periodic
// rescan recovers from that and any future hook-payload regression:
// whatever the CLI writes to disk surfaces in the registry within
// RESCAN_PERIOD_SECS, and the 5s `session/list` tick on each helper
// then renders it in session management view.
//
// We only insert sessions that are NOT already in the registry —
// never clobber a live row with disk-inferred state (history_loader
// can't see status=Working / Attention / current_tool, which live
// hooks track).
let inner_for_periodic = Arc::clone(&inner);
tokio::task::spawn_local(async move {
const RESCAN_PERIOD_SECS: u64 = 30;
let mut ticker = tokio::time::interval(std::time::Duration::from_secs(RESCAN_PERIOD_SECS));
// Skip the first tick — initial scan above already handled boot.
ticker.tick().await;
loop {
ticker.tick().await;
let scan_started = std::time::Instant::now();
let sessions = match tokio::task::spawn_blocking(|| {
crate::history_loader::load_all()
})
.await
{
Ok(s) => s,
Err(e) => {
tracing::warn!(
target: "master_history",
error = %e,
"periodic history rescan task panicked; will retry next tick"
);
continue;
}
};
let scanned = sessions.len();
let mut added: usize = 0;
for s in &sessions {
let info = crate::session_registry::agent_session_to_session_info(s);
if inner_for_periodic.registry.insert_if_absent(info).await {
added += 1;
}
}
tracing::debug!(
target: "master_history",
scanned,
added,
elapsed_ms = scan_started.elapsed().as_millis() as u64,
"periodic history rescan"
);
if added > 0 {
tracing::info!(
target: "master_history",
added,
"periodic rescan found new on-disk sessions; broadcasting sessions/changed"
);
broadcast_ext_to_helpers(
&inner_for_periodic,
crate::session_registry::build_sessions_changed_notification(),
)
.await;
}
}
});

let client = MasterClient {
state: Arc::clone(&inner),
};
Expand Down
58 changes: 58 additions & 0 deletions tools/wta/src/session_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,17 @@ pub trait SessionRegistry: Send + Sync {
/// twice with the same `session_id` keeps only the latest copy.
async fn upsert(&self, info: SessionInfo);

/// Insert `info` ONLY if no row exists for `info.session_id`. Returns
/// `true` if the insert happened. The check + insert run under a
/// single lock so a concurrent live `apply_event` for the same SID
/// cannot race in between (which would otherwise let an
/// `upsert`-after-`lookup` clobber freshly-set live state like
/// `status=Working` / `current_tool`). Used by the periodic
/// history rescan in master to surface newly-created on-disk
/// sessions without overwriting any live row a hook has already
/// installed in the same window.
async fn insert_if_absent(&self, info: SessionInfo) -> bool;

/// Remove the row for `sid`. Returns the prior value if any (the master
/// uses this both for routing teardown and to know what to broadcast
/// in `session_removed` ext-notifications).
Expand Down Expand Up @@ -843,6 +854,15 @@ impl SessionRegistry for InMemoryRegistry {
upsert_locked(&mut guard, info);
}

async fn insert_if_absent(&self, info: SessionInfo) -> bool {
let mut guard = self.inner.lock().await;
if guard.sessions.contains_key(&info.session_id) {
return false;
}
upsert_locked(&mut guard, info);
true
}

async fn remove(&self, sid: &acp::SessionId) -> Option<SessionInfo> {
let mut guard = self.inner.lock().await;
remove_locked(&mut guard, sid)
Expand Down Expand Up @@ -1346,6 +1366,44 @@ mod tests {
assert_eq!(reg.snapshot().await.len(), 1, "no duplicate rows");
}

#[tokio::test]
async fn insert_if_absent_inserts_missing_row() {
let reg = InMemoryRegistry::new();
let original = info("sess-1", Some("pane-A"));
assert!(reg.insert_if_absent(original.clone()).await);
assert_eq!(reg.lookup(&original.session_id).await, Some(original));
}

#[tokio::test]
async fn insert_if_absent_preserves_existing_row() {
let reg = InMemoryRegistry::new();
let sid = acp::SessionId::new("sess-1".to_string());
assert!(reg.insert_if_absent(info("sess-1", Some("pane-A"))).await);
assert!(!reg.insert_if_absent(info("sess-1", Some("pane-B"))).await);
assert_eq!(
reg.lookup(&sid).await.unwrap().pane_session_id.as_deref(),
Some("pane-A")
);
}

#[tokio::test]
async fn insert_if_absent_allows_only_one_concurrent_insert() {
let reg = Arc::new(InMemoryRegistry::new());
let left = Arc::clone(&reg);
let right = Arc::clone(&reg);
let (left_inserted, right_inserted) = tokio::join!(
async move { left.insert_if_absent(info("sess-1", Some("pane-A"))).await },
async move { right.insert_if_absent(info("sess-1", Some("pane-B"))).await }
);

let inserted = [left_inserted, right_inserted]
.into_iter()
.filter(|inserted| *inserted)
.count();
assert_eq!(inserted, 1);
assert_eq!(reg.snapshot().await.len(), 1);
}

#[tokio::test]
async fn remove_returns_prior_and_subsequent_lookup_is_none() {
let reg = InMemoryRegistry::new();
Expand Down
136 changes: 108 additions & 28 deletions tools/wta/src/shell/wt_channel/cli_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,40 +379,36 @@ impl CliChannel {

/// Start background event listener (wraps `wtcli listen --json`).
/// wtcli inherits WT_COM_CLSID from this process's env.
///
/// If the `wtcli listen` child exits unexpectedly (silent COM blip,
/// transient RPC error, WT restart), we retry **once**. If the retry
/// also exits, we give up — at that point something is fundamentally
/// broken and a tight respawn loop would just spam logs.
///
/// Session-management freshness does not depend on this path: the
/// 30 s master disk rescan in `serve_master` covers new on-disk
/// sessions even when the listener is dead. Autofix and other push
/// events do go dark after a permanent failure, which is the
/// accepted tradeoff for keeping the recovery logic trivial.
pub async fn start_reader(self: &std::sync::Arc<Self>) {
const MAX_ATTEMPTS: u32 = 2;
let wtcli = self.wtcli_path.clone();
let weak = std::sync::Arc::downgrade(self);
tokio::spawn(async move {
let Ok(mut child) = tokio::process::Command::new(&wtcli)
.args(["--json", "listen"])
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::null())
.spawn()
else {
return;
};

let stdout = child.stdout.take().unwrap();
let mut reader = tokio::io::BufReader::new(stdout);
let mut line = String::new();

loop {
line.clear();
use tokio::io::AsyncBufReadExt;
match reader.read_line(&mut line).await {
Ok(0) => break,
Ok(_) => {
let Some(this) = weak.upgrade() else { break };
if let Ok(val) = serde_json::from_str::<serde_json::Value>(line.trim()) {
let tx = this.event_tx.lock().unwrap();
if let Some(tx) = tx.as_ref() {
let _ = tx.send(val);
}
}
}
Err(_) => break,
for attempt in 1..=MAX_ATTEMPTS {
if weak.upgrade().is_none() {
return;
}
run_wtcli_listen_once(&wtcli, &weak, attempt).await;
}
tracing::error!(
target: "wt_channel",
wtcli = %wtcli,
attempts = MAX_ATTEMPTS,
"wtcli listen exited after retry; giving up. \
Push events (autofix, agent_event hooks) are now dark for the lifetime of this helper. \
Session management still recovers via master's 30s disk rescan."
);
});
}

Expand Down Expand Up @@ -440,6 +436,90 @@ impl CliChannel {
}
}

/// Spawn `wtcli --json listen` once, forward each parsed JSON line to the
/// channel's `event_tx`, and return when the child exits (clean EOF or
/// pipe error). Returns control to the caller so it can decide whether to
/// retry. Lifecycle (start / exit reason / events seen) is logged.
async fn run_wtcli_listen_once(
wtcli: &str,
weak: &std::sync::Weak<CliChannel>,
attempt: u32,
) {
use tokio::io::AsyncBufReadExt;

let mut child = match tokio::process::Command::new(wtcli)
.args(["--json", "listen"])
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
// CliChannel-dropped-mid-read → Child dropped without explicit
// kill → kill_on_drop reaps wtcli instead of orphaning it.
.kill_on_drop(true)
.spawn()
{
Ok(c) => {
tracing::info!(target: "wt_channel", wtcli, attempt, "wtcli listen spawned");
c
}
Err(err) => {
tracing::warn!(target: "wt_channel", wtcli, attempt, error = %err, "wtcli listen spawn failed");
return;
}
};

let stdout = match child.stdout.take() {
Some(s) => s,
None => {
tracing::warn!(target: "wt_channel", attempt, "wtcli listen child missing stdout pipe");
let _ = child.start_kill();
let _ = child.wait().await;
return;
}
};

let mut reader = tokio::io::BufReader::new(stdout);
let mut line = String::new();
let mut events_seen: u64 = 0;
let started = std::time::Instant::now();

let exit_reason = loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break "stdout EOF",
Ok(_) => {
let Some(this) = weak.upgrade() else {
let _ = child.start_kill();
let _ = child.wait().await;
return;
};
if let Ok(val) = serde_json::from_str::<serde_json::Value>(line.trim()) {
events_seen += 1;
let tx = this.event_tx.lock().unwrap();
if let Some(tx) = tx.as_ref() {
let _ = tx.send(val);
}
}
}
Err(_) => break "stdout read error",
}
};

// Pipe error doesn't imply the child has exited yet — kill before
// wait so we don't block on a child that's otherwise healthy.
// Harmless on the "stdout EOF" path.
let _ = child.start_kill();
let exit_status = child.wait().await.ok();

tracing::warn!(
target: "wt_channel",
attempt,
events_seen,
exit_reason,
exit_status = ?exit_status,
lived_ms = started.elapsed().as_millis() as u64,
"wtcli listen exited"
);
}

#[async_trait::async_trait]
impl WtChannel for CliChannel {
async fn request(
Expand Down
Loading