Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/migrations/v1_0/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,24 @@ async for chunk in client.send_message(request):
...
```

Alternatively, use the `ArtifactsAggregator` helper to automatically assemble artifacts from the stream:

```python
from a2a.client import ArtifactsAggregator

# Assemble a single artifact by ID
# Note: each ArtifactsAggregator instance consumes the stream once — create a new instance per operation
aggregator = ArtifactsAggregator.from_stream(client.send_message(request))
artifact = await aggregator.get_artifact('my-artifact-id')
if artifact is not None:
for part in artifact.parts:
print(part.text)


# Or assemble all artifacts from the stream — requires a new request and a new aggregator instance
aggregator = ArtifactsAggregator.from_stream(client.send_message(request))
artifacts = await aggregator.get_all_artifacts()
```

---

Expand Down
2 changes: 2 additions & 0 deletions src/a2a/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Client-side components for interacting with an A2A agent."""

from a2a.client.artifact_aggregator import ArtifactsAggregator
from a2a.client.auth import (
AuthInterceptor,
CredentialService,
Expand Down Expand Up @@ -30,6 +31,7 @@
'A2AClientError',
'A2AClientTimeoutError',
'AgentCardResolutionError',
'ArtifactsAggregator',
'AuthInterceptor',
'BaseClient',
'Client',
Expand Down
96 changes: 96 additions & 0 deletions src/a2a/client/artifact_aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from collections.abc import AsyncIterator

from a2a.types import Artifact, StreamResponse


class ArtifactsAggregator:
"""Client-side utility for assembling Artifact objects from a stream of StreamResponse events.

Interprets the append and last_chunk flags of TaskArtifactUpdateEvent to
reconstruct complete artifacts from chunked streaming responses. Each instance
wraps a single stream that can be consumed only once.
"""

def __init__(self, stream: AsyncIterator[StreamResponse]) -> None:
self._stream = stream

@classmethod
def from_stream(

Check failure on line 18 in src/a2a/client/artifact_aggregator.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

ruff (D102)

src/a2a/client/artifact_aggregator.py:18:9: D102 Missing docstring in public method
cls, stream: AsyncIterator[StreamResponse]
) -> 'ArtifactsAggregator':
return cls(stream)

async def get_artifact(self, artifact_id: str) -> Artifact | None:
"""Assemble and return a single Artifact by its ID from the stream.

Iterates over the stream and collects all parts belonging to the artifact
with the given ID, stopping when last_chunk is True.

Args:
artifact_id: The ID of the artifact to assemble.

Returns:
The assembled Artifact with all collected parts,
or None if the artifact_id was not found in the stream.

Note:
Consumes the stream. Do not call this method and get_all_artifacts
on the same instance.
"""
artifact = None

async for event in self._stream:
if not event.HasField('artifact_update'):
continue

if event.artifact_update.artifact.artifact_id == artifact_id:
if artifact is None or not event.artifact_update.append:
artifact = Artifact(
name=event.artifact_update.artifact.name,
description=event.artifact_update.artifact.description,
metadata=event.artifact_update.artifact.metadata,
extensions=event.artifact_update.artifact.extensions,
artifact_id=artifact_id,
)

artifact.parts.extend(event.artifact_update.artifact.parts)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using CopyFrom is more idiomatic for Protobuf messages and ensures that all fields (including any future additions to the Artifact schema) are correctly preserved without manual assignment. Additionally, using an if/else block avoids redundant extend calls on the initial chunk, as CopyFrom already includes the parts present in that event.

                if artifact is None or not event.artifact_update.append:
                    artifact = Artifact()
                    artifact.CopyFrom(event.artifact_update.artifact)
                else:
                    artifact.parts.extend(event.artifact_update.artifact.parts)

if event.artifact_update.last_chunk:
break
return artifact

async def get_all_artifacts(self) -> list[Artifact]:
"""Assemble and return all Artifacts from the stream.

Iterates over the entire stream and assembles all artifacts, handling
interleaved chunks from multiple artifacts using artifact_id as the key.
If append is False, the parts for that artifact are reset before adding
the new parts.

Returns:
A list of assembled Artifact objects.

Note:
Consumes the stream. Do not call this method and get_artifact
on the same instance.
"""
artifacts: dict[str, Artifact] = {}

async for event in self._stream:
if not event.HasField('artifact_update'):
continue

artifact_id = event.artifact_update.artifact.artifact_id

if artifact_id not in artifacts or not event.artifact_update.append:
artifacts[artifact_id] = Artifact(
name=event.artifact_update.artifact.name,
description=event.artifact_update.artifact.description,
metadata=event.artifact_update.artifact.metadata,
extensions=event.artifact_update.artifact.extensions,
artifact_id=artifact_id,
)

artifacts[artifact_id].parts.extend(
event.artifact_update.artifact.parts
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to get_artifact, leveraging CopyFrom here improves robustness against schema changes and simplifies the reconstruction logic. The if/else structure also prevents duplicating parts on the first chunk of each artifact.

            if artifact_id not in artifacts or not event.artifact_update.append:
                artifacts[artifact_id] = Artifact()
                artifacts[artifact_id].CopyFrom(event.artifact_update.artifact)
            else:
                artifacts[artifact_id].parts.extend(
                    event.artifact_update.artifact.parts
                )

return list(artifacts.values())
Loading
Loading