Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions src/browser/stores/WorkspaceStore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2720,6 +2720,122 @@ describe("WorkspaceStore", () => {
});
});

it("preserves internal resume metadata across background handoffs", async () => {
const activeWorkspaceId = "active-workspace-internal-resume-background";
const backgroundWorkspaceId = "background-workspace-internal-resume-background";
const initialRecency = new Date("2024-01-06T00:00:00.000Z").getTime();

const backgroundStreamingSnapshot: WorkspaceActivitySnapshot = {
recency: initialRecency,
streaming: true,
streamingGeneration: 1,
lastModel: "claude-sonnet-4",
lastThinkingLevel: null,
};

let releaseBackgroundCompletion!: () => void;
const backgroundCompletionReady = new Promise<void>((resolve) => {
releaseBackgroundCompletion = resolve;
});

mockActivityList.mockResolvedValue({
[backgroundWorkspaceId]: backgroundStreamingSnapshot,
});

mockActivitySubscribe.mockImplementation(async function* (
_input?: void,
options?: { signal?: AbortSignal }
): AsyncGenerator<WorkspaceActivityEvent, void, unknown> {
await backgroundCompletionReady;
if (options?.signal?.aborted) {
return;
}

yield {
type: "activity" as const,
workspaceId: backgroundWorkspaceId,
activity: {
...backgroundStreamingSnapshot,
streamingGeneration: 2,
},
};

yield {
type: "activity" as const,
workspaceId: backgroundWorkspaceId,
activity: {
...backgroundStreamingSnapshot,
recency: initialRecency + 1,
streaming: false,
streamingGeneration: 2,
},
};

await waitForAbortSignal(options?.signal);
});

mockChatStreamFor(backgroundWorkspaceId, function* () {
yield {
type: "message",
id: "internal-resume-compaction-request",
role: "user",
parts: [{ type: "text", text: "/compact" }],
metadata: {
historySequence: 1,
timestamp: Date.now(),
muxMetadata: {
type: "compaction-request",
rawCommand: "/compact",
parsed: {
model: "claude-sonnet-4",
followUpContent: {
text: "Continue",
model: "claude-sonnet-4",
agentId: "exec",
dispatchOptions: { source: "internal-resume" },
},
},
},
},
};

yield {
type: "stream-start",
workspaceId: backgroundWorkspaceId,
messageId: "compaction-stream",
historySequence: 2,
model: "claude-sonnet-4",
startTime: Date.now(),
mode: "exec",
};

yield { type: "caught-up", hasOlderHistory: false };
});

const onResponseComplete = createResponseCompleteSpy();

recreateStore(onResponseComplete);

createAndAddWorkspace(store, backgroundWorkspaceId);

const sawCompactingStream = await waitUntil(
() => store.getWorkspaceState(backgroundWorkspaceId).isCompacting
);
expect(sawCompactingStream).toBe(true);

createAndAddWorkspace(store, activeWorkspaceId);

releaseBackgroundCompletion();
await tick(0);

expectResponseComplete(onResponseComplete, {
workspaceId: backgroundWorkspaceId,
isFinal: true,
completion: { kind: "compaction", hasAutoFollowUp: true, suppressNotification: true },
completedAt: initialRecency + 1,
});
});

