fix(streaming): support bytes chunks + fix async DB access in generators (#1236, #1219)#1412
Conversation
#1219) Two related StreamingResponse bugs surfaced from user reports: - #1236: yielding `bytes` raised "'bytes' object cannot be converted to 'PyString'". The Rust stream driver only extracted `str`; it now downcasts `PyBytes` first (used as-is) and falls back to UTF-8 encoding `str`, so binary streaming (file downloads, octet-stream) works. Non-str/bytes values are logged and end the stream instead of being silently dropped. - #1219: awaiting async resources (e.g. async SQLAlchemy) inside a streaming generator raised "attached to a different loop". AsyncGeneratorWrapper drove the generator on a freshly-created loop and swallowed all errors. It now drives the generator on the loop that was running when the StreamingResponse was constructed (the handler's loop, where those resources are bound) via run_coroutine_threadsafe, falling back to a dedicated background loop only for sync handlers. Generator errors now propagate (surfaced in logs) instead of silently truncating the stream. Tests: 4 unit tests (loop capture, error propagation, background-loop fallback, bytes pass-through) + 2 integration tests (sync and async bytes streaming). Existing SSE suite still green. ruff/isort/cargo fmt clean. Supersedes #1308 (which fixed only the bytes case). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (2)
📝 WalkthroughWalkthroughThis PR enables raw binary streaming in Robyn's response pipeline. The Rust stream handler gains a ChangesBinary Streaming Support
Sequence Diagram(s)sequenceDiagram
rect rgba(70, 130, 180, 0.5)
Note over Client,Actix: Binary Streaming Request Flow
end
participant Client
participant Actix as Actix-Web Handler
participant PyGen as Python Generator
participant Wrapper as AsyncGeneratorWrapper
participant RustStream as create_python_stream
Client->>Actix: GET /stream/bytes or /stream/bytes_async
Actix->>RustStream: start streaming
RustStream->>PyGen: __next__() or via Wrapper
alt async generator
PyGen->>Wrapper: Wrapper.__next__
Wrapper->>Wrapper: schedule __anext__ on loop
Wrapper-->>PyGen: bytes or str chunk
else sync generator
PyGen-->>RustStream: bytes or str chunk
end
RustStream->>RustStream: PyBytes? → raw bytes / String? → UTF-8 bytes
RustStream-->>Client: chunk in octet-stream response
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
unit_tests/test_streaming_response.py (1)
51-62: ⚡ Quick winAdd a cleanup assertion for the owned background loop
This test validates fallback behavior, but not loop/thread teardown. Please also assert the owned thread stops after exhaustion to guard against thread-leak regressions.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@unit_tests/test_streaming_response.py` around lines 51 - 62, After the assertion that validates the wrapper yields the expected values in the test_wrapper_without_running_loop_uses_background_loop function, add an additional assertion to verify that the background loop/thread owned by AsyncGeneratorWrapper (indicated by _owns_loop being True) is properly cleaned up and stopped after the generator is exhausted. This ensures the wrapper does not leak the background thread resource after the generator completes iteration.integration_tests/test_binary_streaming.py (1)
17-21: ⚡ Quick winAssert
Content-Typefor the async bytes endpoint tooThe sync case checks header contract; the async case should mirror it to catch regression in response metadata.
Test delta
def test_stream_bytes_async(session): @@ r = requests.get(f"{BASE_URL}/stream/bytes_async", timeout=TIMEOUT) assert r.status_code == 200 + assert r.headers.get("Content-Type") == "application/octet-stream" assert r.content == EXPECTED🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@integration_tests/test_binary_streaming.py` around lines 17 - 21, The test_stream_bytes_async function is missing a Content-Type header assertion to match the sync case (test_stream_bytes) and ensure the response metadata contract is maintained. Add an assert statement to verify the Content-Type header value in the response object r.headers after the existing status_code assertion, using the same expected content type that the sync test validates.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@robyn/responses.py`:
- Around line 95-96: The daemon thread created by self._thread at lines 95-96 is
only stopped via the _finish() method, but if streaming terminates early before
the wrapper is fully exhausted, _finish() may never be called, leaving the
thread alive. Ensure the thread is properly cleaned up by implementing resource
management that guarantees _finish() is called regardless of how streaming ends,
such as using a context manager pattern with __enter__ and __exit__ methods, or
implementing __del__ to clean up the _thread, or wrapping the streaming logic
with try/finally to ensure cleanup happens on all code paths.
---
Nitpick comments:
In `@integration_tests/test_binary_streaming.py`:
- Around line 17-21: The test_stream_bytes_async function is missing a
Content-Type header assertion to match the sync case (test_stream_bytes) and
ensure the response metadata contract is maintained. Add an assert statement to
verify the Content-Type header value in the response object r.headers after the
existing status_code assertion, using the same expected content type that the
sync test validates.
In `@unit_tests/test_streaming_response.py`:
- Around line 51-62: After the assertion that validates the wrapper yields the
expected values in the test_wrapper_without_running_loop_uses_background_loop
function, add an additional assertion to verify that the background loop/thread
owned by AsyncGeneratorWrapper (indicated by _owns_loop being True) is properly
cleaned up and stopped after the generator is exhausted. This ensures the
wrapper does not leak the background thread resource after the generator
completes iteration.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: a6c75185-22ea-4dae-9fc1-e33ad77ba86d
📒 Files selected for processing (5)
integration_tests/base_routes.pyintegration_tests/test_binary_streaming.pyrobyn/responses.pysrc/types/response.rsunit_tests/test_streaming_response.py
… generators Adds a "Streaming raw bytes" section to the streaming docs covering StreamingResponse with bytes chunks (file downloads / octet-stream, #1236) and awaiting async resources (DB sessions, HTTP clients) inside an async generator (#1219). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The owned-loop branch of AsyncGeneratorWrapper started a daemon thread running the loop, but it was only stopped in _finish() — which never runs if streaming ends before exhaustion (client disconnect, or Rust ending the stream on an unsupported chunk type), leaking the thread. - The thread target and the cleanup now use staticmethods that take the loop as an argument instead of bound methods, so they don't keep the wrapper alive (a bound-method target would have created a self-reference that prevents GC). - A weakref.finalize stops the loop when the wrapper is dropped, so the loop is closed and the thread exits even on early termination. _finish() calls it for the prompt/normal path. Adds a unit test that abandons the stream after one chunk and asserts the background thread is joined. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Good catch on the background-loop thread leak — fixed in bd0cf84.
Added a unit test that abandons the stream after one chunk and asserts the background thread is joined. Also verified the fix end-to-end against a live server: streaming a generator that |
Fixes two related
StreamingResponsebugs that came out of user reports.#1236 — yielding
bytescrashedStreaming binary data (
yield b"...") raised'bytes' object cannot be converted to 'PyString'because the Rust stream driver only extractedstr. It now downcastsPyBytesfirst (used as-is) and falls back to UTF-8-encodingstr, so file downloads /application/octet-streamwork. A value that's neitherstrnorbytesis logged and ends the stream instead of being silently dropped.#1219 — awaiting async DB inside a generator crashed
await session.execute(...)(async SQLAlchemy) inside a streaming generator raisedTask ... attached to a different loop.AsyncGeneratorWrapperdrove the generator on a freshly-created event loop and swallowed every error (print+raise StopIteration), so the stream silently truncated.It now drives the generator on the loop that was running when the
StreamingResponsewas constructed — i.e. the handler's loop, where the async resources are bound — viarun_coroutine_threadsafe, falling back to a dedicated background loop only for sync handlers. Generator errors now propagate (and show up in the server logs) instead of disappearing.Tests
unit_tests/test_streaming_response.py): generator runs on the constructing loop, errors propagate, background-loop fallback for sync handlers, bytes pass-through.integration_tests/test_binary_streaming.py): sync and asyncbytesstreaming.ruff/ruff format/isort/cargo fmt --checkall clean.Supersedes #1308 (which fixed only the bytes case) — happy to close that in favour of this.
Closes #1236
Closes #1219
🤖 Generated with Claude Code
Summary by CodeRabbit
bytes(including async streaming) for binary downloads.StreamingResponse.