diff --git a/.changeset/add-valkey-store-providers.md b/.changeset/add-valkey-store-providers.md new file mode 100644 index 000000000..0947d5885 --- /dev/null +++ b/.changeset/add-valkey-store-providers.md @@ -0,0 +1,18 @@ +--- +"@voltagent/a2a-server": major +"@voltagent/resumable-streams": minor +--- + +feat: add Valkey-backed TaskStore and ResumableStreamStore providers + +Adds `ValkeyTaskStore` to `@voltagent/a2a-server` and `createResumableStreamValkeyStore` to +`@voltagent/resumable-streams`, enabling distributed persistence via the `@valkey/valkey-glide` +client library. Both stores support configurable key prefixes, optional TTL-based expiration, and +standalone or cluster Valkey deployments. The `@valkey/valkey-glide` peer dependency is optional so +consumers who don't use Valkey are unaffected. + +**Breaking change in `@voltagent/a2a-server`:** `A2AServerConfig` now accepts an optional +`taskStore` property. When provided, it takes precedence over the `deps.taskStore` argument passed +to `A2AServer.initialize()`. The full precedence chain is: +`config.taskStore` > `deps.taskStore` > `InMemoryTaskStore`. A debug-level log is emitted when +`config.taskStore` overrides a non-null `deps.taskStore`. diff --git a/.gitignore b/.gitignore index fe0d10b5d..4ad1323ab 100644 --- a/.gitignore +++ b/.gitignore @@ -169,6 +169,9 @@ skills !packages/core/src/workspace/skills !packages/core/src/workspace/skills/** +# kiro +.kiro + dev-debug.log node_modules/ diff --git a/examples/with-valkey-store/.env.example b/examples/with-valkey-store/.env.example new file mode 100644 index 000000000..6bd9978be --- /dev/null +++ b/examples/with-valkey-store/.env.example @@ -0,0 +1,5 @@ +OPENAI_API_KEY= + +# Valkey connection (defaults shown) +VALKEY_HOST=localhost +VALKEY_PORT=6379 diff --git a/examples/with-valkey-store/.gitignore b/examples/with-valkey-store/.gitignore new file mode 100644 index 000000000..9c97bbd46 --- /dev/null +++ b/examples/with-valkey-store/.gitignore @@ -0,0 +1,3 @@ +node_modules +dist +.env diff --git a/examples/with-valkey-store/README.md b/examples/with-valkey-store/README.md new file mode 100644 index 000000000..71b8999d5 --- /dev/null +++ b/examples/with-valkey-store/README.md @@ -0,0 +1,170 @@ +
+ +VoltAgent banner + + +
+
+ +
+ Home Page | + Documentation | + Examples | + Discord | + Blog +
+
+ +
+ +
+ VoltAgent is an open source TypeScript framework for building and orchestrating AI agents.
+ Escape the limitations of no-code builders and the complexity of starting from scratch. +
+
+
+ +
+ +[![npm version](https://img.shields.io/npm/v/@voltagent/core.svg)](https://www.npmjs.com/package/@voltagent/core) +[![Contributor Covenant](https://img.shields.io/badge/Contributor%20Covenant-2.0-4baaaa.svg)](../../CODE_OF_CONDUCT.md) +[![Discord](https://img.shields.io/discord/1361559153780195478.svg?label=&logo=discord&logoColor=ffffff&color=7389D8&labelColor=6A7EC2)](https://s.voltagent.dev/discord) +[![Twitter Follow](https://img.shields.io/twitter/follow/voltagent_dev?style=social)](https://twitter.com/voltagent_dev) + +
+ +
+ +# VoltAgent with Valkey Store Example + +This example demonstrates how to use **Valkey** as a distributed backing store for both A2A task persistence and resumable streaming in VoltAgent. It uses the `@valkey/valkey-glide` client library for high-performance access to Valkey (standalone or cluster). + +## What you get + +- **ValkeyTaskStore** — Persists A2A task records to Valkey with configurable key prefixes and TTL-based expiration. +- **ValkeyResumableStreamStore** — Manages resumable streaming sessions via Valkey pub/sub and key-value operations. +- A minimal VoltAgent project with a `SupportAgent` exposed over the A2A protocol, backed entirely by Valkey. + +## Structure + +```text +examples/with-valkey-store +├── src/ +│ ├── agents/assistant.ts # Example agent definition +│ └── index.ts # VoltAgent bootstrap with Valkey stores +├── .env.example # Environment variable template +├── package.json +├── tsconfig.json +└── README.md +``` + +## Prerequisites + +- Node.js 20+ +- `pnpm` +- A running Valkey instance (or Redis-compatible server) +- `OPENAI_API_KEY` in your environment + +### Start Valkey locally with Docker + +```bash +docker run -d --name valkey -p 6379:6379 valkey/valkey:8 +``` + +## Run locally + +1. Copy the environment template and fill in your keys: + +```bash +cp .env.example .env +``` + +2. Install dependencies and start the dev server: + +```bash +pnpm install +pnpm --filter voltagent-example-with-valkey-store dev +``` + +The server listens on `http://localhost:3141`. + +## Configuration + +Environment variables: + +| Variable | Default | Description | +| ---------------- | ----------- | ------------------------------------ | +| `OPENAI_API_KEY` | — | OpenAI API key for the example agent | +| `VALKEY_HOST` | `localhost` | Valkey server hostname | +| `VALKEY_PORT` | `6379` | Valkey server port | + +### Key prefixes and TTL + +Both stores accept `keyPrefix` and `ttlSeconds` options: + +```typescript +// Task store — keys like "my-tasks:agentId::taskId" +const taskStore = await createValkeyTaskStore({ + client: valkeyClient, + keyPrefix: "my-tasks", + ttlSeconds: 3600, +}); + +// Stream store — keys like "my-streams:active:userId-conversationId" +const streamStore = await createResumableStreamValkeyStore({ + client: valkeyClient, + clientConfig: { addresses: [{ host: "localhost", port: 6379 }] }, + keyPrefix: "my-streams", + ttlSeconds: 600, +}); +``` + +### Cluster mode + +Both stores accept `GlideClient` or `GlideClusterClient`: + +```typescript +import { GlideClusterClient } from "@valkey/valkey-glide"; + +const clusterClient = await GlideClusterClient.createClient({ + addresses: [ + { host: "node1.example.com", port: 6379 }, + { host: "node2.example.com", port: 6379 }, + ], + useTLS: true, +}); + +const taskStore = new ValkeyTaskStore({ client: clusterClient }); +``` + +## Try it + +```bash +# Fetch the agent card +curl http://localhost:3141/.well-known/supportagent/agent-card.json | jq + +# Send a message +curl -X POST http://localhost:3141/a2a/supportagent \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "id": "1", + "method": "message/send", + "params": { + "message": { + "kind": "message", + "role": "user", + "messageId": "msg-1", + "parts": [{ "kind": "text", "text": "What time is it?" }] + } + } + }' +``` + +## Next steps + +- Adjust `ttlSeconds` to match your retention requirements (or omit it for no expiration). +- Use `GlideClusterClient` for production Valkey cluster deployments (e.g., AWS ElastiCache Valkey). +- Add TLS by setting `useTLS: true` in the client configuration. + +Happy hacking! 🚀 diff --git a/examples/with-valkey-store/package.json b/examples/with-valkey-store/package.json new file mode 100644 index 000000000..0b8cd1ecd --- /dev/null +++ b/examples/with-valkey-store/package.json @@ -0,0 +1,32 @@ +{ + "name": "voltagent-example-with-valkey-store", + "version": "0.0.0", + "dependencies": { + "@valkey/valkey-glide": "^2.3.1", + "@voltagent/a2a-server": "^2.0.3", + "@voltagent/core": "^2.7.2", + "@voltagent/internal": "^1.0.3", + "@voltagent/logger": "^2.0.2", + "@voltagent/resumable-streams": "^2.0.2", + "@voltagent/server-hono": "^2.0.12", + "ai": "^6.0.0", + "zod": "^3.25.76" + }, + "devDependencies": { + "@types/node": "^24.2.1", + "tsx": "^4.21.0", + "typescript": "^5.8.2" + }, + "private": true, + "repository": { + "type": "git", + "url": "https://github.com/VoltAgent/voltagent.git", + "directory": "examples/with-valkey-store" + }, + "scripts": { + "build": "tsc", + "dev": "tsx watch --env-file=.env ./src", + "start": "node dist/index.js" + }, + "type": "module" +} diff --git a/examples/with-valkey-store/src/agents/assistant.ts b/examples/with-valkey-store/src/agents/assistant.ts new file mode 100644 index 000000000..a5e74823b --- /dev/null +++ b/examples/with-valkey-store/src/agents/assistant.ts @@ -0,0 +1,22 @@ +import { Agent, createTool } from "@voltagent/core"; +import { z } from "zod"; + +const statusTool = createTool({ + name: "status", + description: "Return the current time in ISO format", + parameters: z.object({}), + async execute() { + return { + timestamp: new Date().toISOString(), + }; + }, +}); + +/** Pre-configured support agent with a `status` tool that returns the current time. */ +export const assistant = new Agent({ + id: "supportagent", + name: "SupportAgent", + instructions: "Reply with helpful answers and include the current time when relevant.", + model: "openai/gpt-4o-mini", + tools: [statusTool], +}); diff --git a/examples/with-valkey-store/src/index.ts b/examples/with-valkey-store/src/index.ts new file mode 100644 index 000000000..e25bea6e5 --- /dev/null +++ b/examples/with-valkey-store/src/index.ts @@ -0,0 +1,76 @@ +import { GlideClient } from "@valkey/valkey-glide"; +import { A2AServer } from "@voltagent/a2a-server"; +import { createValkeyTaskStore } from "@voltagent/a2a-server/valkey-store"; +import { VoltAgent } from "@voltagent/core"; +import { createPinoLogger } from "@voltagent/logger"; +import { createResumableStreamAdapter } from "@voltagent/resumable-streams"; +import { createResumableStreamValkeyStore } from "@voltagent/resumable-streams/valkey-store"; +import { honoServer } from "@voltagent/server-hono"; +import { assistant } from "./agents/assistant.js"; + +const logger = createPinoLogger({ + name: "with-valkey-store", + level: "debug", +}); + +const host = process.env.VALKEY_HOST ?? "localhost"; +const rawPort = process.env.VALKEY_PORT; +const port = rawPort !== undefined ? Number(rawPort) : 6379; +if (!Number.isInteger(port) || port < 1 || port > 65535) { + throw new Error(`Invalid VALKEY_PORT "${rawPort}": must be an integer between 1 and 65535`); +} + +/** + * Bootstraps a VoltAgent instance backed by Valkey for both A2A task + * persistence and resumable streaming. Connects to the Valkey server + * specified by `VALKEY_HOST` / `VALKEY_PORT` environment variables + * (defaulting to `localhost:6379`), then starts an HTTP server on port 3141. + */ +async function main() { + const valkeyClient = await GlideClient.createClient({ + addresses: [{ host, port }], + }); + logger.info(`Connected to Valkey at ${host}:${port}`); + + const taskStore = await createValkeyTaskStore({ + client: valkeyClient, + keyPrefix: "example-tasks", + ttlSeconds: 3600, + }); + + const streamStore = await createResumableStreamValkeyStore({ + client: valkeyClient, + clientConfig: { addresses: [{ host, port }] }, + keyPrefix: "example-streams", + ttlSeconds: 600, + }); + + const streamAdapter = await createResumableStreamAdapter({ + streamStore, + }); + + const a2aServerFactory = () => + new A2AServer({ + name: "SupportAgent", + version: "0.1.0", + description: "A2A server with Valkey-backed task and stream persistence", + taskStore, + }); + + new VoltAgent({ + agents: { assistant }, + a2aServers: { supportAgent: a2aServerFactory }, + server: honoServer({ + port: 3141, + resumableStream: { adapter: streamAdapter }, + }), + logger, + }); + + logger.info("VoltAgent with Valkey stores running on http://localhost:3141"); +} + +main().catch((err) => { + logger.error("Failed to start", { error: err }); + process.exit(1); +}); diff --git a/examples/with-valkey-store/tsconfig.json b/examples/with-valkey-store/tsconfig.json new file mode 100644 index 000000000..811b07146 --- /dev/null +++ b/examples/with-valkey-store/tsconfig.json @@ -0,0 +1,14 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "rootDir": "src", + "outDir": "dist", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "target": "ES2022", + "types": ["node"], + "esModuleInterop": true + }, + "include": ["src/**/*"], + "exclude": ["dist"] +} diff --git a/packages/a2a-server/package.json b/packages/a2a-server/package.json index b3d536dc9..34f6df3fc 100644 --- a/packages/a2a-server/package.json +++ b/packages/a2a-server/package.json @@ -20,6 +20,16 @@ "types": "./dist/index.d.ts", "default": "./dist/index.js" } + }, + "./valkey-store": { + "import": { + "types": "./dist/valkey-store.d.mts", + "default": "./dist/valkey-store.mjs" + }, + "require": { + "types": "./dist/valkey-store.d.ts", + "default": "./dist/valkey-store.js" + } } }, "files": [ @@ -29,8 +39,14 @@ "main": "dist/index.js", "module": "dist/index.mjs", "peerDependencies": { + "@valkey/valkey-glide": ">=2.3.1", "@voltagent/core": "^2.0.0" }, + "peerDependenciesMeta": { + "@valkey/valkey-glide": { + "optional": true + } + }, "repository": { "type": "git", "url": "https://github.com/VoltAgent/voltagent.git", diff --git a/packages/a2a-server/src/schemas.ts b/packages/a2a-server/src/schemas.ts new file mode 100644 index 000000000..cf2cb86ad --- /dev/null +++ b/packages/a2a-server/src/schemas.ts @@ -0,0 +1,72 @@ +import { z } from "zod"; + +/** + * Zod schemas for A2A task-related types. + * + * These are the **single source of truth** — the corresponding TypeScript + * types in `./types.ts` are derived via `z.infer` so the runtime validation + * and static types can never drift apart. + */ + +/** Zod schema for the set of valid task lifecycle states. */ +export const TaskStateSchema = z.enum([ + "submitted", + "working", + "input-required", + "completed", + "failed", + "canceled", +]); + +/** Zod schema for a text-only message part. */ +export const A2AMessagePartTextSchema = z.object({ + kind: z.literal("text"), + text: z.string(), +}); + +/** Zod schema for a message part. Currently only `text` parts exist; extend with `z.discriminatedUnion` when new kinds are added. */ +export const A2AMessagePartSchema = A2AMessagePartTextSchema; + +/** Zod schema for a single A2A message (user or agent). */ +export const A2AMessageSchema = z.object({ + kind: z.literal("message"), + role: z.enum(["user", "agent"]), + messageId: z.string(), + parts: z.array(A2AMessagePartSchema), + contextId: z.string().optional(), + taskId: z.string().optional(), + referenceTaskIds: z.array(z.string()).optional(), + extensions: z.array(z.string()).optional(), + metadata: z.record(z.unknown()).optional(), +}); + +/** Zod schema for a task's current status including state and timestamp. */ +export const TaskStatusSchema = z.object({ + state: TaskStateSchema, + message: A2AMessageSchema.optional(), + timestamp: z.string(), +}); + +/** Zod schema for a text-only artifact part. */ +export const TaskArtifactPartSchema = z.object({ + kind: z.literal("text"), + text: z.string(), +}); + +/** Zod schema for an artifact produced by an agent during task execution. */ +export const TaskArtifactSchema = z.object({ + name: z.string(), + parts: z.array(TaskArtifactPartSchema), + description: z.string().optional(), + metadata: z.record(z.unknown()).optional(), +}); + +/** Zod schema for a complete task record including status, history, and optional artifacts. */ +export const TaskRecordSchema = z.object({ + id: z.string(), + contextId: z.string(), + status: TaskStatusSchema, + history: z.array(A2AMessageSchema), + artifacts: z.array(TaskArtifactSchema).optional(), + metadata: z.record(z.unknown()).optional(), +}); diff --git a/packages/a2a-server/src/server.ts b/packages/a2a-server/src/server.ts index eb801f063..2172ae8b5 100644 --- a/packages/a2a-server/src/server.ts +++ b/packages/a2a-server/src/server.ts @@ -1,8 +1,9 @@ import { randomUUID } from "node:crypto"; -import { type Agent, convertUsage } from "@voltagent/core"; +import { type Agent, convertUsage, getGlobalLogger } from "@voltagent/core"; import { buildAgentCard } from "./adapters/agent"; import { fromVoltAgentMessage, toVoltAgentMessage } from "./adapters/message"; import { createSuccessResponse, normalizeError } from "./protocol"; +import { A2AMessageSchema } from "./schemas"; import { InMemoryTaskStore } from "./store"; import { appendMessage, @@ -55,6 +56,13 @@ function resolveAgentCardUrl(serverId: string, requestUrl?: string): string { } } +/** + * A2A (Agent-to-Agent) protocol server. + * + * Manages agent registration, JSON-RPC request routing, task lifecycle, and + * streaming responses. Call {@link initialize} with runtime dependencies + * before handling any requests. + */ export class A2AServer { private deps?: Required; private readonly config: A2AServerConfig; @@ -62,6 +70,7 @@ export class A2AServer { private readonly configuredAgents = new Map(); private readonly agentFilter: A2AFilterFunction | undefined; + /** Creates a new A2AServer from the given configuration, registering any pre-configured agents. */ constructor(config: A2AServerConfig) { this.config = config; this.agentFilter = config.filterAgents; @@ -76,13 +85,30 @@ export class A2AServer { } } + /** + * Initializes the server with runtime dependencies. + * + * Task store precedence: `config.taskStore` (constructor) > `deps.taskStore` > `InMemoryTaskStore`. + * If a `taskStore` was provided in the `A2AServerConfig` constructor, it takes priority over + * the one supplied here in `deps`. + */ initialize(deps: A2AServerDeps): void { + if (this.config.taskStore && deps.taskStore) { + getGlobalLogger() + .child({ component: "a2a-server" }) + .debug( + "config.taskStore is overriding deps.taskStore. " + + "The task store provided in A2AServerConfig takes precedence.", + ); + } + this.deps = { ...deps, - taskStore: deps.taskStore ?? new InMemoryTaskStore(), + taskStore: this.config.taskStore ?? deps.taskStore ?? new InMemoryTaskStore(), } as Required; } + /** Returns the server's public metadata (id, name, version, description, provider). */ getMetadata() { return { id: this.config.id, @@ -93,6 +119,7 @@ export class A2AServer { }; } + /** Builds and returns the {@link AgentCard} for the specified agent, including its endpoint URL. */ getAgentCard(agentId: string, context: A2ARequestContext = {}): AgentCard { const agent = this.resolveAgent(agentId, context); const url = resolveAgentCardUrl(agentId, context.requestUrl); @@ -108,6 +135,11 @@ export class A2AServer { }); } + /** + * Routes an incoming JSON-RPC request to the appropriate handler. + * + * Supported methods: `message/send`, `message/stream`, `tasks/get`, `tasks/cancel`. + */ async handleRequest( agentId: string, request: JsonRpcRequest, @@ -304,6 +336,33 @@ export class A2AServer { if (abortController.signal.aborted) { return await this.ensureCanceledRecord(agentId, record); } + + const failureText = + error instanceof Error ? error.message : "Task failed with an unknown error"; + const failureMessage: A2AMessage = { + kind: "message", + role: "agent", + messageId: randomUUID(), + taskId: record.id, + contextId: record.contextId, + parts: [{ kind: "text", text: failureText }], + }; + + record = appendMessage(record, failureMessage); + record = transitionStatus(record, { state: "failed", message: failureMessage }); + + try { + await taskStore.save({ agentId, data: record }); + } catch (saveErr) { + getGlobalLogger() + .child({ component: "a2a-server" }) + .warn("Failed to persist task failure status", { + agentId, + taskId: record.id, + saveError: saveErr, + }); + } + throw error; } finally { this.clearActiveOperation(agentId, record.id); @@ -664,24 +723,16 @@ export class A2AServer { if (!payload || typeof payload !== "object") { throw VoltA2AError.invalidParams("Params must be an object"); } - const candidate = payload as Partial; + const candidate = payload as Record; - if (!candidate.message || typeof candidate.message !== "object") { - throw VoltA2AError.invalidParams("'message' must be provided"); - } - - if (!Array.isArray(candidate.message.parts) || candidate.message.parts.length === 0) { - throw VoltA2AError.invalidParams("Message must include at least one part"); - } - - const hasInvalidPart = candidate.message.parts.some( - (part) => part.kind !== "text" || typeof part.text !== "string", - ); - if (hasInvalidPart) { - throw VoltA2AError.invalidParams("Only plain text message parts are supported"); + try { + A2AMessageSchema.parse(candidate.message); + } catch (error) { + const message = error instanceof Error ? error.message : "Invalid message payload"; + throw VoltA2AError.invalidParams(message); } - return candidate as MessageSendParams; + return candidate as unknown as MessageSendParams; } private validateTaskQueryParams(payload: unknown): TaskQueryParams { diff --git a/packages/a2a-server/src/types.ts b/packages/a2a-server/src/types.ts index 352db4eeb..1bb9ed6c4 100644 --- a/packages/a2a-server/src/types.ts +++ b/packages/a2a-server/src/types.ts @@ -4,15 +4,29 @@ import type { A2AServerLike as BaseA2AServerLike, A2AServerMetadata as BaseA2AServerMetadata, } from "@voltagent/internal/a2a"; - +import type { z } from "zod"; +import type { + A2AMessagePartSchema, + A2AMessagePartTextSchema, + A2AMessageSchema, + TaskArtifactPartSchema, + TaskArtifactSchema, + TaskRecordSchema, + TaskStateSchema, + TaskStatusSchema, +} from "./schemas"; + +/** Identifier for a JSON-RPC request — a string, number, or `null` for notifications. */ export type A2AJsonRpcId = string | number | null; +/** Standard JSON-RPC 2.0 error object returned inside a {@link JsonRpcResponse}. */ export interface JsonRpcError { code: number; message: string; data?: Data; } +/** Standard JSON-RPC 2.0 response envelope. */ export interface JsonRpcResponse { jsonrpc: "2.0"; id: A2AJsonRpcId; @@ -20,16 +34,19 @@ export interface JsonRpcResponse { error?: JsonRpcError | null; } +/** Wrapper around an async generator that yields {@link JsonRpcResponse} objects for streaming calls. */ export interface JsonRpcStream { kind: "stream"; id: A2AJsonRpcId; stream: AsyncGenerator>; } +/** Discriminated union of a single {@link JsonRpcResponse} or a {@link JsonRpcStream}. */ export type JsonRpcHandlerResult = | JsonRpcResponse | JsonRpcStream; +/** Incoming JSON-RPC 2.0 request envelope. */ export interface JsonRpcRequest { jsonrpc: "2.0"; id: A2AJsonRpcId; @@ -37,65 +54,37 @@ export interface JsonRpcRequest { params?: Params; } -export type TaskState = - | "submitted" - | "working" - | "input-required" - | "completed" - | "failed" - | "canceled"; - -export interface A2AMessagePartText { - kind: "text"; - text: string; -} +/** The set of valid task lifecycle states. Derived from {@link TaskStateSchema}. */ +export type TaskState = z.infer; -export type A2AMessagePart = A2AMessagePartText; - -export interface A2AMessage { - kind: "message"; - role: "user" | "agent"; - messageId: string; - parts: A2AMessagePart[]; - contextId?: string; - taskId?: string; - referenceTaskIds?: string[]; - extensions?: string[]; - metadata?: Record; -} +/** A text-only message part. Derived from {@link A2AMessagePartTextSchema}. */ +export type A2AMessagePartText = z.infer; -export interface TaskStatus { - state: TaskState; - message?: A2AMessage; - timestamp: string; -} +/** Union of all message part kinds. Derived from {@link A2AMessagePartSchema}. */ +export type A2AMessagePart = z.infer; -export interface TaskArtifactPart { - kind: "text"; - text: string; -} +/** A single message exchanged between user and agent. Derived from {@link A2AMessageSchema}. */ +export type A2AMessage = z.infer; -export interface TaskArtifact { - name: string; - parts: TaskArtifactPart[]; - description?: string; - metadata?: Record; -} +/** Current status of a task including lifecycle state and timestamp. Derived from {@link TaskStatusSchema}. */ +export type TaskStatus = z.infer; -export interface TaskRecord { - id: string; - contextId: string; - status: TaskStatus; - history: A2AMessage[]; - artifacts?: TaskArtifact[]; - metadata?: Record; -} +/** A text-only artifact part. Derived from {@link TaskArtifactPartSchema}. */ +export type TaskArtifactPart = z.infer; + +/** An artifact produced by an agent during task execution. Derived from {@link TaskArtifactSchema}. */ +export type TaskArtifact = z.infer; +/** A complete task record including status, history, and optional artifacts. Derived from {@link TaskRecordSchema}. */ +export type TaskRecord = z.infer; + +/** Persistence layer for loading and saving {@link TaskRecord} instances keyed by agent and task ID. */ export interface TaskStore { load(params: { agentId: string; taskId: string }): Promise; save(params: { agentId: string; data: TaskRecord }): Promise; } +/** Per-request context forwarded to agent invocations and filter functions. */ export interface A2ARequestContext { userId?: string; sessionId?: string; @@ -103,13 +92,16 @@ export interface A2ARequestContext { requestUrl?: string; } +/** Parameters passed to an {@link A2AFilterFunction} for filtering a list of items. */ export interface A2AFilterParams { items: T[]; context?: A2ARequestContext; } +/** Callback that filters a list of items (e.g. agents) based on the current request context. */ export type A2AFilterFunction = (params: A2AFilterParams) => T[]; +/** Parameters for the `message/send` and `message/stream` JSON-RPC methods. */ export interface MessageSendParams { id?: string; sessionId?: string; @@ -118,21 +110,27 @@ export interface MessageSendParams { message: A2AMessage; } +/** Parameters for the `tasks/get` JSON-RPC method. */ export interface TaskQueryParams { id: string; historyLength?: number; metadata?: Record; } +/** Parameters for the `tasks/cancel` JSON-RPC method. */ export interface TaskIdParams { id: string; metadata?: Record; } +/** Result of a `message/send` or `message/stream` call — the updated {@link TaskRecord}. */ export type MessageSendResult = TaskRecord; +/** Result of a `tasks/get` call — the requested {@link TaskRecord}. */ export type TaskGetResult = TaskRecord; +/** Result of a `tasks/cancel` call — the canceled {@link TaskRecord}. */ export type TaskCancelResult = TaskRecord; +/** Configuration supplied to the {@link A2AServer} constructor. */ export interface A2AServerConfig { id?: string; name: string; @@ -144,14 +142,22 @@ export interface A2AServerConfig { }; agents?: Record; filterAgents?: A2AFilterFunction; + /** + * Optional task store. When provided, takes precedence over `deps.taskStore` passed to + * `A2AServer.initialize()`. Falls back to an in-memory store if neither is set. + */ + taskStore?: TaskStore; } +/** Metadata describing an {@link A2AServer} instance (name, version, etc.). */ export interface A2AServerMetadata extends BaseA2AServerMetadata {} +/** Runtime dependencies injected into {@link A2AServer} via `initialize()`. */ export interface A2AServerDeps extends BaseA2AServerDeps { taskStore?: TaskStore; } +/** Minimal public surface of an A2A server, used by server-provider adapters. */ export interface A2AServerLike extends BaseA2AServerLike { getAgentCard?(agentId: string, context?: A2ARequestContext): AgentCard; handleRequest?( @@ -161,8 +167,10 @@ export interface A2AServerLike extends BaseA2AServerLike { ): Promise; } +/** Factory function that creates a new {@link A2AServerLike} instance on demand. */ export type A2AServerFactory = () => T; +/** A single skill advertised by an agent in its {@link AgentCard}. */ export interface AgentCardSkill { id: string; name: string; @@ -170,6 +178,7 @@ export interface AgentCardSkill { tags?: string[]; } +/** Public metadata card describing an agent's capabilities, skills, and endpoint URL. */ export interface AgentCard { name: string; description?: string; @@ -189,6 +198,7 @@ export interface AgentCard { skills: AgentCardSkill[]; } +/** Standard A2A JSON-RPC error codes, extending the base JSON-RPC 2.0 error range with task-specific codes. */ export const A2AErrorCode = { PARSE_ERROR: -32700, INVALID_REQUEST: -32600, @@ -203,7 +213,14 @@ export const A2AErrorCode = { export type A2AErrorCode = (typeof A2AErrorCode)[keyof typeof A2AErrorCode]; +/** + * Typed error class for A2A JSON-RPC failures. + * + * Provides static factory methods for every standard error code so callers + * never need to remember numeric codes. + */ export class VoltA2AError extends Error { + /** Creates a new {@link VoltA2AError} with the given code, message, and optional details. */ constructor( public code: A2AErrorCode, message: string, @@ -214,33 +231,39 @@ export class VoltA2AError extends Error { this.name = "VoltA2AError"; } + /** Converts this error into a plain {@link JsonRpcError} object suitable for serialisation. */ toJsonRpcError(): JsonRpcError { return { code: this.code, message: this.message, data: { taskId: this.taskId, - ...(this.data ? { details: this.data } : {}), + ...(this.data !== undefined ? { details: this.data } : {}), }, }; } + /** Creates a parse-error (-32700) for malformed JSON payloads. */ static parseError(details?: unknown) { return new VoltA2AError(A2AErrorCode.PARSE_ERROR, "Invalid JSON payload", details); } + /** Creates an invalid-request (-32600) error. */ static invalidRequest(message = "Invalid request", details?: unknown) { return new VoltA2AError(A2AErrorCode.INVALID_REQUEST, message, details); } + /** Creates a method-not-found (-32601) error for an unknown JSON-RPC method. */ static methodNotFound(method: string) { return new VoltA2AError(A2AErrorCode.METHOD_NOT_FOUND, `Unknown method '${method}'`); } + /** Creates an invalid-params (-32602) error. */ static invalidParams(message = "Invalid parameters", details?: unknown) { return new VoltA2AError(A2AErrorCode.INVALID_PARAMS, message, details); } + /** Creates a task-not-found (-32001) error for the given task ID. */ static taskNotFound(taskId: string) { return new VoltA2AError( A2AErrorCode.TASK_NOT_FOUND, @@ -250,6 +273,7 @@ export class VoltA2AError extends Error { ); } + /** Creates a task-not-cancelable (-32002) error for a task that can no longer be canceled. */ static taskNotCancelable(taskId: string) { return new VoltA2AError( A2AErrorCode.TASK_NOT_CANCELABLE, @@ -259,10 +283,12 @@ export class VoltA2AError extends Error { ); } + /** Creates an unsupported-operation (-32004) error. */ static unsupportedOperation(message = "Unsupported operation") { return new VoltA2AError(A2AErrorCode.UNSUPPORTED_OPERATION, message); } + /** Creates an internal-error (-32603) error. */ static internal(message = "Internal error", details?: unknown) { return new VoltA2AError(A2AErrorCode.INTERNAL_ERROR, message, details); } diff --git a/packages/a2a-server/src/valkey-store.spec.ts b/packages/a2a-server/src/valkey-store.spec.ts new file mode 100644 index 000000000..bb09fedcc --- /dev/null +++ b/packages/a2a-server/src/valkey-store.spec.ts @@ -0,0 +1,190 @@ +import { safeStringify } from "@voltagent/internal"; +import { TaskRecordSchema } from "./schemas"; +import type { TaskRecord } from "./types"; +import { ValkeyTaskStore, createValkeyTaskStore } from "./valkey-store"; + +// Mock @valkey/valkey-glide so tests don't require the actual package +vi.mock("@valkey/valkey-glide", () => ({ + TimeUnit: { Seconds: "EX" }, +})); + +function makeClient() { + return { + get: vi.fn(), + set: vi.fn(), + }; +} + +function makeTaskRecord(overrides: Partial = {}): TaskRecord { + return { + id: "task-1", + contextId: "ctx-1", + status: { state: "submitted", timestamp: new Date().toISOString() }, + history: [], + ...overrides, + }; +} + +describe("ValkeyTaskStore", () => { + it("load() returns deserialized TaskRecord when client.get returns a JSON string", async () => { + const client = makeClient(); + const record = makeTaskRecord(); + client.get.mockResolvedValue(safeStringify(record)); + + const store = new ValkeyTaskStore({ client } as any); + const result = await store.load({ agentId: "agent-1", taskId: "task-1" }); + + expect(result).toEqual(record); + }); + + it("load() returns null when client.get returns null", async () => { + const client = makeClient(); + client.get.mockResolvedValue(null); + + const store = new ValkeyTaskStore({ client } as any); + const result = await store.load({ agentId: "agent-1", taskId: "task-1" }); + + expect(result).toBeNull(); + }); + + it("load() handles GlideString Buffer values by converting to string", async () => { + const client = makeClient(); + const record = makeTaskRecord(); + client.get.mockResolvedValue(Buffer.from(safeStringify(record))); + + const store = new ValkeyTaskStore({ client } as any); + const result = await store.load({ agentId: "agent-1", taskId: "task-1" }); + + expect(result).toEqual(record); + }); + + it("save() calls client.set with the correct composite key {keyPrefix}:{agentId}::{taskId}", async () => { + const client = makeClient(); + client.set.mockResolvedValue("OK"); + const record = makeTaskRecord({ id: "task-42" }); + + const store = new ValkeyTaskStore({ client, keyPrefix: "my-prefix" } as any); + await store.save({ agentId: "agent-x", data: record }); + + expect(client.set.mock.calls[0][0]).toBe("my-prefix:agent-x::task-42"); + }); + + it("save() uses default keyPrefix 'a2a-tasks' when none is provided", async () => { + const client = makeClient(); + client.set.mockResolvedValue("OK"); + const record = makeTaskRecord({ id: "task-42" }); + + const store = new ValkeyTaskStore({ client } as any); + await store.save({ agentId: "agent-x", data: record }); + + expect(client.set.mock.calls[0][0]).toBe("a2a-tasks:agent-x::task-42"); + }); + + it("save() serializes the TaskRecord as a JSON string containing the task data", async () => { + const client = makeClient(); + client.set.mockResolvedValue("OK"); + const record = makeTaskRecord({ id: "task-99" }); + + const store = new ValkeyTaskStore({ client } as any); + await store.save({ agentId: "agent-1", data: record }); + + const storedValue = client.set.mock.calls[0][1]; + expect(typeof storedValue).toBe("string"); + const parsed = JSON.parse(storedValue); + expect(parsed.id).toBe("task-99"); + expect(parsed.contextId).toBe(record.contextId); + }); + + it("save() calls client.set with expiry options when ttlSeconds is configured", async () => { + const client = makeClient(); + client.set.mockResolvedValue("OK"); + const record = makeTaskRecord({ id: "task-ttl" }); + + const store = new ValkeyTaskStore({ client, ttlSeconds: 300 } as any); + await store.save({ agentId: "agent-1", data: record }); + + const options = client.set.mock.calls[0][2]; + expect(options).toEqual({ expiry: { type: "EX", count: 300 } }); + }); + + it("save() calls client.set without expiry options when ttlSeconds is not configured", async () => { + const client = makeClient(); + client.set.mockResolvedValue("OK"); + const record = makeTaskRecord({ id: "task-no-ttl" }); + + const store = new ValkeyTaskStore({ client } as any); + await store.save({ agentId: "agent-1", data: record }); + + expect(client.set.mock.calls[0]).toHaveLength(2); + }); + + it("load() propagates errors thrown by client.get", async () => { + const client = makeClient(); + client.get.mockRejectedValue(new Error("connection refused")); + + const store = new ValkeyTaskStore({ client } as any); + await expect(store.load({ agentId: "agent-1", taskId: "task-1" })).rejects.toThrow( + "connection refused", + ); + }); + + it("load() throws a descriptive error when stored data is corrupted JSON", async () => { + const client = makeClient(); + client.get.mockResolvedValue("not-valid-json{{{"); + + const store = new ValkeyTaskStore({ client, keyPrefix: "pfx" } as any); + await expect(store.load({ agentId: "agent-1", taskId: "task-1" })).rejects.toThrow( + /Failed to parse stored TaskRecord for key "pfx:agent-1::task-1"/, + ); + }); + + it("load() throws when stored data is valid JSON but fails schema validation", async () => { + const client = makeClient(); + // Valid JSON but missing required TaskRecord fields (id, contextId, status, history) + client.get.mockResolvedValue(safeStringify({ bogus: true })); + + const store = new ValkeyTaskStore({ client, keyPrefix: "pfx" } as any); + await expect(store.load({ agentId: "agent-1", taskId: "task-1" })).rejects.toThrow( + /Invalid TaskRecord for key "pfx:agent-1::task-1"/, + ); + }); + + it("save() propagates errors thrown by client.set", async () => { + const client = makeClient(); + client.set.mockRejectedValue(new Error("write timeout")); + const record = makeTaskRecord({ id: "task-err" }); + + const store = new ValkeyTaskStore({ client } as any); + await expect(store.save({ agentId: "agent-1", data: record })).rejects.toThrow("write timeout"); + }); + + it("exposes activeCancellations Set for task cancellation signaling", () => { + const client = makeClient(); + const store = new ValkeyTaskStore({ client } as any); + + expect(store.activeCancellations).toBeInstanceOf(Set); + }); +}); + +describe("createValkeyTaskStore", () => { + it("returns a ValkeyTaskStore instance with eagerly-resolved TimeUnit", async () => { + const client = makeClient(); + client.set.mockResolvedValue("OK"); + + const store = await createValkeyTaskStore({ client, ttlSeconds: 60 } as any); + + expect(store).toBeInstanceOf(ValkeyTaskStore); + + const record = makeTaskRecord({ id: "task-factory" }); + await store.save({ agentId: "agent-1", data: record }); + + const options = client.set.mock.calls[0][2]; + expect(options).toEqual({ expiry: { type: "EX", count: 60 } }); + }); + + it("returns a ValkeyTaskStore without resolving TimeUnit when no ttlSeconds", async () => { + const client = makeClient(); + const store = await createValkeyTaskStore({ client } as any); + expect(store).toBeInstanceOf(ValkeyTaskStore); + }); +}); diff --git a/packages/a2a-server/src/valkey-store.ts b/packages/a2a-server/src/valkey-store.ts new file mode 100644 index 000000000..0ec5dd13c --- /dev/null +++ b/packages/a2a-server/src/valkey-store.ts @@ -0,0 +1,192 @@ +import { randomUUID } from "node:crypto"; +import type { GlideClient, GlideClusterClient, TimeUnit } from "@valkey/valkey-glide"; +import { safeStringify } from "@voltagent/internal"; +import { TaskRecordSchema } from "./schemas"; +import type { TaskRecord, TaskStore } from "./types"; + +/** + * Configuration options for {@link ValkeyTaskStore}. + */ +export interface ValkeyTaskStoreOptions { + /** Valkey client instance (standalone {@link GlideClient} or {@link GlideClusterClient}). */ + client: GlideClient | GlideClusterClient; + /** Key prefix for all task records stored in Valkey. Defaults to `"a2a-tasks"`. */ + keyPrefix?: string; + /** Optional TTL in seconds applied to every persisted task record. Must be a positive finite number. */ + ttlSeconds?: number; +} + +const VALKEY_GLIDE_REQUIRED = + "@valkey/valkey-glide is required for ValkeyTaskStore. Install it with: pnpm add @valkey/valkey-glide"; + +/** + * Creates a {@link ValkeyTaskStore} with eagerly-resolved Valkey dependencies. + * + * Validates `ttlSeconds` and pre-resolves the `TimeUnit.Seconds` enum from + * `@valkey/valkey-glide` so that subsequent `save()` calls do not need to + * perform a dynamic import. + * + * @param options - Store configuration including the Valkey client, optional key prefix, and TTL. + * @returns A fully initialised {@link ValkeyTaskStore} instance. + * @throws If `ttlSeconds` is provided but is not a positive finite number. + * @throws If `@valkey/valkey-glide` is not installed when `ttlSeconds` is set. + */ +export async function createValkeyTaskStore( + options: ValkeyTaskStoreOptions, +): Promise { + if ( + options.ttlSeconds !== undefined && + (!Number.isFinite(options.ttlSeconds) || options.ttlSeconds <= 0) + ) { + throw new Error("ttlSeconds must be a positive finite number"); + } + + let timeUnitSeconds: TimeUnit | undefined; + if (options.ttlSeconds !== undefined) { + try { + const mod = await import("@valkey/valkey-glide"); + timeUnitSeconds = mod.TimeUnit.Seconds; + } catch { + throw new Error(VALKEY_GLIDE_REQUIRED); + } + } + return new ValkeyTaskStore(options, timeUnitSeconds); +} + +/** + * Valkey-backed implementation of {@link TaskStore}. + * + * **Important:** `activeCancellations` is an in-process `Set` and is **not** propagated across + * server instances sharing the same Valkey backend. For cross-instance cancellation, `A2AServer` + * would need to subscribe to a Valkey pub/sub channel for cancellation events instead of relying + * on process-local `AbortController` signaling. + */ +export class ValkeyTaskStore implements TaskStore { + private readonly client: GlideClient | GlideClusterClient; + private readonly keyPrefix: string; + private readonly ttlSeconds?: number; + /** @internal */ + private timeUnitSeconds?: TimeUnit; + + // In-process only, not propagated across instances via Valkey. + readonly activeCancellations = new Set(); + + /** + * Creates a new ValkeyTaskStore. + * + * Prefer {@link createValkeyTaskStore} which eagerly resolves the Valkey + * `TimeUnit` dependency. Direct construction is supported but the caller + * must supply the resolved `timeUnitSeconds` when `ttlSeconds` is used. + * + * @param options - Store configuration. + * @param timeUnitSeconds - Pre-resolved `TimeUnit.Seconds` value from `@valkey/valkey-glide`. + * @throws If `ttlSeconds` is provided but is not a positive finite number. + */ + constructor(options: ValkeyTaskStoreOptions, /** @internal */ timeUnitSeconds?: TimeUnit) { + if ( + options.ttlSeconds !== undefined && + (!Number.isFinite(options.ttlSeconds) || options.ttlSeconds <= 0) + ) { + throw new Error("ttlSeconds must be a positive finite number"); + } + this.client = options.client; + this.keyPrefix = options.keyPrefix ?? "a2a-tasks"; + this.ttlSeconds = options.ttlSeconds; + this.timeUnitSeconds = timeUnitSeconds; + } + + /** + * Lazily resolves and caches the `TimeUnit.Seconds` enum value from + * `@valkey/valkey-glide`. Called internally by {@link save} when TTL is configured. + */ + private async getTimeUnitSeconds(): Promise { + if (this.timeUnitSeconds !== undefined) return this.timeUnitSeconds; + try { + const mod = await import("@valkey/valkey-glide"); + this.timeUnitSeconds = mod.TimeUnit.Seconds; + return this.timeUnitSeconds; + } catch { + throw new Error(VALKEY_GLIDE_REQUIRED); + } + } + + /** + * Loads a task record from Valkey by agent and task ID. + * + * @param params - The agent ID and task ID identifying the record. + * @returns The deserialised {@link TaskRecord}, or `null` if not found. + * @throws If the stored value cannot be parsed as valid JSON. + */ + async load(params: { agentId: string; taskId: string }): Promise { + const key = this.makeKey(params.agentId, params.taskId); + const result = await this.client.get(key); + if (result === null) return null; + + let parsed: unknown; + try { + parsed = JSON.parse(String(result)); + } catch (error) { + const detail = error instanceof Error ? error.message : "Unknown error"; + throw new Error(`Failed to parse stored TaskRecord for key "${key}": ${detail}`); + } + + const validation = TaskRecordSchema.safeParse(parsed); + if (!validation.success) { + throw new Error( + `Invalid TaskRecord for key "${key}": ${safeStringify(validation.error.issues)}`, + ); + } + + return validation.data; + } + + /** + * Persists a task record to Valkey. + * + * The record is serialised with {@link safeStringify} and stored under a + * composite key derived from the agent ID and the record's task ID. When + * `ttlSeconds` is configured the key is set with an expiry. + * + * Note: unlike {@link load}, `save` does **not** run Zod validation. The + * caller is trusted to supply a well-typed `TaskRecord`, and skipping + * validation on the write path avoids the per-call overhead. Any schema + * drift will surface on the next `load()`. + * + * @param params - The agent ID and the {@link TaskRecord} to persist. + */ + async save(params: { agentId: string; data: TaskRecord }): Promise { + const taskId = params.data.id ?? randomUUID(); + const normalized: TaskRecord = + taskId === params.data.id ? params.data : { ...params.data, id: taskId }; + const key = this.makeKey(params.agentId, taskId); + const json = safeStringify(normalized); + + if (json.startsWith("SAFE_STRINGIFY_ERROR:")) { + throw new Error( + `Failed to serialize TaskRecord for agent "${params.agentId}", task "${taskId}": ${json}`, + ); + } + + if (this.ttlSeconds !== undefined) { + const seconds = await this.getTimeUnitSeconds(); + await this.client.set(key, json, { + expiry: { type: seconds, count: this.ttlSeconds }, + }); + } else { + await this.client.set(key, json); + } + } + + /** + * Builds the Valkey key for a given agent/task pair. + * + * Colons inside `agentId` and `taskId` are escaped to prevent collisions + * with the `keyPrefix:agentId::taskId` delimiter scheme. + */ + private makeKey(agentId: string, taskId: string): string { + // Escape colons in user-provided IDs to prevent key collisions with the delimiter. + const safeAgentId = agentId.replace(/:/g, "\\:"); + const safeTaskId = taskId.replace(/:/g, "\\:"); + return `${this.keyPrefix}:${safeAgentId}::${safeTaskId}`; + } +} diff --git a/packages/a2a-server/tsup.config.ts b/packages/a2a-server/tsup.config.ts index 0819104fd..ff573024e 100644 --- a/packages/a2a-server/tsup.config.ts +++ b/packages/a2a-server/tsup.config.ts @@ -2,7 +2,7 @@ import { defineConfig } from "tsup"; import { markAsExternalPlugin } from "../shared/tsup-plugins/mark-as-external"; export default defineConfig({ - entry: ["src/index.ts"], + entry: ["src/index.ts", "src/valkey-store.ts"], format: ["cjs", "esm"], splitting: false, sourcemap: true, diff --git a/packages/resumable-streams/package.json b/packages/resumable-streams/package.json index 8c2c03a27..369d69bc8 100644 --- a/packages/resumable-streams/package.json +++ b/packages/resumable-streams/package.json @@ -20,6 +20,16 @@ "types": "./dist/index.d.ts", "default": "./dist/index.js" } + }, + "./valkey-store": { + "import": { + "types": "./dist/valkey-store.d.mts", + "default": "./dist/valkey-store.mjs" + }, + "require": { + "types": "./dist/valkey-store.d.ts", + "default": "./dist/valkey-store.js" + } } }, "files": [ @@ -35,9 +45,15 @@ "main": "dist/index.js", "module": "dist/index.mjs", "peerDependencies": { + "@valkey/valkey-glide": ">=2.3.1", "@voltagent/core": "^2.0.0", "ai": "^6.0.0" }, + "peerDependenciesMeta": { + "@valkey/valkey-glide": { + "optional": true + } + }, "publishConfig": { "access": "public" }, diff --git a/packages/resumable-streams/src/resumable-streams.ts b/packages/resumable-streams/src/resumable-streams.ts index cad154466..80c282164 100644 --- a/packages/resumable-streams/src/resumable-streams.ts +++ b/packages/resumable-streams/src/resumable-streams.ts @@ -84,7 +84,8 @@ const getResumableStreamDisabledInfo = (value: unknown) => { return { reason, docsUrl }; }; -const markResumableStreamStoreType = ( +/** @internal */ +export const markResumableStreamStoreType = ( value: T, type: string, displayName?: string, @@ -201,7 +202,8 @@ const buildStreamKey = ({ conversationId, userId }: ResumableStreamContext) => { return `${userId}-${conversationId}`; }; -const buildActiveStreamKey = (keyPrefix: string, context: ResumableStreamContext) => +/** @internal */ +export const buildActiveStreamKey = (keyPrefix: string, context: ResumableStreamContext) => `${keyPrefix}:active:${buildStreamKey(context)}`; const buildActiveStreamQuery = (context: ResumableStreamContext, streamId?: string): string => { @@ -216,7 +218,8 @@ const buildActiveStreamQuery = (context: ResumableStreamContext, streamId?: stri return params.toString(); }; -const createActiveStreamStoreFromPublisher = ( +/** @internal */ +export const createActiveStreamStoreFromPublisher = ( publisher: ResumableStreamPublisher, keyPrefix: string, ): ResumableStreamActiveStore => ({ @@ -250,7 +253,8 @@ const createActiveStreamStoreFromPublisher = ( }, }); -const mergeStreamAndActiveStore = ( +/** @internal */ +export const mergeStreamAndActiveStore = ( streamStore: T, activeStreamStore: ResumableStreamActiveStore, ): T & ResumableStreamActiveStore => ({ @@ -260,6 +264,12 @@ const mergeStreamAndActiveStore = ( clearActiveStream: activeStreamStore.clearActiveStream, }); +/** + * Creates an in-memory active-stream store that tracks which stream ID is + * currently active for each conversation/user pair. + * + * @returns A {@link ResumableStreamActiveStore} backed by a `Map`. + */ export function createMemoryResumableStreamActiveStore(): ResumableStreamActiveStore { const activeStreams = new Map(); @@ -383,6 +393,14 @@ const createInMemoryPubSub = () => { return { publisher, subscriber }; }; +/** + * Creates an in-memory resumable stream store backed by the `resumable-stream` library. + * + * Useful for development and testing. Data does not survive process restarts. + * + * @param options - Optional key prefix and `waitUntil` callback. + * @returns A {@link ResumableStreamStore} with active-stream tracking. + */ export async function createResumableStreamMemoryStore( options: ResumableStreamStoreOptions = {}, ): Promise { @@ -402,6 +420,15 @@ export async function createResumableStreamMemoryStore( return markResumableStreamStoreType(mergedStore, "memory", "Memory"); } +/** + * Creates a Redis-backed resumable stream store. + * + * If `publisher` / `subscriber` clients are not provided, they are created + * automatically from the `REDIS_URL` or `KV_URL` environment variable. + * + * @param options - Redis connection and key prefix options. + * @returns A {@link ResumableStreamStore} with active-stream tracking. + */ export async function createResumableStreamRedisStore( options: ResumableStreamRedisStoreOptions = {}, ): Promise { @@ -446,6 +473,13 @@ export async function createResumableStreamRedisStore( return markResumableStreamStoreType(mergedStore, "redis", "Redis"); } +/** + * Creates a resumable stream store from user-supplied publisher and subscriber instances. + * + * @param options - Must include both `publisher` and `subscriber`. + * @returns A {@link ResumableStreamStore} with active-stream tracking. + * @throws If `publisher` or `subscriber` is missing. + */ export async function createResumableStreamGenericStore( options: ResumableStreamGenericStoreOptions, ): Promise { @@ -468,6 +502,14 @@ export async function createResumableStreamGenericStore( return markResumableStreamStoreType(mergedStore, "custom", "Custom"); } +/** + * Creates a resumable stream store backed by the VoltOps managed service. + * + * Returns a disabled store when the required API keys are not configured. + * + * @param options - VoltOps client or API key configuration. + * @returns A {@link ResumableStreamStore} with active-stream tracking, or a disabled stub. + */ export async function createResumableStreamVoltOpsStore( options: ResumableStreamVoltOpsStoreOptions = {}, ): Promise { @@ -571,6 +613,16 @@ export async function createResumableStreamVoltOpsStore( return markResumableStreamStoreType(mergedStore, "voltops", "VoltOps"); } +/** + * Builds a {@link ResumableStreamAdapter} from a stream store and active-stream store. + * + * If `activeStreamStore` is not provided explicitly, it is inferred from `streamStore` + * when the store implements the {@link ResumableStreamActiveStore} interface. + * + * @param config - Must include `streamStore`; `activeStreamStore` is inferred when possible. + * @returns A fully wired {@link ResumableStreamAdapter}. + * @throws If `streamStore` is missing or `activeStreamStore` cannot be resolved. + */ export async function createResumableStreamAdapter( config: ResumableStreamAdapterConfig, ): Promise { @@ -624,6 +676,17 @@ export async function createResumableStreamAdapter( return adapter; } +/** + * Merges an existing resumable stream adapter into server-provider dependencies. + * + * If `deps` already contains a `resumableStream`, the provided adapter is ignored + * (with a warning). Disabled adapters are also filtered out. + * + * @param deps - Current server-provider dependencies. + * @param adapter - Optional adapter to inject. + * @param logger - Optional logger for warnings. + * @returns Updated dependencies with the resolved adapter, if any. + */ export async function resolveResumableStreamDeps( deps: ServerProviderDeps, adapter: ResumableStreamAdapter | undefined, @@ -647,6 +710,14 @@ export async function resolveResumableStreamDeps( }; } +/** + * Validates a {@link ResumableStreamAdapter}, returning `undefined` + * when the adapter is disabled or invalid. + * + * @param adapter - The adapter to validate. + * @param logger - Optional logger for warnings. + * @returns The validated adapter, or `undefined`. + */ export function resolveResumableStreamAdapter( adapter: ResumableStreamAdapter | undefined, logger?: Logger, diff --git a/packages/resumable-streams/src/valkey-store.spec.ts b/packages/resumable-streams/src/valkey-store.spec.ts new file mode 100644 index 000000000..9c4a8b164 --- /dev/null +++ b/packages/resumable-streams/src/valkey-store.spec.ts @@ -0,0 +1,360 @@ +import { createResumableStreamValkeyStore } from "./valkey-store"; + +vi.mock("@valkey/valkey-glide", () => ({ + GlideClient: { createClient: vi.fn() }, + GlideClientConfiguration: { PubSubChannelModes: { Exact: 0 } }, + TimeUnit: { Seconds: "EX" }, +})); + +vi.mock("resumable-stream/generic", () => ({ + createResumableStreamContext: vi.fn().mockReturnValue({ + createNewResumableStream: vi.fn(), + resumeExistingStream: vi.fn(), + }), +})); + +function makeGlideClient() { + return { + get: vi.fn(), + set: vi.fn(), + incr: vi.fn(), + publish: vi.fn(), + del: vi.fn(), + }; +} + +function makeOptions(clientOverrides = {}) { + return { + client: { ...makeGlideClient(), ...clientOverrides }, + clientConfig: { addresses: [{ host: "localhost", port: 6379 }] }, + }; +} + +async function getPublisher(clientOverrides = {}) { + const { createResumableStreamContext } = await import("resumable-stream/generic"); + const mockCtx = vi.mocked(createResumableStreamContext); + mockCtx.mockClear(); + + const opts = makeOptions(clientOverrides); + await createResumableStreamValkeyStore(opts as any); + + const callArgs = mockCtx.mock.calls[0][0] as any; + return { publisher: callArgs.publisher, subscriber: callArgs.subscriber, client: opts.client }; +} + +describe("ValkeyResumableStreamStore — Publisher adapter", () => { + it("set with { EX: 60 } calls client.set with expiry", async () => { + const { publisher, client } = await getPublisher(); + (client as any).set.mockResolvedValue("OK"); + + await publisher.set("my-key", "my-value", { EX: 60 }); + + expect((client as any).set).toHaveBeenCalledWith("my-key", "my-value", { + expiry: { type: "EX", count: 60 }, + }); + }); + + it("set without EX calls client.set with just key and value", async () => { + const { publisher, client } = await getPublisher(); + (client as any).set.mockResolvedValue("OK"); + + await publisher.set("my-key", "my-value"); + + expect((client as any).set).toHaveBeenCalledWith("my-key", "my-value"); + expect((client as any).set).toHaveBeenCalledTimes(1); + expect((client as any).set.mock.calls[0]).toHaveLength(2); + }); + + it("get converts GlideString (Buffer) to string", async () => { + const { publisher, client } = await getPublisher(); + (client as any).get.mockResolvedValue(Buffer.from("hello")); + + const result = await publisher.get("some-key"); + + expect(result).toBe("hello"); + }); + + it("get returns null when client returns null", async () => { + const { publisher, client } = await getPublisher(); + (client as any).get.mockResolvedValue(null); + + const result = await publisher.get("missing-key"); + + expect(result).toBeNull(); + }); + + it("incr delegates to client.incr", async () => { + const { publisher, client } = await getPublisher(); + (client as any).incr.mockResolvedValue(5); + + const result = await publisher.incr("counter-key"); + + expect((client as any).incr).toHaveBeenCalledWith("counter-key"); + expect(result).toBe(5); + }); + + it("publish delegates to client.publish", async () => { + const { publisher, client } = await getPublisher(); + (client as any).publish.mockResolvedValue(1); + + const result = await publisher.publish("my-channel", "my-message"); + + expect((client as any).publish).toHaveBeenCalledWith("my-message", "my-channel"); + expect(result).toBe(1); + }); + + it("del calls client.del([key])", async () => { + const { publisher, client } = await getPublisher(); + (client as any).del.mockResolvedValue(1); + + await publisher.del("some-key"); + + expect((client as any).del).toHaveBeenCalledWith(["some-key"]); + }); +}); + +describe("ValkeyResumableStreamStore — Subscriber adapter", () => { + it("subscribe calls GlideClient.createClient with correct pubsubSubscriptions config", async () => { + const { GlideClient } = await import("@valkey/valkey-glide"); + const mockCreateClient = vi.mocked(GlideClient.createClient); + const mockSubClient = { close: vi.fn() }; + mockCreateClient.mockResolvedValue(mockSubClient as any); + + const { subscriber } = await getPublisher(); + const callback = vi.fn(); + + await subscriber.subscribe("test-channel", callback); + + expect(mockCreateClient).toHaveBeenCalledWith( + expect.objectContaining({ + addresses: [{ host: "localhost", port: 6379 }], + pubsubSubscriptions: expect.objectContaining({ + channelsAndPatterns: expect.objectContaining({ + 0: expect.any(Set), + }), + callback: expect.any(Function), + }), + }), + ); + + const callArg = mockCreateClient.mock.calls[0][0] as any; + expect(callArg.pubsubSubscriptions.channelsAndPatterns[0].has("test-channel")).toBe(true); + }); + + it("subscribe callback invokes the provided callback with msg.message", async () => { + const { GlideClient } = await import("@valkey/valkey-glide"); + const mockCreateClient = vi.mocked(GlideClient.createClient); + const mockSubClient = { close: vi.fn() }; + mockCreateClient.mockClear(); + mockCreateClient.mockResolvedValue(mockSubClient as any); + + const { subscriber } = await getPublisher(); + const callback = vi.fn(); + + mockCreateClient.mockClear(); + await subscriber.subscribe("test-channel", callback); + + const callArg = mockCreateClient.mock.calls[0][0] as any; + callArg.pubsubSubscriptions.callback({ message: "hello-world" }, null); + + expect(callback).toHaveBeenCalledWith("hello-world"); + }); + + it("unsubscribe calls close() on the subscription client and removes it", async () => { + const { GlideClient } = await import("@valkey/valkey-glide"); + const mockCreateClient = vi.mocked(GlideClient.createClient); + const mockSubClient = { close: vi.fn() }; + mockCreateClient.mockResolvedValue(mockSubClient as any); + + const { subscriber } = await getPublisher(); + await subscriber.subscribe("test-channel", vi.fn()); + await subscriber.unsubscribe("test-channel"); + + expect(mockSubClient.close).toHaveBeenCalledTimes(1); + + // Unsubscribing again should resolve safely (client already removed) + await subscriber.unsubscribe("test-channel"); + expect(mockSubClient.close).toHaveBeenCalledTimes(1); + }); +}); + +describe("ValkeyResumableStreamStore — factory", () => { + it("returns object with all required store methods", async () => { + const opts = makeOptions(); + const store = await createResumableStreamValkeyStore(opts as any); + + expect(typeof store.createNewResumableStream).toBe("function"); + expect(typeof store.resumeExistingStream).toBe("function"); + expect(typeof store.getActiveStreamId).toBe("function"); + expect(typeof store.setActiveStreamId).toBe("function"); + expect(typeof store.clearActiveStream).toBe("function"); + expect(typeof store.close).toBe("function"); + }); + + it("setActiveStreamId applies ttlSeconds as EX when configured", async () => { + const client = makeGlideClient(); + client.set.mockResolvedValue("OK"); + client.get.mockResolvedValue(null); + + const opts = { + ...makeOptions(), + client, + ttlSeconds: 600, + }; + const store = await createResumableStreamValkeyStore(opts as any); + + await store.setActiveStreamId( + { conversationId: "conv-1", userId: "user-1" } as any, + "stream-42", + ); + + expect(client.set).toHaveBeenCalledWith("resumable-stream:active:user-1-conv-1", "stream-42", { + expiry: { type: "EX", count: 600 }, + }); + }); + + it("setActiveStreamId does NOT apply EX when ttlSeconds is not configured", async () => { + const client = makeGlideClient(); + client.set.mockResolvedValue("OK"); + client.get.mockResolvedValue(null); + + const opts = { + ...makeOptions(), + client, + }; + const store = await createResumableStreamValkeyStore(opts as any); + + await store.setActiveStreamId( + { conversationId: "conv-1", userId: "user-1" } as any, + "stream-99", + ); + + const setCalls = client.set.mock.calls; + const activeSetCall = setCalls.find( + (c: any[]) => typeof c[0] === "string" && c[0].includes("active:"), + ); + expect(activeSetCall).toBeDefined(); + expect(activeSetCall).toHaveLength(2); + }); + + it("getActiveStreamId returns stored stream ID", async () => { + const client = makeGlideClient(); + client.get.mockResolvedValue("stream-123"); + client.set.mockResolvedValue("OK"); + + const opts = { ...makeOptions(), client }; + const store = await createResumableStreamValkeyStore(opts as any); + + const result = await store.getActiveStreamId({ + conversationId: "conv-1", + userId: "user-1", + } as any); + + expect(result).toBe("stream-123"); + expect(client.get).toHaveBeenCalledWith("resumable-stream:active:user-1-conv-1"); + }); + + it("getActiveStreamId returns null when no active stream exists", async () => { + const client = makeGlideClient(); + client.get.mockResolvedValue(null); + client.set.mockResolvedValue("OK"); + + const opts = { ...makeOptions(), client }; + const store = await createResumableStreamValkeyStore(opts as any); + + const result = await store.getActiveStreamId({ + conversationId: "conv-1", + userId: "user-1", + } as any); + + expect(result).toBeNull(); + }); + + it("getActiveStreamId returns null when stored value is empty string", async () => { + const client = makeGlideClient(); + client.get.mockResolvedValue(""); + client.set.mockResolvedValue("OK"); + + const opts = { ...makeOptions(), client }; + const store = await createResumableStreamValkeyStore(opts as any); + + const result = await store.getActiveStreamId({ + conversationId: "conv-1", + userId: "user-1", + } as any); + + expect(result).toBeNull(); + }); + + it("clearActiveStream deletes key when streamId matches current value", async () => { + const client = makeGlideClient(); + client.get.mockResolvedValue("stream-42"); + client.del.mockResolvedValue(1); + client.set.mockResolvedValue("OK"); + + const opts = { ...makeOptions(), client }; + const store = await createResumableStreamValkeyStore(opts as any); + + await store.clearActiveStream({ + conversationId: "conv-1", + userId: "user-1", + streamId: "stream-42", + } as any); + + expect(client.del).toHaveBeenCalledWith(["resumable-stream:active:user-1-conv-1"]); + }); + + it("clearActiveStream does NOT delete key when streamId does not match", async () => { + const client = makeGlideClient(); + client.get.mockResolvedValue("stream-other"); + client.del.mockResolvedValue(1); + client.set.mockResolvedValue("OK"); + + const opts = { ...makeOptions(), client }; + const store = await createResumableStreamValkeyStore(opts as any); + + await store.clearActiveStream({ + conversationId: "conv-1", + userId: "user-1", + streamId: "stream-42", + } as any); + + expect(client.del).not.toHaveBeenCalled(); + }); + + it("store is tagged with valkey type marker", async () => { + const opts = makeOptions(); + const store = (await createResumableStreamValkeyStore(opts as any)) as any; + + expect(store.__voltagentResumableStoreType).toBe("valkey"); + }); + + it("close() closes all subscription clients and clears the map", async () => { + const { GlideClient } = await import("@valkey/valkey-glide"); + const mockCreateClient = vi.mocked(GlideClient.createClient); + const mockSubClient1 = { close: vi.fn() }; + const mockSubClient2 = { close: vi.fn() }; + mockCreateClient + .mockResolvedValueOnce(mockSubClient1 as any) + .mockResolvedValueOnce(mockSubClient2 as any); + + const opts = makeOptions(); + const store = await createResumableStreamValkeyStore(opts as any); + + const { createResumableStreamContext } = await import("resumable-stream/generic"); + const mockCtx = vi.mocked(createResumableStreamContext); + const callArgs = mockCtx.mock.calls[mockCtx.mock.calls.length - 1][0] as any; + await callArgs.subscriber.subscribe("channel-1", vi.fn()); + await callArgs.subscriber.subscribe("channel-2", vi.fn()); + + await store.close(); + + expect(mockSubClient1.close).toHaveBeenCalledTimes(1); + expect(mockSubClient2.close).toHaveBeenCalledTimes(1); + + // Calling close() again should be safe (map is cleared) + await store.close(); + expect(mockSubClient1.close).toHaveBeenCalledTimes(1); + expect(mockSubClient2.close).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/resumable-streams/src/valkey-store.ts b/packages/resumable-streams/src/valkey-store.ts new file mode 100644 index 000000000..fb444ffe0 --- /dev/null +++ b/packages/resumable-streams/src/valkey-store.ts @@ -0,0 +1,251 @@ +import type { GlideClient, GlideClusterClient, TimeUnit } from "@valkey/valkey-glide"; +import { + buildActiveStreamKey, + createActiveStreamStoreFromPublisher, + markResumableStreamStoreType, + mergeStreamAndActiveStore, +} from "./resumable-streams"; +import type { ResumableStreamActiveStore, ResumableStreamStore } from "./types"; + +const DEFAULT_KEY_PREFIX = "resumable-stream"; + +/** + * Connection configuration passed to the Valkey GLIDE client. + * + * At minimum, `addresses` must contain one `{ host, port }` entry. Additional + * properties (TLS, timeouts, etc.) are forwarded to the underlying GLIDE + * client constructor. + */ +export interface ValkeyConnectionConfig { + addresses: Array<{ host: string; port: number }>; + useTLS?: boolean; + requestTimeout?: number; + clientName?: string; + [key: string]: unknown; +} + +/** + * Options for creating a Valkey-backed resumable stream store via + * {@link createResumableStreamValkeyStore}. + */ +export interface ResumableStreamValkeyStoreOptions { + /** Valkey client instance (standalone {@link GlideClient} or {@link GlideClusterClient}). */ + client: GlideClient | GlideClusterClient; + /** Connection config reused when creating per-channel subscription clients. */ + clientConfig: ValkeyConnectionConfig; + /** Key prefix for all Valkey keys managed by this store. Defaults to `"resumable-stream"`. */ + keyPrefix?: string; + /** Optional TTL in seconds applied to active-stream keys. Must be a positive finite number. */ + // Applied to active stream keys only; stream data keys are managed by resumable-stream/generic + ttlSeconds?: number; + /** + * Maximum number of concurrent subscription channels. Each subscription creates a + * dedicated GlideClient TCP connection (required by the Glide pub/sub model), so this + * also caps the number of open connections. Defaults to 1000. + */ + maxSubscriptions?: number; + /** Optional callback (e.g. from a serverless runtime) to keep the process alive while background work completes. */ + waitUntil?: ((promise: Promise) => void) | null; +} + +/** + * A resumable stream store backed by Valkey, combining stream creation/resumption + * with active-stream tracking and a {@link close} method for cleanup. + */ +export type ValkeyResumableStreamStore = ResumableStreamStore & + ResumableStreamActiveStore & { + close(): Promise; + }; + +/** + * Creates a Valkey-backed resumable stream store. + * + * The returned store uses the provided {@link GlideClient} (or + * {@link GlideClusterClient}) for key-value and pub/sub operations required by + * the `resumable-stream/generic` library. Each pub/sub subscription creates a + * dedicated GLIDE client connection (required by the GLIDE pub/sub model). + * + * @param options - Store configuration including the Valkey client, connection + * config, optional key prefix, TTL, and subscription limits. + * @returns A {@link ValkeyResumableStreamStore} ready for use. + * @throws If `@valkey/valkey-glide` is not installed or is an incompatible version. + * @throws If `ttlSeconds` is provided but is not a positive finite number. + */ +export async function createResumableStreamValkeyStore( + options: ResumableStreamValkeyStoreOptions, +): Promise { + let GlideClientClass: typeof GlideClient; + let GlideClientConfigurationClass: { PubSubChannelModes: { Exact: number } }; + let timeUnit: typeof TimeUnit; + + try { + const mod = await import("@valkey/valkey-glide"); + GlideClientClass = mod.GlideClient; + // PubSubChannelModes isn't exported as a value type; use a single guarded access. + const PubSubExact = (mod as any).GlideClientConfiguration?.PubSubChannelModes?.Exact; + if (PubSubExact === undefined) { + throw new Error( + "GlideClientConfiguration.PubSubChannelModes.Exact is not available. " + + "The installed version of @valkey/valkey-glide may be incompatible.", + ); + } + GlideClientConfigurationClass = { PubSubChannelModes: { Exact: PubSubExact } }; + timeUnit = mod.TimeUnit; + } catch (err) { + if (err instanceof Error && err.message.includes("PubSubChannelModes")) { + throw err; + } + throw new Error( + "@valkey/valkey-glide is required for createResumableStreamValkeyStore. " + + "Install it with: pnpm add @valkey/valkey-glide", + ); + } + + const keyPrefix = options.keyPrefix ?? DEFAULT_KEY_PREFIX; + const { client, clientConfig } = options; + + if ( + options.ttlSeconds !== undefined && + (!Number.isFinite(options.ttlSeconds) || options.ttlSeconds <= 0) + ) { + throw new Error("ttlSeconds must be a positive finite number"); + } + + // Publisher adapter + const publisher = { + async connect() {}, + async publish(channel: string, message: string) { + return client.publish(message, channel); + }, + async set(key: string, value: string, setOptions?: { EX?: number }) { + if (setOptions?.EX !== undefined) { + return client.set(key, value, { + expiry: { type: timeUnit.Seconds, count: setOptions.EX }, + }); + } + return client.set(key, value); + }, + async get(key: string): Promise { + const result = await client.get(key); + return result !== null ? String(result) : null; + }, + async incr(key: string) { + return client.incr(key); + }, + async del(key: string) { + return client.del([key]); + }, + }; + + // Subscriber adapter — one dedicated client per channel (Glide pub/sub requirement). + // Detect whether the caller provided a cluster client so subscription clients match. + let GlideClusterClientClass: typeof GlideClusterClient | undefined; + try { + const mod = await import("@valkey/valkey-glide"); + GlideClusterClientClass = mod.GlideClusterClient; + } catch { + // Already handled above; GlideClusterClient is only needed for cluster mode. + } + const isClusterMode = + GlideClusterClientClass !== undefined && client instanceof GlideClusterClientClass; + + const maxSubscriptions = options.maxSubscriptions ?? 1000; + const subscriptionClients = new Map(); + // Guard against concurrent subscribe calls interleaving across awaits. + const pendingSubscriptions = new Set(); + + const subscriber = { + async connect() {}, + async subscribe(channel: string, callback: (message: string) => void) { + // Close any existing client for this channel to avoid resource leaks on duplicate calls + const existing = subscriptionClients.get(channel); + if (existing) { + existing.close(); + subscriptionClients.delete(channel); + } + + if (pendingSubscriptions.has(channel)) { + throw new Error(`A subscription for channel "${channel}" is already being established.`); + } + + if (subscriptionClients.size + pendingSubscriptions.size >= maxSubscriptions) { + throw new Error( + `Maximum subscription limit (${maxSubscriptions}) reached. Unsubscribe from existing channels before subscribing to new ones.`, + ); + } + + pendingSubscriptions.add(channel); + try { + const pubsubConfig = { + ...clientConfig, + pubsubSubscriptions: { + channelsAndPatterns: { + [GlideClientConfigurationClass.PubSubChannelModes.Exact]: new Set([channel]), + }, + callback: (msg: { message: unknown }, _ctx: unknown) => callback(String(msg.message)), + }, + }; + + const subClient = + isClusterMode && GlideClusterClientClass + ? await GlideClusterClientClass.createClient(pubsubConfig) + : await GlideClientClass.createClient(pubsubConfig); + + subscriptionClients.set(channel, subClient); + } finally { + pendingSubscriptions.delete(channel); + } + }, + async unsubscribe(channel: string) { + const subClient = subscriptionClients.get(channel); + if (subClient) { + subClient.close(); + subscriptionClients.delete(channel); + } + }, + }; + + const { createResumableStreamContext } = await import("resumable-stream/generic"); + + const streamStore = createResumableStreamContext({ + keyPrefix, + waitUntil: options.waitUntil ?? null, + publisher, + subscriber, + }) as ResumableStreamStore; + + const activeStreamStore = createActiveStreamStoreFromPublisher(publisher, keyPrefix); + + // Wire ttlSeconds into setActiveStreamId so active stream keys expire + const ttlSeconds = options.ttlSeconds; + const ttlActiveStreamStore = + ttlSeconds !== undefined + ? { + ...activeStreamStore, + async setActiveStreamId( + context: Parameters[0], + streamId: string, + ) { + const key = buildActiveStreamKey(keyPrefix, context); + await publisher.set(key, streamId, { EX: ttlSeconds }); + }, + } + : activeStreamStore; + + const mergedStore = mergeStreamAndActiveStore(streamStore, ttlActiveStreamStore); + const taggedStore = markResumableStreamStoreType(mergedStore, "valkey", "Valkey"); + + return { + ...taggedStore, + /** + * Closes all internally-created subscription clients. The main `client` passed in + * `options` is **not** closed — the caller retains ownership of its lifecycle. + */ + async close() { + for (const subClient of subscriptionClients.values()) { + subClient.close(); + } + subscriptionClients.clear(); + }, + }; +} diff --git a/packages/resumable-streams/tsconfig.json b/packages/resumable-streams/tsconfig.json index d4612442c..a8814988a 100644 --- a/packages/resumable-streams/tsconfig.json +++ b/packages/resumable-streams/tsconfig.json @@ -23,8 +23,8 @@ "esModuleInterop": true, "skipLibCheck": true, "forceConsistentCasingInFileNames": true, - "types": ["node"] + "types": ["vitest/globals", "node"] }, - "include": ["src/**/*.ts"], - "exclude": ["node_modules", "dist"] + "include": ["src/**/*.ts", "src/**/*.spec-d.ts"], + "exclude": ["node_modules", "dist", "src/**/*.spec.ts"] } diff --git a/packages/resumable-streams/tsup.config.ts b/packages/resumable-streams/tsup.config.ts index e019584d6..c2af14e7f 100644 --- a/packages/resumable-streams/tsup.config.ts +++ b/packages/resumable-streams/tsup.config.ts @@ -2,7 +2,7 @@ import { defineConfig } from "tsup"; import { markAsExternalPlugin } from "../shared/tsup-plugins/mark-as-external"; export default defineConfig({ - entry: ["src/index.ts"], + entry: ["src/index.ts", "src/valkey-store.ts"], format: ["cjs", "esm"], splitting: false, sourcemap: true, diff --git a/packages/resumable-streams/vitest.config.ts b/packages/resumable-streams/vitest.config.ts new file mode 100644 index 000000000..236b2f5eb --- /dev/null +++ b/packages/resumable-streams/vitest.config.ts @@ -0,0 +1,17 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + include: ["**/*.spec.ts"], + environment: "node", + coverage: { + provider: "v8", + reporter: ["text", "json", "html"], + include: ["src/**/*.ts"], + exclude: ["src/**/*.d.ts", "src/**/index.ts"], + }, + globals: true, + testTimeout: 10000, + hookTimeout: 10000, + }, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 62ea9b37d..6ede9d5f4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -2980,6 +2980,46 @@ importers: specifier: ^5.8.2 version: 5.9.2 + examples/with-valkey-store: + dependencies: + '@valkey/valkey-glide': + specifier: ^2.3.1 + version: 2.3.1 + '@voltagent/a2a-server': + specifier: ^2.0.3 + version: link:../../packages/a2a-server + '@voltagent/core': + specifier: ^2.7.2 + version: link:../../packages/core + '@voltagent/internal': + specifier: ^1.0.3 + version: link:../../packages/internal + '@voltagent/logger': + specifier: ^2.0.2 + version: link:../../packages/logger + '@voltagent/resumable-streams': + specifier: ^2.0.2 + version: link:../../packages/resumable-streams + '@voltagent/server-hono': + specifier: ^2.0.12 + version: link:../../packages/server-hono + ai: + specifier: ^6.0.0 + version: 6.0.3(zod@3.25.76) + zod: + specifier: ^3.25.76 + version: 3.25.76 + devDependencies: + '@types/node': + specifier: ^24.2.1 + version: 24.6.2 + tsx: + specifier: ^4.21.0 + version: 4.21.0 + typescript: + specifier: ^5.8.2 + version: 5.9.3 + examples/with-vector-search: dependencies: '@ai-sdk/openai': @@ -3636,6 +3676,9 @@ importers: '@a2a-js/sdk': specifier: ^0.2.5 version: 0.2.5 + '@valkey/valkey-glide': + specifier: '>=2.3.1' + version: 2.3.1 '@voltagent/internal': specifier: ^1.0.2 version: link:../internal @@ -3646,6 +3689,9 @@ importers: '@voltagent/core': specifier: ^2.0.2 version: link:../core + fast-check: + specifier: ^3.23.2 + version: 3.23.2 packages/ag-ui: dependencies: @@ -4223,6 +4269,9 @@ importers: packages/resumable-streams: dependencies: + '@valkey/valkey-glide': + specifier: '>=2.3.1' + version: 2.3.1 '@voltagent/core': specifier: ^2.6.6 version: link:../core @@ -21209,6 +21258,69 @@ packages: - graphql dev: false + /@valkey/valkey-glide-darwin-arm64@2.3.1: + resolution: {integrity: sha512-S3uCwGoBqQuVieBsevftlXzy/lFkDf49wQkg0UaRGMoWhmaik8d9rwTzKky1+otLjWCblWtMDcqIWGMs5ppo8Q==} + cpu: [arm64] + os: [darwin] + requiresBuild: true + dev: false + optional: true + + /@valkey/valkey-glide-darwin-x64@2.3.1: + resolution: {integrity: sha512-7F+UoyMhO0QiAHEDZApy0+PNR7v7Gct5SMiRH+8RuUk8Nf1Cc3mNVtFXk0jwwbhatiWiSNEaMDkTRPWYZN6IcQ==} + cpu: [x64] + os: [darwin] + requiresBuild: true + dev: false + optional: true + + /@valkey/valkey-glide-linux-arm64-gnu@2.3.1: + resolution: {integrity: sha512-Aigxpq6oSxPhdX7V0JZsQrBW4pzcNibCb1m6XorGdWpG62sttV/3DKz/06V+h1dSGfqpkjWPxRJS1EmTqPYmZw==} + cpu: [arm64] + os: [linux] + requiresBuild: true + dev: false + optional: true + + /@valkey/valkey-glide-linux-arm64-musl@2.3.1: + resolution: {integrity: sha512-vmohWbUncgH+Rz1OnXPeSDVChCFMy0MYL38qcmAS05aGwL2sgMjipsEWiAsc82IAauKsVE7Ou68YZcyYK/mhag==} + cpu: [arm64] + os: [linux] + requiresBuild: true + dev: false + optional: true + + /@valkey/valkey-glide-linux-x64-gnu@2.3.1: + resolution: {integrity: sha512-v/pJefhNO2fjEKq7uQAked896S2N3ZymiO/ei2v8JRxbYk1M+/Ffl6NH6SG+Mo/k2dyeRN1B5PXNq79jv149Rg==} + cpu: [x64] + os: [linux] + requiresBuild: true + dev: false + optional: true + + /@valkey/valkey-glide-linux-x64-musl@2.3.1: + resolution: {integrity: sha512-GAcWs6MrS4v6/mjoaYyd8ay+L6Zl2L9YjkuKsupWzjSsT+PEVV/3yX0ecNeK7SGMn5Wfd3InZ3gfGPEsWEhc0g==} + cpu: [x64] + os: [linux] + requiresBuild: true + dev: false + optional: true + + /@valkey/valkey-glide@2.3.1: + resolution: {integrity: sha512-HzKCoNRSJYG83vKrYVJk+By2a0qVt/VuQfziCtdM6Q3Lt2WUE1PqxUAwO1Br2flSgrTxNdGgaGmX8qdnM25d5w==} + engines: {node: '>=16'} + dependencies: + long: 5.3.2 + protobufjs: 7.5.3 + optionalDependencies: + '@valkey/valkey-glide-darwin-arm64': 2.3.1 + '@valkey/valkey-glide-darwin-x64': 2.3.1 + '@valkey/valkey-glide-linux-arm64-gnu': 2.3.1 + '@valkey/valkey-glide-linux-arm64-musl': 2.3.1 + '@valkey/valkey-glide-linux-x64-gnu': 2.3.1 + '@valkey/valkey-glide-linux-x64-musl': 2.3.1 + dev: false + /@vercel/nft@0.29.4(supports-color@10.2.2): resolution: {integrity: sha512-6lLqMNX3TuycBPABycx7A9F1bHQR7kiQln6abjFbPrf5C/05qHM9M5E4PeTE59c7z8g6vHnx1Ioihb2AQl7BTA==} engines: {node: '>=18'} @@ -21295,7 +21407,7 @@ packages: '@babel/core': 7.28.5 '@babel/plugin-syntax-typescript': 7.27.1(@babel/core@7.28.5) '@babel/plugin-transform-typescript': 7.28.0(@babel/core@7.28.5) - '@rolldown/pluginutils': 1.0.0-rc.9 + '@rolldown/pluginutils': 1.0.0-rc.17 '@vue/babel-plugin-jsx': 1.5.0(@babel/core@7.28.5) vite: 7.2.7(@types/node@24.2.1)(jiti@2.6.1) vue: 3.5.22(typescript@5.9.3)