it("preserves queued auto-follow-up metadata for background completion callbacks", async () => {
const activeWorkspaceId = "active-workspace-queued-follow-up-background";
const backgroundWorkspaceId = "background-workspace-queued-follow-up-background";
Expand Down
14 changes: 8 additions & 6 deletions src/browser/stores/WorkspaceStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2474,9 +2474,9 @@ export class WorkspaceStore {
previous.streamingGeneration !== snapshot.streamingGeneration;
if (didBackgroundStreamingGenerationAdvance) {
// Background activity snapshots continue across handoffs even after onChat unsubscribes.
// Once a newer stream generation appears, any cached live stream contexts are stale and
// must not suppress the terminal notification for the new background turn.
this.aggregators.get(workspaceId)?.clearActiveStreams();
// Let the aggregator decide whether any stream-scoped completion metadata should
// survive the handoff before clearing stale live stream contexts.
this.aggregators.get(workspaceId)?.handleBackgroundStreamingGenerationAdvance();
}

const stoppedStreamingSnapshot =
Expand Down Expand Up @@ -2512,12 +2512,13 @@ export class WorkspaceStore {
// Activity snapshots don't include message/content metadata. Reuse any
// still-active stream context captured before this workspace was backgrounded
// so queued follow-up handoffs remain suppressible in App notifications.
const completion = wasIdleCompaction
? createIdleCompactionCompletion(backgroundCompletion?.hasAutoFollowUp ?? false)
: backgroundCompletion;
this.emitResponseComplete({
workspaceId,
isFinal: true,
completion: wasIdleCompaction
? createIdleCompactionCompletion(backgroundCompletion?.hasAutoFollowUp ?? false)
: backgroundCompletion,
completion,
completedAt: stoppedStreamingSnapshot.recency,
});
}
Expand All @@ -2526,6 +2527,7 @@ export class WorkspaceStore {
// Inactive workspaces do not receive stream-end events via onChat. Once
// activity confirms streaming stopped, clear stale stream contexts so they
// cannot leak compaction metadata into future completion callbacks.
this.aggregators.get(workspaceId)?.clearBackgroundHandoffCompletion();
this.aggregators.get(workspaceId)?.clearActiveStreams();
this.aggregators.get(workspaceId)?.clearPendingStreamStart();
}
Expand Down
123 changes: 123 additions & 0 deletions src/browser/utils/messages/StreamingMessageAggregator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1983,6 +1983,129 @@ describe("StreamingMessageAggregator", () => {

expect(aggregator.isCompacting()).toBe(false);
});

test("suppresses response notifications for default post-compaction Continue resumes", () => {
const workspaceId = "test-workspace-default-continue-notify";
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT, workspaceId);
let completion: Parameters<typeof shouldNotifyOnResponseComplete>[0];
aggregator.onResponseComplete = (event) => {
completion = event.completion;
};

const summaryMessage = createMuxMessage("summary-default-continue", "assistant", "Summary", {
historySequence: 1,
timestamp: Date.now(),
compactionBoundary: true,
muxMetadata: {
type: "compaction-summary",
pendingFollowUp: {
text: "Continue",
model: "anthropic:claude-3-5-haiku-20241022",
dispatchOptions: { source: "internal-resume" },
agentId: "exec",
},
},
});

aggregator.loadHistoricalMessages([summaryMessage], true);
aggregator.handleMessage({
...createMuxMessage("synthetic-default-continue", "user", "Continue", {
historySequence: 2,
timestamp: Date.now(),
synthetic: true,
uiVisible: true,
}),
type: "message",
});

aggregator.handleStreamStart({
type: "stream-start",
workspaceId,
messageId: "continue-stream",
historySequence: 3,
model: "anthropic:claude-3-5-haiku-20241022",
startTime: Date.now(),
agentId: "exec",
mode: "exec",
});
aggregator.handleStreamEnd({
type: "stream-end",
workspaceId,
messageId: "continue-stream",
metadata: {
historySequence: 3,
timestamp: Date.now(),
model: "anthropic:claude-3-5-haiku-20241022",
},
parts: [{ type: "text", text: "Done" }],
});

expect(completion).toEqual({
kind: "response",
hasAutoFollowUp: false,
suppressNotification: true,
});
expect(shouldNotifyOnResponseComplete(completion)).toBe(false);
});

test("does not suppress response notifications for user-authored Continue follow-ups", () => {
const workspaceId = "test-workspace-user-follow-up-notify";
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT, workspaceId);
let completion: Parameters<typeof shouldNotifyOnResponseComplete>[0];
aggregator.onResponseComplete = (event) => {
completion = event.completion;
};

const summaryMessage = createMuxMessage("summary-user-follow-up", "assistant", "Summary", {
historySequence: 1,
timestamp: Date.now(),
compactionBoundary: true,
muxMetadata: {
type: "compaction-summary",
pendingFollowUp: {
text: "Continue",
model: "anthropic:claude-3-5-haiku-20241022",
agentId: "exec",
},
},
});

aggregator.loadHistoricalMessages([summaryMessage], true);
aggregator.handleMessage({
...createMuxMessage("synthetic-user-follow-up", "user", "Continue", {
historySequence: 2,
timestamp: Date.now(),
synthetic: true,
uiVisible: true,
}),
type: "message",
});

aggregator.handleStreamStart({
type: "stream-start",
workspaceId,
messageId: "follow-up-stream",
historySequence: 3,
model: "anthropic:claude-3-5-haiku-20241022",
startTime: Date.now(),
agentId: "exec",
mode: "exec",
});
aggregator.handleStreamEnd({
type: "stream-end",
workspaceId,
messageId: "follow-up-stream",
metadata: {
historySequence: 3,
timestamp: Date.now(),
model: "anthropic:claude-3-5-haiku-20241022",
},
parts: [{ type: "text", text: "Done" }],
});

expect(completion).toBeUndefined();
expect(shouldNotifyOnResponseComplete(completion)).toBe(true);
});
});

