Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/blobstore/buffer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"cas_reader_buffer.go",
"cas_validating_chunk_reader.go",
"cas_validating_reader.go",
"chunk_buffer_pool.go",
"chunk_reader.go",
"chunk_reader_backed_reader.go",
"common_conversions.go",
Expand Down Expand Up @@ -55,6 +56,8 @@ go_test(
"new_proto_buffer_from_proto_test.go",
"new_validated_buffer_from_byte_slice_test.go",
"new_validated_buffer_from_reader_at_test.go",
"reader_backed_chunk_reader_bench_test.go",
"reader_backed_chunk_reader_test.go",
"with_error_handler_test.go",
],
deps = [
Expand Down
26 changes: 26 additions & 0 deletions pkg/blobstore/buffer/chunk_buffer_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package buffer

import "sync"

// chunkBufferPool caches []byte buffers used by ChunkReader
// implementations that allocate a fixed-size working buffer per
// stream. Buffers are stored as *[]byte to avoid the interface-boxing
// allocation that occurs when storing a non-pointer value in
// sync.Pool.
var chunkBufferPool sync.Pool

func getChunkBuffer(size int) *[]byte {
if v := chunkBufferPool.Get(); v != nil {
b := v.(*[]byte)
if cap(*b) >= size {
*b = (*b)[:size]
return b
}
}
b := make([]byte, size)
return &b
}

func putChunkBuffer(b *[]byte) {
chunkBufferPool.Put(b)
}
6 changes: 6 additions & 0 deletions pkg/blobstore/buffer/chunk_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (
// responsible for providing space for the data. This interface is
// similar to how frame-based transfer protocols work, including the
// Bytestream protocol used by REv2.
//
// The byte slice returned by Read() is only guaranteed to be valid
// until the next call to Read() or Close() on the same ChunkReader.
// Implementations are free to reuse the backing storage across Read()
// calls in order to minimize allocations. Callers that need to retain
// the contents past the next Read()/Close() must copy them.
type ChunkReader interface {
Read() ([]byte, error)
Close()
Expand Down
9 changes: 9 additions & 0 deletions pkg/blobstore/buffer/multiplexed_chunk_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ func newMultiplexedChunkReader(r ChunkReader, additionalConsumers int) ChunkRead
}

func (r *multiplexedChunkReader) readAndShareWithOthers(currentConsumerContinues int) ([]byte, error) {
// All waiting consumers receive the exact same slice returned by
// the underlying ChunkReader.Read(). This is safe with respect
// to the "valid until next Read()/Close()" contract because the
// pendingConsumers counter ensures every consumer must call
// Read() (or Close()) again before another underlying Read()
// can be issued. By the time the next underlying Read()
// executes, every consumer has therefore released its reference
// to the previously returned slice, so the underlying reader is
// free to reuse the backing storage.
data, err := r.r.Read()
for _, c := range r.waitingConsumers {
c <- readResult{data: data, err: err}
Expand Down
12 changes: 10 additions & 2 deletions pkg/blobstore/buffer/reader_backed_chunk_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type readerBackedChunkReader struct {
r io.ReadCloser
maximumChunkSizeBytes int

buf *[]byte
err error
}

Expand All @@ -23,8 +24,11 @@ func newReaderBackedChunkReader(r io.ReadCloser, maximumChunkSizeBytes int) Chun

func (r *readerBackedChunkReader) Read() ([]byte, error) {
if r.err == nil {
b := make([]byte, r.maximumChunkSizeBytes)
n, err := io.ReadFull(r.r, b[:])
if r.buf == nil {
r.buf = getChunkBuffer(r.maximumChunkSizeBytes)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, you're trying to solve the issue that transmitting a large file causes O(n) allocations. However, with this specific change you're trying to reduce this to 'less-than-1' allocation by using a sync.Pool. Is that really necessary?

In other words, why don't we just write this instead?

r.buf = make([]byte, r.maximumChunkSizeBytes)

Sure, it's one more allocation. But are we sure that that's actually part of the bottleneck?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main issue is that bb-storage on our workload is GC-bound:
Stock bb-storage CPU flamegraph

And the biggest source of allocations is this place:
Stock bb-storage allocations flamegraph

We have all kinds of blob sizes in CAS. Many of them do fit in the default chunk size, but we also have a huge tail of blobs which are in the realm of gigabytes:
image

With this distribution of sizes it looks like we need to optimize both small and huge files, that is why I've looked into sync.Pool first, and then found that we can actually reuse one buffer (except for the gRPC part, which is really unfortunate).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, can't we first do some benchmarking to see whether making a single allocation is sufficient?

I am not a fan of using sync.Pool here. I get why sync.Pool exists. Namely, you often want to use it for holding objects that are complex to initialize. But in our case that shouldn't need to apply. We're just talking about simple byte arrays here. If we already need to use sync.Pool for storing simple byte slices, then Go's memory management must be really bad.

I also suspect that a big amount of waste is coming from the fact that we always allocate maximumChunkSizeBytes instead of something like:

max(min(maximumChunkSizeBytes, digest.GetSizeBytes()), 1)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over-allocating is wasteful, but not the kind of waste that introduces GC overhead. GC overhead is sensitive to the number of allocations and not especially to their size.

I agree that sync.Pool is a hack here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get why sync.Pool exists. Namely, you often want to use it for holding objects that are complex to initialize.

That's not exactly true. https://pkg.go.dev/sync#Pool:

Pool's purpose is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. [...]

An appropriate use of a Pool is to manage a group of temporary items silently shared among and potentially reused by concurrent independent clients of a package. Pool provides a way to amortize allocation overhead across many clients.

An example of good use of a Pool is in the fmt package, which maintains a dynamically-sized store of temporary output buffers.

https://victoriametrics.com/blog/go-sync-pool/ provides several examples where the standard library utilizes pools for simple objects. https://cs.opensource.google/go/go/+/master:src/net/http/server.go;l=857-870;drc=15b9fc2659f77608548cb279c5e0565b0664cfca;bpv=1;bpt=1 is basically our exact case — just allocating a buffer of constant size.

The point of the pool here is to make sure that we allocate less buffers overall, so GC needs to do less work overall. It is usually negligible when we have other work to do, but in case of bb-storage it might be noticeable depending on the workload.

I have done some benchmarking with both sync.Pool and without it:

  • For large blobs pool does not matter, we are getting the main benefit from reusing the buffer inside the reader.
  • For small blobs pool does matter, we are saving ~30% of GC CPU, which means ~30% of total CPU usage when we are GC bound.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also suspect that a big amount of waste is coming from the fact that we always allocate maximumChunkSizeBytes instead of something like:

max(min(maximumChunkSizeBytes, digest.GetSizeBytes()), 1)

It might be a big amount of waste from the point of allocated memory, but I wouldn't say we have significant problems in that regard. And speaking about CPU usage:

  • The main driver of GC workload is the number of objects. Having smaller objects probably improves data locality slightly, but I don't think it is significant enough.
  • Having many different sizes might actually make GC/allocator work slightly more complex, as these objects will use different size classes.
  • It is not obvious how to recycle different-sized objects via sync.Pool.

But there is another interesting angle to all of that — Go has a significantly more complex data path for allocations > 32 KB. Maybe it makes sense to decrease the size of these buffers to 32 KB to get lock-free allocations.

@matshch matshch May 26, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have conducted several more experiments:

  • Decreasing buffer sizes to 32 KB (which are better supported by Go runtime) does not help, as we are getting more overhead from more gRPC messages than from heavier allocation/GC paths.
  • sync.Pool definitely helps to reduce CPU usage compared to allocating new constant-size buffers each time, especially in case of smaller blobs, as we can reuse them quicker.
  • Replacing allocations with new purpose-sized buffers helps a bit, but sync.Pool still outperforms this idea by 10-30% on blobs <64 KB.

}
b := *r.buf
n, err := io.ReadFull(r.r, b)
if err == io.ErrUnexpectedEOF {
r.err = io.EOF
} else {
Expand All @@ -38,5 +42,9 @@ func (r *readerBackedChunkReader) Read() ([]byte, error) {
}

func (r *readerBackedChunkReader) Close() {
if r.buf != nil {
putChunkBuffer(r.buf)
r.buf = nil
}
r.r.Close()
}
82 changes: 82 additions & 0 deletions pkg/blobstore/buffer/reader_backed_chunk_reader_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package buffer_test

import (
"bytes"
"testing"

"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
)

type readAtCloser struct{ *bytes.Reader }

func (readAtCloser) Close() error { return nil }

const benchChunkSize = 1 << 16

var benchBlobSizes = []struct {
name string
blobSize int64
}{
{"BlobSize=4KiB", 4 << 10},
{"BlobSize=64KiB", 64 << 10},
{"BlobSize=256KiB", 256 << 10},
{"BlobSize=4MiB", 4 << 20},
{"BlobSize=64MiB", 64 << 20},
}

// NewValidatedBufferFromReaderAt is the cheapest public entry point
// that resolves to newReaderBackedChunkReader without wrapping
// decorators.
func BenchmarkReaderBackedChunkReader(b *testing.B) {
for _, c := range benchBlobSizes {
b.Run(c.name, func(b *testing.B) {
data := make([]byte, c.blobSize)
for i := range data {
data[i] = byte(i)
}

b.SetBytes(c.blobSize)
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
buf := buffer.NewValidatedBufferFromReaderAt(readAtCloser{bytes.NewReader(data)}, c.blobSize)
r := buf.ToChunkReader(0, benchChunkSize)
for {
if _, err := r.Read(); err != nil {
break
}
}
r.Close()
}
})
}
}

func BenchmarkReaderBackedChunkReaderParallel(b *testing.B) {
for _, c := range benchBlobSizes {
b.Run(c.name, func(b *testing.B) {
data := make([]byte, c.blobSize)
for i := range data {
data[i] = byte(i)
}

b.SetBytes(c.blobSize)
b.ReportAllocs()
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
buf := buffer.NewValidatedBufferFromReaderAt(readAtCloser{bytes.NewReader(data)}, c.blobSize)
r := buf.ToChunkReader(0, benchChunkSize)
for {
if _, err := r.Read(); err != nil {
break
}
}
r.Close()
}
})
})
}
}
71 changes: 71 additions & 0 deletions pkg/blobstore/buffer/reader_backed_chunk_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package buffer_test

import (
"bytes"
"io"
"testing"

"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
"github.com/stretchr/testify/require"
)

func TestReaderBackedChunkReaderReusesBuffer(t *testing.T) {
const (
chunkSize = 16
blobSize = int64(chunkSize * 4)
)
data := make([]byte, blobSize)
for i := range data {
data[i] = byte(i)
}

b := buffer.NewValidatedBufferFromReaderAt(readAtCloser{bytes.NewReader(data)}, blobSize)
r := b.ToChunkReader(0, chunkSize)
defer r.Close()

first, err := r.Read()
require.NoError(t, err)
require.Equal(t, chunkSize, len(first))
firstAddr := &first[0]

for i := 0; i < 3; i++ {
next, err := r.Read()
require.NoError(t, err)
require.Equal(t, chunkSize, len(next))
require.Same(t, firstAddr, &next[0],
"successive chunks must share backing storage")
}

_, err = r.Read()
require.Equal(t, io.EOF, err)
}

// TestReaderBackedChunkReaderClosePoolsBuffer retries because
// sync.Pool may drop entries between calls (e.g. on GC).
func TestReaderBackedChunkReaderClosePoolsBuffer(t *testing.T) {
const chunkSize = 4096
data := make([]byte, chunkSize)

var observedReuse bool
for attempt := 0; attempt < 100 && !observedReuse; attempt++ {
b1 := buffer.NewValidatedBufferFromReaderAt(readAtCloser{bytes.NewReader(data)}, int64(chunkSize))
r1 := b1.ToChunkReader(0, chunkSize)
chunk, err := r1.Read()
require.NoError(t, err)
addr1 := &chunk[0]
_, err = r1.Read()
require.Equal(t, io.EOF, err)
r1.Close()

b2 := buffer.NewValidatedBufferFromReaderAt(readAtCloser{bytes.NewReader(data)}, int64(chunkSize))
r2 := b2.ToChunkReader(0, chunkSize)
chunk2, err := r2.Read()
require.NoError(t, err)
if &chunk2[0] == addr1 {
observedReuse = true
}
r2.Close()
}
require.True(t, observedReuse,
"expected sync.Pool to return a previously-released buffer at least once across 100 attempts")
}
6 changes: 6 additions & 0 deletions pkg/blobstore/grpcservers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//pkg/zstd",
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@org_golang_google_genproto_googleapis_bytestream//:bytestream",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//types/known/emptypb",
Expand All @@ -32,14 +33,18 @@ go_library(
go_test(
name = "grpcservers_test",
srcs = [
"byte_stream_server_bench_test.go",
"byte_stream_server_test.go",
"content_addressable_storage_server_test.go",
"indirect_content_addressable_storage_server_test.go",
],
deps = [
":grpcservers",
"//internal/mock",
"//pkg/blobstore",
"//pkg/blobstore/buffer",
"//pkg/blobstore/slicing",
"//pkg/capabilities",
"//pkg/digest",
"//pkg/proto/icas",
"//pkg/testutil",
Expand All @@ -51,6 +56,7 @@ go_test(
"@org_golang_google_genproto_googleapis_rpc//status",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//status",
"@org_golang_google_grpc//test/bufconn",
"@org_uber_go_mock//gomock",
Expand Down
23 changes: 21 additions & 2 deletions pkg/blobstore/grpcservers/byte_stream_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd"

"google.golang.org/genproto/googleapis/bytestream"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -56,7 +57,18 @@ func (s *byteStreamServer) Read(in *bytestream.ReadRequest, out bytestream.ByteS
if readErr != nil {
return readErr
}
if writeErr := out.Send(&bytestream.ReadResponse{Data: readBuf}); writeErr != nil {
// A ChunkReader may reuse the slice returned by
// Read() across calls. Encoding into a grpc.PreparedMsg
// copies the wire-format bytes synchronously, so the
// underlying buffer is no longer referenced by gRPC after
// SendMsg returns. This is the only contract-safe way to
// hand a reusable buffer to grpc.SendMsg; see
// google.golang.org/grpc issues #5857 and #8186.
var prepared grpc.PreparedMsg
if err := prepared.Encode(out, &bytestream.ReadResponse{Data: readBuf}); err != nil {
return err
}
if writeErr := out.SendMsg(&prepared); writeErr != nil {
return writeErr
}
}
Expand All @@ -81,7 +93,14 @@ type readStreamWriter struct {
}

func (w *readStreamWriter) Write(p []byte) (int, error) {
if err := w.out.Send(&bytestream.ReadResponse{Data: p}); err != nil {
// Each Write encodes the chunk into a grpc.PreparedMsg before sending,
// so the caller's buffer is no longer referenced by gRPC after Write
// returns. See the IDENTITY path above for rationale.
var prepared grpc.PreparedMsg
if err := prepared.Encode(w.out, &bytestream.ReadResponse{Data: p}); err != nil {
return 0, err
}
if err := w.out.SendMsg(&prepared); err != nil {
return 0, err
}
return len(p), nil
Expand Down
Loading
Loading