From 35caf47b928ff181c8dd4bc5b643ea8eebd37a59 Mon Sep 17 00:00:00 2001 From: Finley Ge Date: Mon, 15 Jun 2026 12:50:11 +0800 Subject: [PATCH] feat: add FastGPT debug session integration --- apps/cli/README.en.md | 11 +- apps/cli/README.md | 11 +- apps/cli/src/commands/debug.spec.ts | 70 +++ apps/cli/src/commands/debug.ts | 137 +++++- apps/cli/src/debug/gateway.spec.ts | 69 ++- apps/cli/src/debug/gateway.ts | 100 +++- apps/connection-gateway/src/routes.ts | 33 +- apps/server/.env.template | 6 + apps/server/main.ts | 3 + apps/server/src/deps.ts | 11 +- apps/server/src/routes/debug-session.route.ts | 388 +++++++++++++++ .../session-registry.port.ts | 5 + .../ports/plugin/plugin-debug-session.port.ts | 43 ++ .../plugin/plugin-runtime-manager.port.ts | 3 +- .../plugin-debug-session.vo.test.ts | 28 ++ .../value-objects/plugin-debug-session.vo.ts | 62 +++ .../src/connection-gateway/service.test.ts | 61 +++ .../src/connection-gateway/service.ts | 63 +++ .../connection-gateway/session-registry.ts | 38 ++ packages/infrastructure/src/env/index.ts | 6 + .../src/plugin/debug-session.repo.test.ts | 51 ++ .../src/plugin/debug-session.repo.ts | 324 ++++++++++++ .../debug-runtime.driver.test.ts | 1 + .../debug-runtime.driver.ts | 33 +- .../infrastructure/src/plugin/tool.impl.ts | 23 +- .../contracts/dto/connection-gateway.dto.ts | 7 + .../contracts/dto/plugin-debug-session.dto.ts | 90 ++++ .../route/plugin-debug-session.contract.ts | 82 +++ ...6-15_12-19-59-fastgpt-debug-integration.md | 465 ++++++++++++++++++ 29 files changed, 2159 insertions(+), 65 deletions(-) create mode 100644 apps/server/src/routes/debug-session.route.ts create mode 100644 packages/domain/src/ports/plugin/plugin-debug-session.port.ts create mode 100644 packages/domain/src/value-objects/plugin-debug-session.vo.test.ts create mode 100644 packages/domain/src/value-objects/plugin-debug-session.vo.ts create mode 100644 packages/infrastructure/src/plugin/debug-session.repo.test.ts create mode 100644 packages/infrastructure/src/plugin/debug-session.repo.ts create mode 100644 packages/interface-adapter/src/contracts/dto/plugin-debug-session.dto.ts create mode 100644 packages/interface-adapter/src/contracts/route/plugin-debug-session.contract.ts create mode 100644 plan/2026-06-15_12-19-59-fastgpt-debug-integration.md diff --git a/apps/cli/README.en.md b/apps/cli/README.en.md index 58d0f825..8d33d473 100644 --- a/apps/cli/README.en.md +++ b/apps/cli/README.en.md @@ -31,7 +31,16 @@ Command-line tool for FastGPT plugin development. It is used to create, build, t ### Remote Debugging -Local plugins can connect to a test-environment plugin-server through Connection Gateway. The CLI needs the gateway TCP endpoint for the long-lived channel; the HTTP endpoint is used to create and clean up the session. +Local plugins can connect to a test-environment plugin-server through a FastGPT connect link. The recommended path is for FastGPT to authenticate the user and create the debug session, while the CLI only exchanges a one-time ticket for short-lived connection info. + +```bash +fastgpt-plugin debug ./plugins/getTime ./plugins/dbops \ + --connect "https://fastgpt.example.com/debug-plugin/connect?ticket=..." +``` + +The connect link returns the gateway TCP endpoint, `debug:tmbId:{tmbId}:session:{debugSessionId}` source, precreated session, and scoped connect token. The CLI does not need `CONNECTION_GATEWAY_AUTH_TOKEN` or `JWT_SECRET`. + +For low-level local integration, the CLI can still connect to Connection Gateway directly: ```bash fastgpt-plugin debug ./plugins/getTime ./plugins/dbops \ diff --git a/apps/cli/README.md b/apps/cli/README.md index ec244655..f25e4d46 100644 --- a/apps/cli/README.md +++ b/apps/cli/README.md @@ -31,7 +31,16 @@ FastGPT 插件开发的命令行工具,用于创建、构建和测试 FastGPT ### 远程调试 -本地插件可以通过 Connection Gateway 接入测试环境的 plugin-server。CLI 只需要能访问 gateway 的 TCP 地址;HTTP 地址用于创建/清理 session。 +本地插件可以通过 FastGPT 生成的 connect link 接入测试环境的 plugin-server。推荐路径是由 FastGPT 完成用户鉴权并创建 debug session,CLI 只使用一次性 ticket 换取短期连接信息。 + +```bash +fastgpt-plugin debug ./plugins/getTime ./plugins/dbops \ + --connect "https://fastgpt.example.com/debug-plugin/connect?ticket=..." +``` + +connect link 会返回 gateway TCP 地址、`debug:tmbId:{tmbId}:session:{debugSessionId}` source、预创建 session 和 scoped connect token。CLI 不需要 `CONNECTION_GATEWAY_AUTH_TOKEN` 或 `JWT_SECRET`。 + +本地底层联调仍可直接连接 Connection Gateway: ```bash fastgpt-plugin debug ./plugins/getTime ./plugins/dbops \ diff --git a/apps/cli/src/commands/debug.spec.ts b/apps/cli/src/commands/debug.spec.ts index 3c2011ae..5b9314be 100644 --- a/apps/cli/src/commands/debug.spec.ts +++ b/apps/cli/src/commands/debug.spec.ts @@ -66,6 +66,7 @@ describe('debug command', () => { loggerSpy.success.mockReset(); loggerSpy.info.mockReset(); loggerSpy.error.mockReset(); + vi.unstubAllGlobals(); delete process.env.CONNECTION_GATEWAY_AUTH_TOKEN; await rm(tempUploadDir, { recursive: true, force: true }); }); @@ -178,6 +179,75 @@ describe('debug command', () => { expect(exitSpy).not.toHaveBeenCalled(); }); + it('应能通过 connect link 换取预创建远程调试连接信息', async () => { + vi.stubGlobal( + 'fetch', + vi.fn(async () => + new Response( + JSON.stringify({ + data: { + tcpUrl: 'tcp://tcp.example.com:39430', + source: 'debug:tmbId:tmb-1:session:debug-1', + sessionId: 'session-debug', + connectToken: 'scoped-token', + expiresAt: Date.now() + 60_000, + session: { + id: 'session-debug', + consumerType: 'plugin-debug', + subject: 'tmb-1', + sessionScope: { + userId: 'tmb-1', + source: 'debug:tmbId:tmb-1:session:debug-1' + }, + transport: 'tcp', + capabilities: ['gateway.bind', 'invoke'], + generation: 0, + ownerNodeId: 'node-a', + status: 'connecting', + connectedAt: Date.now(), + lastSeenAt: Date.now(), + expiresAt: Date.now() + 60_000 + } + } + }) + ) + ) + ); + + await run([ + process.execPath, + 'cli', + 'debug', + GETTIME_TOOL_DIR, + '--connect', + 'https://fastgpt.example.com/debug/connect?ticket=t1' + ]); + + expect(globalThis.fetch).toHaveBeenCalledWith( + 'https://fastgpt.example.com/debug/connect?ticket=t1' + ); + expect(vi.mocked(connectDebugGateway).mock.calls[0]?.[0].options).toMatchObject({ + tcpHost: 'tcp.example.com', + tcpPort: 39430, + source: 'debug:tmbId:tmb-1:session:debug-1', + precreatedSession: { + connectToken: 'scoped-token', + session: expect.objectContaining({ + id: 'session-debug' + }) + } + }); + expect(vi.mocked(connectDebugGateway).mock.calls[0]?.[0].options).not.toHaveProperty( + 'authToken' + ); + expect(vi.mocked(connectDebugGateway).mock.calls[0]?.[0].options).not.toHaveProperty( + 'jwtSecret' + ); + expect(getLoggerOutput(loggerSpy.success)).toContain( + '远程调试已就绪: debug:tmbId:tmb-1:session:debug-1 getTime' + ); + }); + it('应能通过 CONNECTION_GATEWAY_AUTH_TOKEN 配置 gateway token', async () => { process.env.CONNECTION_GATEWAY_AUTH_TOKEN = 'gateway-token-from-env'; diff --git a/apps/cli/src/commands/debug.ts b/apps/cli/src/commands/debug.ts index a78fe2f3..17f0ad26 100644 --- a/apps/cli/src/commands/debug.ts +++ b/apps/cli/src/commands/debug.ts @@ -15,7 +15,9 @@ import { } from '@fastgpt-plugin/cli/debug/session'; import { logger } from '@fastgpt-plugin/cli/helpers'; import type { Command } from 'commander'; +import z from 'zod'; +import { ConnectionGatewaySessionSchema } from '@domain/value-objects/connection-gateway.vo'; import type { SystemVarType } from '@domain/value-objects/system-var.vo'; import type { ToolStreamMessageType } from '@domain/value-objects/tool.vo'; @@ -30,6 +32,7 @@ type DebugCommandOptions = { systemVarFile?: string; uploadDir?: string; gateway?: boolean; + connect?: string; gatewayBaseUrl?: string; gatewayAuthToken?: string; gatewayJwtSecret?: string; @@ -61,6 +64,7 @@ export class DebugCommand extends BaseCommand { .option('--system-var-file ', 'systemVar JSON 文件路径') .option('--upload-dir ', '虚拟 uploadFile 的输出目录') .option('--gateway', '连接 Connection Gateway,等待远程调试请求', false) + .option('--connect ', 'FastGPT debug connect link,通过 ticket 换取远程调试连接信息') .option('--gateway-base-url ', 'Connection Gateway HTTP 地址') .option('--gateway-auth-token ', 'Connection Gateway AUTH_TOKEN') .option('--gateway-jwt-secret ', 'Connection Gateway JWT_SECRET') @@ -86,8 +90,12 @@ export class DebugCommand extends BaseCommand { ); const isMultiEntry = entries.length > 1; - if (isMultiEntry && !options.gateway) { - throw new Error('多个插件同时调试需要使用 --gateway。'); + if (options.connect) { + options.gateway = true; + } + + if (isMultiEntry && !options.gateway && !options.connect) { + throw new Error('多个插件同时调试需要使用 --gateway 或 --connect。'); } if (isMultiEntry && options.run) { @@ -240,7 +248,11 @@ export class DebugCommand extends BaseCommand { private resolveGatewayOptions( options: DebugCommandOptions, snapshot: DebugPluginSnapshot - ): DebugGatewayClientOptions { + ): Promise | DebugGatewayClientOptions { + if (options.connect) { + return this.resolveConnectGatewayOptions(options); + } + const userId = options.gatewayUserId ?? process.env.CONNECTION_GATEWAY_USER_ID ?? 'debug-user'; const tcpEndpoint = resolveGatewayTcpEndpoint(options); @@ -275,6 +287,33 @@ export class DebugCommand extends BaseCommand { }; } + private async resolveConnectGatewayOptions( + options: DebugCommandOptions + ): Promise { + const info = await exchangeConnectLink(options.connect as string); + const tcpEndpoint = parseGatewayTcpUrl(info.tcpUrl); + + return { + baseUrl: '', + tcpHost: tcpEndpoint.host, + tcpPort: tcpEndpoint.port, + userId: info.tmbId, + source: info.source, + tokenTtlMs: Math.max(1, info.expiresAt - Date.now()), + reconnect: options.gatewayNoReconnect ? false : options.gatewayReconnect ?? true, + reconnectIntervalMs: toPositiveInt( + options.gatewayReconnectIntervalMs ?? + process.env.CONNECTION_GATEWAY_RECONNECT_INTERVAL_MS ?? + '2000', + 'gateway-reconnect-interval-ms' + ), + precreatedSession: { + session: info.session, + connectToken: info.connectToken + } + }; + } + private resolveGatewaySource( options: DebugCommandOptions, _snapshot: DebugPluginSnapshot @@ -303,16 +342,17 @@ export class DebugCommand extends BaseCommand { }>; options: DebugCommandOptions; }): Promise { - const source = this.resolveGatewaySource(options, sessions[0].snapshot); const targets: DebugGatewayTarget[] = sessions.map((session) => ({ runtime: session.runtime, snapshot: session.snapshot })); + const gatewayOptions = await this.resolveGatewayOptions(options, sessions[0].snapshot); const gateway = await connectDebugGateway({ targets, - options: this.resolveGatewayOptions(options, sessions[0].snapshot), + options: gatewayOptions, onLog: (message) => logger.info(message) }); + const source = gateway.session.sessionScope.source ?? gatewayOptions.source ?? '-'; targets.forEach((target) => { logger.success(`远程调试已就绪: ${source} ${target.snapshot.pluginId}`); @@ -451,18 +491,7 @@ function resolveGatewayTcpEndpoint(options: DebugCommandOptions): { host: string const tcpUrl = options.gatewayTcpUrl ?? process.env.CONNECTION_GATEWAY_TCP_URL; if (tcpUrl) { - const parsed = new URL(tcpUrl); - if (parsed.protocol !== 'tcp:') { - throw new Error('gateway-tcp-url 必须使用 tcp:// 协议。'); - } - if (!parsed.hostname || !parsed.port) { - throw new Error('gateway-tcp-url 必须包含 host 和 port。'); - } - - return { - host: parsed.hostname, - port: toPositiveInt(parsed.port, 'gateway-tcp-url port') - }; + return parseGatewayTcpUrl(tcpUrl); } return { @@ -474,6 +503,80 @@ function resolveGatewayTcpEndpoint(options: DebugCommandOptions): { host: string }; } +const ConnectInfoSchema = z.object({ + tcpUrl: z.string().min(1), + source: z.string().min(1), + sessionId: z.string().min(1), + connectToken: z.string().min(1), + expiresAt: z.number().int().positive(), + session: ConnectionGatewaySessionSchema.optional() +}); + +async function exchangeConnectLink(connectUrl: string) { + const response = await fetch(connectUrl); + const text = await response.text(); + const payload = text ? JSON.parse(text) : {}; + + if (!response.ok) { + throw new Error(`connect link 请求失败: ${response.status} ${text}`); + } + + const info = ConnectInfoSchema.parse(payload.data ?? payload); + const session = + info.session ?? + ConnectionGatewaySessionSchema.parse({ + id: info.sessionId, + consumerType: 'plugin-debug', + subject: parseTmbIdFromDebugSource(info.source), + sessionScope: { + userId: parseTmbIdFromDebugSource(info.source), + source: info.source + }, + transport: 'tcp', + capabilities: ['gateway.bind', 'invoke'], + generation: 0, + ownerNodeId: 'remote', + status: 'connecting', + connectedAt: Date.now(), + lastSeenAt: Date.now(), + expiresAt: info.expiresAt, + metadata: { + connectToken: info.connectToken + } + }); + + return { + ...info, + tmbId: parseTmbIdFromDebugSource(info.source), + session + }; +} + +function parseTmbIdFromDebugSource(source: string): string { + const parts = source.split(':'); + const index = parts.indexOf('tmbId'); + const tmbId = index >= 0 ? parts[index + 1] : undefined; + if (!tmbId) { + throw new Error(`debug source 缺少 tmbId: ${source}`); + } + return tmbId; +} + +function parseGatewayTcpUrl(tcpUrl: string): { host: string; port: number } { + const parsed = new URL(tcpUrl); + if (parsed.protocol !== 'tcp:') { + throw new Error('gateway-tcp-url 必须使用 tcp:// 协议。'); + } + if (!parsed.hostname || !parsed.port) { + throw new Error('gateway-tcp-url 必须包含 host 和 port。'); + } + + return { + host: parsed.hostname, + port: toPositiveInt(parsed.port, 'gateway-tcp-url port') + }; +} + function findDuplicateValues(values: string[]): string[] { const seen = new Set(); const duplicates = new Set(); diff --git a/apps/cli/src/debug/gateway.spec.ts b/apps/cli/src/debug/gateway.spec.ts index 44dd017f..1bf88b65 100644 --- a/apps/cli/src/debug/gateway.spec.ts +++ b/apps/cli/src/debug/gateway.spec.ts @@ -3,6 +3,8 @@ import net from 'node:net'; import { afterEach, describe, expect, it, vi } from 'vitest'; +import { ConnectionGatewaySessionSchema } from '@domain/value-objects/connection-gateway.vo'; + import { connectDebugGateway } from './gateway'; describe('connectDebugGateway', () => { @@ -93,6 +95,69 @@ describe('connectDebugGateway', () => { }) ); }); + + it('binds a precreated session without gateway internal HTTP calls', async () => { + const receivedFrames: unknown[] = []; + const connectSpy = vi.spyOn(net, 'connect'); + connectSpy.mockImplementation( + ((_port: number, _host: string, connectListener?: () => void) => { + const socket = new FakeSocket((chunk) => { + receivedFrames.push(...decodeFrames(chunk)); + }); + queueMicrotask(() => { + connectListener?.(); + }); + return socket as unknown as net.Socket; + }) as typeof net.connect + ); + vi.spyOn(globalThis, 'fetch').mockImplementation(fetchMock); + + const client = await connectDebugGateway({ + targets: [makeTarget()], + options: { + ...makeOptions(), + authToken: undefined, + jwtSecret: undefined, + source: 'debug:tmbId:tmb-1:session:debug-1', + precreatedSession: { + session: { + ...makeSession(), + subject: 'tmb-1', + sessionScope: { + userId: 'tmb-1', + source: 'debug:tmbId:tmb-1:session:debug-1' + }, + capabilities: ['gateway.bind', 'invoke'] + }, + connectToken: 'scoped-token' + } + } + }); + + client.close(); + await client.closed; + + expect(fetchMock).not.toHaveBeenCalled(); + expect(receivedFrames).toEqual([ + expect.objectContaining({ + type: 'event', + capability: 'gateway.bind', + payload: expect.objectContaining({ + token: 'scoped-token', + metadata: { + pluginDebug: { + targets: [ + expect.objectContaining({ + source: 'debug:tmbId:tmb-1:session:debug-1', + pluginId: 'getTime' + }) + ] + } + } + }) + }) + ]); + }); }); class FakeSocket extends EventEmitter { @@ -155,7 +220,7 @@ function makeTarget() { } function makeSession() { - return { + return ConnectionGatewaySessionSchema.parse({ id: 'session-a', consumerType: 'plugin-debug', subject: 'user:u1', @@ -171,7 +236,7 @@ function makeSession() { connectedAt: Date.now(), lastSeenAt: Date.now(), expiresAt: Date.now() + 30_000 - }; + }); } function decodeFrames(chunk: Buffer): unknown[] { diff --git a/apps/cli/src/debug/gateway.ts b/apps/cli/src/debug/gateway.ts index 246ad698..49f08e17 100644 --- a/apps/cli/src/debug/gateway.ts +++ b/apps/cli/src/debug/gateway.ts @@ -24,10 +24,14 @@ const TOKEN_HEADER = { export type DebugGatewayClientOptions = { baseUrl: string; - authToken: string; - jwtSecret: string; + authToken?: string; + jwtSecret?: string; tcpHost: string; tcpPort: number; + precreatedSession?: { + session: ConnectionGatewaySession; + connectToken: string; + }; userId: string; teamId?: string; source?: string; @@ -182,11 +186,13 @@ async function connectSingleDebugGateway({ createdAt: Date.now(), payload: { kind: 'plugin-debug.bind', + token: options.precreatedSession?.connectToken ?? createConnectionToken(options), sources: targets.map((target) => ({ source: session.sessionScope.source, pluginId: target.snapshot.pluginId, version: target.snapshot.version - })) + })), + metadata: makePluginDebugMetadata(targets, session.sessionScope.source) } }).catch(async (error) => { socket.destroy(error instanceof Error ? error : new Error(String(error))); @@ -212,6 +218,10 @@ async function deleteGatewaySession( session: ConnectionGatewaySession, options: DebugGatewayClientOptions ): Promise { + if (options.precreatedSession || !options.authToken) { + return; + } + const response = await fetch( `${normalizeBaseUrl(options.baseUrl)}/internal/sessions/${encodeURIComponent(session.id)}`, { @@ -356,6 +366,14 @@ async function createGatewaySession( targets: DebugGatewayTarget[], options: DebugGatewayClientOptions ): Promise { + if (options.precreatedSession) { + return options.precreatedSession.session; + } + + if (!options.jwtSecret || !options.authToken) { + throw new Error('Gateway auth token and JWT secret are required when creating a session'); + } + const now = Date.now(); const source = options.source ?? makeDefaultDebugSource(options.userId); const claims = { @@ -382,24 +400,7 @@ async function createGatewaySession( body: JSON.stringify({ token, transport: 'tcp', - metadata: { - pluginDebug: { - targets: targets.map((target) => ({ - source, - pluginId: target.snapshot.pluginId, - version: target.snapshot.version, - name: target.snapshot.name, - description: target.snapshot.description, - toolDescription: target.snapshot.toolDescription, - author: target.snapshot.author, - tags: target.snapshot.tags, - permissions: target.snapshot.permissions, - secretSchema: target.snapshot.secretSchema, - isToolSet: target.snapshot.isToolSet, - tools: target.snapshot.tools - })) - } - } + metadata: makePluginDebugMetadata(targets, source) }) }); const payload = await parseJsonResponse(response); @@ -408,10 +409,67 @@ async function createGatewaySession( return ConnectionGatewaySessionSchema.parse(session); } +function createConnectionToken(options: DebugGatewayClientOptions): string | undefined { + if (options.precreatedSession) { + return options.precreatedSession.connectToken; + } + + if (!options.jwtSecret) { + return undefined; + } + + const now = Date.now(); + const source = options.source ?? makeDefaultDebugSource(options.userId); + return signConnectionToken( + { + consumerType: CONNECTION_GATEWAY_PLUGIN_DEBUG_CONSUMER_TYPE, + subject: options.subject ?? `user:${options.userId}`, + sessionScope: { + userId: options.userId, + ...(options.teamId ? { teamId: options.teamId } : {}), + source + }, + transport: 'tcp' as const, + capabilities: [ + CONNECTION_GATEWAY_BIND_CAPABILITY, + CONNECTION_GATEWAY_PLUGIN_DEBUG_INVOKE_CAPABILITY + ], + issuedAt: now, + expiresAt: now + options.tokenTtlMs, + nonce: randomUUID() + }, + options.jwtSecret + ); +} + function makeDefaultDebugSource(userId: string): string { return `debug:user:${userId}`; } +function makePluginDebugMetadata( + targets: DebugGatewayTarget[], + source?: string +): Record { + return { + pluginDebug: { + targets: targets.map((target) => ({ + source, + pluginId: target.snapshot.pluginId, + version: target.snapshot.version, + name: target.snapshot.name, + description: target.snapshot.description, + toolDescription: target.snapshot.toolDescription, + author: target.snapshot.author, + tags: target.snapshot.tags, + permissions: target.snapshot.permissions, + secretSchema: target.snapshot.secretSchema, + isToolSet: target.snapshot.isToolSet, + tools: target.snapshot.tools + })) + } + }; +} + function signConnectionToken(claims: unknown, secret: string): string { const header = encodeBase64Url(JSON.stringify(TOKEN_HEADER)); const payload = encodeBase64Url(JSON.stringify(claims)); diff --git a/apps/connection-gateway/src/routes.ts b/apps/connection-gateway/src/routes.ts index 06a28255..5854bc2a 100644 --- a/apps/connection-gateway/src/routes.ts +++ b/apps/connection-gateway/src/routes.ts @@ -6,7 +6,8 @@ import { ConnectionGatewayRequestAcceptedDTOSchema, ConnectionGatewayRequestDTOSchema, ConnectionGatewaySessionStatusViewDTOSchema, - ConnectionGatewayStreamRequestDTOSchema + ConnectionGatewayStreamRequestDTOSchema, + ConnectionGatewayUpdateSessionMetadataRequestDTOSchema } from '@interface-adapter/contracts/dto/connection-gateway.dto'; import { cors } from 'hono/cors'; import { requestId } from 'hono/request-id'; @@ -228,6 +229,36 @@ export function createConnectionGatewayApp(deps: Pick { + try { + const body = ConnectionGatewayUpdateSessionMetadataRequestDTOSchema.parse( + c.req.valid('json') + ); + const session = await deps.service.updateSessionMetadata({ + sessionId: requiredParam(c.req.param('sessionId')), + metadata: body.metadata + }); + + return R.success(c, { session }); + } catch (error) { + return R.fail(c, statusFromError(error), normalizeError(error)); + } + } + ); + app.openapi( createRoute({ method: 'post', diff --git a/apps/server/.env.template b/apps/server/.env.template index 58ed1eff..befee75b 100644 --- a/apps/server/.env.template +++ b/apps/server/.env.template @@ -11,8 +11,14 @@ FASTGPT_BASE_URL=http://localhost:3000 # ================ Connection Gateway ===================== # plugin-server 通过 HTTP internal API 访问 gateway;TCP 连接地址由 CLI / 外部长连接消费者配置。 CONNECTION_GATEWAY_BASE_URL=http://localhost:3010 +# plugin-server 返回给 CLI 的 gateway TCP 公网地址;gateway HTTP 可以仅内网可达。 +CONNECTION_GATEWAY_TCP_URL=tcp://localhost:3011 # 调用 connection-gateway internal API 的 Bearer token;生产环境建议显式配置,本地/远程 gateway token 不同时必须显式配置。 CONNECTION_GATEWAY_AUTH_TOKEN=replace-with-connection-gateway-auth-token +# Debug session 有效期(ms)。 +CONNECTION_GATEWAY_DEBUG_SESSION_TTL_MS=1800000 +# CLI connect ticket 一次性兑换有效期(ms)。 +CONNECTION_GATEWAY_DEBUG_TICKET_TTL_MS=300000 # Debug runtime 等待本地调试插件响应的超时时间(ms)。 CONNECTION_GATEWAY_DEBUG_REQUEST_TIMEOUT_MS=120000 diff --git a/apps/server/main.ts b/apps/server/main.ts index 8d9e338f..eb4f1388 100644 --- a/apps/server/main.ts +++ b/apps/server/main.ts @@ -11,6 +11,7 @@ import { getErrText } from '@shared/utils/err'; import deps from './src/deps'; import { init } from './src/init'; +import { makeDebugSessionRoute } from './src/routes/debug-session.route'; import { makeModelRoute } from './src/routes/model.route'; import { makePluginRoute } from './src/routes/plugin.route'; import { makeRuntimeRoute } from './src/routes/runtime.route'; @@ -23,6 +24,7 @@ const logger = getLogger(root); logger.debug(serverEnv); const modelRoute = makeModelRoute(deps); +const debugSessionRoute = makeDebugSessionRoute(deps); const pluginRoute = makePluginRoute(deps); const runtimeRoute = makeRuntimeRoute(deps); const toolRoute = makeToolRoute({ toolManager: deps.toolManager, logger: getLogger(mod.tool) }); @@ -34,6 +36,7 @@ app.openAPIRegistry.registerComponent('securitySchemes', 'bearerAuth', { }); app.route('/api', modelRoute); +app.route('/api', debugSessionRoute); app.route('/api', pluginRoute); app.route('/api', runtimeRoute); app.route('/api', toolRoute); diff --git a/apps/server/src/deps.ts b/apps/server/src/deps.ts index e8bbd2b1..957984ef 100644 --- a/apps/server/src/deps.ts +++ b/apps/server/src/deps.ts @@ -3,6 +3,7 @@ import { LocalFileStorageRepo } from '@infrastructure/file-storage/local-file-st import { RemoteFileStorageRepo } from '@infrastructure/file-storage/remote-file-storage.repo'; import { FileTTLManager } from '@infrastructure/file-ttl/file-ttl.impl'; import { DebugPluginRepoOverlay } from '@infrastructure/plugin/debug-plugin.repo'; +import { RedisPluginDebugSessionRepo } from '@infrastructure/plugin/debug-session.repo'; import { PluginRepo } from '@infrastructure/plugin/plugin.repo'; import { CompositePluginRuntimeManager } from '@infrastructure/plugin/plugin-runtime/composite-runtime.manager'; import { ConnectionGatewayDebugRuntimeManager } from '@infrastructure/plugin/plugin-runtime/drivers/connection-gateway/debug-runtime.driver'; @@ -67,7 +68,7 @@ const connectionGatewayDebugRuntimeManager = new ConnectionGatewayDebugRuntimeMa baseUrl: serverEnv.CONNECTION_GATEWAY_BASE_URL, authToken: serverEnv.CONNECTION_GATEWAY_AUTH_TOKEN, requestTimeoutMs: serverEnv.CONNECTION_GATEWAY_DEBUG_REQUEST_TIMEOUT_MS, - sourceForUser: ({ userId }) => `debug:user:${userId}` + sourceForTmbId: ({ tmbId }) => `debug:tmbId:${tmbId}` }); export const pluginRuntimeManager = new CompositePluginRuntimeManager({ @@ -81,6 +82,11 @@ export const toolManager = ToolManager.getInstance({ fastgptBaseUrl: serverEnv.FASTGPT_BASE_URL }); +export const pluginDebugSessionRepo = new RedisPluginDebugSessionRepo( + redisClient.getClient, + serverEnv.JWT_SECRET +); + const deps = { localFileStorageRepo, pluginPKGFileResolver, @@ -91,7 +97,8 @@ const deps = { mongoClient, fileTTLManager, toolManager, - pluginRuntimeManager + pluginRuntimeManager, + pluginDebugSessionRepo }; export default deps; diff --git a/apps/server/src/routes/debug-session.route.ts b/apps/server/src/routes/debug-session.route.ts new file mode 100644 index 00000000..ea2b81d8 --- /dev/null +++ b/apps/server/src/routes/debug-session.route.ts @@ -0,0 +1,388 @@ +import { + PluginDebugSessionCreateRequestDTOSchema, + PluginDebugSessionRevokeRequestDTOSchema, + PluginDebugSessionTicketExchangeRequestDTOSchema +} from '@interface-adapter/contracts/dto/plugin-debug-session.dto'; +import { PluginDebugSessionContract } from '@interface-adapter/contracts/route/plugin-debug-session.contract'; +import z from 'zod'; + +import type { PluginDebugSessionPort } from '@domain/ports/plugin/plugin-debug-session.port'; +import type { PluginRepoPort } from '@domain/ports/plugin/plugin-repo.port'; +import { ConnectionGatewaySessionSchema } from '@domain/value-objects/connection-gateway.vo'; +import { HmacConnectionGatewayToken } from '@infrastructure/connection-gateway/token'; +import { serverEnv } from '@infrastructure/env'; +import { createOpenAPIHono, createRoute, R } from '@infrastructure/hono/utils/response'; +import { getLogger, mod } from '@infrastructure/logger'; + +export type DebugSessionRouteDeps = { + pluginDebugSessionRepo: PluginDebugSessionPort; + pluginRepo: PluginRepoPort; +}; + +const DebugSessionStatusQuerySchema = z.object({ + tmbId: z.string().min(1) +}); + +const gatewayTokenSigner = new HmacConnectionGatewayToken(serverEnv.JWT_SECRET); + +export const makeDebugSessionRoute = (deps: DebugSessionRouteDeps) => { + const route = createOpenAPIHono(); + const logger = getLogger(mod.plugin); + + route.openapi( + createRoute({ + ...PluginDebugSessionContract.Create.meta, + request: { + body: { + content: { + 'application/json': { + schema: PluginDebugSessionCreateRequestDTOSchema + } + } + } + }, + responses: { + 200: { + description: 'HTTP 200 response', + content: { + 'application/json': { + schema: PluginDebugSessionContract.Create.response[200]! + } + } + }, + 400: { + description: 'HTTP 400 response', + content: { + 'application/json': { + schema: PluginDebugSessionContract.Create.response[400]! + } + } + } + } + }), + async (c) => { + try { + const body = c.req.valid('json'); + const ttlMs = body.ttlMs ?? serverEnv.CONNECTION_GATEWAY_DEBUG_SESSION_TTL_MS; + const ticketTtlMs = Math.min(ttlMs, serverEnv.CONNECTION_GATEWAY_DEBUG_TICKET_TTL_MS); + const { session, ticket } = await deps.pluginDebugSessionRepo.create({ + tmbId: body.tmbId, + ttlMs, + ticketTtlMs + }); + + return R.success(c, { + debugSessionId: session.debugSessionId, + tmbId: session.tmbId, + source: session.source, + ticket, + ticketExpiresAt: Date.now() + ticketTtlMs, + expiresAt: session.expiresAt + }); + } catch (error) { + logger.error('Create debug session failed', { error }); + return R.fail(c, 400, error instanceof Error ? error : String(error)); + } + } + ); + + route.openapi( + createRoute({ + ...PluginDebugSessionContract.ExchangeTicket.meta, + request: { + body: { + content: { + 'application/json': { + schema: PluginDebugSessionTicketExchangeRequestDTOSchema + } + } + } + }, + responses: { + 200: { + description: 'HTTP 200 response', + content: { + 'application/json': { + schema: PluginDebugSessionContract.ExchangeTicket.response[200]! + } + } + }, + 400: { + description: 'HTTP 400 response', + content: { + 'application/json': { + schema: PluginDebugSessionContract.ExchangeTicket.response[400]! + } + } + }, + 404: { + description: 'HTTP 404 response', + content: { + 'application/json': { + schema: PluginDebugSessionContract.ExchangeTicket.response[404]! + } + } + } + } + }), + async (c) => { + try { + const body = c.req.valid('json'); + const { session } = await deps.pluginDebugSessionRepo.exchangeTicket(body.ticket); + const connectToken = await gatewayTokenSigner.sign({ + consumerType: 'plugin-debug', + subject: session.tmbId, + sessionScope: { + userId: session.tmbId, + source: session.source + }, + transport: 'tcp', + capabilities: ['gateway.bind', 'invoke'], + issuedAt: Date.now(), + expiresAt: session.expiresAt + }); + const gatewaySession = await createGatewaySession({ + token: connectToken, + source: session.source + }); + await deps.pluginDebugSessionRepo.setGatewaySession({ + tmbId: session.tmbId, + debugSessionId: session.debugSessionId, + gatewaySessionId: gatewaySession.id + }); + + return R.success(c, { + tcpUrl: serverEnv.CONNECTION_GATEWAY_TCP_URL, + source: session.source, + sessionId: gatewaySession.id, + session: gatewaySession, + connectToken, + expiresAt: session.expiresAt + }); + } catch (error) { + logger.error('Exchange debug session ticket failed', { error }); + return R.fail(c, 404, error instanceof Error ? error : String(error)); + } + } + ); + + route.openapi( + createRoute({ + ...PluginDebugSessionContract.Status.meta, + request: { + query: DebugSessionStatusQuerySchema + }, + responses: { + 200: { + description: 'HTTP 200 response', + content: { + 'application/json': { + schema: PluginDebugSessionContract.Status.response[200]! + } + } + }, + 404: { + description: 'HTTP 404 response', + content: { + 'application/json': { + schema: PluginDebugSessionContract.Status.response[404]! + } + } + } + } + }), + async (c) => { + const query = c.req.valid('query'); + const debugSessionId = requiredParam(c.req.param('debugSessionId')); + const session = await deps.pluginDebugSessionRepo.get({ + tmbId: query.tmbId, + debugSessionId + }); + + if (!session) { + return R.fail(c, 404, 'Debug session not found'); + } + + const gatewayStatus = await getGatewayStatusBySource(session.source).catch(() => null); + const [plugins] = + gatewayStatus?.session && gatewayStatus.ownerAlive + ? await deps.pluginRepo.list({ sources: [session.source] }) + : [[]]; + + return R.success(c, { + debugSessionId: session.debugSessionId, + tmbId: session.tmbId, + source: session.source, + status: toDebugSessionStatus(session.status, gatewayStatus), + plugins, + gateway: gatewayStatus + ? { + sessionId: gatewayStatus.session?.id, + ownerAlive: gatewayStatus.ownerAlive, + mailboxLag: gatewayStatus.mailboxLag + } + : undefined, + expiresAt: session.expiresAt + }); + } + ); + + route.openapi( + createRoute({ + ...PluginDebugSessionContract.Revoke.meta, + request: { + body: { + content: { + 'application/json': { + schema: PluginDebugSessionRevokeRequestDTOSchema + } + } + } + }, + responses: { + 200: { + description: 'HTTP 200 response', + content: { + 'application/json': { + schema: PluginDebugSessionContract.Revoke.response[200]! + } + } + }, + 404: { + description: 'HTTP 404 response', + content: { + 'application/json': { + schema: PluginDebugSessionContract.Revoke.response[404]! + } + } + } + } + }), + async (c) => { + const body = c.req.valid('json'); + const debugSessionId = requiredParam(c.req.param('debugSessionId')); + const session = await deps.pluginDebugSessionRepo.revoke({ + tmbId: body.tmbId, + debugSessionId + }); + + if (session?.gatewaySessionId) { + await deleteGatewaySession(session.gatewaySessionId).catch((error) => { + logger.warn('Delete revoked gateway session failed', { + debugSessionId, + gatewaySessionId: session.gatewaySessionId, + error + }); + }); + } + + return R.success(c, { revoked: Boolean(session) }); + } + ); + + return route; +}; + +async function createGatewaySession(input: { token: string; source: string }) { + const response = await fetch(`${gatewayBaseUrl()}/internal/sessions`, { + method: 'POST', + headers: { + Authorization: `Bearer ${serverEnv.CONNECTION_GATEWAY_AUTH_TOKEN}`, + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ + token: input.token, + transport: 'tcp', + metadata: { + pluginDebug: { + targets: [] + } + } + }) + }); + const payload = await parseGatewayResponse(response); + return z + .object({ + data: z.object({ + session: ConnectionGatewaySessionSchema + }) + }) + .parse(payload).data.session; +} + +async function getGatewayStatusBySource(source: string): Promise { + const response = await fetch( + `${gatewayBaseUrl()}/internal/sessions/by-source/${encodeURIComponent(source)}/status`, + { + headers: { + Authorization: `Bearer ${serverEnv.CONNECTION_GATEWAY_AUTH_TOKEN}` + } + } + ); + const payload = await parseGatewayResponse(response); + return GatewayStatusViewSchema.parse(payload.data); +} + +async function deleteGatewaySession(sessionId: string): Promise { + const response = await fetch(`${gatewayBaseUrl()}/internal/sessions/${encodeURIComponent(sessionId)}`, { + method: 'DELETE', + headers: { + Authorization: `Bearer ${serverEnv.CONNECTION_GATEWAY_AUTH_TOKEN}` + } + }); + + await parseGatewayResponse(response); +} + +async function parseGatewayResponse(response: Response): Promise { + const text = await response.text(); + const payload = text ? JSON.parse(text) : {}; + if (!response.ok) { + throw new Error(`Gateway request failed: ${response.status} ${text}`); + } + return payload; +} + +function gatewayBaseUrl(): string { + return serverEnv.CONNECTION_GATEWAY_BASE_URL.replace(/\/+$/, ''); +} + +function requiredParam(value: string | undefined): string { + if (!value) { + throw new Error('Required route param is missing'); + } + + return value; +} + +const GatewayStatusViewSchema = z.object({ + session: z + .object({ + id: z.string(), + status: z.string() + }) + .nullable(), + ownerAlive: z.boolean(), + mailboxLag: z.number().int().nonnegative() +}); + +type GatewayStatusView = z.infer; + +function toDebugSessionStatus( + sessionStatus: 'pending' | 'connected' | 'disconnected' | 'revoked' | 'expired', + gatewayStatus: GatewayStatusView | null +) { + if (sessionStatus === 'revoked' || sessionStatus === 'expired') { + return sessionStatus; + } + + if (!gatewayStatus?.session) { + return sessionStatus === 'pending' ? 'pending' : 'disconnected'; + } + + if (gatewayStatus.session.status === 'connected' && gatewayStatus.ownerAlive) { + return 'connected'; + } + + return 'disconnected'; +} diff --git a/packages/domain/src/ports/connection-gateway/session-registry.port.ts b/packages/domain/src/ports/connection-gateway/session-registry.port.ts index 354dc9b9..11da3db2 100644 --- a/packages/domain/src/ports/connection-gateway/session-registry.port.ts +++ b/packages/domain/src/ports/connection-gateway/session-registry.port.ts @@ -31,6 +31,11 @@ export interface ConnectionGatewaySessionRegistryPort { status: ConnectionGatewaySessionStatus; now?: number; }): Promise; + updateMetadata(input: { + sessionId: string; + metadata: Record; + now?: number; + }): Promise; remove(sessionId: string): Promise; countActive(): Promise; } diff --git a/packages/domain/src/ports/plugin/plugin-debug-session.port.ts b/packages/domain/src/ports/plugin/plugin-debug-session.port.ts new file mode 100644 index 00000000..a565cbbf --- /dev/null +++ b/packages/domain/src/ports/plugin/plugin-debug-session.port.ts @@ -0,0 +1,43 @@ +import type { + PluginDebugSession, + PluginDebugSessionId, + PluginDebugSessionTmbId +} from '@domain/value-objects/plugin-debug-session.vo'; + +export type CreatePluginDebugSessionInput = { + tmbId: PluginDebugSessionTmbId; + ttlMs: number; + ticketTtlMs: number; + now?: number; +}; + +export type CreatePluginDebugSessionOutput = { + session: PluginDebugSession; + ticket: string; + revokedSession?: PluginDebugSession; +}; + +export type ExchangePluginDebugSessionTicketOutput = { + session: PluginDebugSession; +}; + +export interface PluginDebugSessionPort { + create(input: CreatePluginDebugSessionInput): Promise; + exchangeTicket(ticket: string, now?: number): Promise; + get(input: { + tmbId: PluginDebugSessionTmbId; + debugSessionId: PluginDebugSessionId; + now?: number; + }): Promise; + revoke(input: { + tmbId: PluginDebugSessionTmbId; + debugSessionId: PluginDebugSessionId; + now?: number; + }): Promise; + setGatewaySession(input: { + tmbId: PluginDebugSessionTmbId; + debugSessionId: PluginDebugSessionId; + gatewaySessionId: string; + now?: number; + }): Promise; +} diff --git a/packages/domain/src/ports/plugin/plugin-runtime-manager.port.ts b/packages/domain/src/ports/plugin/plugin-runtime-manager.port.ts index 6fb9be99..64b85306 100644 --- a/packages/domain/src/ports/plugin/plugin-runtime-manager.port.ts +++ b/packages/domain/src/ports/plugin/plugin-runtime-manager.port.ts @@ -17,7 +17,8 @@ export type PluginRuntimeInvokeOptions = { timeout?: number; priority?: number; debug?: { - userId: string; + tmbId?: string; + userId?: string; source?: string; }; }; diff --git a/packages/domain/src/value-objects/plugin-debug-session.vo.test.ts b/packages/domain/src/value-objects/plugin-debug-session.vo.test.ts new file mode 100644 index 00000000..cd08ba8f --- /dev/null +++ b/packages/domain/src/value-objects/plugin-debug-session.vo.test.ts @@ -0,0 +1,28 @@ +import { describe, expect, it } from 'vitest'; + +import { + isPluginDebugSessionSource, + makePluginDebugSessionSource, + parsePluginDebugSessionSource +} from './plugin-debug-session.vo'; + +describe('plugin debug session source', () => { + it('builds and parses a tmbId scoped debug source', () => { + const source = makePluginDebugSessionSource({ + tmbId: 'tmb-1', + debugSessionId: 'debug-1' + }); + + expect(source).toBe('debug:tmbId:tmb-1:session:debug-1'); + expect(parsePluginDebugSessionSource(source)).toEqual({ + tmbId: 'tmb-1', + debugSessionId: 'debug-1' + }); + expect(isPluginDebugSessionSource(source)).toBe(true); + }); + + it('rejects plugin-scoped and legacy debug sources', () => { + expect(parsePluginDebugSessionSource('debug:user:u1')).toBeNull(); + expect(parsePluginDebugSessionSource('debug:tmbId:t1:plugin:p1')).toBeNull(); + }); +}); diff --git a/packages/domain/src/value-objects/plugin-debug-session.vo.ts b/packages/domain/src/value-objects/plugin-debug-session.vo.ts new file mode 100644 index 00000000..71a6d414 --- /dev/null +++ b/packages/domain/src/value-objects/plugin-debug-session.vo.ts @@ -0,0 +1,62 @@ +import z from 'zod'; + +export const PluginDebugSessionIdSchema = z.string().min(1); +export type PluginDebugSessionId = z.infer; + +export const PluginDebugSessionTmbIdSchema = z.string().min(1); +export type PluginDebugSessionTmbId = z.infer; + +export const PluginDebugSessionSourceSchema = z + .string() + .min(1) + .refine((source) => parsePluginDebugSessionSource(source) !== null, { + message: 'Invalid plugin debug session source' + }); +export type PluginDebugSessionSource = z.infer; + +export const PluginDebugSessionStatusSchema = z.enum([ + 'pending', + 'connected', + 'disconnected', + 'revoked', + 'expired' +]); +export type PluginDebugSessionStatus = z.infer; + +export const PluginDebugSessionSchema = z.object({ + debugSessionId: PluginDebugSessionIdSchema, + tmbId: PluginDebugSessionTmbIdSchema, + source: z.string().min(1), + status: PluginDebugSessionStatusSchema, + ticketHash: z.string().min(1), + gatewaySessionId: z.string().min(1).optional(), + createdAt: z.number().int().positive(), + expiresAt: z.number().int().positive(), + revokedAt: z.number().int().positive().optional() +}); +export type PluginDebugSession = z.infer; + +export function makePluginDebugSessionSource(input: { + tmbId: string; + debugSessionId: string; +}): string { + return `debug:tmbId:${input.tmbId}:session:${input.debugSessionId}`; +} + +export function parsePluginDebugSessionSource( + source: string +): { tmbId: string; debugSessionId: string } | null { + const match = /^debug:tmbId:([^:]+):session:([^:]+)$/.exec(source); + if (!match) { + return null; + } + + return { + tmbId: match[1], + debugSessionId: match[2] + }; +} + +export function isPluginDebugSessionSource(source: string | undefined): source is string { + return typeof source === 'string' && parsePluginDebugSessionSource(source) !== null; +} diff --git a/packages/infrastructure/src/connection-gateway/service.test.ts b/packages/infrastructure/src/connection-gateway/service.test.ts index 96561a5d..c451a91f 100644 --- a/packages/infrastructure/src/connection-gateway/service.test.ts +++ b/packages/infrastructure/src/connection-gateway/service.test.ts @@ -274,6 +274,67 @@ describe('ConnectionGatewayService', () => { }); }); + it('updates metadata from a token-bound bind envelope', async () => { + const { service, token } = makeService(); + const signed = await token.sign({ + consumerType: 'plugin-debug', + subject: 'tmb-1', + sessionScope: { + userId: 'tmb-1', + source: 'debug:tmbId:tmb-1:session:debug-1' + }, + transport: 'tcp', + capabilities: ['gateway.bind', 'invoke'], + issuedAt: now, + expiresAt: now + 60_000 + }); + const session = await service.createSession({ + token: signed, + transport: 'tcp', + now + }); + + await service.bindSession({ + sessionId: session.id, + envelope: { + ...makeBindEnvelope(session.id, session.generation), + payload: { + kind: 'plugin-debug.bind', + token: signed, + metadata: { + pluginDebug: { + targets: [ + { + source: 'debug:tmbId:tmb-1:session:debug-1', + pluginId: 'getTime', + version: '1.0.0' + } + ] + } + } + } + } + }); + + await expect( + service.getLatestStatusBySource('debug:tmbId:tmb-1:session:debug-1') + ).resolves.toMatchObject({ + session: expect.objectContaining({ + id: session.id, + metadata: { + pluginDebug: { + targets: [ + expect.objectContaining({ + pluginId: 'getTime' + }) + ] + } + } + }), + ownerAlive: true + }); + }); + it('keeps connecting and closed sessions visible but fails requests closed', async () => { const { service, token } = makeService(); const session = await service.createSession({ diff --git a/packages/infrastructure/src/connection-gateway/service.ts b/packages/infrastructure/src/connection-gateway/service.ts index 9060e7fb..aa22675d 100644 --- a/packages/infrastructure/src/connection-gateway/service.ts +++ b/packages/infrastructure/src/connection-gateway/service.ts @@ -216,6 +216,23 @@ export class ConnectionGatewayService { envelope: ConnectionGatewayEnvelope; }): Promise { const envelope = ConnectionGatewayEnvelopeSchema.parse(input.envelope); + const bindToken = getBindToken(envelope.payload); + if (bindToken) { + const claims = await this.deps.tokenVerifier.verify({ + token: bindToken, + expectedTransport: 'tcp', + requiredCapability: 'gateway.bind' + }); + const existing = await this.deps.sessionRegistry.get(input.sessionId); + if (!existing || existing.subject !== claims.subject) { + throw createError(ErrorCode.connectionGatewayInvalidToken); + } + const expectedSource = existing.sessionScope.source; + if (expectedSource && claims.sessionScope.source !== expectedSource) { + throw createError(ErrorCode.connectionGatewayInvalidToken); + } + } + const session = await this.assertEnvelopeSession(input.sessionId, envelope, { requireCapability: false, requireConnected: false @@ -232,6 +249,13 @@ export class ConnectionGatewayService { ownerNodeId: this.deps.options.nodeId, status: 'connected' }); + const metadata = getBindMetadata(envelope.payload); + if (metadata) { + await this.deps.sessionRegistry.updateMetadata({ + sessionId: session.id, + metadata + }); + } await this.renewOwnerLease(session.id); this.pushLog(session.id, 'info', 'Gateway TCP session bound', { consumerType: session.consumerType, @@ -292,6 +316,23 @@ export class ConnectionGatewayService { } } + async updateSessionMetadata(input: { + sessionId: string; + metadata: Record; + }): Promise { + const session = await this.deps.sessionRegistry.updateMetadata({ + sessionId: input.sessionId, + metadata: input.metadata + }); + + if (!session) { + throw createError(ErrorCode.connectionGatewaySessionNotFound); + } + + this.pushLog(session.id, 'info', 'Gateway session metadata updated'); + return session; + } + async renewOwnerLease(sessionId: string, now = Date.now()): Promise { const renewed = await this.deps.sessionRegistry.renewOwnerLease({ sessionId, @@ -451,3 +492,25 @@ function isTerminalReplyEnvelope(envelope: ConnectionGatewayEnvelope): boolean { const payload = envelope.payload as { event?: unknown }; return payload.event === 'end' || payload.event === 'error'; } + +function getBindMetadata(payload: unknown): Record | undefined { + if (!payload || typeof payload !== 'object' || Array.isArray(payload)) { + return undefined; + } + + const metadata = (payload as { metadata?: unknown }).metadata; + if (!metadata || typeof metadata !== 'object' || Array.isArray(metadata)) { + return undefined; + } + + return metadata as Record; +} + +function getBindToken(payload: unknown): string | undefined { + if (!payload || typeof payload !== 'object' || Array.isArray(payload)) { + return undefined; + } + + const token = (payload as { token?: unknown }).token; + return typeof token === 'string' && token.length > 0 ? token : undefined; +} diff --git a/packages/infrastructure/src/connection-gateway/session-registry.ts b/packages/infrastructure/src/connection-gateway/session-registry.ts index 73effa37..f7a905e5 100644 --- a/packages/infrastructure/src/connection-gateway/session-registry.ts +++ b/packages/infrastructure/src/connection-gateway/session-registry.ts @@ -83,6 +83,25 @@ export class InMemoryConnectionGatewaySessionRegistry return true; } + async updateMetadata(input: { + sessionId: string; + metadata: Record; + now?: number; + }): Promise { + const session = await this.get(input.sessionId); + if (!session) { + return null; + } + + const next: ConnectionGatewaySession = { + ...session, + metadata: input.metadata, + lastSeenAt: input.now ?? Date.now() + }; + this.sessions.set(session.id, next); + return next; + } + async remove(sessionId: string): Promise { this.sessions.delete(sessionId); } @@ -192,6 +211,25 @@ export class RedisConnectionGatewaySessionRegistry implements ConnectionGatewayS return true; } + async updateMetadata(input: { + sessionId: string; + metadata: Record; + now?: number; + }): Promise { + const session = await this.get(input.sessionId); + if (!session) { + return null; + } + + const next: ConnectionGatewaySession = { + ...session, + metadata: input.metadata, + lastSeenAt: input.now ?? Date.now() + }; + await this.saveSessionAndIndexes(next); + return next; + } + async remove(sessionId: string): Promise { const session = await this.get(sessionId); const multi = this.redis.multi().del(this.sessionKey(sessionId)); diff --git a/packages/infrastructure/src/env/index.ts b/packages/infrastructure/src/env/index.ts index af0afb42..e7cf540c 100644 --- a/packages/infrastructure/src/env/index.ts +++ b/packages/infrastructure/src/env/index.ts @@ -160,6 +160,9 @@ const ServerEnvSchema = { // Connection Gateway client 配置 CONNECTION_GATEWAY_BASE_URL: z.url().default('http://localhost:3010'), CONNECTION_GATEWAY_AUTH_TOKEN: GatewayAuthTokenSchema, + CONNECTION_GATEWAY_TCP_URL: z.string().default('tcp://localhost:3011'), + CONNECTION_GATEWAY_DEBUG_SESSION_TTL_MS: PositiveIntSchema.default(30 * 60_000), + CONNECTION_GATEWAY_DEBUG_TICKET_TTL_MS: PositiveIntSchema.default(5 * 60_000), CONNECTION_GATEWAY_DEBUG_REQUEST_TIMEOUT_MS: PositiveIntSchema.default(120_000), // 安全配置 @@ -251,6 +254,9 @@ export type ServerEnv = { JWT_SECRET: string; CONNECTION_GATEWAY_BASE_URL: string; CONNECTION_GATEWAY_AUTH_TOKEN: string; + CONNECTION_GATEWAY_TCP_URL: string; + CONNECTION_GATEWAY_DEBUG_SESSION_TTL_MS: number; + CONNECTION_GATEWAY_DEBUG_TICKET_TTL_MS: number; CONNECTION_GATEWAY_DEBUG_REQUEST_TIMEOUT_MS: number; HTTP_PROXY?: string; HTTPS_PROXY?: string; diff --git a/packages/infrastructure/src/plugin/debug-session.repo.test.ts b/packages/infrastructure/src/plugin/debug-session.repo.test.ts new file mode 100644 index 00000000..3f57ccc8 --- /dev/null +++ b/packages/infrastructure/src/plugin/debug-session.repo.test.ts @@ -0,0 +1,51 @@ +import { describe, expect, it } from 'vitest'; + +import { InMemoryPluginDebugSessionRepo } from './debug-session.repo'; + +describe('InMemoryPluginDebugSessionRepo', () => { + it('creates a tmbId scoped session and consumes tickets once', async () => { + const repo = new InMemoryPluginDebugSessionRepo('secret'); + const now = Date.now(); + const { session, ticket } = await repo.create({ + tmbId: 'tmb-1', + ttlMs: 60_000, + ticketTtlMs: 10_000, + now + }); + + expect(session.source).toBe(`debug:tmbId:tmb-1:session:${session.debugSessionId}`); + await expect(repo.exchangeTicket(ticket, now + 1)).resolves.toEqual({ session }); + await expect(repo.exchangeTicket(ticket, now + 2)).rejects.toThrow( + 'Debug session ticket not found' + ); + }); + + it('revokes the previous active session for the same tmbId', async () => { + const repo = new InMemoryPluginDebugSessionRepo('secret'); + const first = await repo.create({ + tmbId: 'tmb-1', + ttlMs: 60_000, + ticketTtlMs: 10_000, + now: 1_000 + }); + const second = await repo.create({ + tmbId: 'tmb-1', + ttlMs: 60_000, + ticketTtlMs: 10_000, + now: 2_000 + }); + + expect(second.revokedSession).toMatchObject({ + debugSessionId: first.session.debugSessionId, + status: 'revoked' + }); + await expect(repo.exchangeTicket(first.ticket, 2_001)).rejects.toThrow( + 'Debug session ticket not found' + ); + await expect(repo.exchangeTicket(second.ticket, 2_001)).resolves.toMatchObject({ + session: { + debugSessionId: second.session.debugSessionId + } + }); + }); +}); diff --git a/packages/infrastructure/src/plugin/debug-session.repo.ts b/packages/infrastructure/src/plugin/debug-session.repo.ts new file mode 100644 index 00000000..5ff4fc9e --- /dev/null +++ b/packages/infrastructure/src/plugin/debug-session.repo.ts @@ -0,0 +1,324 @@ +import { createHmac, randomBytes, randomUUID } from 'node:crypto'; + +import type Redis from 'ioredis'; + +import type { + CreatePluginDebugSessionInput, + CreatePluginDebugSessionOutput, + ExchangePluginDebugSessionTicketOutput, + PluginDebugSessionPort +} from '@domain/ports/plugin/plugin-debug-session.port'; +import { + makePluginDebugSessionSource, + type PluginDebugSession, + PluginDebugSessionSchema +} from '@domain/value-objects/plugin-debug-session.vo'; + +const KEY_PREFIX = 'plugin-debug:sessions'; + +export class InMemoryPluginDebugSessionRepo implements PluginDebugSessionPort { + private readonly sessions = new Map(); + private readonly tickets = new Map(); + private readonly activeByTmbId = new Map(); + + constructor(private readonly hashSecret = 'test-secret') {} + + async create(input: CreatePluginDebugSessionInput): Promise { + const now = input.now ?? Date.now(); + const debugSessionId = randomUUID(); + const ticket = createOpaqueTicket(); + const session: PluginDebugSession = { + debugSessionId, + tmbId: input.tmbId, + source: makePluginDebugSessionSource({ tmbId: input.tmbId, debugSessionId }), + status: 'pending', + ticketHash: hashTicket(ticket, this.hashSecret), + createdAt: now, + expiresAt: now + input.ttlMs + }; + const activeSession = await this.getActive(input.tmbId, now); + const revokedSession = activeSession + ? await this.revoke({ + tmbId: activeSession.tmbId, + debugSessionId: activeSession.debugSessionId, + now + }) + : null; + + this.sessions.set(this.sessionKey(session), session); + this.tickets.set(session.ticketHash, this.sessionKey(session)); + this.activeByTmbId.set(session.tmbId, session.debugSessionId); + + return { + session, + ticket, + ...(revokedSession ? { revokedSession } : {}) + }; + } + + async exchangeTicket( + ticket: string, + now = Date.now() + ): Promise { + const ticketHash = hashTicket(ticket, this.hashSecret); + const sessionKey = this.tickets.get(ticketHash); + if (!sessionKey) { + throw new Error('Debug session ticket not found'); + } + + this.tickets.delete(ticketHash); + const session = this.sessions.get(sessionKey); + if (!session || session.expiresAt <= now || session.status === 'revoked') { + throw new Error('Debug session ticket expired'); + } + + return { session }; + } + + async get(input: { + tmbId: string; + debugSessionId: string; + now?: number; + }): Promise { + const session = this.sessions.get(this.sessionKey(input)); + if (!session) { + return null; + } + + return normalizeSessionExpiry(session, input.now ?? Date.now()); + } + + async revoke(input: { + tmbId: string; + debugSessionId: string; + now?: number; + }): Promise { + const session = this.sessions.get(this.sessionKey(input)); + if (!session) { + return null; + } + + const now = input.now ?? Date.now(); + const next: PluginDebugSession = { + ...session, + status: 'revoked', + revokedAt: session.revokedAt ?? now + }; + this.sessions.set(this.sessionKey(next), next); + this.tickets.delete(next.ticketHash); + if (this.activeByTmbId.get(next.tmbId) === next.debugSessionId) { + this.activeByTmbId.delete(next.tmbId); + } + return next; + } + + async setGatewaySession(input: { + tmbId: string; + debugSessionId: string; + gatewaySessionId: string; + now?: number; + }): Promise { + const session = await this.get(input); + if (!session) { + return null; + } + + const next: PluginDebugSession = { + ...session, + gatewaySessionId: input.gatewaySessionId + }; + this.sessions.set(this.sessionKey(next), next); + return next; + } + + private async getActive(tmbId: string, now: number): Promise { + const debugSessionId = this.activeByTmbId.get(tmbId); + if (!debugSessionId) { + return null; + } + + return this.get({ tmbId, debugSessionId, now }); + } + + private sessionKey(input: { tmbId: string; debugSessionId: string }): string { + return `${input.tmbId}:${input.debugSessionId}`; + } +} + +export class RedisPluginDebugSessionRepo implements PluginDebugSessionPort { + constructor( + private readonly redis: Redis, + private readonly hashSecret: string + ) {} + + async create(input: CreatePluginDebugSessionInput): Promise { + const now = input.now ?? Date.now(); + const debugSessionId = randomUUID(); + const ticket = createOpaqueTicket(); + const session: PluginDebugSession = { + debugSessionId, + tmbId: input.tmbId, + source: makePluginDebugSessionSource({ tmbId: input.tmbId, debugSessionId }), + status: 'pending', + ticketHash: hashTicket(ticket, this.hashSecret), + createdAt: now, + expiresAt: now + input.ttlMs + }; + const activeSession = await this.getActive(input.tmbId, now); + const revokedSession = activeSession + ? await this.revoke({ + tmbId: activeSession.tmbId, + debugSessionId: activeSession.debugSessionId, + now + }) + : null; + + await this.saveSession(session); + await this.redis + .multi() + .set(this.ticketKey(session.ticketHash), this.sessionKey(session), 'PX', input.ticketTtlMs) + .set(this.activeKey(session.tmbId), session.debugSessionId, 'PX', input.ttlMs) + .exec(); + + return { + session, + ticket, + ...(revokedSession ? { revokedSession } : {}) + }; + } + + async exchangeTicket( + ticket: string, + now = Date.now() + ): Promise { + const ticketHash = hashTicket(ticket, this.hashSecret); + const ticketKey = this.ticketKey(ticketHash); + const sessionKey = await this.redis.call('GETDEL', ticketKey); + if (!sessionKey) { + throw new Error('Debug session ticket not found'); + } + + const session = await this.getByKey(String(sessionKey), now); + if (!session || session.status === 'revoked') { + throw new Error('Debug session ticket expired'); + } + + return { session }; + } + + async get(input: { + tmbId: string; + debugSessionId: string; + now?: number; + }): Promise { + return this.getByKey(this.sessionKey(input), input.now ?? Date.now()); + } + + async revoke(input: { + tmbId: string; + debugSessionId: string; + now?: number; + }): Promise { + const session = await this.get(input); + if (!session) { + return null; + } + + const now = input.now ?? Date.now(); + const next: PluginDebugSession = { + ...session, + status: 'revoked', + revokedAt: session.revokedAt ?? now + }; + await this.saveSession(next); + + const multi = this.redis.multi().del(this.ticketKey(next.ticketHash)); + if ((await this.redis.get(this.activeKey(next.tmbId))) === next.debugSessionId) { + multi.del(this.activeKey(next.tmbId)); + } + await multi.exec(); + + return next; + } + + async setGatewaySession(input: { + tmbId: string; + debugSessionId: string; + gatewaySessionId: string; + now?: number; + }): Promise { + const session = await this.get(input); + if (!session) { + return null; + } + + const next: PluginDebugSession = { + ...session, + gatewaySessionId: input.gatewaySessionId + }; + await this.saveSession(next); + return next; + } + + private async getActive(tmbId: string, now: number): Promise { + const debugSessionId = await this.redis.get(this.activeKey(tmbId)); + if (!debugSessionId) { + return null; + } + + return this.get({ tmbId, debugSessionId, now }); + } + + private async getByKey(key: string, now: number): Promise { + const value = await this.redis.get(key); + if (!value) { + return null; + } + + const session = PluginDebugSessionSchema.parse(JSON.parse(value)); + return normalizeSessionExpiry(session, now); + } + + private async saveSession(session: PluginDebugSession): Promise { + await this.redis.set( + this.sessionKey(session), + JSON.stringify(session), + 'PX', + Math.max(1, session.expiresAt - Date.now()) + ); + } + + private sessionKey(input: { tmbId: string; debugSessionId: string }): string { + return `${KEY_PREFIX}:by-id:${input.tmbId}:${input.debugSessionId}`; + } + + private ticketKey(ticketHash: string): string { + return `${KEY_PREFIX}:by-ticket:${ticketHash}`; + } + + private activeKey(tmbId: string): string { + return `${KEY_PREFIX}:active-by-tmb:${tmbId}`; + } +} + +function normalizeSessionExpiry( + session: PluginDebugSession, + now: number +): PluginDebugSession | null { + if (session.expiresAt > now) { + return session; + } + + return { + ...session, + status: session.status === 'revoked' ? 'revoked' : 'expired' + }; +} + +function createOpaqueTicket(): string { + return randomBytes(32).toString('base64url'); +} + +function hashTicket(ticket: string, secret: string): string { + return createHmac('sha256', secret).update(ticket).digest('base64url'); +} diff --git a/packages/infrastructure/src/plugin/plugin-runtime/drivers/connection-gateway/debug-runtime.driver.test.ts b/packages/infrastructure/src/plugin/plugin-runtime/drivers/connection-gateway/debug-runtime.driver.test.ts index b2e122ec..901527f8 100644 --- a/packages/infrastructure/src/plugin/plugin-runtime/drivers/connection-gateway/debug-runtime.driver.test.ts +++ b/packages/infrastructure/src/plugin/plugin-runtime/drivers/connection-gateway/debug-runtime.driver.test.ts @@ -74,6 +74,7 @@ describe('ConnectionGatewayDebugRuntimeManager', () => { baseUrl: 'http://gateway.local', authToken: 'token', requestTimeoutMs: 1_000, + sourceForTmbId: ({ tmbId }) => `debug:tmbId:${tmbId}`, sourceForUser: ({ userId }) => `debug:user:${userId}` }); diff --git a/packages/infrastructure/src/plugin/plugin-runtime/drivers/connection-gateway/debug-runtime.driver.ts b/packages/infrastructure/src/plugin/plugin-runtime/drivers/connection-gateway/debug-runtime.driver.ts index 012b38ad..2f733650 100644 --- a/packages/infrastructure/src/plugin/plugin-runtime/drivers/connection-gateway/debug-runtime.driver.ts +++ b/packages/infrastructure/src/plugin/plugin-runtime/drivers/connection-gateway/debug-runtime.driver.ts @@ -21,12 +21,14 @@ export type ConnectionGatewayDebugRuntimeManagerOptions = { baseUrl: string; authToken: string; requestTimeoutMs: number; - sourceForUser(input: { userId: string }): string; + sourceForTmbId(input: { tmbId: string }): string; + sourceForUser?: (input: { userId: string }) => string; }; export type ConnectionGatewayDebugRuntimeInvokeOptions = PluginRuntimeInvokeOptions & { debug?: { - userId: string; + tmbId?: string; + userId?: string; source?: string; }; }; @@ -90,21 +92,16 @@ export class ConnectionGatewayDebugRuntimeManager return failureResult(createError(ErrorCode.pluginRuntimeEventNotSupported)); } - const debugUserId = options?.debug?.userId; - if (!debugUserId) { + const source = this.resolveSource(options?.debug); + if (!source) { return failureResult( createError(ErrorCode.pluginRuntimePluginNotFound, { - message: 'Missing debug user id for connection-gateway runtime' + message: 'Missing debug source for connection-gateway runtime' }) ); } try { - const source = - options.debug?.source ?? - this.options.sourceForUser({ - userId: debugUserId - }); const session = await this.findSessionBySource(source); const responses = this.publishRequestAndReadStream({ session, @@ -229,6 +226,22 @@ export class ConnectionGatewayDebugRuntimeManager private get baseUrl(): string { return this.options.baseUrl.replace(/\/+$/, ''); } + + private resolveSource(debug: ConnectionGatewayDebugRuntimeInvokeOptions['debug']): string | null { + if (debug?.source) { + return debug.source; + } + + if (debug?.tmbId) { + return this.options.sourceForTmbId({ tmbId: debug.tmbId }); + } + + if (debug?.userId && this.options.sourceForUser) { + return this.options.sourceForUser({ userId: debug.userId }); + } + + return null; + } } function toObjectPayload(payload: unknown): Record { diff --git a/packages/infrastructure/src/plugin/tool.impl.ts b/packages/infrastructure/src/plugin/tool.impl.ts index 1d73728e..9d2d5315 100644 --- a/packages/infrastructure/src/plugin/tool.impl.ts +++ b/packages/infrastructure/src/plugin/tool.impl.ts @@ -214,7 +214,7 @@ export class ToolManager implements ToolManagerPort { ...(isDebugPluginSource(source) ? { debug: { - userId: getDebugUserIdFromSource(source), + ...getDebugIdentityFromSource(source), source } } @@ -256,19 +256,24 @@ function isDebugPluginSource(source: PluginSourceType | undefined): source is st return typeof source === 'string' && source.startsWith('debug:'); } -function getDebugUserIdFromSource(source: string): string { +function getDebugIdentityFromSource(source: string): { tmbId?: string; userId?: string } { const parts = source.split(':'); + const tmbIndex = parts.indexOf('tmbId'); + const tmbId = tmbIndex >= 0 ? parts[tmbIndex + 1] : undefined; + if (tmbId) { + return { tmbId }; + } + const userIndex = parts.indexOf('user'); const userId = userIndex >= 0 ? parts[userIndex + 1] : undefined; - - if (!userId) { - throw createError(ErrorCode.pluginRuntimePluginNotFound, { - message: 'Debug source must include user id', - data: { source } - }); + if (userId) { + return { userId }; } - return userId; + throw createError(ErrorCode.pluginRuntimePluginNotFound, { + message: 'Debug source must include tmbId or user id', + data: { source } + }); } type ToolRunErrorContext = { diff --git a/packages/interface-adapter/src/contracts/dto/connection-gateway.dto.ts b/packages/interface-adapter/src/contracts/dto/connection-gateway.dto.ts index c5cb10b3..d15290ee 100644 --- a/packages/interface-adapter/src/contracts/dto/connection-gateway.dto.ts +++ b/packages/interface-adapter/src/contracts/dto/connection-gateway.dto.ts @@ -24,6 +24,13 @@ export type ConnectionGatewayCreateSessionResponseDTO = z.infer< typeof ConnectionGatewayCreateSessionResponseDTOSchema >; +export const ConnectionGatewayUpdateSessionMetadataRequestDTOSchema = z.object({ + metadata: z.record(z.string(), z.unknown()) +}); +export type ConnectionGatewayUpdateSessionMetadataRequestDTO = z.infer< + typeof ConnectionGatewayUpdateSessionMetadataRequestDTOSchema +>; + export const ConnectionGatewayRequestDTOSchema = z.object({ envelope: ConnectionGatewayEnvelopeSchema, stream: z.boolean().default(false) diff --git a/packages/interface-adapter/src/contracts/dto/plugin-debug-session.dto.ts b/packages/interface-adapter/src/contracts/dto/plugin-debug-session.dto.ts new file mode 100644 index 00000000..066e18b2 --- /dev/null +++ b/packages/interface-adapter/src/contracts/dto/plugin-debug-session.dto.ts @@ -0,0 +1,90 @@ +import z from 'zod'; + +import { ConnectionGatewaySessionSchema } from '@domain/value-objects/connection-gateway.vo'; + +import { PluginListItemDTOSchema } from './plugin.dto'; + +export const PluginDebugSessionStatusDTOSchema = z.enum([ + 'pending', + 'connected', + 'disconnected', + 'revoked', + 'expired' +]); + +export const PluginDebugSessionCreateRequestDTOSchema = z.object({ + tmbId: z.string().min(1), + ttlMs: z.number().int().positive().optional() +}); + +export const PluginDebugSessionCreateResponseDTOSchema = z.object({ + debugSessionId: z.string().min(1), + tmbId: z.string().min(1), + source: z.string().min(1), + ticket: z.string().min(1), + ticketExpiresAt: z.number().int().positive(), + expiresAt: z.number().int().positive() +}); + +export const PluginDebugSessionTicketExchangeRequestDTOSchema = z.object({ + ticket: z.string().min(1) +}); + +export const PluginDebugSessionTicketExchangeResponseDTOSchema = z.object({ + tcpUrl: z.string().min(1), + source: z.string().min(1), + sessionId: z.string().min(1), + session: ConnectionGatewaySessionSchema, + connectToken: z.string().min(1), + expiresAt: z.number().int().positive() +}); + +export const PluginDebugSessionGetParamsDTOSchema = z.object({ + debugSessionId: z.string().min(1), + tmbId: z.string().min(1) +}); + +export const PluginDebugSessionStatusResponseDTOSchema = z.object({ + debugSessionId: z.string().min(1), + tmbId: z.string().min(1), + source: z.string().min(1), + status: PluginDebugSessionStatusDTOSchema, + plugins: z.array(PluginListItemDTOSchema), + gateway: z + .object({ + sessionId: z.string().min(1).optional(), + ownerAlive: z.boolean(), + mailboxLag: z.number().int().nonnegative() + }) + .optional(), + expiresAt: z.number().int().positive() +}); + +export const PluginDebugSessionRevokeParamsDTOSchema = z.object({ + debugSessionId: z.string().min(1) +}); + +export const PluginDebugSessionRevokeRequestDTOSchema = z.object({ + tmbId: z.string().min(1), + reason: z.string().optional() +}); + +export const PluginDebugSessionRevokeResponseDTOSchema = z.object({ + revoked: z.boolean() +}); + +export type PluginDebugSessionCreateRequestDTO = z.infer< + typeof PluginDebugSessionCreateRequestDTOSchema +>; +export type PluginDebugSessionCreateResponseDTO = z.infer< + typeof PluginDebugSessionCreateResponseDTOSchema +>; +export type PluginDebugSessionTicketExchangeRequestDTO = z.infer< + typeof PluginDebugSessionTicketExchangeRequestDTOSchema +>; +export type PluginDebugSessionTicketExchangeResponseDTO = z.infer< + typeof PluginDebugSessionTicketExchangeResponseDTOSchema +>; +export type PluginDebugSessionStatusResponseDTO = z.infer< + typeof PluginDebugSessionStatusResponseDTOSchema +>; diff --git a/packages/interface-adapter/src/contracts/route/plugin-debug-session.contract.ts b/packages/interface-adapter/src/contracts/route/plugin-debug-session.contract.ts new file mode 100644 index 00000000..28273ec1 --- /dev/null +++ b/packages/interface-adapter/src/contracts/route/plugin-debug-session.contract.ts @@ -0,0 +1,82 @@ +import { type ContractMetaType,defineContract, jsonResponse } from '../contract.type'; +import { ErrorResponseDTOSchema } from '../dto/common.dto'; +import { + PluginDebugSessionCreateRequestDTOSchema, + PluginDebugSessionCreateResponseDTOSchema, + PluginDebugSessionRevokeRequestDTOSchema, + PluginDebugSessionRevokeResponseDTOSchema, + PluginDebugSessionStatusResponseDTOSchema, + PluginDebugSessionTicketExchangeRequestDTOSchema, + PluginDebugSessionTicketExchangeResponseDTOSchema +} from '../dto/plugin-debug-session.dto'; + +import { authToken } from './auth'; + +const tags = ['plugin'] satisfies ContractMetaType['tags']; + +export const PluginDebugSessionContract = { + Create: defineContract({ + meta: { + method: 'post', + path: '/plugin/debug-sessions', + operationId: 'pluginDebugSession.create', + description: 'Create a plugin debug session for a FastGPT tmbId', + summary: 'Create debug session', + tags, + security: authToken + }, + request: PluginDebugSessionCreateRequestDTOSchema, + response: { + 200: jsonResponse({ data: PluginDebugSessionCreateResponseDTOSchema }), + 400: jsonResponse({ error: ErrorResponseDTOSchema }) + } + }), + ExchangeTicket: defineContract({ + meta: { + method: 'post', + path: '/plugin/debug-sessions/tickets:exchange', + operationId: 'pluginDebugSession.exchangeTicket', + description: 'Exchange a one-time debug session ticket for scoped gateway connection info', + summary: 'Exchange debug ticket', + tags, + security: authToken + }, + request: PluginDebugSessionTicketExchangeRequestDTOSchema, + response: { + 200: jsonResponse({ data: PluginDebugSessionTicketExchangeResponseDTOSchema }), + 400: jsonResponse({ error: ErrorResponseDTOSchema }), + 404: jsonResponse({ error: ErrorResponseDTOSchema }) + } + }), + Status: defineContract({ + meta: { + method: 'get', + path: '/plugin/debug-sessions/:debugSessionId', + operationId: 'pluginDebugSession.status', + description: 'Get a plugin debug session status and mounted debug plugins', + summary: 'Get debug session status', + tags, + security: authToken + }, + response: { + 200: jsonResponse({ data: PluginDebugSessionStatusResponseDTOSchema }), + 404: jsonResponse({ error: ErrorResponseDTOSchema }) + } + }), + Revoke: defineContract({ + meta: { + method: 'post', + path: '/plugin/debug-sessions/:debugSessionId/revoke', + operationId: 'pluginDebugSession.revoke', + description: 'Revoke a plugin debug session and close its gateway session', + summary: 'Revoke debug session', + tags, + security: authToken + }, + request: PluginDebugSessionRevokeRequestDTOSchema, + response: { + 200: jsonResponse({ data: PluginDebugSessionRevokeResponseDTOSchema }), + 404: jsonResponse({ error: ErrorResponseDTOSchema }) + } + }) +} as const; diff --git a/plan/2026-06-15_12-19-59-fastgpt-debug-integration.md b/plan/2026-06-15_12-19-59-fastgpt-debug-integration.md new file mode 100644 index 00000000..aee14589 --- /dev/null +++ b/plan/2026-06-15_12-19-59-fastgpt-debug-integration.md @@ -0,0 +1,465 @@ +--- +mode: plan +cwd: /Volumes/Code/fastgpt-plugin +task: FastGPT debug channel integration +complexity: complex +tool: update_plan +total_thoughts: 7 +created_at: 2026-06-15T12:19:59+08:00 +--- + +# FastGPT Debug Channel Integration + +## 任务概述 + +本阶段把 TCP debug 从“手动传 gateway 参数的本地工具”升级为 FastGPT 可集成的调试通道控制面。 + +当前已经完成的底座: + +- `connection-gateway` 可以维护 TCP 长链接、session、mailbox、owner lease、metrics。 +- CLI 可以通过一个 TCP channel 挂载多个本地插件。 +- `plugin-server` 已能通过 debug `source` 查询 gateway 中的本地插件 metadata,并把 invoke 转发到 CLI。 + +本计划聚焦 FastGPT 集成需要的 debug session、ticket 兑换、source 规范和断连控制。TUI、daemon、生产 runtime、渠道 WebSocket consumer adapter 保持后续独立 PR。 + +## 已确认契约 + +### Source + +统一使用 `tmbId` 作为 FastGPT 侧身份边界: + +```text +debug:tmbId:{tmbId}:session:{debugSessionId} +``` + +约束: + +- `tmbId` 是 FastGPT 中 user + team 的唯一标识。 +- `debugSessionId` 用于隔离同一个 `tmbId` 的新旧调试会话。 +- 一个 source 对应一个 debug channel。 +- 一个 debug channel 可以挂载多个本地插件。 +- `pluginId` 只存在于插件 metadata 和 invoke payload 中,不参与 source 生成。 + +### Ticket + +FastGPT 创建 debug session 后给 CLI 一个 opaque connect link。CLI 请求该链接,通过一次性 ticket 换取连接信息。 + +CLI 不接触以下全局 secret: + +- `CONNECTION_GATEWAY_AUTH_TOKEN` +- `JWT_SECRET` +- gateway internal HTTP auth token + +CLI 只能获得短期、单 session、可撤销的连接凭证。 + +### 控制面职责 + +FastGPT 负责: + +- 用户鉴权。 +- 解析当前用户的 `tmbId`。 +- 创建/刷新/断开 debug session 的产品入口。 +- 给 CLI 暴露公网 connect link。 + +fastgpt-plugin/plugin-server 负责: + +- debug session 创建、ticket 兑换、revoke、status。 +- 生成 `debug:tmbId:{tmbId}:session:{debugSessionId}`。 +- 调用 connection-gateway internal API 创建 scoped connect token。 +- 查询 debug source 下的插件列表和插件详情。 +- invoke 时按 debug source 走 gateway runtime,断连时 fail closed。 + +connection-gateway 负责: + +- 维护长链接 session、mailbox、owner lease、metrics。 +- 校验 scoped connection token。 +- 不感知 FastGPT 用户模型,不直接处理 `tmbId` 鉴权。 + +CLI 负责: + +- 加载本地一个或多个插件。 +- 请求 connect link 并兑换连接信息。 +- 用返回的 `tcpUrl`、`sessionId`、`connectToken` 建立 TCP channel。 +- 上报多插件 metadata,处理远程 invoke。 +- 收到 revoke/close 或连接失效后退出或重连失败。 + +## 推荐端到端流程 + +```mermaid +sequenceDiagram + participant User as "Developer" + participant FastGPT as "FastGPT" + participant Plugin as "fastgpt-plugin server" + participant Gateway as "connection-gateway" + participant CLI as "debug CLI" + + User->>FastGPT: "Create debug session" + FastGPT->>FastGPT: "Auth and resolve tmbId" + FastGPT->>Plugin: "POST /debug-sessions { tmbId }" + Plugin->>Plugin: "Create debugSessionId, source, ticket" + Plugin-->>FastGPT: "source, debugSessionId, connectLink" + FastGPT-->>User: "Show/copy connect link" + + User->>CLI: "fastgpt-plugin debug --connect ./plugin-a ./plugin-b" + CLI->>FastGPT: "GET connect link with ticket" + FastGPT->>Plugin: "POST /debug-sessions/:id/tickets:exchange" + Plugin->>Gateway: "Create scoped TCP session token" + Plugin-->>FastGPT: "tcpUrl, source, sessionId, connectToken" + FastGPT-->>CLI: "Connection info" + CLI->>Gateway: "Bind TCP session" + CLI->>Gateway: "Publish plugin metadata bundle" + + FastGPT->>Plugin: "List plugins by source" + Plugin->>Gateway: "Get latest status by source" + Plugin-->>FastGPT: "Debug plugin metadata" + + FastGPT->>Plugin: "Invoke plugin with tmbId + source + pluginId" + Plugin->>Gateway: "Publish request and stream replies" + Gateway-->>CLI: "Invoke envelope" + CLI-->>Gateway: "Stream/response envelopes" + Plugin-->>FastGPT: "Tool stream/result" + + FastGPT->>Plugin: "Revoke debug session" + Plugin->>Gateway: "Close gateway session" + Gateway-->>CLI: "Connection closes" +``` + +## API 设计草案 + +### 创建 debug session + +面向 FastGPT 后端调用。 + +```http +POST /api/plugins/debug-sessions +Authorization: Bearer +Content-Type: application/json +``` + +Request: + +```json +{ + "tmbId": "tmb_xxx", + "ttlMs": 1800000 +} +``` + +Response: + +```json +{ + "debugSessionId": "dbg_xxx", + "source": "debug:tmbId:tmb_xxx:session:dbg_xxx", + "ticket": "opaque-one-time-ticket", + "ticketExpiresAt": 1781500000000 +} +``` + +实现约束: + +- 同一个 `tmbId` 默认只保留一个 active debug session。 +- 创建新 session 时可以 revoke 旧 session。 +- ticket 必须短 TTL、一次性使用。 +- ticket 存储需要可横向扩展,优先复用 Redis。 + +### 兑换 ticket + +面向 FastGPT 后端代理调用,CLI 访问 FastGPT 公网 link,FastGPT 再请求 plugin-server。 + +```http +POST /api/plugins/debug-sessions/tickets:exchange +Authorization: Bearer +Content-Type: application/json +``` + +Request: + +```json +{ + "ticket": "opaque-one-time-ticket" +} +``` + +Response: + +```json +{ + "tcpUrl": "tcp://tcp.example.com:39430", + "source": "debug:tmbId:tmb_xxx:session:dbg_xxx", + "sessionId": "gateway-session-id", + "connectToken": "scoped-connection-token", + "expiresAt": 1781500000000 +} +``` + +实现约束: + +- 兑换成功后 ticket 立即失效。 +- `connectToken` 的 claims 绑定 `consumerType=plugin-debug`、`transport=tcp`、`subject=tmbId`、`sessionScope.source`。 +- 返回的 `tcpUrl` 来自 plugin-server 配置,不由 CLI 拼接。 + +### 查询 debug session 状态 + +面向 FastGPT 后端调用。 + +```http +GET /api/plugins/debug-sessions/{debugSessionId}?tmbId={tmbId} +``` + +Response: + +```json +{ + "debugSessionId": "dbg_xxx", + "tmbId": "tmb_xxx", + "source": "debug:tmbId:tmb_xxx:session:dbg_xxx", + "status": "pending|connected|disconnected|revoked|expired", + "plugins": [ + { + "pluginId": "getTime", + "version": "0.1.0", + "name": "Get Time", + "isToolSet": false + } + ], + "gateway": { + "sessionId": "gateway-session-id", + "ownerAlive": true, + "mailboxLag": 0 + } +} +``` + +### 断开 debug session + +面向 FastGPT 后端调用。 + +```http +POST /api/plugins/debug-sessions/{debugSessionId}:revoke +Authorization: Bearer +Content-Type: application/json +``` + +Request: + +```json +{ + "tmbId": "tmb_xxx", + "reason": "user-disconnect" +} +``` + +实现约束: + +- revoke 幂等。 +- revoke 后 ticket、pending reconnect、gateway session 均不可继续使用。 +- 已断开的 session 查询应返回 `revoked` 或 `disconnected`,不能 fallback 到生产插件。 + +## 代码实施计划 + +### 1. Domain contract + +新增 debug session value object 和端口: + +- `DebugSessionId` +- `DebugSessionSource` +- `DebugSessionTicket` +- `PluginDebugSessionPort` + +关键函数: + +- `makeDebugSessionSource({ tmbId, debugSessionId })` +- `parseDebugSessionSource(source)` +- `isDebugSessionSource(source)` + +向后兼容策略: + +- 旧的 `debug:user:{userId}` 测试 source 可以保留到当前 PR 内部测试迁移完成。 +- 新增逻辑默认使用 `debug:tmbId:{tmbId}:session:{debugSessionId}`。 + +### 2. Infrastructure session store + +优先实现 Redis store: + +- `create({ tmbId, ttlMs })` +- `exchangeTicket(ticket)` +- `get({ tmbId, debugSessionId })` +- `revoke({ tmbId, debugSessionId })` +- `getActiveByTmbId(tmbId)` + +Redis key 建议: + +```text +plugin-debug:session:{debugSessionId} +plugin-debug:ticket:{ticketHash} +plugin-debug:active-by-tmb:{tmbId} +``` + +注意事项: + +- ticket 存 hash,不存明文。 +- exchange 使用原子 delete/consume,避免并发重复兑换。 +- active session index 需要 TTL 同步。 + +### 3. Gateway token minting + +plugin-server 侧增加只给 debug session 使用的 gateway token minting 封装: + +- 输入:`tmbId`、`source`、`debugSessionId`、`ttlMs` +- 输出:`sessionId`、`connectToken`、`expiresAt` + +token claims: + +```json +{ + "consumerType": "plugin-debug", + "subject": "tmb_xxx", + "sessionScope": { + "userId": "tmb_xxx", + "source": "debug:tmbId:tmb_xxx:session:dbg_xxx" + }, + "transport": "tcp", + "capabilities": ["gateway.bind", "plugin-debug.invoke"], + "expiresAt": 1781500000000 +} +``` + +当前 `ConnectionGatewaySessionScopeSchema` 仍要求 `userId`。本阶段可把 `userId` 填为 `tmbId` 来保持向后兼容;后续再单独把 gateway scope 字段泛化为 `subjectId` 或 `principalId`。 + +### 4. Server routes + +在 `apps/server` 增加 debug session route,建议单独文件: + +- `apps/server/src/routes/debug-session.route.ts` + +并在 app 初始化中挂载。 + +路由只暴露给 FastGPT 后端或受信内部调用方;鉴权沿用现有 server auth middleware,不增加公网裸接口。 + +### 5. Runtime invoke options 改造 + +把 debug invoke 选项从 `userId` 迁到 `tmbId/source`: + +```ts +debug?: { + tmbId: string; + source: string; +} +``` + +兼容策略: + +- 当前内部可短期接受 `userId`,但新调用路径使用 `tmbId`。 +- `ConnectionGatewayDebugRuntimeManager` 优先使用显式 `source`。 +- 缺少 `source` 时使用 `sourceForTmbId` 生成 active source,或直接 fail,避免误路由。 + +### 6. Debug plugin metadata 查询 + +`DebugPluginRepoOverlay` 已按 source 查询 gateway metadata。需要补齐: + +- source 格式测试覆盖 `debug:tmbId:{tmbId}:session:{debugSessionId}`。 +- status API 输出多插件 bundle。 +- session missing、owner dead、closed 时保持 fail closed。 + +### 7. CLI connect link + +当前 CLI 支持手动传: + +- `--gateway-base-url` +- `--gateway-auth-token` +- `--gateway-jwt-secret` +- `--gateway-tcp-url` +- `--gateway-user-id` +- `--gateway-source` + +本 PR 先新增非 TUI 的 agent-friendly 入口: + +```bash +fastgpt-plugin debug ./plugin-a ./plugin-b --connect +``` + +行为: + +- CLI 请求 `` 获取 connection info。 +- 使用返回的 `tcpUrl`、`source`、`sessionId`、`connectToken` 建立 TCP session。 +- 保留旧参数用于本地开发和回归测试。 +- 新文档推荐 `--connect`,不推荐暴露 gateway global secret。 + +## 测试计划 + +### Unit + +- `makeDebugSessionSource` 和 parser。 +- ticket TTL、一次性兑换、重复兑换失败。 +- 同一 `tmbId` 创建新 session 时旧 session revoke。 +- revoke 幂等。 +- gateway token claims 正确绑定 `tmbId/source/session`。 +- debug runtime 使用显式 source 调用。 +- debug repo 查询多插件 bundle。 + +### Integration + +- 创建 debug session -> 兑换 ticket -> CLI bind -> list plugins -> invoke -> revoke。 +- 一个 CLI channel 挂载两个 plugin,FastGPT 侧能分别 list/detail/invoke。 +- 断连后 list/detail/invoke fail closed。 +- ticket 过期、重复使用、错误 `tmbId` 均失败。 + +### Manual smoke + +本地: + +```bash +pnpm build:connection-gateway +pnpm dev:connection-gateway +pnpm dev:server +fastgpt-plugin debug ./examples/get-time ./examples/other --connect +``` + +远程测试环境: + +- gateway TCP 只提供给 CLI。 +- gateway HTTP 只需要 plugin-server 内网可达。 +- FastGPT 公网只暴露 connect link。 + +## 风险与注意事项 + +- 不能把 gateway global auth token 或 JWT secret 返回给 CLI。 +- `tmbId` 需要由 FastGPT 鉴权后传入,plugin-server 默认信任内部调用方。 +- ticket store 必须支持多节点;内存 store 只能用于测试。 +- debug route 一旦选中,断连时必须 fail closed。 +- source 不包含 pluginId,避免多插件同时 debug 时路由错误。 +- CLI reconnect 需要尊重 revoke;revoke 后重连必须失败。 +- gateway scope 当前字段名仍是 `userId`,短期填 `tmbId`,避免把 gateway schema 泛化混入本 PR。 + +## PR 边界 + +本 PR 建议包含: + +- debug session source/ticket/session store。 +- plugin-server debug session API。 +- CLI `--connect` 非交互入口。 +- `tmbId` debug invoke option 和相关测试。 +- 文档更新。 + +本 PR 不包含: + +- CLI TUI。 +- CLI daemon。 +- FastGPT 主仓 UI。 +- WebSocket channel consumer adapter。 +- gateway scope schema 大改名。 +- 生产 runtime 切换或 fallback 策略变更。 + +## 参考文件 + +- `apps/cli/src/commands/debug.ts` +- `apps/cli/src/debug/gateway.ts` +- `apps/server/src/deps.ts` +- `apps/server/src/routes/plugin.route.ts` +- `packages/domain/src/ports/plugin/plugin-runtime-manager.port.ts` +- `packages/domain/src/value-objects/connection-gateway.vo.ts` +- `packages/infrastructure/src/connection-gateway/service.ts` +- `packages/infrastructure/src/connection-gateway/token.ts` +- `packages/infrastructure/src/plugin/debug-plugin.repo.ts` +- `packages/infrastructure/src/plugin/plugin-runtime/drivers/connection-gateway/debug-runtime.driver.ts`