Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ts/docs/architecture/dispatcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,12 @@ Handles `@`-prefixed system commands:
- `@explain` — Explanation of cached translations
- `@feedback` — Inspect and export user-feedback entries
recorded by the chat UI (`list`, `top`, `filter`, `export`, `count`)
- `@ports` — List all registered TCP ports (per `(agent, role, port)`
group) with the agent-server's own listen port and the current number
of clients connected to each agent's WS server. Agents that don't
publish a count via `SessionContext.notifyClientCountChanged` render
`N/A` in the Clients column. Currently the browser and code agents
publish counts; others are diagnostics-only.

Each command has a `CommandDescriptor` that defines expected parameters,
subcommands, and help text.
Expand Down
8 changes: 8 additions & 0 deletions ts/packages/agentRpc/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,14 @@ export async function createAgentRpcClient(
const context = contextMap.get(param.contextId);
return context.notifyReadinessChanged();
},
notifyClientCountChanged: async (param: {
contextId: number;
role: string;
count: number;
}) => {
const context = contextMap.get(param.contextId);
return context.notifyClientCountChanged(param.role, param.count);
},
storageRead: async (param: {
contextId: number;
session: boolean;
Expand Down
10 changes: 10 additions & 0 deletions ts/packages/agentRpc/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,16 @@ export function createAgentRpcServer(
contextId,
});
},
notifyClientCountChanged: async (
role: string,
count: number,
): Promise<void> => {
return rpc.invoke("notifyClientCountChanged", {
contextId,
role,
count,
});
},
};
}

