Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions drpcpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Options struct {
type Pool[K comparable, V Conn] struct {
opts Options
mu sync.Mutex
closed bool
entries map[K]*list[K, V]
order list[K, V]
}
Expand Down Expand Up @@ -119,10 +120,20 @@ func (p *Pool[K, V]) log(what string, cb func() string) {

// Close evicts all entries from the Pool's cache, closing them and returning all
// of the combined errors from closing.
//
// Close also marks the pool as closed so that any connection subsequently
// returned via Put is closed immediately rather than cached. This matters
// because Close can only see the connections currently idle in the cache:
// connections that are checked out (serving an in-flight Invoke or NewStream)
// live outside the cache and are returned later via Put. Without the closed
// flag those late returns would resurrect the pool, re-arming an expiration
// timer and leaking the connection (and its manager goroutines) until it fires.
func (p *Pool[K, V]) Close() (err error) {
p.mu.Lock()
defer p.mu.Unlock()

p.closed = true

var eg errs.Group
for ent := p.order.head; ent != nil; ent = ent.global.next {
eg.Add(p.closeEntry(ent))
Expand Down Expand Up @@ -233,6 +244,13 @@ func (p *Pool[K, V]) Put(key K, val V) {
p.mu.Lock()
defer p.mu.Unlock()

// If the pool has been closed, don't cache the connection (which would
// resurrect the pool and leak the connection); close it instead.
if p.closed {
_ = val.Close()
return
}

local := p.entries[key]
if local == nil {
local = new(list[K, V])
Expand Down
38 changes: 38 additions & 0 deletions drpcpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,3 +672,41 @@ func BenchmarkPool(b *testing.B) {
invoke(ctx, conn)
}
}

// TestPoolPutAfterClose verifies that a connection returned to the pool after
// the pool is closed is closed immediately rather than cached. This models a
// long-lived stream whose connection is checked out (and therefore invisible to
// Close) when the pool is closed, and only handed back via Put once the stream
// finally ends. Caching it would re-arm the expiration timer and leak the
// connection until it fires.
func TestPoolPutAfterClose(t *testing.T) {
ctx := drpctest.NewTracker(t)
defer ctx.Close()

// Use a non-zero expiration so a cached connection would linger (and arm a
// timer) rather than be evicted by capacity limits.
pool := New[string, Conn](Options{Expiration: time.Hour})

closed := make(chan string, 1)
conn := &callbackConn{CloseFn: func() error { closed <- "key"; return nil }}

// Stage the connection in the pool, then check it out, modeling an in-flight
// stream. While checked out the connection lives outside the cache.
pool.Put("key", conn)
got, ok := pool.Take("key")
assert.True(t, ok)
assert.Equal(t, got, conn)
assert.Equal(t, len(closed), 0)

// Closing the pool cannot see the checked-out connection.
assert.NoError(t, pool.Close())
assert.Equal(t, len(closed), 0)

// Returning it now must close it instead of resurrecting the pool.
pool.Put("key", conn)
assert.Equal(t, <-closed, "key")

// And it must not have been retained.
_, ok = pool.Take("key")
assert.That(t, !ok)
}
Loading