drpcstream: introduce shared BufferPool for ring buffer#55
Conversation
|
I had an idea which I ran through claude and below is the summary of it. I'm not planning to do it but in future we can re-consider if profiling shows any gain. Buffer pool: further optimization ideas Right now the data copy chain for an incoming message looks like this:
We could eliminate copy #2 by having the packet assembler get its buffer from the pool directly. The assembler already has a TODO for buffer reuse. Instead of reusing its own backing array across packets (lines 84-87), it would Another idea: size-bucketed pools (e.g. 1KiB, 16KiB, 32KiB) so that I think we should keep the pool simple for now. The assembler integration is the more interesting optimization since it removes a full copy per message. Worth revisiting once we have benchmarks to measure the actual impact. |
a17330d to
b91bf1b
Compare
cafa1dc to
b3d2355
Compare
b91bf1b to
a58986c
Compare
b3d2355 to
38c84dc
Compare
38c84dc to
f2f767f
Compare
f2f767f to
a305254
Compare
| } | ||
|
|
||
| rb.buf[rb.head] = append(rb.buf[rb.head][:0], data...) | ||
| b := rb.pool.Get() |
There was a problem hiding this comment.
These can me move outside the lock now.
| data = append([]byte(nil), data...) | ||
| s.recvQueue.Done() | ||
| data = append([]byte(nil), *b...) | ||
| s.pool.Put(b) |
There was a problem hiding this comment.
We should continue to use s.recvQueue.Done(). Refer to other comments for context.
a305254 to
f5edd92
Compare
cthumuluru-crdb
left a comment
There was a problem hiding this comment.
Added a couple of comments. Besides that the changes look good.
| head int // next write position (producer) | ||
| tail int // next read position (consumer) | ||
| count int // number of occupied slots | ||
| pool *BufferPool // shared pool buffers are obtained from |
There was a problem hiding this comment.
nit: in case of Close() we are no longer waiting for the held buffer to be released. I don't see any risk in that but that would also mean, the consumer may return the buffer back to the pool after the packet buffer is closed (maybe due to context cancellation). The buffer should still be returned to the pool since the pool is shared across streams and is at the connection level.
Can you add a comment that buffer pool is shared across streams and is owned by the manager and not by ring buffer (just to be more specific about the ownership)?
There was a problem hiding this comment.
I will add the comment but as for the rest of the comment, i couldn't understand the problem outlined.
There was a problem hiding this comment.
I see you were talking about adding the comment only. Updated.
| // increasing stream ids within a single transport. | ||
| func New(ctx context.Context, sid uint64, wr *drpcwire.MuxWriter) *Stream { | ||
| return NewWithOptions(ctx, sid, wr, Options{}) | ||
| func New(ctx context.Context, sid uint64, wr *drpcwire.MuxWriter, pool *BufferPool) *Stream { |
There was a problem hiding this comment.
Since buffer pool is an internal option to a stream, I prefer Internal options to pass it around.
There was a problem hiding this comment.
I don't understand this one. It's not an optional parameter so I don't see the value in wrapping it behind Internal struct.
| m.pendingStreams = make(map[uint64]*pendingStream) | ||
|
|
||
| m.streams = newActiveStreams() | ||
| m.recvPool = drpcstream.NewBufferPool() |
There was a problem hiding this comment.
Since manager uses it, how about we move it to the package where manager belongs? Also, keeping the name as buffer pool would enable us to use it for send buffer if needed in the future.
There was a problem hiding this comment.
I want to move it to drpcwire in an upcoming PR as it's a lower in dependency hierarchy and contains similar fundamental pieces. I don't want to move it to drpcmanager or drpcstream because we have to create interfaces all over the place.
Add a BufferPool backed by sync.Pool that is shared across all streams within a Manager. The ring buffer now obtains a pooled buffer on Enqueue and copies the message into it, instead of growing a fixed slice per slot. Dequeue hands the buffer's data to the consumer and advances the tail immediately, and Done releases the buffer back to the pool once the consumer is finished with it. Keeping the Dequeue/Done contract means the consumer works with a plain []byte and never has to know whether the queue is backed by a pool or by fixed buffers. Because Dequeue advances the tail right away rather than waiting for Done, Close no longer has to block until in-flight buffers are released. The pool is a required parameter in the Stream constructor, created once per Manager and passed to all streams it creates. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
f5edd92 to
03f1339
Compare
Add a BufferPool backed by sync.Pool that is shared across all streams
within a Manager. The ring buffer now obtains buffers from the pool on
Enqueue and transfers ownership to the caller on Dequeue, which advances
the tail immediately. This removes the two-step Dequeue/Done protocol
and simplifies Close (no longer needs to wait for held buffers).
The pool is a required parameter in the Stream constructor, created once
per Manager and passed to all streams it creates.