diff --git a/src/browser/stores/WorkspaceStore.test.ts b/src/browser/stores/WorkspaceStore.test.ts index af4febbf15..ce6ca6b067 100644 --- a/src/browser/stores/WorkspaceStore.test.ts +++ b/src/browser/stores/WorkspaceStore.test.ts @@ -2720,6 +2720,122 @@ describe("WorkspaceStore", () => { }); }); + it("preserves internal resume metadata across background handoffs", async () => { + const activeWorkspaceId = "active-workspace-internal-resume-background"; + const backgroundWorkspaceId = "background-workspace-internal-resume-background"; + const initialRecency = new Date("2024-01-06T00:00:00.000Z").getTime(); + + const backgroundStreamingSnapshot: WorkspaceActivitySnapshot = { + recency: initialRecency, + streaming: true, + streamingGeneration: 1, + lastModel: "claude-sonnet-4", + lastThinkingLevel: null, + }; + + let releaseBackgroundCompletion!: () => void; + const backgroundCompletionReady = new Promise((resolve) => { + releaseBackgroundCompletion = resolve; + }); + + mockActivityList.mockResolvedValue({ + [backgroundWorkspaceId]: backgroundStreamingSnapshot, + }); + + mockActivitySubscribe.mockImplementation(async function* ( + _input?: void, + options?: { signal?: AbortSignal } + ): AsyncGenerator { + await backgroundCompletionReady; + if (options?.signal?.aborted) { + return; + } + + yield { + type: "activity" as const, + workspaceId: backgroundWorkspaceId, + activity: { + ...backgroundStreamingSnapshot, + streamingGeneration: 2, + }, + }; + + yield { + type: "activity" as const, + workspaceId: backgroundWorkspaceId, + activity: { + ...backgroundStreamingSnapshot, + recency: initialRecency + 1, + streaming: false, + streamingGeneration: 2, + }, + }; + + await waitForAbortSignal(options?.signal); + }); + + mockChatStreamFor(backgroundWorkspaceId, function* () { + yield { + type: "message", + id: "internal-resume-compaction-request", + role: "user", + parts: [{ type: "text", text: "/compact" }], + metadata: { + historySequence: 1, + timestamp: Date.now(), + muxMetadata: { + type: "compaction-request", + rawCommand: "/compact", + parsed: { + model: "claude-sonnet-4", + followUpContent: { + text: "Continue", + model: "claude-sonnet-4", + agentId: "exec", + dispatchOptions: { source: "internal-resume" }, + }, + }, + }, + }, + }; + + yield { + type: "stream-start", + workspaceId: backgroundWorkspaceId, + messageId: "compaction-stream", + historySequence: 2, + model: "claude-sonnet-4", + startTime: Date.now(), + mode: "exec", + }; + + yield { type: "caught-up", hasOlderHistory: false }; + }); + + const onResponseComplete = createResponseCompleteSpy(); + + recreateStore(onResponseComplete); + + createAndAddWorkspace(store, backgroundWorkspaceId); + + const sawCompactingStream = await waitUntil( + () => store.getWorkspaceState(backgroundWorkspaceId).isCompacting + ); + expect(sawCompactingStream).toBe(true); + + createAndAddWorkspace(store, activeWorkspaceId); + + releaseBackgroundCompletion(); + await tick(0); + + expectResponseComplete(onResponseComplete, { + workspaceId: backgroundWorkspaceId, + isFinal: true, + completion: { kind: "compaction", hasAutoFollowUp: true, suppressNotification: true }, + completedAt: initialRecency + 1, + }); + }); + it("preserves queued auto-follow-up metadata for background completion callbacks", async () => { const activeWorkspaceId = "active-workspace-queued-follow-up-background"; const backgroundWorkspaceId = "background-workspace-queued-follow-up-background"; diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index 6822897317..7b2b5f5fdc 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -2474,9 +2474,9 @@ export class WorkspaceStore { previous.streamingGeneration !== snapshot.streamingGeneration; if (didBackgroundStreamingGenerationAdvance) { // Background activity snapshots continue across handoffs even after onChat unsubscribes. - // Once a newer stream generation appears, any cached live stream contexts are stale and - // must not suppress the terminal notification for the new background turn. - this.aggregators.get(workspaceId)?.clearActiveStreams(); + // Let the aggregator decide whether any stream-scoped completion metadata should + // survive the handoff before clearing stale live stream contexts. + this.aggregators.get(workspaceId)?.handleBackgroundStreamingGenerationAdvance(); } const stoppedStreamingSnapshot = @@ -2512,12 +2512,13 @@ export class WorkspaceStore { // Activity snapshots don't include message/content metadata. Reuse any // still-active stream context captured before this workspace was backgrounded // so queued follow-up handoffs remain suppressible in App notifications. + const completion = wasIdleCompaction + ? createIdleCompactionCompletion(backgroundCompletion?.hasAutoFollowUp ?? false) + : backgroundCompletion; this.emitResponseComplete({ workspaceId, isFinal: true, - completion: wasIdleCompaction - ? createIdleCompactionCompletion(backgroundCompletion?.hasAutoFollowUp ?? false) - : backgroundCompletion, + completion, completedAt: stoppedStreamingSnapshot.recency, }); } @@ -2526,6 +2527,7 @@ export class WorkspaceStore { // Inactive workspaces do not receive stream-end events via onChat. Once // activity confirms streaming stopped, clear stale stream contexts so they // cannot leak compaction metadata into future completion callbacks. + this.aggregators.get(workspaceId)?.clearBackgroundHandoffCompletion(); this.aggregators.get(workspaceId)?.clearActiveStreams(); this.aggregators.get(workspaceId)?.clearPendingStreamStart(); } diff --git a/src/browser/utils/messages/StreamingMessageAggregator.test.ts b/src/browser/utils/messages/StreamingMessageAggregator.test.ts index df0cfabbd6..e6976beb9d 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.test.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.test.ts @@ -1983,6 +1983,129 @@ describe("StreamingMessageAggregator", () => { expect(aggregator.isCompacting()).toBe(false); }); + + test("suppresses response notifications for default post-compaction Continue resumes", () => { + const workspaceId = "test-workspace-default-continue-notify"; + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT, workspaceId); + let completion: Parameters[0]; + aggregator.onResponseComplete = (event) => { + completion = event.completion; + }; + + const summaryMessage = createMuxMessage("summary-default-continue", "assistant", "Summary", { + historySequence: 1, + timestamp: Date.now(), + compactionBoundary: true, + muxMetadata: { + type: "compaction-summary", + pendingFollowUp: { + text: "Continue", + model: "anthropic:claude-3-5-haiku-20241022", + dispatchOptions: { source: "internal-resume" }, + agentId: "exec", + }, + }, + }); + + aggregator.loadHistoricalMessages([summaryMessage], true); + aggregator.handleMessage({ + ...createMuxMessage("synthetic-default-continue", "user", "Continue", { + historySequence: 2, + timestamp: Date.now(), + synthetic: true, + uiVisible: true, + }), + type: "message", + }); + + aggregator.handleStreamStart({ + type: "stream-start", + workspaceId, + messageId: "continue-stream", + historySequence: 3, + model: "anthropic:claude-3-5-haiku-20241022", + startTime: Date.now(), + agentId: "exec", + mode: "exec", + }); + aggregator.handleStreamEnd({ + type: "stream-end", + workspaceId, + messageId: "continue-stream", + metadata: { + historySequence: 3, + timestamp: Date.now(), + model: "anthropic:claude-3-5-haiku-20241022", + }, + parts: [{ type: "text", text: "Done" }], + }); + + expect(completion).toEqual({ + kind: "response", + hasAutoFollowUp: false, + suppressNotification: true, + }); + expect(shouldNotifyOnResponseComplete(completion)).toBe(false); + }); + + test("does not suppress response notifications for user-authored Continue follow-ups", () => { + const workspaceId = "test-workspace-user-follow-up-notify"; + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT, workspaceId); + let completion: Parameters[0]; + aggregator.onResponseComplete = (event) => { + completion = event.completion; + }; + + const summaryMessage = createMuxMessage("summary-user-follow-up", "assistant", "Summary", { + historySequence: 1, + timestamp: Date.now(), + compactionBoundary: true, + muxMetadata: { + type: "compaction-summary", + pendingFollowUp: { + text: "Continue", + model: "anthropic:claude-3-5-haiku-20241022", + agentId: "exec", + }, + }, + }); + + aggregator.loadHistoricalMessages([summaryMessage], true); + aggregator.handleMessage({ + ...createMuxMessage("synthetic-user-follow-up", "user", "Continue", { + historySequence: 2, + timestamp: Date.now(), + synthetic: true, + uiVisible: true, + }), + type: "message", + }); + + aggregator.handleStreamStart({ + type: "stream-start", + workspaceId, + messageId: "follow-up-stream", + historySequence: 3, + model: "anthropic:claude-3-5-haiku-20241022", + startTime: Date.now(), + agentId: "exec", + mode: "exec", + }); + aggregator.handleStreamEnd({ + type: "stream-end", + workspaceId, + messageId: "follow-up-stream", + metadata: { + historySequence: 3, + timestamp: Date.now(), + model: "anthropic:claude-3-5-haiku-20241022", + }, + parts: [{ type: "text", text: "Done" }], + }); + + expect(completion).toBeUndefined(); + expect(shouldNotifyOnResponseComplete(completion)).toBe(true); + }); }); describe("pending stream model", () => { diff --git a/src/browser/utils/messages/StreamingMessageAggregator.ts b/src/browser/utils/messages/StreamingMessageAggregator.ts index 76987e5bb6..de99ff4c00 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.ts @@ -7,7 +7,11 @@ import type { InlineSkillSnapshotMap, AgentSkillReference, } from "@/common/types/message"; -import { createMuxMessage, getCompactionFollowUpContent } from "@/common/types/message"; +import { + createMuxMessage, + getCompactionFollowUpContent, + isCompactionSummaryMetadata, +} from "@/common/types/message"; import { copyStreamLifecycleSnapshot, @@ -131,6 +135,7 @@ interface StreamingContext { // background activity completion can still suppress intermediate notifications // after the workspace loses its live queued-message subscription. hasQueuedFollowUp: boolean; + suppressNotification: boolean; isReplay: boolean; model: string; routedThroughGateway?: boolean; @@ -410,6 +415,9 @@ export class StreamingMessageAggregator { private messages = new Map(); private activeStreams = new Map(); + private backgroundHandoffCompletion: ReturnType = + undefined; + // Derived value cache - invalidated as a unit on every mutation. // Adding a new cached value? Add it here and it will auto-invalidate. private displayedMessageCache = new Map< @@ -1412,6 +1420,29 @@ export class StreamingMessageAggregator { }; } + private isDefaultPostCompactionContinueTurn(): boolean { + const messages = this.getAllMessages(); + const latestMessage = messages.at(-1); + const previousMessage = messages.at(-2); + if (latestMessage?.role !== "user" || previousMessage?.role !== "assistant") { + return false; + } + + if (latestMessage.metadata?.synthetic !== true) { + return false; + } + + const summaryMetadata = previousMessage.metadata?.muxMetadata; + if (!isCompactionSummaryMetadata(summaryMetadata)) { + return false; + } + + // The backend marks internal post-compaction resumes at the compaction follow-up + // source. This frontend check preserves the policy for replay/tests where only + // the synthetic user row and compaction summary are available. + return summaryMetadata.pendingFollowUp?.dispatchOptions?.source === "internal-resume"; + } + private setPendingStreamStartTime(time: number | null): void { this.pendingStreamStartTime = time; if (time === null) { @@ -1600,7 +1631,21 @@ export class StreamingMessageAggregator { } getActiveResponseCompleteMetadata() { - return buildAggregateResponseCompleteMetadata(this.activeStreams.values()); + return ( + buildAggregateResponseCompleteMetadata(this.activeStreams.values()) ?? + this.backgroundHandoffCompletion + ); + } + + handleBackgroundStreamingGenerationAdvance(): void { + const completion = buildAggregateResponseCompleteMetadata(this.activeStreams.values()); + this.backgroundHandoffCompletion = + completion?.suppressNotification === true ? completion : undefined; + this.clearActiveStreams(); + } + + clearBackgroundHandoffCompletion(): void { + this.backgroundHandoffCompletion = undefined; } setActiveQueuedFollowUp(hasQueuedFollowUp: boolean): void { @@ -1807,8 +1852,13 @@ export class StreamingMessageAggregator { // stream starts, older contexts must not keep suppressing later completions. activeStream.hasQueuedFollowUp = false; } + this.backgroundHandoffCompletion = undefined; const routeProvider = resolveRouteProvider(data.routeProvider, data.routedThroughGateway); + const suppressNotification = + this.isDefaultPostCompactionContinueTurn() || + this.getLatestUnresolvedCompactionRequest()?.parsed.followUpContent?.dispatchOptions + ?.source === "internal-resume"; const now = Date.now(); const context: StreamingContext = { serverStartTime: data.startTime, @@ -1819,6 +1869,7 @@ export class StreamingMessageAggregator { isIdleCompaction, hasCompactionContinue, hasQueuedFollowUp: false, + suppressNotification, isReplay: data.replay === true, model: data.model, routedThroughGateway: data.routedThroughGateway, diff --git a/src/browser/utils/messages/responseCompletionMetadata.ts b/src/browser/utils/messages/responseCompletionMetadata.ts index a20782ecee..d47c75416f 100644 --- a/src/browser/utils/messages/responseCompletionMetadata.ts +++ b/src/browser/utils/messages/responseCompletionMetadata.ts @@ -1,16 +1,21 @@ +interface ResponseNotificationPolicy { + /** Suppress notify-on-response for synthetic implementation-detail turns. */ + suppressNotification?: boolean; +} + export type ResponseCompleteMetadata = - | { + | ({ kind: "response"; // Notification policy should follow the user-visible terminal turn rather than every // intermediate stream boundary. Another queued/auto-dispatched follow-up means this // completion is only a handoff, so it should not notify on its own. hasAutoFollowUp: boolean; - } - | { + } & ResponseNotificationPolicy) + | ({ kind: "compaction"; hasAutoFollowUp: boolean; isIdle?: boolean; - }; + } & ResponseNotificationPolicy); export interface ResponseCompleteEvent { workspaceId: string; @@ -27,6 +32,8 @@ export interface ResponseCompletionState { isCompacting: boolean; hasCompactionContinue: boolean; hasQueuedFollowUp: boolean; + /** This stream is a synthetic implementation-detail turn and should not alert. */ + suppressNotification?: boolean; // Idle compaction is maintenance work, so downstream notification policy must // be able to suppress the final completion even when the workspace is selected. isIdleCompaction?: boolean; @@ -36,7 +43,8 @@ export function buildResponseCompleteMetadata( state: ResponseCompletionState ): ResponseCompleteMetadata | undefined { const hasAutoFollowUp = state.hasCompactionContinue || state.hasQueuedFollowUp; - if (!state.isCompacting && !hasAutoFollowUp) { + const suppressNotification = state.suppressNotification === true; + if (!state.isCompacting && !hasAutoFollowUp && !suppressNotification) { return undefined; } @@ -45,12 +53,14 @@ export function buildResponseCompleteMetadata( kind: "compaction", hasAutoFollowUp, ...(state.isIdleCompaction ? { isIdle: true } : {}), + ...(suppressNotification ? { suppressNotification: true } : {}), }; } return { kind: "response", hasAutoFollowUp, + ...(suppressNotification ? { suppressNotification: true } : {}), }; } @@ -60,12 +70,14 @@ export function buildAggregateResponseCompleteMetadata( let isCompacting = false; let hasCompactionContinue = false; let hasQueuedFollowUp = false; + let suppressNotification = false; let isIdleCompaction = false; for (const state of states) { isCompacting ||= state.isCompacting; hasCompactionContinue ||= state.hasCompactionContinue; hasQueuedFollowUp ||= state.hasQueuedFollowUp; + suppressNotification ||= state.suppressNotification === true; isIdleCompaction ||= state.isIdleCompaction === true; } @@ -73,6 +85,7 @@ export function buildAggregateResponseCompleteMetadata( isCompacting, hasCompactionContinue, hasQueuedFollowUp, + suppressNotification, isIdleCompaction, }); } @@ -88,6 +101,10 @@ export function createIdleCompactionCompletion(hasAutoFollowUp: boolean): Respon export function shouldNotifyOnResponseComplete( completion: ResponseCompleteMetadata | undefined ): boolean { + if (completion?.suppressNotification === true) { + return false; + } + if (completion?.kind === "compaction" && completion.isIdle) { return false; } diff --git a/src/common/types/message.ts b/src/common/types/message.ts index 3f60426877..36ee1fc269 100644 --- a/src/common/types/message.ts +++ b/src/common/types/message.ts @@ -108,6 +108,8 @@ export function pickStartupRetrySendOptions( } export interface CompactionFollowUpDispatchOptions { + /** Source marker for internal resume follow-ups, not user-authored prompts. */ + source?: "internal-resume"; /** Skip the queued follow-up instead of replaying it later if the workspace stopped being idle. */ requireIdle?: boolean; } diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index b70719448c..4d856324d6 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -2859,7 +2859,20 @@ export class AgentSession { toolPolicy: [{ regex_match: ".*", action: "disable" }], }; - const messageText = buildCompactionMessageText({ followUpContent: params.followUpContent }); + const followUpContent: CompactionFollowUpRequest = + params.reason === "mid-stream" + ? { + ...params.followUpContent, + dispatchOptions: { + ...params.followUpContent.dispatchOptions, + // Mid-stream compaction resumes with a generated "Continue" sentinel; unlike + // on-send compaction, it is not the user's original prompt completing. + source: "internal-resume", + }, + } + : params.followUpContent; + + const messageText = buildCompactionMessageText({ followUpContent }); const metadata: MuxMessageMetadata = { type: "compaction-request", @@ -2867,7 +2880,7 @@ export class AgentSession { commandPrefix: "/compact", parsed: { model: sendOptions.model, - followUpContent: params.followUpContent, + followUpContent, }, requestedModel: sendOptions.model, source: "auto-compaction",