Skip to content

feat: add Valkey-backed TaskStore and ResumableStreamStore#1259

Open
rileydes-improving wants to merge 22 commits into
VoltAgent:mainfrom
rileydes-improving:feat/valkey-store-providers
Open

feat: add Valkey-backed TaskStore and ResumableStreamStore#1259
rileydes-improving wants to merge 22 commits into
VoltAgent:mainfrom
rileydes-improving:feat/valkey-store-providers

Conversation

@rileydes-improving
Copy link
Copy Markdown

@rileydes-improving rileydes-improving commented Apr 30, 2026

PR Checklist

Please check if your PR fulfills the following requirements:

Bugs / Features

What is the current behavior?

The @voltagent/a2a-server and @voltagent/resumable-streams packages only support in-memory storage. Task state and stream data are lost on restart and cannot be shared across multiple server instances.

What is the new behavior?

Adds Valkey-backed store implementations to both packages using the @valkey/valkey-glide client library:

@voltagent/a2a-server

  • New ValkeyTaskStore class implementing the TaskStore interface with composite key generation, JSON serialization, and optional TTL-based expiration.
  • createValkeyTaskStore factory function for easy instantiation.
  • New taskStore option on A2AServerConfig so a store can be provided at construction time.

@voltagent/resumable-streams

  • New createResumableStreamValkeyStore factory that wires Valkey pub/sub into the resumable-stream generic context, with per-channel dedicated GlideClient connections and a configurable max subscription limit (default 1000).
  • TTL support for active stream keys via optional ttlSeconds.
  • Exports internal utilities (markResumableStreamStoreType, buildActiveStreamKey, createActiveStreamStoreFromPublisher, mergeStreamAndActiveStore) for reuse by custom store implementations.

Shared

  • @valkey/valkey-glide added as an optional peer dependency in both packages so consumers who don't use Valkey are unaffected.
  • Both stores support standalone and cluster Valkey deployments, configurable key prefixes, and optional TTL.

Example

  • New examples/with-valkey-store project demonstrating end-to-end integration of both stores with a SupportAgent, including environment config template and README.

Tests

  • Comprehensive unit tests for ValkeyTaskStore (load, save, error handling, cancellations, factory function).
  • Comprehensive unit tests for createResumableStreamValkeyStore (publisher/subscriber adapters, TTL, subscription limits, close cleanup).

Notes for reviewers

  • The @valkey/valkey-glide peer dependency is optional, no impact on existing consumers.
  • The resumable-streams Valkey store creates one dedicated GlideClient per subscription channel, which is a requirement of the Glide pub/sub model. The maxSubscriptions option (default 1000) caps resource usage.

Summary by CodeRabbit

  • New Features

    • Valkey-backed storage for task persistence and resumable streams (configurable key prefixes, optional TTL, supports standalone/cluster)
    • New example app showcasing Valkey integration and a sample assistant with a runtime tool
  • Documentation

    • Added example README, env/sample files, package config and run instructions
  • Tests

    • New test suites for Valkey-backed task and resumable-stream stores
  • Breaking Changes

    • Server config can now specify a task store that overrides the injected default (config > deps > in-memory)

Signed-off-by: Riley Des <riley.desserre@improving.com>
- Add ValkeyTaskStore class for distributed task persistence using Valkey
- Support optional TTL configuration for automatic task expiration
- Add createValkeyTaskStore factory function with lazy loading of @valkey/valkey-glide
- Make @valkey/valkey-glide an optional peer dependency
- Add taskStore configuration option to A2AServerConfig
- Update task store initialization to respect precedence:
config.taskStore > deps.taskStore > InMemoryTaskStore
- Export ValkeyTaskStore from main index

Signed-off-by: Riley Des <riley.desserre@improving.com>
- Add unit tests for load() method covering deserialization,
  null handling, and Buffer conversion
- Add unit tests for save() method covering composite key
  generation, default keyPrefix, JSON serialization, and
  TTL options
- Add error handling tests for corrupted JSON and client
  connection failures
- Add tests for activeCancellations Set exposure
- Add factory function tests for createValkeyTaskStore with
and without TTL configuration
- Mock @valkey/valkey-glide to avoid runtime dependency
  in tests
- Include helper functions for creating mock clients and
TaskRecord fixtures

Signed-off-by: Riley Des <riley.desserre@improving.com>
- Add Valkey/Glide-based store for resumable streams with
  pub/sub support
- Export internal utilities for reuse by store implementations
  (markResumableStreamStoreType, buildActiveStreamKey,
  createActiveStreamStoreFromPublisher,
  mergeStreamAndActiveStore)
- Add ValkeyConnectionConfig and
  ResumableStreamValkeyStoreOptions types for configuration
- Implement subscription management with configurable max
  concurrent connections (default 1000)
- Add TTL support for active stream keys via optional
  ttlSeconds parameter
- Add @valkey/valkey-glide as optional peer dependency
- Export createResumableStreamValkeyStore factory

Signed-off-by: Riley Des <riley.desserre@improving.com>
- Add test suite for ValkeyResumableStreamStore with publisher and subscriber adapters
- Test publisher operations: set with/without expiry, get with Buffer conversion, incr, publish, del
- Test subscriber operations: subscribe with pubsub config, callback invocation, unsubscribe cleanup
- Test factory method returns all required store methods and applies TTL configuration
- Add vitest configuration for resumable-streams package
- Update tsconfig.json to support test environment
- Ensure proper mocking of @valkey/valkey-glide and resumable-stream/generic modules

