From dea11c94e0c1e795bc9dfea04c14168c46d84343 Mon Sep 17 00:00:00 2001 From: Artem Leshchev Date: Tue, 19 May 2026 15:12:21 -0500 Subject: [PATCH 1/4] pkg/blobstore/grpcservers: add ByteStream Read benchmarks Two benchmarks share a Read driver and differ only in load: BenchmarkByteStreamReadUnderGCPressure exercises the server Read path in a regime where per-RPC allocation cost dominates: enough concurrent Read RPCs that fresh chunk buffers are allocated faster than the GC can reclaim them. The bb-storage instances that motivated this benchmark exhibited a positive feedback loop where rising GC CPU stretched out each RPC, which kept more in-flight buffers alive, which raised GC CPU further. The benchmark does not attempt to reproduce that runaway behavior but it does measure per-RPC allocation cost in a regime where the Go runtime is forced to track it continuously. It pins GOMAXPROCS so the ratio of allocator throughput to available GC CPU is reproducible across hosts, runs 1024 in-flight RPCs (parallelism*GOMAXPROCS, with HTTP/2's 100-stream default lifted) so the allocator stays under sustained pressure, and sets GOMEMLIMIT to a fixed multiple of the steady-state working set so the runtime has to assist GC on every allocation. BenchmarkByteStreamRead exercises the same Read path without touching any runtime knobs: GOMAXPROCS, GOMEMLIMIT and parallelism are left at their defaults. It measures whatever the host has on hand and serves as a regression guard for changes to the chunk-buffer lifecycle outside the artificial pressure regime. Both sweep blob sizes from 64 KiB to 64 MiB and report four custom metrics via b.ReportMetric, computed over the benchmark window using runtime/metrics: %gc-cpu : GC CPU (assist + dedicated + idle + pause) as a fraction of total CPU time. gc-pause-us/op : mean STW pause time per RPC, estimated by summing midpoints of the /sched/pauses/total/gc histogram and dividing by b.N. gc-cycles/Kop : completed GC cycles per thousand RPCs. heap-MiB-peak : max HeapInuse observed by a 10ms sampler. --- pkg/blobstore/grpcservers/BUILD.bazel | 5 + .../byte_stream_server_bench_test.go | 424 ++++++++++++++++++ 2 files changed, 429 insertions(+) create mode 100644 pkg/blobstore/grpcservers/byte_stream_server_bench_test.go diff --git a/pkg/blobstore/grpcservers/BUILD.bazel b/pkg/blobstore/grpcservers/BUILD.bazel index 556ad91e8..b15cbabe1 100644 --- a/pkg/blobstore/grpcservers/BUILD.bazel +++ b/pkg/blobstore/grpcservers/BUILD.bazel @@ -32,6 +32,7 @@ go_library( go_test( name = "grpcservers_test", srcs = [ + "byte_stream_server_bench_test.go", "byte_stream_server_test.go", "content_addressable_storage_server_test.go", "indirect_content_addressable_storage_server_test.go", @@ -39,7 +40,10 @@ go_test( deps = [ ":grpcservers", "//internal/mock", + "//pkg/blobstore", "//pkg/blobstore/buffer", + "//pkg/blobstore/slicing", + "//pkg/capabilities", "//pkg/digest", "//pkg/proto/icas", "//pkg/testutil", @@ -51,6 +55,7 @@ go_test( "@org_golang_google_genproto_googleapis_rpc//status", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//credentials/insecure", "@org_golang_google_grpc//status", "@org_golang_google_grpc//test/bufconn", "@org_uber_go_mock//gomock", diff --git a/pkg/blobstore/grpcservers/byte_stream_server_bench_test.go b/pkg/blobstore/grpcservers/byte_stream_server_bench_test.go new file mode 100644 index 000000000..a0b5d35d6 --- /dev/null +++ b/pkg/blobstore/grpcservers/byte_stream_server_bench_test.go @@ -0,0 +1,424 @@ +package grpcservers_test + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "io" + "math" + "net" + "runtime" + "runtime/debug" + "runtime/metrics" + "sync/atomic" + "testing" + "time" + + remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" + "github.com/buildbarn/bb-storage/pkg/blobstore" + "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" + "github.com/buildbarn/bb-storage/pkg/blobstore/grpcservers" + "github.com/buildbarn/bb-storage/pkg/blobstore/slicing" + "github.com/buildbarn/bb-storage/pkg/capabilities" + "github.com/buildbarn/bb-storage/pkg/digest" + bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd" + "github.com/klauspost/compress/zstd" + + "google.golang.org/genproto/googleapis/bytestream" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" +) + +// benchBlobAccess is a minimal in-memory BlobAccess. Get() wraps the +// stored bytes in a bytes.Reader and returns a Buffer constructed via +// NewValidatedBufferFromReaderAt, exercising the standard ChunkReader +// code path through the server. +type benchBlobAccess struct { + capabilities.Provider + blobs map[digest.Digest][]byte +} + +func (b *benchBlobAccess) Get(_ context.Context, d digest.Digest) buffer.Buffer { + data, ok := b.blobs[d] + if !ok { + panic("benchmark requested an unknown digest") + } + return buffer.NewValidatedBufferFromReaderAt( + readAtCloser{bytes.NewReader(data)}, + int64(len(data)), + ) +} + +func (benchBlobAccess) GetFromComposite(context.Context, digest.Digest, digest.Digest, slicing.BlobSlicer) buffer.Buffer { + panic("not implemented") +} + +func (benchBlobAccess) Put(context.Context, digest.Digest, buffer.Buffer) error { + panic("not implemented") +} + +func (benchBlobAccess) FindMissing(context.Context, digest.Set) (digest.Set, error) { + panic("not implemented") +} + +type readAtCloser struct{ *bytes.Reader } + +func (readAtCloser) Close() error { return nil } + +// makeBenchBlobs generates n random blobs of the given size, on the Go +// heap. Returns the BlobAccess together with the list of ByteStream +// resource names the client should request. The aggregate resident +// footprint is n*size; callers must size GOMEMLIMIT accordingly so the +// arena itself does not bind GC pacing. +func makeBenchBlobs(b *testing.B, n, size int) (blobstore.BlobAccess, []string) { + b.Helper() + blobs := make(map[digest.Digest][]byte, n) + names := make([]string, 0, n) + for i := 0; i < n; i++ { + data := make([]byte, size) + if _, err := rand.Read(data); err != nil { + b.Fatal(err) + } + sum := sha256.Sum256(data) + hash := hex.EncodeToString(sum[:]) + d := digest.MustNewDigest("", remoteexecution.DigestFunction_SHA256, hash, int64(size)) + blobs[d] = data + names = append(names, d.GetByteStreamReadPath(remoteexecution.Compressor_IDENTITY)) + } + return &benchBlobAccess{blobs: blobs}, names +} + +// startBenchServer spins up an in-process ByteStream server backed by +// ba and returns a connected client plus a cleanup function. +func startBenchServer(b *testing.B, ba blobstore.BlobAccess) (bytestream.ByteStreamClient, func()) { + b.Helper() + l := bufconn.Listen(1 << 20) + // MaxConcurrentStreams lifts the HTTP/2 100-stream default + // so parallelism*GOMAXPROCS in-flight RPCs are not queued at + // the gRPC layer. + server := grpc.NewServer(grpc.MaxConcurrentStreams(2048)) + // 64 KiB matches the production readChunkSize. + bytestream.RegisterByteStreamServer(server, grpcservers.NewByteStreamServer(ba, 64*1024, bb_zstd.NewUnboundedPool( + []zstd.EOption{zstd.WithEncoderConcurrency(1)}, + []zstd.DOption{zstd.WithDecoderConcurrency(1)}, + ))) + go func() { + _ = server.Serve(l) + }() + conn, err := grpc.DialContext( + context.Background(), + "bufnet", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return l.Dial() }), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + b.Fatal(err) + } + cleanup := func() { + conn.Close() + server.Stop() + } + return bytestream.NewByteStreamClient(conn), cleanup +} + +// drainRead issues a Read RPC and discards every chunk. Returns the +// total number of bytes read. +func drainRead(ctx context.Context, client bytestream.ByteStreamClient, name string) (int64, error) { + stream, err := client.Read(ctx, &bytestream.ReadRequest{ResourceName: name}) + if err != nil { + return 0, err + } + var total int64 + for { + resp, err := stream.Recv() + if err == io.EOF { + return total, nil + } + if err != nil { + return total, err + } + total += int64(len(resp.Data)) + } +} + +// gcMetrics wraps runtime/metrics to compute the GC CPU fraction and +// STW pause time accrued over a specific window. The underlying +// counters (/cpu/classes/gc/total:cpu-seconds, /cpu/classes/total:cpu-seconds, +// /sched/pauses/total/gc:seconds) are cumulative since process start, +// so a window measurement is the difference between two reads. +type gcMetrics struct { + samples []metrics.Sample +} + +func newGCMetrics() *gcMetrics { + return &gcMetrics{samples: []metrics.Sample{ + {Name: "/cpu/classes/gc/total:cpu-seconds"}, + {Name: "/cpu/classes/total:cpu-seconds"}, + {Name: "/sched/pauses/total/gc:seconds"}, + }} +} + +func (g *gcMetrics) read() (gcSec, totalSec, pauseSec float64) { + metrics.Read(g.samples) + return g.samples[0].Value.Float64(), g.samples[1].Value.Float64(), histogramSum(g.samples[2].Value.Float64Histogram()) +} + +// histogramSum estimates the total time recorded in a runtime/metrics +// histogram by weighting each bucket count by its midpoint. Buckets +// may have -Inf or +Inf edges; those degenerate to the finite neighbor +// so a single sample at an extreme bucket does not dominate the sum. +func histogramSum(h *metrics.Float64Histogram) float64 { + var sum float64 + for i, c := range h.Counts { + if c == 0 { + continue + } + lo := h.Buckets[i] + hi := h.Buckets[i+1] + var mid float64 + switch { + case math.IsInf(lo, -1) && math.IsInf(hi, +1): + mid = 0 + case math.IsInf(lo, -1): + mid = hi + case math.IsInf(hi, +1): + mid = lo + default: + mid = (lo + hi) / 2 + } + sum += mid * float64(c) + } + return sum +} + +// BenchmarkByteStreamReadUnderGCPressure exercises the server Read +// path in a regime where per-RPC allocation cost dominates: enough +// concurrent Read RPCs that fresh chunk buffers are allocated faster +// than the GC can reclaim them. The bb-storage instances that +// motivated this benchmark exhibited a positive feedback loop where +// rising GC CPU stretched out each RPC, which kept more in-flight +// buffers alive, which raised GC CPU further. The benchmark does not +// attempt to reproduce that runaway behavior -- it would not +// converge -- but it does measure per-RPC allocation cost in a regime +// where the Go runtime is forced to track it continuously. +// +// To get there the benchmark (a) pins GOMAXPROCS so the ratio of +// allocator throughput to available GC CPU is reproducible across +// hosts, (b) fans out enough in-flight RPCs that the allocator +// stays under sustained pressure, and (c) sets GOMEMLIMIT to a +// fixed multiple of the steady-state working set so the runtime +// has to assist GC on every allocation. None of these knobs model +// any particular deployment. +// +// Outside this regime -- large blobs where per-RPC wall time is +// dominated by moving bytes rather than per-chunk allocation, or +// generous memory budgets where GC has room to defer work -- the +// Read path's per-chunk allocation cost is negligible. The 16 MiB +// and 64 MiB sub-benchmarks cover that case and serve as a +// regression guard. +// +// Custom metrics, all computed over the benchmark window: +// +// - %gc-cpu : GC CPU (assist + dedicated + idle + pause) +// as a fraction of total CPU time, from /cpu/classes/gc/total +// and /cpu/classes/total. +// - gc-pause-us/op : mean STW pause time per RPC, estimated from +// the /sched/pauses/total/gc histogram by summing bucket +// midpoints and dividing by b.N. +// - gc-cycles/Kop : completed GC cycles per thousand RPCs. +// - heap-MiB-peak : max HeapInuse observed by a 10ms sampler. +func BenchmarkByteStreamReadUnderGCPressure(b *testing.B) { + const ( + gomaxprocs = 8 + // parallelism*GOMAXPROCS = 1024 in-flight RPCs. Lower + // values do not push the 256 KiB sub-benchmark into the + // GC-bound regime that exposes the per-chunk allocation + // cost. + parallelism = 128 + // numBlobs sets the working set's logical cardinality. + // With parallelism*GOMAXPROCS goroutines drawing names + // from a round-robin counter, a value much smaller than + // the goroutine count means each goroutine cycles through + // the same set of digests every few RPCs, but no single + // digest is hammered to the exclusion of the others. + numBlobs = 64 + // Each in-flight RPC briefly holds a chunk buffer plus + // some gRPC stream state; transientBudget sizes + // GOMEMLIMIT's headroom above the resident arena so that + // only those transient allocations push against the + // limit. The 3/2 ratio leaves just enough slack above the + // steady-state working set that the GC has to run + // continuously. At small blob sizes that is enough to + // drive the runtime well past 50% GC CPU and the actual + // heap blows through the limit because GC cannot keep up; + // at large blob sizes per-RPC throughput is bounded by + // data movement rather than allocation, GC has time to + // catch up, and the heap stays near the limit. + transientBudgetNum = 3 + transientBudgetDen = 2 + minTransientBudget = 64 << 20 + ) + + // Pin GOMAXPROCS so the GC CPU fraction is reproducible. The + // benchmark's GC pressure is a function of (allocator + // throughput) / (available CPU); without pinning, the same + // workload on a many-core host has so much idle CPU that GC + // never saturates and the regime under test never materializes. + prevProcs := runtime.GOMAXPROCS(gomaxprocs) + defer runtime.GOMAXPROCS(prevProcs) + + sizes := []struct { + name string + blobBytes int + }{ + {"64KiB", 64 << 10}, + {"256KiB", 256 << 10}, + {"1MiB", 1 << 20}, + {"4MiB", 4 << 20}, + {"16MiB", 16 << 20}, + {"64MiB", 64 << 20}, + } + for _, sz := range sizes { + b.Run("BlobSize="+sz.name, func(b *testing.B) { + // workingSet is an upper bound on bytes the server + // holds across all in-flight RPCs at any moment. + // The transient budget tracks it so GOMEMLIMIT + // scales consistently with blob size. + workingSet := gomaxprocs * parallelism * sz.blobBytes + transientBudget := workingSet * transientBudgetNum / transientBudgetDen + if transientBudget < minTransientBudget { + transientBudget = minTransientBudget + } + // The arena (numBlobs * blobBytes) is resident + // throughout the run, so include it in the memory + // limit; otherwise the limit binds on the fixture + // rather than on the transient allocations. + memoryLimit := numBlobs*sz.blobBytes + transientBudget + runByteStreamRead(b, sz.blobBytes, numBlobs, memoryLimit, parallelism) + }) + } +} + +// BenchmarkByteStreamRead is the companion to +// BenchmarkByteStreamReadUnderGCPressure that runs the same Read +// path without touching any runtime knobs: GOMAXPROCS, GOMEMLIMIT +// and parallelism are left at their defaults. Per-RPC wall time +// reflects whatever the host has on hand, and changes to the +// chunk-buffer lifecycle must not regress this case. +func BenchmarkByteStreamRead(b *testing.B) { + const numBlobs = 64 + sizes := []struct { + name string + blobBytes int + }{ + {"64KiB", 64 << 10}, + {"256KiB", 256 << 10}, + {"1MiB", 1 << 20}, + {"4MiB", 4 << 20}, + {"16MiB", 16 << 20}, + {"64MiB", 64 << 20}, + } + for _, sz := range sizes { + b.Run("BlobSize="+sz.name, func(b *testing.B) { + runByteStreamRead(b, sz.blobBytes, numBlobs, 0, 0) + }) + } +} + +// runByteStreamRead is the shared driver for the Read benchmarks. A +// non-positive memoryLimit skips GOMEMLIMIT, and a non-positive +// parallelism leaves b.RunParallel at its default (GOMAXPROCS). +func runByteStreamRead(b *testing.B, blobSize, numBlobs, memoryLimit, parallelism int) { + if memoryLimit > 0 { + prevLimit := debug.SetMemoryLimit(-1) + debug.SetMemoryLimit(int64(memoryLimit)) + defer debug.SetMemoryLimit(prevLimit) + } + + ba, names := makeBenchBlobs(b, numBlobs, blobSize) + client, cleanup := startBenchServer(b, ba) + defer cleanup() + + // Warm the connection and codec caches with a few serial RPCs + // so the measured window does not include connection setup or + // gRPC's BDP discovery for the flow-control window. (Allocator + // caches scale with concurrency and are not meaningfully warmed + // here, but the parallel phase reaches steady state quickly.) + warmCtx, warmCancel := context.WithTimeout(context.Background(), 30*time.Second) + for i := 0; i < 32; i++ { + if _, err := drainRead(warmCtx, client, names[i%numBlobs]); err != nil { + b.Fatal(err) + } + } + warmCancel() + + runtime.GC() + gm := newGCMetrics() + gcBefore, totalBefore, pauseBefore := gm.read() + var msBefore runtime.MemStats + runtime.ReadMemStats(&msBefore) + + var peakHeap atomic.Uint64 + done := make(chan struct{}) + go func() { + t := time.NewTicker(10 * time.Millisecond) + defer t.Stop() + var ms runtime.MemStats + for { + select { + case <-done: + return + case <-t.C: + runtime.ReadMemStats(&ms) + for { + cur := peakHeap.Load() + if ms.HeapInuse <= cur || peakHeap.CompareAndSwap(cur, ms.HeapInuse) { + break + } + } + } + } + }() + + b.SetBytes(int64(blobSize)) + b.ReportAllocs() + if parallelism > 0 { + // Drive parallelism*GOMAXPROCS in-flight RPCs so the + // allocator stays under sustained pressure. b.RunParallel's + // default p=GOMAXPROCS goroutines on a busy host leave + // enough idle CPU per goroutine that GC never saturates. + b.SetParallelism(parallelism) + } + var counter atomic.Uint64 + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + ctx := context.Background() + for pb.Next() { + i := counter.Add(1) - 1 + if _, err := drainRead(ctx, client, names[i%uint64(numBlobs)]); err != nil { + b.Fatal(err) + } + } + }) + + b.StopTimer() + close(done) + gcAfter, totalAfter, pauseAfter := gm.read() + var msAfter runtime.MemStats + runtime.ReadMemStats(&msAfter) + + ops := float64(b.N) + gcDelta := gcAfter - gcBefore + totalDelta := totalAfter - totalBefore + pauseDelta := pauseAfter - pauseBefore + if totalDelta > 0 { + b.ReportMetric(gcDelta/totalDelta*100, "%gc-cpu") + } + b.ReportMetric(pauseDelta/ops*1e6, "gc-pause-us/op") + b.ReportMetric(float64(msAfter.NumGC-msBefore.NumGC)/ops*1000, "gc-cycles/Kop") + b.ReportMetric(float64(peakHeap.Load())/(1<<20), "heap-MiB-peak") +} From 8d97513e180d190d736abd09274b85780c00fa2b Mon Sep 17 00:00:00 2001 From: Artem Leshchev Date: Thu, 14 May 2026 08:41:31 -0500 Subject: [PATCH 2/4] pkg/blobstore/buffer: add benchmark for readerBackedChunkReader Exercises readerBackedChunkReader through the public Buffer API. --- pkg/blobstore/buffer/BUILD.bazel | 1 + .../reader_backed_chunk_reader_bench_test.go | 82 +++++++++++++++++++ 2 files changed, 83 insertions(+) create mode 100644 pkg/blobstore/buffer/reader_backed_chunk_reader_bench_test.go diff --git a/pkg/blobstore/buffer/BUILD.bazel b/pkg/blobstore/buffer/BUILD.bazel index 9f2995862..7db5f520a 100644 --- a/pkg/blobstore/buffer/BUILD.bazel +++ b/pkg/blobstore/buffer/BUILD.bazel @@ -55,6 +55,7 @@ go_test( "new_proto_buffer_from_proto_test.go", "new_validated_buffer_from_byte_slice_test.go", "new_validated_buffer_from_reader_at_test.go", + "reader_backed_chunk_reader_bench_test.go", "with_error_handler_test.go", ], deps = [ diff --git a/pkg/blobstore/buffer/reader_backed_chunk_reader_bench_test.go b/pkg/blobstore/buffer/reader_backed_chunk_reader_bench_test.go new file mode 100644 index 000000000..4d558672f --- /dev/null +++ b/pkg/blobstore/buffer/reader_backed_chunk_reader_bench_test.go @@ -0,0 +1,82 @@ +package buffer_test + +import ( + "bytes" + "testing" + + "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" +) + +type readAtCloser struct{ *bytes.Reader } + +func (readAtCloser) Close() error { return nil } + +const benchChunkSize = 1 << 16 + +var benchBlobSizes = []struct { + name string + blobSize int64 +}{ + {"BlobSize=4KiB", 4 << 10}, + {"BlobSize=64KiB", 64 << 10}, + {"BlobSize=256KiB", 256 << 10}, + {"BlobSize=4MiB", 4 << 20}, + {"BlobSize=64MiB", 64 << 20}, +} + +// NewValidatedBufferFromReaderAt is the cheapest public entry point +// that resolves to newReaderBackedChunkReader without wrapping +// decorators. +func BenchmarkReaderBackedChunkReader(b *testing.B) { + for _, c := range benchBlobSizes { + b.Run(c.name, func(b *testing.B) { + data := make([]byte, c.blobSize) + for i := range data { + data[i] = byte(i) + } + + b.SetBytes(c.blobSize) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + buf := buffer.NewValidatedBufferFromReaderAt(readAtCloser{bytes.NewReader(data)}, c.blobSize) + r := buf.ToChunkReader(0, benchChunkSize) + for { + if _, err := r.Read(); err != nil { + break + } + } + r.Close() + } + }) + } +} + +func BenchmarkReaderBackedChunkReaderParallel(b *testing.B) { + for _, c := range benchBlobSizes { + b.Run(c.name, func(b *testing.B) { + data := make([]byte, c.blobSize) + for i := range data { + data[i] = byte(i) + } + + b.SetBytes(c.blobSize) + b.ReportAllocs() + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + buf := buffer.NewValidatedBufferFromReaderAt(readAtCloser{bytes.NewReader(data)}, c.blobSize) + r := buf.ToChunkReader(0, benchChunkSize) + for { + if _, err := r.Read(); err != nil { + break + } + } + r.Close() + } + }) + }) + } +} From 66f0f6f7db95f3b085922d4f52c8374583e72dd0 Mon Sep 17 00:00:00 2001 From: Artem Leshchev Date: Thu, 14 May 2026 08:42:11 -0500 Subject: [PATCH 3/4] pkg/blobstore/buffer: reuse chunk buffers across Read() calls readerBackedChunkReader previously allocated a fresh maximumChunkSizeBytes slice on every Read() call. With the default 64 KiB chunk size and production traffic around 20-30 KRPS, this allocation site accounted for the bulk of GC pressure on bb_storage, with the runtime spending up to 90% of CPU in GC under load. The reader now owns a single buffer for its entire lifetime, acquired lazily on first Read() from a package-private sync.Pool and returned on Close(). Per-reader ownership eliminates the per-chunk allocation for large blobs; the pool prevents the runtime from re-allocating that buffer once per RPC, which dominates GC work at small blob sizes (64 KiB - 256 KiB) where each RPC reads only a few chunks. The ChunkReader contract is tightened to reflect the long-standing implicit invariant that returned slices are only valid until the next Read()/Close() call -- every in-tree consumer already respected this. multiplexedChunkReader's existing channel-based barrier already serializes consumers, which keeps buffer reuse safe across CloneStream(). A similar make() per Read() pattern exists in pkg/blobstore/grpcclients/cas_blob_access.go's zstdByteStreamChunkReader and is left for a follow-up PR. --- pkg/blobstore/buffer/BUILD.bazel | 2 + pkg/blobstore/buffer/chunk_buffer_pool.go | 26 +++++++ pkg/blobstore/buffer/chunk_reader.go | 6 ++ .../buffer/multiplexed_chunk_reader.go | 9 +++ .../buffer/reader_backed_chunk_reader.go | 12 +++- .../buffer/reader_backed_chunk_reader_test.go | 71 +++++++++++++++++++ 6 files changed, 124 insertions(+), 2 deletions(-) create mode 100644 pkg/blobstore/buffer/chunk_buffer_pool.go create mode 100644 pkg/blobstore/buffer/reader_backed_chunk_reader_test.go diff --git a/pkg/blobstore/buffer/BUILD.bazel b/pkg/blobstore/buffer/BUILD.bazel index 7db5f520a..fe81f60eb 100644 --- a/pkg/blobstore/buffer/BUILD.bazel +++ b/pkg/blobstore/buffer/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "cas_reader_buffer.go", "cas_validating_chunk_reader.go", "cas_validating_reader.go", + "chunk_buffer_pool.go", "chunk_reader.go", "chunk_reader_backed_reader.go", "common_conversions.go", @@ -56,6 +57,7 @@ go_test( "new_validated_buffer_from_byte_slice_test.go", "new_validated_buffer_from_reader_at_test.go", "reader_backed_chunk_reader_bench_test.go", + "reader_backed_chunk_reader_test.go", "with_error_handler_test.go", ], deps = [ diff --git a/pkg/blobstore/buffer/chunk_buffer_pool.go b/pkg/blobstore/buffer/chunk_buffer_pool.go new file mode 100644 index 000000000..526b37fb3 --- /dev/null +++ b/pkg/blobstore/buffer/chunk_buffer_pool.go @@ -0,0 +1,26 @@ +package buffer + +import "sync" + +// chunkBufferPool caches []byte buffers used by ChunkReader +// implementations that allocate a fixed-size working buffer per +// stream. Buffers are stored as *[]byte to avoid the interface-boxing +// allocation that occurs when storing a non-pointer value in +// sync.Pool. +var chunkBufferPool sync.Pool + +func getChunkBuffer(size int) *[]byte { + if v := chunkBufferPool.Get(); v != nil { + b := v.(*[]byte) + if cap(*b) >= size { + *b = (*b)[:size] + return b + } + } + b := make([]byte, size) + return &b +} + +func putChunkBuffer(b *[]byte) { + chunkBufferPool.Put(b) +} diff --git a/pkg/blobstore/buffer/chunk_reader.go b/pkg/blobstore/buffer/chunk_reader.go index b84dc205f..82a25ab5a 100644 --- a/pkg/blobstore/buffer/chunk_reader.go +++ b/pkg/blobstore/buffer/chunk_reader.go @@ -10,6 +10,12 @@ import ( // responsible for providing space for the data. This interface is // similar to how frame-based transfer protocols work, including the // Bytestream protocol used by REv2. +// +// The byte slice returned by Read() is only guaranteed to be valid +// until the next call to Read() or Close() on the same ChunkReader. +// Implementations are free to reuse the backing storage across Read() +// calls in order to minimize allocations. Callers that need to retain +// the contents past the next Read()/Close() must copy them. type ChunkReader interface { Read() ([]byte, error) Close() diff --git a/pkg/blobstore/buffer/multiplexed_chunk_reader.go b/pkg/blobstore/buffer/multiplexed_chunk_reader.go index 28d7b3260..331068de5 100644 --- a/pkg/blobstore/buffer/multiplexed_chunk_reader.go +++ b/pkg/blobstore/buffer/multiplexed_chunk_reader.go @@ -32,6 +32,15 @@ func newMultiplexedChunkReader(r ChunkReader, additionalConsumers int) ChunkRead } func (r *multiplexedChunkReader) readAndShareWithOthers(currentConsumerContinues int) ([]byte, error) { + // All waiting consumers receive the exact same slice returned by + // the underlying ChunkReader.Read(). This is safe with respect + // to the "valid until next Read()/Close()" contract because the + // pendingConsumers counter ensures every consumer must call + // Read() (or Close()) again before another underlying Read() + // can be issued. By the time the next underlying Read() + // executes, every consumer has therefore released its reference + // to the previously returned slice, so the underlying reader is + // free to reuse the backing storage. data, err := r.r.Read() for _, c := range r.waitingConsumers { c <- readResult{data: data, err: err} diff --git a/pkg/blobstore/buffer/reader_backed_chunk_reader.go b/pkg/blobstore/buffer/reader_backed_chunk_reader.go index 3a54154d4..22b81600a 100644 --- a/pkg/blobstore/buffer/reader_backed_chunk_reader.go +++ b/pkg/blobstore/buffer/reader_backed_chunk_reader.go @@ -8,6 +8,7 @@ type readerBackedChunkReader struct { r io.ReadCloser maximumChunkSizeBytes int + buf *[]byte err error } @@ -23,8 +24,11 @@ func newReaderBackedChunkReader(r io.ReadCloser, maximumChunkSizeBytes int) Chun func (r *readerBackedChunkReader) Read() ([]byte, error) { if r.err == nil { - b := make([]byte, r.maximumChunkSizeBytes) - n, err := io.ReadFull(r.r, b[:]) + if r.buf == nil { + r.buf = getChunkBuffer(r.maximumChunkSizeBytes) + } + b := *r.buf + n, err := io.ReadFull(r.r, b) if err == io.ErrUnexpectedEOF { r.err = io.EOF } else { @@ -38,5 +42,9 @@ func (r *readerBackedChunkReader) Read() ([]byte, error) { } func (r *readerBackedChunkReader) Close() { + if r.buf != nil { + putChunkBuffer(r.buf) + r.buf = nil + } r.r.Close() } diff --git a/pkg/blobstore/buffer/reader_backed_chunk_reader_test.go b/pkg/blobstore/buffer/reader_backed_chunk_reader_test.go new file mode 100644 index 000000000..3d288f504 --- /dev/null +++ b/pkg/blobstore/buffer/reader_backed_chunk_reader_test.go @@ -0,0 +1,71 @@ +package buffer_test + +import ( + "bytes" + "io" + "testing" + + "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" + "github.com/stretchr/testify/require" +) + +func TestReaderBackedChunkReaderReusesBuffer(t *testing.T) { + const ( + chunkSize = 16 + blobSize = int64(chunkSize * 4) + ) + data := make([]byte, blobSize) + for i := range data { + data[i] = byte(i) + } + + b := buffer.NewValidatedBufferFromReaderAt(readAtCloser{bytes.NewReader(data)}, blobSize) + r := b.ToChunkReader(0, chunkSize) + defer r.Close() + + first, err := r.Read() + require.NoError(t, err) + require.Equal(t, chunkSize, len(first)) + firstAddr := &first[0] + + for i := 0; i < 3; i++ { + next, err := r.Read() + require.NoError(t, err) + require.Equal(t, chunkSize, len(next)) + require.Same(t, firstAddr, &next[0], + "successive chunks must share backing storage") + } + + _, err = r.Read() + require.Equal(t, io.EOF, err) +} + +// TestReaderBackedChunkReaderClosePoolsBuffer retries because +// sync.Pool may drop entries between calls (e.g. on GC). +func TestReaderBackedChunkReaderClosePoolsBuffer(t *testing.T) { + const chunkSize = 4096 + data := make([]byte, chunkSize) + + var observedReuse bool + for attempt := 0; attempt < 100 && !observedReuse; attempt++ { + b1 := buffer.NewValidatedBufferFromReaderAt(readAtCloser{bytes.NewReader(data)}, int64(chunkSize)) + r1 := b1.ToChunkReader(0, chunkSize) + chunk, err := r1.Read() + require.NoError(t, err) + addr1 := &chunk[0] + _, err = r1.Read() + require.Equal(t, io.EOF, err) + r1.Close() + + b2 := buffer.NewValidatedBufferFromReaderAt(readAtCloser{bytes.NewReader(data)}, int64(chunkSize)) + r2 := b2.ToChunkReader(0, chunkSize) + chunk2, err := r2.Read() + require.NoError(t, err) + if &chunk2[0] == addr1 { + observedReuse = true + } + r2.Close() + } + require.True(t, observedReuse, + "expected sync.Pool to return a previously-released buffer at least once across 100 attempts") +} From b993b6789309fba5a378e6bfc30e8ca6360f16f8 Mon Sep 17 00:00:00 2001 From: Artem Leshchev Date: Mon, 18 May 2026 15:44:21 -0500 Subject: [PATCH 4/4] pkg/blobstore/grpcservers: use grpc.PreparedMsg to safely reuse chunk buffers The previous commit made readerBackedChunkReader reuse its 64 KiB buffer across Read() calls. That alone violates the grpc.SendMsg contract, which documents that 'It is not safe to modify the message after calling SendMsg' because stats handlers and tracing may capture the message to process lazily. grpc.PreparedMsg is the documented escape hatch: PreparedMsg.Encode marshals and copies the wire bytes synchronously, and prepareMsg short-circuits on *PreparedMsg, so the source []byte is no longer referenced by gRPC after SendMsg returns. --- pkg/blobstore/grpcservers/BUILD.bazel | 1 + .../grpcservers/byte_stream_server.go | 23 +++++++++++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/pkg/blobstore/grpcservers/BUILD.bazel b/pkg/blobstore/grpcservers/BUILD.bazel index b15cbabe1..8a41db5f9 100644 --- a/pkg/blobstore/grpcservers/BUILD.bazel +++ b/pkg/blobstore/grpcservers/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//pkg/zstd", "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", "@org_golang_google_genproto_googleapis_bytestream//:bytestream", + "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", "@org_golang_google_protobuf//types/known/emptypb", diff --git a/pkg/blobstore/grpcservers/byte_stream_server.go b/pkg/blobstore/grpcservers/byte_stream_server.go index a80baa728..9d3ad7ba6 100644 --- a/pkg/blobstore/grpcservers/byte_stream_server.go +++ b/pkg/blobstore/grpcservers/byte_stream_server.go @@ -13,6 +13,7 @@ import ( bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd" "google.golang.org/genproto/googleapis/bytestream" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -56,7 +57,18 @@ func (s *byteStreamServer) Read(in *bytestream.ReadRequest, out bytestream.ByteS if readErr != nil { return readErr } - if writeErr := out.Send(&bytestream.ReadResponse{Data: readBuf}); writeErr != nil { + // A ChunkReader may reuse the slice returned by + // Read() across calls. Encoding into a grpc.PreparedMsg + // copies the wire-format bytes synchronously, so the + // underlying buffer is no longer referenced by gRPC after + // SendMsg returns. This is the only contract-safe way to + // hand a reusable buffer to grpc.SendMsg; see + // google.golang.org/grpc issues #5857 and #8186. + var prepared grpc.PreparedMsg + if err := prepared.Encode(out, &bytestream.ReadResponse{Data: readBuf}); err != nil { + return err + } + if writeErr := out.SendMsg(&prepared); writeErr != nil { return writeErr } } @@ -81,7 +93,14 @@ type readStreamWriter struct { } func (w *readStreamWriter) Write(p []byte) (int, error) { - if err := w.out.Send(&bytestream.ReadResponse{Data: p}); err != nil { + // Each Write encodes the chunk into a grpc.PreparedMsg before sending, + // so the caller's buffer is no longer referenced by gRPC after Write + // returns. See the IDENTITY path above for rationale. + var prepared grpc.PreparedMsg + if err := prepared.Encode(w.out, &bytestream.ReadResponse{Data: p}); err != nil { + return 0, err + } + if err := w.out.SendMsg(&prepared); err != nil { return 0, err } return len(p), nil