Expand Down
5 changes: 5 additions & 0 deletions ts/packages/agentRpc/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ export type AgentContextInvokeFunctions = {
indexes: (param: { contextId: number; type: string }) => Promise<any>;
reloadAgentSchema: (param: { contextId: number }) => Promise<void>;
notifyReadinessChanged: (param: { contextId: number }) => Promise<void>;
notifyClientCountChanged: (param: {
contextId: number;
role: string;
count: number;
}) => Promise<void>;
popupQuestion: (param: {
contextId: number;
message: string;
Expand Down
23 changes: 23 additions & 0 deletions ts/packages/agentSdk/src/agentInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,29 @@ export interface SessionContext<T = unknown> {
// surface in the trigger path (the next refresh will retry).
notifyReadinessChanged(): Promise<void>;

/**
* Notify the dispatcher that the number of clients currently
* connected to one of this agent's registered listeners has
* changed. Surfaced by the `@system ports` diagnostic command.
*
* `role` must match a role this agent previously passed to
* {@link registerPort}; writes for which no live registration
* exists are dropped silently (defends against late notifications
Comment thread
TalZaccai marked this conversation as resolved.
* arriving after the agent's session context has closed).
Comment thread
TalZaccai marked this conversation as resolved.
*
* `count` is the current total of connected clients for the
* `(agent, role, this session)` tuple, **after** the connect /
* disconnect that triggered this call has been applied to the
* agent's internal tracking. Pass `0` when the last client goes
* away so the column doesn't show a stale positive count.
*
* Best-effort: errors are swallowed (and debug-logged) so an
* event-driven trigger (e.g. WebSocket onClose) never throws into
* the event-emitter path. Agents that don't open ports never need
* to call this.
*/
notifyClientCountChanged(role: string, count: number): Promise<void>;

/**
* Register a port this agent has just bound (typically with
* `bind(0)` so the OS picks a free ephemeral port). The dispatcher
Expand Down
2 changes: 2 additions & 0 deletions ts/packages/agentServer/protocol/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ createDiscoveryHandlers((agentName, role) =>

Passing a lookup callback (rather than the `IPortRegistrar` itself) keeps this package free of an `agent-dispatcher` dependency.

Live port allocations (including any client counts agents have published via `SessionContext.notifyClientCountChanged`) are surfaced in-process by the dispatcher's `@system ports` command.

---

## Trademarks
Expand Down
37 changes: 36 additions & 1 deletion ts/packages/agents/browser/src/agent/agentWebSocketServer.mts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ interface SessionHandlers {
getPreferredClientType?: () => "extension" | "electron" | undefined;
onClientConnected?: (client: BrowserClient) => void;
onClientDisconnected?: (client: BrowserClient) => void;
/**
* Fired after the {@link clients} map mutation completes for any
* connect / disconnect affecting this session, with the post-
* mutation total of tracked clients for the session. Used by the
* agent to push counts up through `SessionContext.notifyClientCountChanged`
* so `@system ports` can surface them. Off-by-one safe: the new
* count is computed AFTER the connect / disconnect is reflected in
* the map.
*/
onClientCountChanged?: (count: number) => void;
onWebAgentMessage?: (client: BrowserClient, data: any) => void;
activeClientId: string | null;
}
Expand Down Expand Up @@ -86,7 +96,14 @@ export class AgentWebSocketServer {
const server = new WebSocketServer({
port,
verifyClient: (info, cb) => {
const origin = info.origin || info.req.headers.origin;
// `info.req.headers.origin` is `string | string[] |
// undefined`; coerce arrays to the first element so
// `isAllowedAgentOrigin` (which calls `startsWith` /
// `new URL`) only ever sees a string.
const rawOrigin = info.origin || info.req.headers.origin;
const origin = Array.isArray(rawOrigin)
? rawOrigin[0]
: rawOrigin;
if (isAllowedAgentOrigin(origin)) {
cb(true);
} else {
Expand Down Expand Up @@ -190,6 +207,12 @@ export class AgentWebSocketServer {
handlers.onClientConnected(client);
}
}

// Push the initial count up now that the session knows
// about its pre-connected clients.
if (handlers.onClientCountChanged) {
handlers.onClientCountChanged(preConnected.size);
}
}

debug(`Session registered: ${sessionId}`);
Expand Down Expand Up @@ -324,6 +347,11 @@ export class AgentWebSocketServer {
session.onClientConnected(client);
}

// Off-by-one safe: fired AFTER the sessionMap mutation above.
if (session?.onClientCountChanged) {
session.onClientCountChanged(sessionMap.size);
}

ws.on("message", (message: string) => {
client.lastActivity = new Date();

Expand Down Expand Up @@ -368,11 +396,18 @@ export class AgentWebSocketServer {
}

const sm = this.clients.get(client.sessionId);
let postCount = 0;
if (sm) {
sm.delete(clientId);
postCount = sm.size;
if (sm.size === 0) this.clients.delete(client.sessionId);
}

// Off-by-one safe: fired AFTER the sessionMap delete.
if (s?.onClientCountChanged) {
s.onClientCountChanged(postCount);
}

if (s && s.activeClientId === clientId) {
this.selectNewActiveClientForSession(client.sessionId);
}
Expand Down
5 changes: 5 additions & 0 deletions ts/packages/agents/browser/src/agent/browserActionHandler.mts
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,11 @@ async function updateBrowserContext(
// timeout from a stale "ready" cache.
void context.notifyReadinessChanged();
},
onClientCountChanged: (count: number) => {
// Surface to `@system ports`. Best-effort; the SDK
// method swallows errors internally.
void context.notifyClientCountChanged("default", count);
},
onWebAgentMessage: async (client: BrowserClient, data: any) => {
if (
data.method === "webAgent/message" &&
Expand Down
9 changes: 7 additions & 2 deletions ts/packages/agents/browser/src/agent/originAllowlist.mts
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@ export function isAllowedAgentOrigin(origin: string | undefined): boolean {
}
// Node's URL parser preserves IPv6 brackets in `hostname`
// (e.g. `new URL("http://[::1]:8080").hostname === "[::1]"`),
// so match the bracketed form.
// so match the bracketed form. Also accept the unbracketed
// `::1` for robustness against URL parser/serializer
// differences across runtimes (other SSRF guards in the repo,
// e.g. examples/workflow/engine/src/builtinTasks.ts, accept
// both).
return (
u.hostname === "localhost" ||
u.hostname === "127.0.0.1" ||
u.hostname === "[::1]"
u.hostname === "[::1]" ||
u.hostname === "::1"
);
Comment thread
TalZaccai marked this conversation as resolved.
Outdated
} catch {
return false;
Expand Down
1 change: 1 addition & 0 deletions ts/packages/agents/browser/src/agent/websiteMemory.mts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ export async function resolveURLWithHistory(
indexes: async () => [],
reloadAgentSchema: async () => {},
notifyReadinessChanged: async () => {},
notifyClientCountChanged: async () => {},
};

// Use searchWebMemories with URL resolution optimized parameters
Expand Down
49 changes: 49 additions & 0 deletions ts/packages/agents/code/src/codeActionHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ let sharedWebSocketServer: CodeAgentWebSocketServer | undefined;
let sharedStartingPromise: Promise<CodeAgentWebSocketServer> | undefined;
let sharedClosingPromise: Promise<void> | undefined;
let sharedWebSocketRefCount = 0;
// Sessions currently sharing the bound port. The shared WS server has
// no per-session client tracking (code's WS protocol carries no session
// id), so a single global count is fanned out to every active session
// — each session's registrar entry surfaces the same global number via
// `@system ports`. Sessions are added on first-schema-enable and
// removed on last-schema-disable.
const sharedActiveSessions = new Set<SessionContext<CodeActionContext>>();
const sharedPendingCalls: Map<
number,
{
Expand Down Expand Up @@ -205,6 +212,21 @@ function attachSharedOnMessage(server: CodeAgentWebSocketServer): void {
debug("Error parsing WebSocket message:", error);
}
};
// Fan out client-count updates to active sessions. To prevent the
// `@system ports` summing from double-counting (each session is
// registered to the SAME physical server), attribute the global
// count to a single "primary" session (the first one in insertion
// order) and report 0 from the rest. The SDK method swallows
// errors internally.
server.onClientCountChanged = (count: number) => {
const primary = sharedActiveSessions.values().next().value;
for (const sc of sharedActiveSessions) {
void sc.notifyClientCountChanged(
"default",
sc === primary ? count : 0,
);
}
};
}

// Start (or attach to an in-flight start of) the shared WebSocket server.
Expand Down Expand Up @@ -264,6 +286,19 @@ async function updateCodeContext(
server.port,
);
sharedWebSocketRefCount++;
sharedActiveSessions.add(context);
// Publish the current (global) count to the primary
// session (first in insertion order) and 0 to others
// so `@system ports` summing doesn't double-count. If
// this session is now becoming the primary (i.e. it's
// the first to enable), it gets the real count;
// otherwise it reports 0 and any future
// onClientCountChanged fanout will keep it at 0.
const primary = sharedActiveSessions.values().next().value;
void context.notifyClientCountChanged(
"default",
context === primary ? server.getConnectedCount() : 0,
);
}
} catch (e) {
// Roll back the per-session schema bookkeeping so a subsequent
Expand All @@ -285,6 +320,9 @@ async function updateCodeContext(
// released by the backstop.
agentContext.portRegistration?.release();
delete agentContext.portRegistration;
const wasPrimary =
sharedActiveSessions.values().next().value === context;
sharedActiveSessions.delete(context);

sharedWebSocketRefCount = Math.max(0, sharedWebSocketRefCount - 1);
if (sharedWebSocketRefCount === 0 && sharedWebSocketServer) {
Expand All @@ -297,6 +335,17 @@ async function updateCodeContext(
sharedClosingPromise = undefined;
});
await sharedClosingPromise;
} else if (wasPrimary && sharedWebSocketServer) {
// Primary session went away — transfer the (global) count
// to the new primary so `@system ports` keeps reporting
// the real number instead of 0.
const newPrimary = sharedActiveSessions.values().next().value;
if (newPrimary) {
void newPrimary.notifyClientCountChanged(
"default",
sharedWebSocketServer.getConnectedCount(),
);
}
}
}
}
Expand Down
15 changes: 14 additions & 1 deletion ts/packages/agents/code/src/codeAgentWebSocketServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ export class CodeAgentWebSocketServer {
private clients: Map<string, WebSocket> = new Map();
private clientIdCounter = 0;
public onMessage?: (message: string) => void;
/**
* Fired after the {@link clients} map mutation completes for any
* connect / disconnect, with the post-mutation total. Used by the
* code agent to push counts up via `SessionContext.notifyClientCountChanged`
* so `@system ports` can surface them. NOTE: code's WS protocol
* has no session identity, so this count is global across every
* session sharing the bound port — not per-session.
*/
public onClientCountChanged?: (count: number) => void;

/**
* @param server the underlying ws server, already bound and listening.
Expand Down Expand Up @@ -104,6 +113,7 @@ export class CodeAgentWebSocketServer {
const clientId = `client-${++this.clientIdCounter}-${Date.now()}`;
debug("New client connected");
this.clients.set(clientId, ws);
this.onClientCountChanged?.(this.clients.size);

// Store client ID on the WebSocket for reference
(ws as any).clientId = clientId;
Expand All @@ -118,11 +128,14 @@ export class CodeAgentWebSocketServer {
ws.on("close", () => {
debug("Client disconnected");
this.clients.delete(clientId);
this.onClientCountChanged?.(this.clients.size);
});

ws.on("error", (error) => {
debug("Client error:", error);
this.clients.delete(clientId);
if (this.clients.delete(clientId)) {
this.onClientCountChanged?.(this.clients.size);
}
});
});
}
Expand Down
9 changes: 7 additions & 2 deletions ts/packages/agents/code/src/originAllowlist.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,16 @@ export function isAllowedAgentOrigin(origin: string | undefined): boolean {
}
// Node's URL parser preserves IPv6 brackets in `hostname`
// (e.g. `new URL("http://[::1]:8080").hostname === "[::1]"`),
// so match the bracketed form.
// so match the bracketed form. Also accept the unbracketed
Comment thread
TalZaccai marked this conversation as resolved.
Outdated
// `::1` for robustness against URL parser/serializer
// differences across runtimes (other SSRF guards in the repo,
// e.g. examples/workflow/engine/src/builtinTasks.ts, accept
// both).
return (
u.hostname === "localhost" ||
u.hostname === "127.0.0.1" ||
u.hostname === "[::1]"
u.hostname === "[::1]" ||
u.hostname === "::1"
);
} catch {
return false;
Expand Down
3 changes: 3 additions & 0 deletions ts/packages/agents/code/test/codeUpdateContext.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ function makeStubContext(agentContext: any): StubContext {
registerCalls.push({ role, port });
return { release: releaseSpy };
},
notifyClientCountChanged(_role: string, _count: number) {
// no-op stub; tested elsewhere via registrar unit tests
},
// The rest of SessionContext isn't touched by updateCodeContext.
} as unknown as SessionContext<any>;
return {
Expand Down
6 changes: 6 additions & 0 deletions ts/packages/dispatcher/dispatcher/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ By default agents runs out of proc in their own process. This is to ensure that
| `@config explanation on\|off` | Toggle LLM explanation (Turn off to stop updating construction store) |
| `@config log db on\|off` | Toggle sending logging information to a remote database (default: on) |

### Diagnostics

| Command | Description |
| --------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `@system ports` | List every TCP port registered with the dispatcher's `PortRegistrar`, grouped by `(agent, role, port)`. Includes the agent-server's own listen port and the connected-client count for any agent that publishes one via `SessionContext.notifyClientCountChanged` (currently the browser and code agents). Rows from agents that don't publish a count render `N/A`. |

### User feedback

When the user rates an agent message via the chat UI's thumbs-up/down buttons or moves a bubble to the trash, the dispatcher persists each event to the per-session `displayLog.json` (as `user-feedback` and `user-message-hidden` entries) and emits a `userFeedback` telemetry event through `Logger.logEvent`. The `@feedback` command group lets you inspect and export those entries.
Expand Down
Loading
Loading