Signed-off-by: Riley Des <riley.desserre@improving.com>
resumable streams

- Add complete example demonstrating ValkeyTaskStore and
  ValkeyResumableStreamStore integration
- Include SupportAgent implementation backed by Valkey
  for distributed persistence
- Add environment configuration template with Valkey
  connection defaults
- Include comprehensive README with setup instructions,
  configuration options, and usage examples
- Add changeset documenting new Valkey store providers
  in a2a-server and resumable-streams packages
- Demonstrate both standalone and cluster Valkey deployments
  with optional TTL-based expiration

Signed-off-by: Riley Des <riley.desserre@improving.com>
- Add ttlSeconds validation guard in createValkeyTaskStore
- Add debug log when config.taskStore overrides deps.taskStore
- Add class-level JSDoc documenting activeCancellations process-local limitation
- Mark timeUnitSeconds constructor param as @internal" 2>&1

Signed-off-by: Riley Des <riley.desserre@improving.com>
- Simplify PubSubChannelModes.Exact cast to single optional-chain access
- Await close() in subscribe and unsubscribe to prevent resource leaks
- Use String(msg.message) instead of unsafe as-string cast in pub/sub callback
- Add ttlSeconds validation guard in createResumableStreamValkeyStore
- Document close() ownership semantics (caller retains main client)
- Add @internal tags to four newly-exported helper functions

Signed-off-by: Riley Des <riley.desserre@improving.com>
- Document taskStore config precedence change in changeset description
- Update example tsconfig moduleResolution from Node to NodeNext

Signed-off-by: Riley Des <riley.desserre@improving.com>
@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented Apr 30, 2026

🦋 Changeset detected

Latest commit: 49941b1

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 4 packages
Name Type
@voltagent/a2a-server Major
@voltagent/resumable-streams Minor
@voltagent/server-elysia Patch
@voltagent/server-hono Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 30, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3999759e-f75c-43f6-995b-65ade8ba3c84

📥 Commits

Reviewing files that changed from the base of the PR and between ee37043 and 49941b1.

📒 Files selected for processing (1)
  • packages/a2a-server/src/server.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/a2a-server/src/server.ts

📝 Walkthrough

Walkthrough

Adds Valkey-backed persistence: a ValkeyTaskStore for A2A task persistence and createResumableStreamValkeyStore for resumable streams; introduces Zod schemas and z.infer-based exported types, updates A2AServer to accept/configure taskStore precedence, adds package subpath exports and optional peer dependency on @valkey/valkey-glide, plus tests, example, and docs.

Changes

Valkey-backed persistence providers

