Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 19 additions & 28 deletions internal/indexworker/indexworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/url"
"sync"
"sync/atomic"
"testing"

"github.com/gobuffalo/pop/v6"
Expand Down Expand Up @@ -225,54 +226,44 @@ func (ts *IndexWorkerTestSuite) TestOutOfBandIndexRemoval() {
assert.True(ts.T(), found, "The removed index should have been recreated")
}

// Test concurrent access - only one worker should create indexes
// Test concurrent access - workers coordinate via advisory lock
func (ts *IndexWorkerTestSuite) TestConcurrentWorkers() {
ctx := context.Background()

// Number of concurrent workers
numWorkers := 3
numWorkers := 5
var wg sync.WaitGroup
wg.Add(numWorkers)

// Track which workers actually created indexes
results := make(chan error, numWorkers)
var successCount, lockSkipCount, errorCount int32

for i := 0; i < numWorkers; i++ {
go func(workerID int) {
go func() {
defer wg.Done()

// Each worker needs its own logger to avoid race conditions
logger := logrus.NewEntry(logrus.New())
logger.Logger.SetLevel(logrus.DebugLevel)

// CreateIndexes returns nil on success or ErrAdvisoryLockAlreadyAcquired
err := CreateIndexes(ctx, ts.config, logger)
results <- err
}(i)
switch {
case err == nil:
atomic.AddInt32(&successCount, 1)
case errors.Is(err, ErrAdvisoryLockAlreadyAcquired):
atomic.AddInt32(&lockSkipCount, 1)
default:
atomic.AddInt32(&errorCount, 1)
ts.T().Errorf("Unexpected error from CreateIndexes: %v", err)
}
}()
}

// Wait for all workers to complete
wg.Wait()
close(results)

// Count how many workers acquired the lock
lockCount := 0
lockSkipCount := 0
for err := range results {
if err == nil {
lockCount++
} else if errors.Is(err, ErrAdvisoryLockAlreadyAcquired) {
lockSkipCount++
} else {
ts.T().Errorf("Unexpected error from CreateIndexes: %v", err)
}
}

// Only one worker should have acquired the lock and created indexes
assert.Equal(ts.T(), 1, lockCount, "Only one worker should acquire the lock and create indexes")
assert.Equal(ts.T(), numWorkers-1, lockSkipCount, "Other workers should skip due to lock")
assert.GreaterOrEqual(ts.T(), successCount, int32(1), "At least one worker should succeed")
assert.Equal(ts.T(), int32(0), errorCount, "No unexpected errors should occur")
assert.Equal(ts.T(), int32(numWorkers), successCount+lockSkipCount, "All workers should either succeed or skip due to lock")

// Verify all indexes were created successfully
// Verify indexes were created correctly regardless of which worker did it
indexes := getUsersIndexes(ts.namespace)
existingIndexes, err := getIndexStatuses(ts.popDB, ts.namespace, getIndexNames(indexes))
require.NoError(ts.T(), err)
Expand Down