diff --git a/src/http2_server.jl b/src/http2_server.jl index e388778b9..4c7f335ab 100644 --- a/src/http2_server.jl +++ b/src/http2_server.jl @@ -730,51 +730,47 @@ function _write_data_frames_h2_server!( end max_frame_size = min(Int(_H2_SERVER_MAX_DATA_FRAME_SIZE), peer_max) write_deadline_ns > 0 && _set_write_deadline!(conn, write_deadline_ns) - # Each "batch" drains as much of the body as the peer's current send - # window allows, builds it into a single contiguous byte buffer - # (interleaved 9-byte DATA-frame headers + payload slices), and writes - # the whole batch to the socket under one lock acquisition. The previous - # implementation paid one IOBuffer allocation, one frame-header - # allocation, and one socket write per frame; this version pays one - # allocation and one socket write per *batch*. + # Each "batch" drains as much of the body as the peer's current send window + # allows and emits it as DATA frames. Each frame is written as a 9-byte + # header (from a single reused scratch buffer) followed by the payload slice + # taken DIRECTLY from `data` (zero-copy: a unit-range view of a Vector{UInt8} + # or Base.CodeUnits is a stride-1 StridedVector, so `write(conn, ::view)` + # passes the underlying pointer straight to the socket — exactly like the + # HTTP/1 body path). + # + # The previous implementation allocated a fresh Vector sized to the whole + # windowed body on EVERY response and copied the body into it. Under HTTP/2's + # per-stream `@spawn` those large, short-lived allocations are retained by the + # glibc malloc per-thread arenas and never returned to the OS, so process RSS + # climbs in proportion to response body size even though the Julia heap stays + # flat. Writing the body in place removes that per-response allocation; the + # only buffer here is the reused 9-byte frame header. + header = Vector{UInt8}(undef, 9) while offset <= total_len # Probe the current allowed window with the size remaining. # Reservation is bounded by stream/conn windows only; per-frame # max-frame-size is applied below when slicing the reservation - # into DATA frames. + # into DATA frames. Reservation may block on a peer WINDOW_UPDATE, + # so it must happen OUTSIDE write_lock. first_chunk = _reserve_h2_send_window!(send_state, stream_id, total_len - offset + 1, write_deadline_ns) first_chunk <= 0 && continue - # Pre-size an output Vector that fits this batch exactly. - n_frames_first = cld(first_chunk, max_frame_size) - batch_len = first_chunk + 9 * n_frames_first - out = Vector{UInt8}(undef, batch_len) - out_pos = 1 - # Emit frames covering this reservation. - remaining_in_res = first_chunk - while remaining_in_res > 0 - payload = min(remaining_in_res, max_frame_size, total_len - offset + 1) - final_chunk = (offset + payload - 1) == total_len - _stamp_h2_data_header!(out, out_pos, payload, end_stream && final_chunk, stream_id) - out_pos += 9 - _copy_h2_payload!(out, out_pos, data, offset, payload) - out_pos += payload - offset += payload - remaining_in_res -= payload - end - # IMPORTANT: write the bytes we have in `out` BEFORE attempting - # another reservation. `_reserve_h2_send_window!` blocks if the - # peer's send window is exhausted, and the peer won't send - # WINDOW_UPDATE until it has received the bytes we just buffered. - # Trying to "drain more before writing" deadlocks against tests - # like `HTTP/2 server honors peer response-header filtering and - # stream flow control` that intentionally use a small initial - # window and gate further DATA on a WINDOW_UPDATE frame. - if out_pos - 1 != length(out) - resize!(out, out_pos - 1) - end + # IMPORTANT: write all frames for this reservation BEFORE attempting + # another reservation. `_reserve_h2_send_window!` blocks if the peer's + # send window is exhausted, and the peer won't send WINDOW_UPDATE until + # it has received the bytes we just wrote. We hold write_lock across the + # whole reservation so its frames stay contiguous on the wire. lock(write_lock) try - _write_all_h2_server!(conn, out) + remaining_in_res = first_chunk + while remaining_in_res > 0 + payload = min(remaining_in_res, max_frame_size, total_len - offset + 1) + final_chunk = (offset + payload - 1) == total_len + _stamp_h2_data_header!(header, 1, payload, end_stream && final_chunk, stream_id) + _write_all_h2_server!(conn, header) + _write_h2_payload_slice!(conn, data, offset, payload) + offset += payload + remaining_in_res -= payload + end finally unlock(write_lock) end @@ -782,6 +778,23 @@ function _write_data_frames_h2_server!( return nothing end +# Write `n` bytes of the response body starting at `data[offset]` straight to the +# socket. For the body types the response path actually uses (Vector{UInt8} and +# Base.CodeUnits, both DenseVector) a unit-range view is a stride-1 StridedVector, +# so `write(conn, ::view)` hits the zero-copy pointer path. The generic fallback +# materializes only the slice (never the whole body) for any other AbstractVector. +@inline function _write_h2_payload_slice!( + conn::Union{TCP.Conn,TLS.Conn}, + data::AbstractVector{UInt8}, + offset::Int, + n::Int, +)::Nothing + slice = @view data[offset:(offset + n - 1)] + nw = write(conn, slice) + nw == n || throw(ProtocolError("h2 server payload write made no progress")) + return nothing +end + function _h2_server_has_active_streams(states_lock::ReentrantLock, states::Dict{UInt32,_H2ServerStreamState})::Bool lock(states_lock) try @@ -1261,13 +1274,11 @@ end body_end_stream::Bool, total_len::Int, )::Nothing - # Build a single contiguous buffer holding the optional HEADERS frame - # plus interleaved DATA frame headers and payload slices, then issue - # one socket write. Empirically this matches the throughput of the - # `Reseau.TCP.writev` (sendmsg/iovec) path on this workload while - # avoiding the Reseau version dependency. The Reseau `writev` API is - # still useful for other protocol implementations (gRPC, QUIC) where - # the cost of the body memcpy here would dominate. + # Emit the optional HEADERS frame and the DATA frames for this batch, + # writing each frame's payload directly from `body` (no body copy). See + # `_write_h2_batch_via_single_buffer!` for the per-response-allocation + # rationale. A future `Reseau.TCP.writev` (sendmsg/iovec) could coalesce the + # frame headers and payload slices into a single vectored syscall. _write_h2_batch_via_single_buffer!(conn, stream_id, body, headers_bytes, body_offset, body_len, server_max, body_end_stream, total_len) return nothing @@ -1284,27 +1295,33 @@ end body_end_stream::Bool, total_len::Int, )::Nothing - n_data_frames = cld(body_len, server_max) - out_len = (headers_bytes === nothing ? 0 : length(headers_bytes)) + body_len + 9 * n_data_frames - out = Vector{UInt8}(undef, out_len) - out_pos = 1 - if headers_bytes !== nothing - unsafe_copyto!(out, out_pos, headers_bytes, 1, length(headers_bytes)) - out_pos += length(headers_bytes) - end + # Caller holds write_lock. Emit the optional HEADERS frame, then each DATA + # frame as a 9-byte header (reused scratch) followed by its payload slice + # taken DIRECTLY from `body` (zero-copy: a unit-range view of a Vector{UInt8} + # or Base.CodeUnits is a stride-1 StridedVector, so `write(conn, ::view)` + # passes the underlying pointer straight to the socket). + # + # The previous implementation built one body-sized buffer (HEADERS + DATA + # headers + a full copy of the body) and issued a single write. That copy is + # the dominant per-response allocation on the buffered-response fast path; + # under HTTP/2's per-stream `@spawn` the freed buffers are retained by the + # glibc malloc arenas and inflate process RSS in proportion to response body + # size (HTTP/2-only — the HTTP/1 path writes the body in place). Writing the + # body in place here removes that allocation; only the small HEADERS-frame + # bytes and the reused 9-byte DATA header remain. + headers_bytes === nothing || _write_all_h2_server!(conn, headers_bytes) + header = Vector{UInt8}(undef, 9) rem = body_len cur = body_offset while rem > 0 payload = min(rem, server_max, total_len - cur + 1) final_chunk = (cur + payload - 1) == total_len - _stamp_h2_data_header!(out, out_pos, payload, body_end_stream && final_chunk, stream_id) - out_pos += 9 - _copy_h2_payload!(out, out_pos, body, cur, payload) - out_pos += payload + _stamp_h2_data_header!(header, 1, payload, body_end_stream && final_chunk, stream_id) + _write_all_h2_server!(conn, header) + _write_h2_payload_slice!(conn, body, cur, payload) cur += payload rem -= payload end - write(conn, out) return nothing end