Layer / File(s) Summary
Data Shape / Validation
packages/a2a-server/src/schemas.ts, packages/a2a-server/src/types.ts
Add Zod schemas and switch exported Task/Message/Record types to z.infer<>; add A2AServerConfig.taskStore?: TaskStore.
Core Implementation — Task Store
packages/a2a-server/src/valkey-store.ts, packages/a2a-server/src/server.ts
Add ValkeyTaskStore and createValkeyTaskStore() (namespaced keys, JSON parse + schema validation, TTL via dynamic import of @valkey/valkey-glide, activeCancellations); A2AServer.initialize applies config.taskStore ?? deps.taskStore ?? new InMemoryTaskStore() and logs override; error path in handleMessageSend appends failure message, marks task failed, persists update and logs on save failure.
Core Implementation — Resumable Streams
packages/resumable-streams/src/valkey-store.ts, packages/resumable-streams/src/resumable-streams.ts
Add createResumableStreamValkeyStore() with publisher/subscriber adapters, per-channel pubsub clients, duplicate-subscribe protection, optional TTL for active-stream keys, close() to clean up internal subscribers; promote four previously internal helpers to exported /** @internal */ symbols.
Runtime Errors & API Surface
packages/a2a-server/src/types.ts, packages/a2a-server/src/server.ts
Add VoltA2AError.toJsonRpcError() and multiple static factory constructors; validateMessageSendParams now uses A2AMessageSchema.parse and maps schema errors to VoltA2AError.invalidParams.
Packaging / Exports / Build
packages/a2a-server/package.json, packages/resumable-streams/package.json, packages/*/tsup.config.ts
Add ./valkey-store subpath exports for both packages; add optional peerDependency @valkey/valkey-glide (>=2.3.1) via peerDependenciesMeta; include src/valkey-store.ts in tsup entry lists.
Tests & Tooling
packages/a2a-server/src/valkey-store.spec.ts, packages/resumable-streams/src/valkey-store.spec.ts, packages/resumable-streams/vitest.config.ts, packages/resumable-streams/tsconfig.json
Add Vitest suites exercising Valkey behavior with mocked Glide client and per-channel pubsub flows; add package-level Vitest config and test type adjustments.
Example & Docs
./.changeset/add-valkey-store-providers.md, examples/with-valkey-store/*
Add changeset and example app demonstrating Valkey client init, ValkeyTaskStore and ValkeyResumableStreamStore usage, README, env example, agent and index.
Repository config
./.gitignore, examples/with-valkey-store/.gitignore
Ignore .kiro at repo root and add .gitignore for the new example package.

Sequence Diagram(s)

sequenceDiagram
    participant App as Example App
    participant A2A as A2AServer
    participant VTS as ValkeyTaskStore
    participant Valkey as Valkey Glide Client
    participant Agent as Support Agent

    App->>A2A: initialize(factory with taskStore=ValkeyTaskStore)
    A2A->>A2A: config.taskStore overrides deps.taskStore if present
    A2A->>VTS: keep reference to task store

    Agent->>A2A: POST /message/send
    A2A->>VTS: save(agentId, TaskRecord)
    VTS->>VTS: ensure id, stringify JSON
    VTS->>Valkey: set(key, value, { expiry: { type: "EX", count } })
    Valkey-->>VTS: OK
    VTS-->>A2A: void

    Agent->>A2A: GET /task/{taskId}
    A2A->>VTS: load(agentId, taskId)
    VTS->>Valkey: get(key)
    Valkey-->>VTS: JSON|string|null
    VTS->>VTS: parse & validate via TaskRecordSchema
    VTS-->>A2A: TaskRecord|null
Loading
sequenceDiagram
    participant App as Example App
    participant RSS as ResumableStreamValkeyStore
    participant Pub as Publisher Adapter
    participant Sub as Subscriber Adapter
    participant Valkey as Valkey Glide Client

    App->>RSS: createResumableStreamValkeyStore(client, ttlSeconds)
    RSS->>Pub: create publisher adapter
    Pub->>Valkey: set(key, value, { expiry: { type: "EX", count } })
    RSS->>Sub: create subscriber adapter (per-channel clients)
    Sub->>Valkey: create per-channel pubsub client (Exact mode)
    RSS->>RSS: merge stream + active-store, return store
    App->>RSS: setActiveStreamId(ctx, id)
    RSS->>Pub: set(activeKey, id, { expiry: ... })
    App->>RSS: close()
    RSS->>Sub: close per-channel pubsub clients
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐰 I hopped into Valkey's glade today,

Keys escaped and TTLs kept at play,
Tasks tucked safe and streams awake on cue,
A rabbit cheers the stores both brave and new,
Persistence hops — hurrah, we flew!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 64.71% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The PR title accurately summarizes the main change: adding Valkey-backed TaskStore and ResumableStreamStore implementations.
Description check ✅ Passed The PR description is comprehensive, addressing all template sections including checklist items, current behavior, new behavior, and detailed notes for reviewers.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7 issues found across 23 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name=".changeset/add-valkey-store-providers.md">

<violation number="1" location=".changeset/add-valkey-store-providers.md:2">
P2: `@voltagent/a2a-server` is marked `minor` even though the changeset body says it contains a breaking change; this should be a `major` bump.</violation>
</file>

<file name="examples/with-valkey-store/tsconfig.json">

<violation number="1" location="examples/with-valkey-store/tsconfig.json:6">
P1: `module: "ESNext"` is incompatible with `moduleResolution: "NodeNext"` in modern TypeScript and can cause the example's TS config to fail validation/build.</violation>
</file>

<file name="packages/resumable-streams/src/valkey-store.spec.ts">

<violation number="1" location="packages/resumable-streams/src/valkey-store.spec.ts:176">
P2: Invalid matcher combination: `.resolves.not.toThrow()` does not meaningfully assert that the unsubscribe promise completes without throwing.</violation>
</file>

<file name="packages/resumable-streams/src/valkey-store.ts">

<violation number="1" location="packages/resumable-streams/src/valkey-store.ts:126">
P2: Concurrent subscribe calls can leak an untracked subscription client and bypass the maxSubscriptions limit because the map checks and the map update are separated by an await.</violation>

<violation number="2" location="packages/resumable-streams/src/valkey-store.ts:126">
P1: Subscription clients are always created as standalone `GlideClient` instances, so cluster-mode stores can use the wrong pub/sub client type.</violation>
</file>

<file name="packages/a2a-server/src/valkey-store.ts">

<violation number="1" location="packages/a2a-server/src/valkey-store.ts:54">
P2: Exported constructor bypasses TTL validation, allowing invalid expiry values to reach Valkey writes.</violation>

<violation number="2" location="packages/a2a-server/src/valkey-store.ts:99">
P1: Raw `agentId`/`taskId` are interpolated into the Valkey key without escaping, so delimiter-containing IDs can collide and overwrite unrelated task records.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Comment thread examples/with-valkey-store/tsconfig.json Outdated
Comment thread packages/resumable-streams/src/valkey-store.ts Outdated
Comment thread packages/a2a-server/src/valkey-store.ts Outdated
Comment thread .changeset/add-valkey-store-providers.md Outdated
Comment thread packages/resumable-streams/src/valkey-store.spec.ts Outdated
Comment thread packages/resumable-streams/src/valkey-store.ts Outdated
Comment thread packages/a2a-server/src/valkey-store.ts
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

🧹 Nitpick comments (2)
packages/a2a-server/src/valkey-store.spec.ts (1)

33-176: Include line 151 in the list of as any casts to replace.

The file contains 14 instances of broad as any casts across lines 33, 43, 54, 65, 76, 87, 102, 114, 124, 134, 145, 151, 162, and 175. Replace these with typed mocks to maintain type safety in the TypeScript-first codebase and strengthen API-drift detection.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/a2a-server/src/valkey-store.spec.ts` around lines 33 - 176, The
tests overuse "as any" when constructing ValkeyTaskStore and calling
createValkeyTaskStore (including the instance at the place you noted) — replace
those broad casts with properly typed mocks: introduce a typed mock/interface
for the Valkey client (e.g., ValkeyClient or the client interface used by
ValkeyTaskStore), update makeClient to return jest.Mocked<ValkeyClient> (or a
typed manual mock) and cast using that type (e.g., const client = makeClient()
as jest.Mocked<ValkeyClient>), then use those typed mock clients when
instantiating ValkeyTaskStore and calling createValkeyTaskStore so all previous
"as any" casts (including the flagged occurrence) are removed and the tests keep
TypeScript type-safety.
packages/resumable-streams/src/valkey-store.spec.ts (1)

39-347: Reduce as any usage throughout this test file to maintain type safety.

This test file contains 41 instances of as any casts that suppress useful type checks. Examples include lines 39, 41, 48, 52, 59, 63, 64, 65, and many others. These casts make it easier to miss API regressions and reduce the value of TypeScript's type system. Consider using proper types, generics, or more specific type assertions where possible.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/resumable-streams/src/valkey-store.spec.ts` around lines 39 - 347,
Many tests overuse "as any" (e.g., in getPublisher, makeGlideClient, and
assertions that inspect mock calls) which erodes type safety; replace those
casts with precise types and mocked types: use the actual factory return type
(e.g., let store: Awaited<ReturnType<typeof createResumableStreamValkeyStore>>),
type publisher/subscriber as the adapter interfaces returned from
createResumableStreamValkeyStore, type client as ReturnType<typeof
makeGlideClient> or jest/vi Mocked<GlideClient> (and mock
GlideClient.createClient with MockedFunction), and type
createResumableStreamContext mocks via MockedFunction or ReturnType<typeof
createResumableStreamContext>; update call-site assertions to use typed
mock.calls (no "as any") so TypeScript checks argument shapes for publisher.set,
client.set/get/incr/publish/del and subscriber.subscribe/unsubscribe, and remove
or narrow all "as any" occurrences accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In @.changeset/add-valkey-store-providers.md:
- Around line 2-3: The changeset currently lists "@voltagent/a2a-server" as a
minor bump but the changes described on lines 14–18 are breaking; update the
changeset entry so "@voltagent/a2a-server": major instead of minor to reflect
the breaking change, and verify any other affected packages (e.g.,
"@voltagent/resumable-streams") are bumped consistently if they are impacted by
the same breaking behavior described in lines 14–18.

In `@examples/with-valkey-store/README.md`:
- Around line 50-59: The unlabeled fenced code block in README.md (the directory
tree example under examples/with-valkey-store) triggers markdownlint MD040;
update that triple-backtick fence to specify a language (e.g., add "text" so it
becomes ```text) so the block is labeled and the lint warning is resolved—locate
the directory-tree block in README.md and add the language identifier to the
opening fence.

In `@examples/with-valkey-store/src/index.ts`:
- Around line 17-23: Validate the VALKEY_PORT env value before calling
GlideClient.createClient: parse process.env.VALKEY_PORT (if set) to a number,
ensure it is an integer within the valid TCP port range (1–65535) and not
NaN/Infinity, and if it fails validation throw or exit with a clear error
mentioning VALKEY_PORT; then set the port variable to the validated number or
the default 6379 and proceed to call GlideClient.createClient in main using the
host and port variables.

In `@examples/with-valkey-store/tsconfig.json`:
- Around line 6-8: Update the tsconfig.json to use a compatible module setting:
change the "module" compiler option from "ESNext" to "NodeNext" so it matches
"moduleResolution": "NodeNext"; locate the "module" and "moduleResolution"
entries in the tsconfig and set "module" to "NodeNext".

In `@packages/a2a-server/src/index.ts`:
- Line 5: Remove the root re-export of "./valkey-store" from index.ts so the
Valkey types are not pulled into the package's public declarations; instead
surface that module only via a dedicated subpath export (e.g., keep a separate
"./valkey-store" export entry in package exports) or make the types in
valkey-store.ts purely runtime (use dynamic import) or isolated as "export type"
only so they don't appear in generated .d.ts. Concretely: delete or comment out
the line exporting "./valkey-store" from the root index export, and add/ensure a
package subpath export for "./valkey-store" (or convert its API to
runtime-only/dynamic import or type-only exports) so consumers who don't depend
on `@valkey/valkey-glide` aren’t forced to install it.

In `@packages/a2a-server/src/valkey-store.ts`:
- Around line 54-59: The constructor of ValkeyTaskStore accepts
options.ttlSeconds but does not validate it, allowing invalid TTLs to flow into
set operations; mirror the validation performed in createValkeyTaskStore by
checking options.ttlSeconds in the ValkeyTaskStore constructor (class
ValkeyTaskStore, constructor) and throw a clear error for invalid values (e.g.,
non-number, non-positive, or out-of-range) or normalize it as done by
createValkeyTaskStore; update the constructor to perform the same validation
logic so direct new ValkeyTaskStore(...) calls have the same TTL guarantees as
createValkeyTaskStore.
- Around line 84-86: The save method is using params.data.id directly which can
be undefined and produces keys like "::undefined"; update the save flow in
ValkeyStore.save to normalize/generate a task id when missing (mirror
InMemoryTaskStore behavior) before calling makeKey and safeStringify — e.g.,
ensure TaskRecord.id is assigned (generate or derive a stable id) and use that
normalized id when calling makeKey(params.agentId, id) and when persisting the
JSON so keys never contain "undefined".

In `@packages/resumable-streams/package.json`:
- Around line 37-45: The package marks `@valkey/valkey-glide` as optional in
peerDependenciesMeta but re-exports types (ResumableStreamValkeyStoreOptions,
ValkeyConnectionConfig, ValkeyResumableStreamStore) from ./valkey-store
(index.ts -> valkey-store) which imports `@valkey/valkey-glide` types at
top-level, so update the package.json to make `@valkey/valkey-glide` a required
peer dependency (remove the "optional": true entry) OR alternatively stop
re-exporting those Valkey types from the public API (move them behind an
internal module or lazy-type-export) so TypeScript consumers don’t need
`@valkey/valkey-glide` for type-checking. Ensure changes target the
peerDependencies/peerDependenciesMeta entries for `@valkey/valkey-glide` or
refactor the exports in valkey-store accordingly.

In `@packages/resumable-streams/src/valkey-store.ts`:
- Around line 83-85: The publish method in valkey-store.ts is passing arguments
to client.publish in the wrong order; update the publish(channel: string,
message: string) implementation to call client.publish with the message first
and the channel second (i.e., client.publish(message, channel)) so that the
adapter conforms to GLIDE's Node API; modify the async publish function in the
Valkey store adapter to swap the arguments and return that result.
- Around line 126-134: The subscription creation always calls
GlideClientClass.createClient which forces standalone behavior; detect whether
options.client is a GlideClusterClient (or exposes cluster-specific API) and
call the cluster client creation path instead (e.g., use
GlideClusterClient.createClient with the cluster-aware pubsub config and channel
mode constants) so sharded/cluster subscriptions are honored; update the code
that builds subClient (the block invoking GlideClientClass.createClient, the
clientConfig/ pubsubSubscriptions usage, and any references to
GlideClientConfigurationClass.PubSubChannelModes.Exact) to branch on the actual
client type or presence of cluster methods and pass the appropriate cluster
configuration symbols when a cluster client is provided.

---

Nitpick comments:
In `@packages/a2a-server/src/valkey-store.spec.ts`:
- Around line 33-176: The tests overuse "as any" when constructing
ValkeyTaskStore and calling createValkeyTaskStore (including the instance at the
place you noted) — replace those broad casts with properly typed mocks:
introduce a typed mock/interface for the Valkey client (e.g., ValkeyClient or
the client interface used by ValkeyTaskStore), update makeClient to return
jest.Mocked<ValkeyClient> (or a typed manual mock) and cast using that type
(e.g., const client = makeClient() as jest.Mocked<ValkeyClient>), then use those
typed mock clients when instantiating ValkeyTaskStore and calling
createValkeyTaskStore so all previous "as any" casts (including the flagged
occurrence) are removed and the tests keep TypeScript type-safety.

In `@packages/resumable-streams/src/valkey-store.spec.ts`:
- Around line 39-347: Many tests overuse "as any" (e.g., in getPublisher,
makeGlideClient, and assertions that inspect mock calls) which erodes type
safety; replace those casts with precise types and mocked types: use the actual
factory return type (e.g., let store: Awaited<ReturnType<typeof
createResumableStreamValkeyStore>>), type publisher/subscriber as the adapter
interfaces returned from createResumableStreamValkeyStore, type client as
ReturnType<typeof makeGlideClient> or jest/vi Mocked<GlideClient> (and mock
GlideClient.createClient with MockedFunction), and type
createResumableStreamContext mocks via MockedFunction or ReturnType<typeof
createResumableStreamContext>; update call-site assertions to use typed
mock.calls (no "as any") so TypeScript checks argument shapes for publisher.set,
client.set/get/incr/publish/del and subscriber.subscribe/unsubscribe, and remove
or narrow all "as any" occurrences accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8c7bce3a-6df1-4191-83b4-351b0c972519

📥 Commits

Reviewing files that changed from the base of the PR and between ad4c44c and d625eba.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (22)
  • .changeset/add-valkey-store-providers.md
  • .gitignore
  • examples/with-valkey-store/.env.example
  • examples/with-valkey-store/.gitignore
  • examples/with-valkey-store/README.md
  • examples/with-valkey-store/package.json
  • examples/with-valkey-store/src/agents/assistant.ts
  • examples/with-valkey-store/src/index.ts
  • examples/with-valkey-store/tsconfig.json
  • packages/a2a-server/package.json
  • packages/a2a-server/src/index.ts
  • packages/a2a-server/src/server.ts
  • packages/a2a-server/src/types.ts
  • packages/a2a-server/src/valkey-store.spec.ts
  • packages/a2a-server/src/valkey-store.ts
  • packages/resumable-streams/package.json
  • packages/resumable-streams/src/index.ts
  • packages/resumable-streams/src/resumable-streams.ts
  • packages/resumable-streams/src/valkey-store.spec.ts
  • packages/resumable-streams/src/valkey-store.ts
  • packages/resumable-streams/tsconfig.json
  • packages/resumable-streams/vitest.config.ts

Comment thread .changeset/add-valkey-store-providers.md Outdated
Comment thread examples/with-valkey-store/README.md Outdated
Comment thread examples/with-valkey-store/src/index.ts
Comment thread examples/with-valkey-store/tsconfig.json Outdated
Comment thread packages/a2a-server/src/index.ts Outdated
Comment thread packages/a2a-server/src/valkey-store.ts
Comment thread packages/a2a-server/src/valkey-store.ts Outdated
Comment thread packages/resumable-streams/package.json
Comment thread packages/resumable-streams/src/valkey-store.ts
Comment thread packages/resumable-streams/src/valkey-store.ts Outdated
…ndling

- Swap publish(channel, message) to publish(message, channel) to match
  the GLIDE API signature
- Use GlideClusterClient.createClient for subscription clients when the
  main client is a cluster instance
- Add pendingSubscriptions guard to prevent race conditions and
  subscription limit bypass on concurrent subscribe calls
- Simplify close() to call synchronous subClient.close() directly
- Update test assertion for corrected publish argument order
- Replace invalid .resolves.not.toThrow() matcher with direct await
- Add docstrings to all exported interfaces, types, classes, methods, and
factory functions

Signed-off-by: Riley Des <riley.desserre@improving.com>
- Validate ttlSeconds in the ValkeyTaskStore constructor so direct
  instantiation has the same guards as createValkeyTaskStore
- Escape colons in agentId and taskId within makeKey to prevent
  delimiter-based key collisions
- Add docstrings to all exported interfaces, types, classes, methods, and
factory functions

Signed-off-by: Riley Des <riley.desserre@improving.com>
The changeset body describes a breaking change to A2AServerConfig but
the version was marked as minor.

Signed-off-by: Riley Des <riley.desserre@improving.com>
- Replace console.debug with getGlobalLogger() in A2AServer.initialize
- Replace wildcard re-export of valkey-store with explicit named exports
  to avoid leaking @valkey/valkey-glide types to non-Valkey consumers

Signed-off-by: Riley Des <riley.desserre@improving.com>
- Change module from ESNext to NodeNext in tsconfig
- Add VALKEY_PORT validation (integer, 1-65535 range)
- Add text language identifier to fenced code block (MD040)

Signed-off-by: Riley Des <riley.desserre@improving.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/a2a-server/src/valkey-store.ts`:
- Around line 122-124: The parsed JSON is being force-cast to TaskRecord without
runtime validation; add a runtime type guard to validate the parsed object
(e.g., an isTaskRecord(payload): boolean) that checks required properties and
their types (IDs, status enum/string, timestamps, payload shapes, etc.), use
JSON.parse(String(result)) and then run the guard before returning, and throw or
handle an error when validation fails so malformed persisted values are not
returned from the function that currently parses result into TaskRecord.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 111f8a04-00bb-47d1-a771-a8f898ae563a

📥 Commits

Reviewing files that changed from the base of the PR and between d625eba and a3675e2.

📒 Files selected for processing (9)
  • .changeset/add-valkey-store-providers.md
  • examples/with-valkey-store/README.md
  • examples/with-valkey-store/src/index.ts
  • examples/with-valkey-store/tsconfig.json
  • packages/a2a-server/src/index.ts
  • packages/a2a-server/src/server.ts
  • packages/a2a-server/src/valkey-store.ts
  • packages/resumable-streams/src/valkey-store.spec.ts
  • packages/resumable-streams/src/valkey-store.ts
✅ Files skipped from review due to trivial changes (4)
  • .changeset/add-valkey-store-providers.md
  • examples/with-valkey-store/tsconfig.json
  • packages/resumable-streams/src/valkey-store.ts
  • examples/with-valkey-store/README.md
🚧 Files skipped from review as they are similar to previous changes (3)
  • packages/a2a-server/src/index.ts
  • packages/a2a-server/src/server.ts
  • packages/resumable-streams/src/valkey-store.spec.ts

Comment thread packages/a2a-server/src/valkey-store.ts
- Add missing test for Zod validation failure on malformed TaskRecord
- Use safeStringify(validation.error.issues) for structured error output
- Document intentional save/load validation asymmetry in JSDoc

Signed-off-by: Riley Des <riley.desserre@improving.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/a2a-server/src/valkey-store.ts`:
- Around line 157-167: The code currently calls safeStringify(params.data) and
persists its result via client.set in both branches (the ttlSeconds path using
getTimeUnitSeconds and the else path) without checking for the safe-stringify
error sentinel; update the logic around makeKey, safeStringify, ttlSeconds,
getTimeUnitSeconds and client.set to detect when json startsWith
"SAFE_STRINGIFY_ERROR:" and immediately throw a descriptive Error (including
agentId and data.id) instead of calling client.set (so load() won't later fail
on a corrupted record).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9cbd2dff-a9d3-4c09-b0c6-fd67d6b0fe59

📥 Commits

Reviewing files that changed from the base of the PR and between a3675e2 and d418794.

📒 Files selected for processing (4)
  • packages/a2a-server/src/schemas.ts
  • packages/a2a-server/src/types.ts
  • packages/a2a-server/src/valkey-store.spec.ts
  • packages/a2a-server/src/valkey-store.ts
🚧 Files skipped from review as they are similar to previous changes (2)
  • packages/a2a-server/src/valkey-store.spec.ts
  • packages/a2a-server/src/types.ts

Comment thread packages/a2a-server/src/valkey-store.ts Outdated
…th entry point

Move Valkey store types and factory functions from the main package
entry point to a dedicated `./valkey-store` sub-path export. This
prevents TypeScript consumers who don't use Valkey from needing
`@valkey/valkey-glide` installed for type resolution, keeping it
correctly optional.

- Add `src/valkey-store.ts` as a second tsup entry point
- Add `./valkey-store` export map in package.json
- Remove Valkey re-exports from `src/index.ts`

Signed-off-by: Riley Des <riley.desserre@improving.com>
Update imports to use the new `./valkey-store` sub-path exports from
`@voltagent/a2a-server` and `@voltagent/resumable-streams`. Fix missing
`.js` extension on relative import required by NodeNext module resolution.

Signed-off-by: Riley Des <riley.desserre@improving.com>
… undefined keys

Signed-off-by: Riley Des <riley.desserre@improving.com>
…g task record

Signed-off-by: Riley Des <riley.desserre@improving.com>
…pes, and server

- Add JSDoc documentation to all exported Zod schemas in schemas.ts
- Add JSDoc documentation to type definitions and interfaces in types.ts
- Add JSDoc documentation to A2AServer class and public methods
- Add JSDoc documentation to example agent configuration

Signed-off-by: Riley Des <riley.desserre@improving.com>
@rileydes-improving
Copy link
Copy Markdown
Author

@coderabbitai resume

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 4, 2026

✅ Actions performed

Reviews resumed.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
packages/a2a-server/src/server.ts (2)

289-338: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Mark one-shot send failures as failed before rethrowing.

The record is persisted as working before generation starts, but the non-abort error path just rethrows. If agent.generateText() fails, the task stays stuck in working instead of transitioning to failed.

Suggested fix
     } catch (error) {
       if (abortController.signal.aborted) {
         return await this.ensureCanceledRecord(agentId, record);
       }
+      record = transitionStatus(record, {
+        state: "failed",
+        message: {
+          kind: "message",
+          role: "agent",
+          messageId: randomUUID(),
+          taskId: record.id,
+          contextId: record.contextId,
+          parts: [
+            {
+              kind: "text",
+              text: error instanceof Error ? error.message : "Task failed",
+            },
+          ],
+        },
+      });
+      await taskStore.save({ agentId, data: record });
       throw error;
     } finally {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/a2a-server/src/server.ts` around lines 289 - 338, The catch block
after agent.generateText must mark the task record as failed and persist it
before rethrowing: when catching a non-abort error, call
transitionStatus(record, { state: "failed", message: /* a short failure message
or error info */ }) to update the record, save it with taskStore.save({ agentId,
data: record }), then rethrow the error; update the catch path that currently
only checks abortController.signal.aborted (around agent.generateText,
registerActiveOperation, ensureCanceledRecord, transitionStatus and
taskStore.save usage) to perform these steps so the task doesn't stay in
"working".