describe("pending stream model", () => {
Expand Down
55 changes: 53 additions & 2 deletions src/browser/utils/messages/StreamingMessageAggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import type {
InlineSkillSnapshotMap,
AgentSkillReference,
} from "@/common/types/message";
import { createMuxMessage, getCompactionFollowUpContent } from "@/common/types/message";
import {
createMuxMessage,
getCompactionFollowUpContent,
isCompactionSummaryMetadata,
} from "@/common/types/message";

import {
copyStreamLifecycleSnapshot,
Expand Down Expand Up @@ -131,6 +135,7 @@ interface StreamingContext {
// background activity completion can still suppress intermediate notifications
// after the workspace loses its live queued-message subscription.
hasQueuedFollowUp: boolean;
suppressNotification: boolean;
isReplay: boolean;
model: string;
routedThroughGateway?: boolean;
Expand Down Expand Up @@ -410,6 +415,9 @@ export class StreamingMessageAggregator {
private messages = new Map<string, MuxMessage>();
private activeStreams = new Map<string, StreamingContext>();

private backgroundHandoffCompletion: ReturnType<typeof buildAggregateResponseCompleteMetadata> =
undefined;

// Derived value cache - invalidated as a unit on every mutation.
// Adding a new cached value? Add it here and it will auto-invalidate.
private displayedMessageCache = new Map<
Expand Down Expand Up @@ -1412,6 +1420,29 @@ export class StreamingMessageAggregator {
};
}

private isDefaultPostCompactionContinueTurn(): boolean {
const messages = this.getAllMessages();
const latestMessage = messages.at(-1);
const previousMessage = messages.at(-2);
if (latestMessage?.role !== "user" || previousMessage?.role !== "assistant") {
return false;
}

if (latestMessage.metadata?.synthetic !== true) {
return false;
}

const summaryMetadata = previousMessage.metadata?.muxMetadata;
if (!isCompactionSummaryMetadata(summaryMetadata)) {
return false;
}

// The backend marks internal post-compaction resumes at the compaction follow-up
// source. This frontend check preserves the policy for replay/tests where only
// the synthetic user row and compaction summary are available.
return summaryMetadata.pendingFollowUp?.dispatchOptions?.source === "internal-resume";
}

private setPendingStreamStartTime(time: number | null): void {
this.pendingStreamStartTime = time;
if (time === null) {
Expand Down Expand Up @@ -1600,7 +1631,21 @@ export class StreamingMessageAggregator {
}

getActiveResponseCompleteMetadata() {
return buildAggregateResponseCompleteMetadata(this.activeStreams.values());
return (
buildAggregateResponseCompleteMetadata(this.activeStreams.values()) ??
this.backgroundHandoffCompletion
);
}

handleBackgroundStreamingGenerationAdvance(): void {
const completion = buildAggregateResponseCompleteMetadata(this.activeStreams.values());
this.backgroundHandoffCompletion =
completion?.suppressNotification === true ? completion : undefined;
this.clearActiveStreams();
}

clearBackgroundHandoffCompletion(): void {
this.backgroundHandoffCompletion = undefined;
}

setActiveQueuedFollowUp(hasQueuedFollowUp: boolean): void {
Expand Down Expand Up @@ -1807,8 +1852,13 @@ export class StreamingMessageAggregator {
// stream starts, older contexts must not keep suppressing later completions.
activeStream.hasQueuedFollowUp = false;
}
this.backgroundHandoffCompletion = undefined;
const routeProvider = resolveRouteProvider(data.routeProvider, data.routedThroughGateway);

const suppressNotification =
this.isDefaultPostCompactionContinueTurn() ||
this.getLatestUnresolvedCompactionRequest()?.parsed.followUpContent?.dispatchOptions
?.source === "internal-resume";
const now = Date.now();
const context: StreamingContext = {
serverStartTime: data.startTime,
Expand All @@ -1819,6 +1869,7 @@ export class StreamingMessageAggregator {
isIdleCompaction,
hasCompactionContinue,
hasQueuedFollowUp: false,
suppressNotification,
isReplay: data.replay === true,
model: data.model,
routedThroughGateway: data.routedThroughGateway,
Expand Down
Loading
Loading