diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 91a862c88..a1d82f14e 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -270,7 +270,18 @@ func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen return } - _ = p.putItem(createCtx, newItem) + // Caller canceled before receiving the item, need to remove from index and close + p.mu.WithLock(func() { + delete(p.index, newItem) + }) + + closeCtx := createCtx + if t := p.config.closeTimeout; t > 0 { + var cancel context.CancelFunc + closeCtx, cancel = context.WithTimeout(closeCtx, t) + defer cancel() + } + p.config.closeItemFunc(closeCtx, newItem) } }() diff --git a/internal/pool/pool_test.go b/internal/pool/pool_test.go index 42c38df98..d29f15ee2 100644 --- a/internal/pool/pool_test.go +++ b/internal/pool/pool_test.go @@ -1220,5 +1220,91 @@ func TestPool(t *testing.T) { //nolint:gocyclo _ = p.putItem(context.Background(), item) }) }) + t.Run("ContextCanceledDuringCreate", func(t *testing.T) { + // This test validates the fix for the session leak bug where + // context cancellation during session creation would cause the + // session to be added to the pool index but never properly tracked + var ( + createStarted = make(chan struct{}) + allowCreateDone = make(chan struct{}) + itemsCreated atomic.Int32 + itemsClosed atomic.Int32 + ) + p := New(rootCtx, + WithLimit[*testItem, testItem](2), + WithTrace[*testItem, testItem](defaultTrace), + WithCreateItemFunc[*testItem, testItem](func(ctx context.Context) (*testItem, error) { + close(createStarted) + <-allowCreateDone + if ctx.Err() != nil { + return nil, ctx.Err() + } + itemsCreated.Add(1) + return &testItem{ + onClose: func() error { + itemsClosed.Add(1) + return nil + }, + onIsAlive: func() bool { + return true + }, + }, nil + }), + ) + + // Start creating an item but cancel context before creation completes + ctx, cancel := context.WithCancel(rootCtx) + errCh := make(chan error, 1) + itemCh := make(chan *testItem, 1) + go func() { + item, err := p.createItemFunc(ctx) + itemCh <- item + errCh <- err + }() + + // Wait for creation to start + <-createStarted + + // Cancel the context while creation is in progress + cancel() + + // Allow creation to complete + close(allowCreateDone) + + // Wait for the create call to return + item := <-itemCh + err := <-errCh + + // Give time for the goroutine to clean up + time.Sleep(100 * time.Millisecond) + + if err == nil && item != nil { + // Case 1: Item was created successfully before context was checked + // This is valid - the item should be in the pool + stats := p.Stats() + require.Equal(t, 1, stats.Index, "item should be in pool index") + // Clean up - close the item + _ = item.Close(rootCtx) + } else { + // Case 2: Context was canceled before item creation + // The item should have been closed and not added to pool + require.Error(t, err) + require.ErrorIs(t, err, context.Canceled) + + // If item was created, it should have been closed + if itemsCreated.Load() > 0 { + require.Equal(t, int32(1), itemsClosed.Load(), "created item should have been closed") + } + + // Verify pool stats show no leaked sessions + stats := p.Stats() + require.Equal(t, 0, stats.Index, "pool index should be empty when item not returned") + require.Equal(t, 0, stats.Idle, "pool idle should be empty") + } + + // In both cases, createInProgress should be 0 + stats := p.Stats() + require.Equal(t, 0, stats.CreateInProgress, "no create should be in progress") + }) }) }