694-716: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Use the new Zod schema here instead of a partial hand-rolled check.

This only validates parts, so malformed messages can still slip through with missing or invalid kind, role, or messageId. That bypasses A2AMessageSchema and can leave the store with invalid records.

Suggested fix
+import { A2AMessageSchema } from "./schemas";
+
   private validateMessageSendParams(payload: unknown): MessageSendParams {
     if (!payload || typeof payload !== "object") {
       throw VoltA2AError.invalidParams("Params must be an object");
     }
     const candidate = payload as Partial<MessageSendParams>;
+
+    if (!A2AMessageSchema.safeParse(candidate.message).success) {
+      throw VoltA2AError.invalidParams("Invalid message payload");
+    }
 
-    if (!candidate.message || typeof candidate.message !== "object") {
-      throw VoltA2AError.invalidParams("'message' must be provided");
-    }
-
-    if (!Array.isArray(candidate.message.parts) || candidate.message.parts.length === 0) {
-      throw VoltA2AError.invalidParams("Message must include at least one part");
-    }
-
-    const hasInvalidPart = candidate.message.parts.some(
-      (part) => part.kind !== "text" || typeof part.text !== "string",
-    );
-    if (hasInvalidPart) {
-      throw VoltA2AError.invalidParams("Only plain text message parts are supported");
-    }
-
     return candidate as MessageSendParams;
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/a2a-server/src/server.ts` around lines 694 - 716, The current
validateMessageSendParams function performs partial, hand-rolled checks that
miss fields like kind, role, and messageId; replace this with the existing Zod
schema by importing A2AMessageSchema and using it to parse/validate the incoming
payload (or at least candidate.message) so malformed messages are rejected. In
practice, remove the manual checks inside validateMessageSendParams, cast
payload to unknown→object only for typing, call A2AMessageSchema.parse(payload
or payload.message) and let Zod throw on invalid data, then return the validated
value as MessageSendParams; keep the function name validateMessageSendParams and
ensure any thrown Zod errors are converted to VoltA2AError.invalidParams as
needed to preserve error semantics.
packages/a2a-server/src/types.ts (1)

234-243: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Preserve falsy error details in toJsonRpcError().

this.data ? ... drops valid payloads like 0, false, or "", so those error details disappear during serialization.

Suggested fix
-      ...(this.data ? { details: this.data } : {}),
+      ...(this.data !== undefined ? { details: this.data } : {}),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/a2a-server/src/types.ts` around lines 234 - 243, The
toJsonRpcError() method drops valid falsy values because it uses a truthy check
(this.data ? ...), so change the condition in toJsonRpcError to only treat
missing data as undefined (e.g., use this.data !== undefined or typeof this.data
!== "undefined") when deciding to include details; update the object
construction in toJsonRpcError to include { details: this.data } whenever data
is defined (including 0, false, ""), referencing the toJsonRpcError function and
the this.data field.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@packages/a2a-server/src/server.ts`:
- Around line 289-338: The catch block after agent.generateText must mark the
task record as failed and persist it before rethrowing: when catching a
non-abort error, call transitionStatus(record, { state: "failed", message: /* a
short failure message or error info */ }) to update the record, save it with
taskStore.save({ agentId, data: record }), then rethrow the error; update the
catch path that currently only checks abortController.signal.aborted (around
agent.generateText, registerActiveOperation, ensureCanceledRecord,
transitionStatus and taskStore.save usage) to perform these steps so the task
doesn't stay in "working".
- Around line 694-716: The current validateMessageSendParams function performs
partial, hand-rolled checks that miss fields like kind, role, and messageId;
replace this with the existing Zod schema by importing A2AMessageSchema and
using it to parse/validate the incoming payload (or at least candidate.message)
so malformed messages are rejected. In practice, remove the manual checks inside
validateMessageSendParams, cast payload to unknown→object only for typing, call
A2AMessageSchema.parse(payload or payload.message) and let Zod throw on invalid
data, then return the validated value as MessageSendParams; keep the function
name validateMessageSendParams and ensure any thrown Zod errors are converted to
VoltA2AError.invalidParams as needed to preserve error semantics.

In `@packages/a2a-server/src/types.ts`:
- Around line 234-243: The toJsonRpcError() method drops valid falsy values
because it uses a truthy check (this.data ? ...), so change the condition in
toJsonRpcError to only treat missing data as undefined (e.g., use this.data !==
undefined or typeof this.data !== "undefined") when deciding to include details;
update the object construction in toJsonRpcError to include { details: this.data
} whenever data is defined (including 0, false, ""), referencing the
toJsonRpcError function and the this.data field.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: bea12016-12e0-42a8-99c6-64cd47c38816

📥 Commits

Reviewing files that changed from the base of the PR and between a80210c and 0fca0dd.

📒 Files selected for processing (6)
  • examples/with-valkey-store/src/agents/assistant.ts
  • packages/a2a-server/src/schemas.ts
  • packages/a2a-server/src/server.ts
  • packages/a2a-server/src/types.ts
  • packages/a2a-server/src/valkey-store.ts
  • packages/resumable-streams/src/resumable-streams.ts
🚧 Files skipped from review as they are similar to previous changes (3)
  • examples/with-valkey-store/src/agents/assistant.ts
  • packages/resumable-streams/src/resumable-streams.ts
  • packages/a2a-server/src/valkey-store.ts

- Mark task as failed and persist before rethrowing on non-abort errors
  in handleMessageSend so tasks don't stay stuck in "working" state
- Replace hand-rolled checks in validateMessageSendParams with Zod
  schema validation (A2AMessageSchema) to reject malformed messages
- Use strict undefined check in toJsonRpcError to preserve falsy data
  values (0, false, "") in serialized error details

Signed-off-by: Riley Des <riley.desserre@improving.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
packages/a2a-server/src/server.ts (1)

711-724: ⚡ Quick win

Partial validation with as unknown as MessageSendParams bypasses type safety for non-message fields.

Only candidate.message is schema-validated, but the entire payload is double-cast through unknown to MessageSendParams. Fields like id, sessionId, metadata, and configuration accept arbitrary runtime values with no type guard. Consider introducing a MessageSendParamsSchema that wraps A2AMessageSchema and validates the full params shape, then return the parsed result directly.

As per coding guidelines: "Maintain type safety in TypeScript-first codebase."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/a2a-server/src/server.ts` around lines 711 - 724, The
validateMessageSendParams function currently only validates candidate.message
using A2AMessageSchema and then force-casts the whole payload to
MessageSendParams, which bypasses runtime checks for id, sessionId, metadata,
configuration, etc.; replace this by creating a MessageSendParamsSchema
(wrapping/including A2AMessageSchema for the message field) and use
MessageSendParamsSchema.parse(candidate) to validate the entire shape, then
return the parsed result (typed as MessageSendParams); keep
VoltA2AError.invalidParams for parse failures by catching the parse error and
rethrowing with its message as is done now.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/a2a-server/src/server.ts`:
- Around line 351-355: The failure path appends failureMessage and transitions
the record then calls taskStore.save which may throw and mask the original agent
error; update the block around taskStore.save({ agentId, data: record }) to
catch any exceptions from the save (e.g., try { await taskStore.save(...) }
catch (saveErr) { processLogger?.warn(...) or swallow/saveErr } ) and ensure the
original thrown variable error is rethrown unconditionally (throw error) after
the save attempt; keep the calls to appendMessage and transitionStatus as-is and
reference appendMessage, transitionStatus, taskStore.save, agentId, record and
error when making the change.

---

Nitpick comments:
In `@packages/a2a-server/src/server.ts`:
- Around line 711-724: The validateMessageSendParams function currently only
validates candidate.message using A2AMessageSchema and then force-casts the
whole payload to MessageSendParams, which bypasses runtime checks for id,
sessionId, metadata, configuration, etc.; replace this by creating a
MessageSendParamsSchema (wrapping/including A2AMessageSchema for the message
field) and use MessageSendParamsSchema.parse(candidate) to validate the entire
shape, then return the parsed result (typed as MessageSendParams); keep
VoltA2AError.invalidParams for parse failures by catching the parse error and
rethrowing with its message as is done now.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e2c67f02-05e5-4233-9baa-44c0613bc5ef

📥 Commits

Reviewing files that changed from the base of the PR and between 0fca0dd and ee37043.

📒 Files selected for processing (2)
  • packages/a2a-server/src/server.ts
  • packages/a2a-server/src/types.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/a2a-server/src/types.ts

Comment thread packages/a2a-server/src/server.ts
Signed-off-by: Riley Des <riley.desserre@improving.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant