Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ts/packages/agentRpc/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,10 @@ export async function createAgentRpcClient(
const context = contextMap.get(param.contextId);
return context.reloadAgentSchema();
},
notifyReadinessChanged: async (param: { contextId: number }) => {
const context = contextMap.get(param.contextId);
return context.notifyReadinessChanged();
},
storageRead: async (param: {
contextId: number;
session: boolean;
Expand Down
5 changes: 5 additions & 0 deletions ts/packages/agentRpc/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,11 @@ export function createAgentRpcServer(
contextId,
});
},
notifyReadinessChanged: async (): Promise<void> => {
return rpc.invoke("notifyReadinessChanged", {
contextId,
});
},
};
}

Expand Down
1 change: 1 addition & 0 deletions ts/packages/agentRpc/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ export type AgentContextInvokeFunctions = {
}) => Promise<void>;
indexes: (param: { contextId: number; type: string }) => Promise<any>;
reloadAgentSchema: (param: { contextId: number }) => Promise<void>;
notifyReadinessChanged: (param: { contextId: number }) => Promise<void>;
popupQuestion: (param: {
contextId: number;
message: string;
Expand Down
12 changes: 12 additions & 0 deletions ts/packages/agentSdk/src/agentInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,18 @@ export interface SessionContext<T = unknown> {
// The dispatcher will call getDynamicSchema/getDynamicGrammar to get the updated content.
reloadAgentSchema(): Promise<void>;

// Notify the dispatcher that this agent's readiness state may have
// changed due to an external event (e.g. an extension client just
// connected, an OAuth token was refreshed). The dispatcher re-runs
// the agent's `checkReadiness` and updates its cache so the next
// pre-flight gate sees the fresh state — without the user having to
// run `@config agent refresh <name>`.
//
// No-op for agents that don't implement `checkReadiness`. Best
// effort: errors are swallowed so a transient probe failure doesn't
// surface in the trigger path (the next refresh will retry).
notifyReadinessChanged(): Promise<void>;

/**
* Register a port this agent has just bound (typically with
* `bind(0)` so the OS picks a free ephemeral port). The dispatcher
Expand Down
33 changes: 33 additions & 0 deletions ts/packages/agentServer/protocol/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ The fixed channel name for conversation lifecycle RPC is exported as `AgentServe
export const AgentServerChannelName = "agent-server";
```

The fixed channel name for the read-only port discovery RPC is exported as `DiscoveryChannelName`:

```typescript
export const DiscoveryChannelName = "discovery";
```

Session-namespaced channels (one pair per joined conversation) are constructed via helper functions:

```typescript
Expand Down Expand Up @@ -67,6 +73,33 @@ getClientType(connectionId: string): string | undefined
unregisterClient(connectionId: string): void
```

## Discovery channel

External clients (Chrome extension, VS Code extension, CLI) look up the live port of any in-process app-agent through a read-only RPC channel hosted at the well-known `discovery` channel name. The dispatcher's `PortRegistrar` is the source of truth; the channel only exposes a single `lookupPort` method:

```typescript
export type DiscoveryInvokeFunctions = {
lookupPort: (param: {
agentName: string;
role?: string;
}) => Promise<{ port: number | null }>;
};
```

`null` means "no allocation found, try again later" — clients should poll/back off rather than treat it as fatal. The well-known `agentName === "agent-server"` resolves the host's own listening port for clients that bootstrap from a different known port.

To stay in lockstep across hosts, both the standalone `agentServer` process and the standalone Electron `shell` build their handler set from the shared factory:

```typescript
import { createDiscoveryHandlers } from "@typeagent/agent-server-protocol";

createDiscoveryHandlers((agentName, role) =>
portRegistrar.lookup(agentName, role),
);
```

Passing a lookup callback (rather than the `IPortRegistrar` itself) keeps this package free of an `agent-dispatcher` dependency.

---

## Trademarks
Expand Down
1 change: 1 addition & 0 deletions ts/packages/agentServer/protocol/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export {
AGENT_SERVER_DISCOVERY_NAME,
DiscoveryChannelName,
DiscoveryInvokeFunctions,
createDiscoveryHandlers,
ConversationInfo,
JoinConversationResult,
UserIdentity,
Expand Down
24 changes: 24 additions & 0 deletions ts/packages/agentServer/protocol/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,30 @@ export type DiscoveryInvokeFunctions = {
}) => Promise<{ port: number | null }>;
};

/**
* Build the read-only discovery RPC handler set from a lookup callback.
*
* Both the agent-server and the standalone Electron shell host this
* channel — the agent-server multiplexes it onto its main WS, the
* standalone shell stands up a dedicated WS for it. They share this
* factory so the wire-level behavior (including null-for-not-found
* normalization) stays in lockstep.
*
* The callback shape — rather than passing the `IPortRegistrar`
* directly — keeps this package free of an `agent-dispatcher` dep,
* which would otherwise create a downward dependency from the
* protocol-only package onto the dispatcher core.
*/
export function createDiscoveryHandlers(
lookup: (agentName: string, role?: string) => number | undefined,
): DiscoveryInvokeFunctions {
return {
lookupPort: async ({ agentName, role }) => ({
port: lookup(agentName, role) ?? null,
}),
};
}

/** Build the dispatcher channel name for a given conversation. */
export function getDispatcherChannelName(conversationId: string): string {
return `dispatcher:${conversationId}`;
Expand Down
22 changes: 7 additions & 15 deletions ts/packages/agentServer/server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
AGENT_SERVER_DEFAULT_PORT,
AGENT_SERVER_DISCOVERY_NAME,
DiscoveryChannelName,
DiscoveryInvokeFunctions,
createDiscoveryHandlers,
DispatcherConnectOptions,
UserIdentity,
getDispatcherChannelName,
Expand Down Expand Up @@ -448,23 +448,15 @@ async function main() {
// on the same WS as agent-server so clients only need one
// connection. Mutations to the registrar are NOT exposed
// here — only agents themselves can register, via the
// in-process SessionContext.registerPort.
const discoveryFunctions: DiscoveryInvokeFunctions = {
lookupPort: async ({ agentName, role }) => {
// The agent-server's own port is registered as a
// real allocation under AGENT_SERVER_DISCOVERY_NAME
// / DEFAULT_ROLE (see registerSelfPort below), so
// no special-case is needed here — the lookup just
// works for both well-known and agent-defined
// names.
const port = portRegistrar.lookup(agentName, role);
return { port: port ?? null };
},
};
// in-process SessionContext.registerPort. The handler
// factory is shared with the standalone Electron shell so
// both hosts speak the same protocol byte-for-byte.
createRpc(
"agent-server:discovery",
channelProvider.createChannel(DiscoveryChannelName),
discoveryFunctions,
createDiscoveryHandlers((agentName, role) =>
portRegistrar.lookup(agentName, role),
),
);
},
);
Expand Down
18 changes: 14 additions & 4 deletions ts/packages/agents/browser/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,27 @@ To build the browser extension, run `pnpm run build` in this folder. For debug s

### Agent WebSocket Server

The browser agent exposes a WebSocket server (`AgentWebSocketServer`) on port 8081. Two types of clients connect to it:

- **Chrome extension** (`src/extension/serviceWorker/websocket.ts`) — connects from the browser's service worker using `chrome.runtime.id` as its client ID.
The browser agent exposes a WebSocket server (`AgentWebSocketServer`) on a
port assigned dynamically by the OS at bind time. The actual port is
published to the host's `PortRegistrar` under `(browser, default)` and
discovered by external clients via the discovery channel hosted at
`ws://localhost:8999/` (default). Both supported hosts publish this
channel: the standalone `agentServer` process and the standalone
Electron `shell` (which hosts an in-process discovery WS so the same
extension config works against either host). To pin the port for
debugging, set `BROWSER_WEBSOCKET_PORT=<n>` before launching the host.

Two types of clients connect to the browser agent:

- **Chrome extension** (`src/extension/serviceWorker/websocket.ts`) — connects from the browser's service worker using `chrome.runtime.id` as its client ID. Calls `discoverPort("browser", "default")` to look up the live port before connecting.
- **Inline browser** (`packages/shell/src/main/browserIpc.ts`) — connects from the Electron shell using `inlineBrowser` as its client ID.

#### Connection URL format

Every client embeds its identity in the WebSocket connection URL as query parameters:

```
ws://localhost:8081?channel=browser&role=client&clientId=<id>&sessionId=<sessionId>
ws://localhost:<port>?channel=browser&role=client&clientId=<id>&sessionId=<sessionId>
```

| Parameter | Description |
Expand Down
1 change: 1 addition & 0 deletions ts/packages/agents/browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"@typeagent/agent-flows": "workspace:*",
"@typeagent/agent-rpc": "workspace:*",
"@typeagent/agent-sdk": "workspace:*",
"@typeagent/agent-server-client": "workspace:*",
"@typeagent/agent-server-protocol": "workspace:*",
"@typeagent/common-utils": "workspace:*",
"@typeagent/config": "workspace:*",
Expand Down
128 changes: 117 additions & 11 deletions ts/packages/agents/browser/src/agent/agentWebSocketServer.mts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import { WebSocketServer, WebSocket } from "ws";
import { IncomingMessage } from "http";
import { AddressInfo } from "net";
import { isAllowedAgentOrigin } from "./originAllowlist.mjs";
import {
createChannelProviderAdapter,
type ChannelProviderAdapter,
Expand Down Expand Up @@ -43,14 +45,93 @@ interface SessionHandlers {
}

export class AgentWebSocketServer {
private server: WebSocketServer;
private clients = new Map<string, Map<string, BrowserClient>>();
private sessionHandlers = new Map<string, SessionHandlers>();

constructor(port: number = 8081) {
this.server = new WebSocketServer({ port });
/**
* @param server The underlying ws server, already bound and listening.
* @param port The actually bound port (OS-assigned when the caller
* passed 0).
*
* Construction is private — use {@link AgentWebSocketServer.start}
* so callers always get a server that is guaranteed to be bound
* before they read {@link port} or pass it to the registrar.
*/
private constructor(
private readonly server: WebSocketServer,
public readonly port: number,
) {
this.setupHandlers();
debug(`Agent WebSocket server started on port ${port}`);
debug(`Agent WebSocket server listening on port ${port}`);
}

/**
* Bind a new server on `port`. Resolves only after the
* `listening` event so callers can synchronously read
* {@link port}; rejects on the first `error` event so bind
* failures (EADDRINUSE under fixed-port overrides) surface
* loudly instead of being swallowed by an attached error
* handler.
*
* Pass `0` to let the OS pick a free ephemeral port; the
* actual port is then available via {@link port}.
*
* Origin allowlist is enforced via `verifyClient`: see
* `isAllowedAgentOrigin` for the policy. Connections from
* disallowed Origins are rejected with HTTP 403 before any
* `connection` event fires.
*/
public static start(port: number = 0): Promise<AgentWebSocketServer> {
return new Promise((resolve, reject) => {
const server = new WebSocketServer({
port,
verifyClient: (info, cb) => {
const origin = info.origin || info.req.headers.origin;
if (isAllowedAgentOrigin(origin)) {
cb(true);
} else {
debug(
`Rejecting WebSocket upgrade from disallowed Origin: ${origin}`,
);
cb(false, 403, "Origin not allowed");
}
},
});
let settled = false;
const onError = (error: Error) => {
if (settled) {
debug("Server error after listening:", error);
return;
}
settled = true;
server.removeListener("listening", onListening);
debug("Server bind error:", error);
reject(error);
};
const onListening = () => {
if (settled) return;
settled = true;
server.removeListener("error", onError);
const address = server.address() as AddressInfo | null;
if (!address || typeof address === "string") {
server.close();
reject(
new Error(
"ws server.address() did not return an AddressInfo",
),
);
return;
}
// Re-attach a permanent error handler so post-listen errors
// are logged rather than crashing the process.
server.on("error", (error) => {
debug("Server error:", error);
});
resolve(new AgentWebSocketServer(server, address.port));
};
server.once("error", onError);
server.once("listening", onListening);
});
}

/**
Expand Down Expand Up @@ -144,10 +225,6 @@ export class AgentWebSocketServer {
this.server.on("connection", (ws: WebSocket, req: IncomingMessage) => {
this.handleNewConnection(ws, req);
});

this.server.on("error", (error) => {
console.error(`Agent WebSocket server error:`, error);
});
}

private handleNewConnection(ws: WebSocket, req: IncomingMessage): void {
Expand Down Expand Up @@ -469,8 +546,37 @@ export class AgentWebSocketServer {
return false;
}

public stop(): void {
this.server.close();
debug("Agent WebSocket server stopped");
/**
* Close all client connections and the underlying server.
* Resolves when the server has fully released its port — important
* for a rapid disable→enable cycle under a fixed-port override
* (`BROWSER_WEBSOCKET_PORT`), where a synchronous return would race
* the new bind into EADDRINUSE.
*
* Iterates every session's client map and closes each `WebSocket`
* before awaiting `server.close()`. Without this, a client whose
* session was never registered (connected before `registerSession`
* could fire) would survive `server.close()` waiting on the underlying
* socket.
*/
public close(): Promise<void> {
debug("Closing AgentWebSocketServer");
for (const sessionMap of this.clients.values()) {
for (const client of sessionMap.values()) {
if (client.channelProvider) {
client.channelProvider.notifyDisconnected();
}
try {
client.socket.close();
} catch {
// Already closed or never opened.
}
}
}
this.clients.clear();
this.sessionHandlers.clear();
return new Promise((resolve) => {
this.server.close(() => resolve());
});
}
}
Loading
Loading