Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 74 additions & 57 deletions src/http2_server.jl
Original file line number Diff line number Diff line change
Expand Up @@ -730,58 +730,71 @@ function _write_data_frames_h2_server!(
end
max_frame_size = min(Int(_H2_SERVER_MAX_DATA_FRAME_SIZE), peer_max)
write_deadline_ns > 0 && _set_write_deadline!(conn, write_deadline_ns)
# Each "batch" drains as much of the body as the peer's current send
# window allows, builds it into a single contiguous byte buffer
# (interleaved 9-byte DATA-frame headers + payload slices), and writes
# the whole batch to the socket under one lock acquisition. The previous
# implementation paid one IOBuffer allocation, one frame-header
# allocation, and one socket write per frame; this version pays one
# allocation and one socket write per *batch*.
# Each "batch" drains as much of the body as the peer's current send window
# allows and emits it as DATA frames. Each frame is written as a 9-byte
# header (from a single reused scratch buffer) followed by the payload slice
# taken DIRECTLY from `data` (zero-copy: a unit-range view of a Vector{UInt8}
# or Base.CodeUnits is a stride-1 StridedVector, so `write(conn, ::view)`
# passes the underlying pointer straight to the socket — exactly like the
# HTTP/1 body path).
#
# The previous implementation allocated a fresh Vector sized to the whole
# windowed body on EVERY response and copied the body into it. Under HTTP/2's
# per-stream `@spawn` those large, short-lived allocations are retained by the
# glibc malloc per-thread arenas and never returned to the OS, so process RSS
# climbs in proportion to response body size even though the Julia heap stays
# flat. Writing the body in place removes that per-response allocation; the
# only buffer here is the reused 9-byte frame header.
header = Vector{UInt8}(undef, 9)
while offset <= total_len
# Probe the current allowed window with the size remaining.
# Reservation is bounded by stream/conn windows only; per-frame
# max-frame-size is applied below when slicing the reservation
# into DATA frames.
# into DATA frames. Reservation may block on a peer WINDOW_UPDATE,
# so it must happen OUTSIDE write_lock.
first_chunk = _reserve_h2_send_window!(send_state, stream_id, total_len - offset + 1, write_deadline_ns)
first_chunk <= 0 && continue
# Pre-size an output Vector that fits this batch exactly.
n_frames_first = cld(first_chunk, max_frame_size)
batch_len = first_chunk + 9 * n_frames_first
out = Vector{UInt8}(undef, batch_len)
out_pos = 1
# Emit frames covering this reservation.
remaining_in_res = first_chunk
while remaining_in_res > 0
payload = min(remaining_in_res, max_frame_size, total_len - offset + 1)
final_chunk = (offset + payload - 1) == total_len
_stamp_h2_data_header!(out, out_pos, payload, end_stream && final_chunk, stream_id)
out_pos += 9
_copy_h2_payload!(out, out_pos, data, offset, payload)
out_pos += payload
offset += payload
remaining_in_res -= payload
end
# IMPORTANT: write the bytes we have in `out` BEFORE attempting
# another reservation. `_reserve_h2_send_window!` blocks if the
# peer's send window is exhausted, and the peer won't send
# WINDOW_UPDATE until it has received the bytes we just buffered.
# Trying to "drain more before writing" deadlocks against tests
# like `HTTP/2 server honors peer response-header filtering and
# stream flow control` that intentionally use a small initial
# window and gate further DATA on a WINDOW_UPDATE frame.
if out_pos - 1 != length(out)
resize!(out, out_pos - 1)
end
# IMPORTANT: write all frames for this reservation BEFORE attempting
# another reservation. `_reserve_h2_send_window!` blocks if the peer's
# send window is exhausted, and the peer won't send WINDOW_UPDATE until
# it has received the bytes we just wrote. We hold write_lock across the
# whole reservation so its frames stay contiguous on the wire.
lock(write_lock)
try
_write_all_h2_server!(conn, out)
remaining_in_res = first_chunk
while remaining_in_res > 0
payload = min(remaining_in_res, max_frame_size, total_len - offset + 1)
final_chunk = (offset + payload - 1) == total_len
_stamp_h2_data_header!(header, 1, payload, end_stream && final_chunk, stream_id)
_write_all_h2_server!(conn, header)
_write_h2_payload_slice!(conn, data, offset, payload)
offset += payload
remaining_in_res -= payload
end
finally
unlock(write_lock)
end
end
return nothing
end

# Write `n` bytes of the response body starting at `data[offset]` straight to the
# socket. For the body types the response path actually uses (Vector{UInt8} and
# Base.CodeUnits, both DenseVector) a unit-range view is a stride-1 StridedVector,
# so `write(conn, ::view)` hits the zero-copy pointer path. The generic fallback
# materializes only the slice (never the whole body) for any other AbstractVector.
@inline function _write_h2_payload_slice!(
conn::Union{TCP.Conn,TLS.Conn},
data::AbstractVector{UInt8},
offset::Int,
n::Int,
)::Nothing
slice = @view data[offset:(offset + n - 1)]
nw = write(conn, slice)
nw == n || throw(ProtocolError("h2 server payload write made no progress"))
return nothing
end

function _h2_server_has_active_streams(states_lock::ReentrantLock, states::Dict{UInt32,_H2ServerStreamState})::Bool
lock(states_lock)
try
Expand Down Expand Up @@ -1261,13 +1274,11 @@ end
body_end_stream::Bool,
total_len::Int,
)::Nothing
# Build a single contiguous buffer holding the optional HEADERS frame
# plus interleaved DATA frame headers and payload slices, then issue
# one socket write. Empirically this matches the throughput of the
# `Reseau.TCP.writev` (sendmsg/iovec) path on this workload while
# avoiding the Reseau version dependency. The Reseau `writev` API is
# still useful for other protocol implementations (gRPC, QUIC) where
# the cost of the body memcpy here would dominate.
# Emit the optional HEADERS frame and the DATA frames for this batch,
# writing each frame's payload directly from `body` (no body copy). See
# `_write_h2_batch_via_single_buffer!` for the per-response-allocation
# rationale. A future `Reseau.TCP.writev` (sendmsg/iovec) could coalesce the
# frame headers and payload slices into a single vectored syscall.
_write_h2_batch_via_single_buffer!(conn, stream_id, body,
headers_bytes, body_offset, body_len, server_max, body_end_stream, total_len)
return nothing
Expand All @@ -1284,27 +1295,33 @@ end
body_end_stream::Bool,
total_len::Int,
)::Nothing
n_data_frames = cld(body_len, server_max)
out_len = (headers_bytes === nothing ? 0 : length(headers_bytes)) + body_len + 9 * n_data_frames
out = Vector{UInt8}(undef, out_len)
out_pos = 1
if headers_bytes !== nothing
unsafe_copyto!(out, out_pos, headers_bytes, 1, length(headers_bytes))
out_pos += length(headers_bytes)
end
# Caller holds write_lock. Emit the optional HEADERS frame, then each DATA
# frame as a 9-byte header (reused scratch) followed by its payload slice
# taken DIRECTLY from `body` (zero-copy: a unit-range view of a Vector{UInt8}
# or Base.CodeUnits is a stride-1 StridedVector, so `write(conn, ::view)`
# passes the underlying pointer straight to the socket).
#
# The previous implementation built one body-sized buffer (HEADERS + DATA
# headers + a full copy of the body) and issued a single write. That copy is
# the dominant per-response allocation on the buffered-response fast path;
# under HTTP/2's per-stream `@spawn` the freed buffers are retained by the
# glibc malloc arenas and inflate process RSS in proportion to response body
# size (HTTP/2-only — the HTTP/1 path writes the body in place). Writing the
# body in place here removes that allocation; only the small HEADERS-frame
# bytes and the reused 9-byte DATA header remain.
headers_bytes === nothing || _write_all_h2_server!(conn, headers_bytes)
header = Vector{UInt8}(undef, 9)
rem = body_len
cur = body_offset
while rem > 0
payload = min(rem, server_max, total_len - cur + 1)
final_chunk = (cur + payload - 1) == total_len
_stamp_h2_data_header!(out, out_pos, payload, body_end_stream && final_chunk, stream_id)
out_pos += 9
_copy_h2_payload!(out, out_pos, body, cur, payload)
out_pos += payload
_stamp_h2_data_header!(header, 1, payload, body_end_stream && final_chunk, stream_id)
_write_all_h2_server!(conn, header)
_write_h2_payload_slice!(conn, body, cur, payload)
cur += payload
rem -= payload
end
write(conn, out)
return nothing
end

Expand Down
Loading