diff --git a/server/cmd/root.go b/server/cmd/root.go index 105b3496..cd12050b 100644 --- a/server/cmd/root.go +++ b/server/cmd/root.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/pgEdge/control-plane/server/internal/cluster" + "github.com/pgEdge/control-plane/server/internal/election" "github.com/rs/zerolog" "github.com/samber/do" "github.com/spf13/cobra" @@ -62,6 +63,7 @@ func newRootCmd(i *do.Injector) *cobra.Command { certificates.Provide(i) database.Provide(i) docker.Provide(i) + election.Provide(i) etcd.Provide(i) filesystem.Provide(i) host.Provide(i) diff --git a/server/internal/election/candidate.go b/server/internal/election/candidate.go new file mode 100644 index 00000000..2d2992c4 --- /dev/null +++ b/server/internal/election/candidate.go @@ -0,0 +1,290 @@ +package election + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/pgEdge/control-plane/server/internal/storage" + "github.com/rs/zerolog" +) + +// ClaimHandler is a callback function invoked when a candidate successfully +// claims leadership. Handlers are executed in separate goroutines. +type ClaimHandler func(ctx context.Context) + +// Candidate participates in a single named election. Once elected, candidate +// maintains leadership until its Stop method is called or until it fails to +// renew its leadership claim. +type Candidate struct { + mu sync.Mutex + running bool + store *ElectionStore + logger zerolog.Logger + electionName Name + candidateID string + isLeader atomic.Bool + ttl time.Duration + watchOp storage.WatchOp[*StoredElection] + ticker *time.Ticker + done chan struct{} + errCh chan error + onClaim []ClaimHandler +} + +// NewCandidate creates a new Candidate instance to participate in the specified +// election. The candidateID uniquely identifies this participant. The ttl +// determines how long a leadership claim remains valid without renewal. The +// onClaim handlers are invoked when this candidate successfully claims +// leadership. +func NewCandidate( + store *ElectionStore, + logger zerolog.Logger, + electionName Name, + candidateID string, + ttl time.Duration, + onClaim []ClaimHandler, +) *Candidate { + return &Candidate{ + store: store, + logger: logger.With(). + Str("component", "election_candidate"). + Stringer("election_name", electionName). + Str("candidate_id", candidateID). + Logger(), + electionName: electionName, + candidateID: candidateID, + ttl: ttl, + errCh: make(chan error, 1), + onClaim: onClaim, + } +} + +// Start begins participating in the election. It synchronously attempts to +// claim leadership and starts an asynchronous process to periodically refresh +// its claim or re-attempt to claim leadership. Start is idempotent. +func (c *Candidate) Start(ctx context.Context) error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.running { + return nil + } + + if err := c.checkClaim(ctx); err != nil { + return err + } + + // we're intentionally not assigning e.done and e.ticker directly to capture + // these variables in this closure and avoid a data race if start is called + // again. + done := make(chan struct{}, 1) + ticker := time.NewTicker(c.ttl / 3) + + go func() { + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-done: + return + case <-ticker.C: + if err := c.lockAndCheckClaim(ctx); err != nil { + c.errCh <- err + } + } + } + }() + + c.running = true + c.done = done + c.ticker = ticker + + go c.watch(ctx) + + if c.IsLeader() { + c.logger.Debug().Msg("i am the current leader") + } + + return nil +} + +// IsLeader returns true if this candidate currently holds leadership. +func (c *Candidate) IsLeader() bool { + return c.isLeader.Load() +} + +// Stop ceases participation in the election, releases leadership if held, and +// stops the asynchronous renewal process. Stop is idempotent. +func (c *Candidate) Stop(ctx context.Context) error { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.running { + return nil + } + + if c.watchOp != nil { + c.watchOp.Close() + c.watchOp = nil + } + + c.done <- struct{}{} + c.running = false + + return c.release(ctx) +} + +// Error returns a channel that receives errors encountered during election +// operations such as claim attempts, renewals, or watch failures. +func (c *Candidate) Error() <-chan error { + return c.errCh +} + +func (c *Candidate) AddHandlers(handlers ...ClaimHandler) { + c.mu.Lock() + defer c.mu.Unlock() + + c.onClaim = append(c.onClaim, handlers...) +} + +func (c *Candidate) lockAndCheckClaim(ctx context.Context) error { + c.mu.Lock() + defer c.mu.Unlock() + + return c.checkClaim(ctx) +} + +func (c *Candidate) checkClaim(ctx context.Context) error { + const maxRetries = 3 + for range maxRetries { + curr, err := c.store.GetByKey(c.electionName).Exec(ctx) + switch { + case errors.Is(err, storage.ErrNotFound): + return c.attemptClaim(ctx) + case err != nil: + return fmt.Errorf("failed to check for existing leader: %w", err) + } + + c.isLeader.Store(curr.LeaderID == c.candidateID) + if curr.LeaderID != c.candidateID { + return nil + } + + err = c.store.Update(curr). + WithTTL(c.ttl). + Exec(ctx) + switch { + case errors.Is(err, storage.ErrValueVersionMismatch): + // Can happen if we caught the claim right before it expired. The + // continue will retry the operation. When we re-fetch the claim, it + // will either not exist or it will belong to someone else. + continue + case err != nil: + return fmt.Errorf("failed to refresh claim: %w", err) + } + + return nil + } + return fmt.Errorf("failed to refresh claim after %d retries", maxRetries) +} + +func (c *Candidate) attemptClaim(ctx context.Context) error { + c.logger.Debug().Msg("attempting to claim leadership") + + err := c.store. + Create(&StoredElection{ + Name: c.electionName, + LeaderID: c.candidateID, + CreatedAt: time.Now(), + }). + WithTTL(c.ttl). + Exec(ctx) + switch { + case err == nil: + c.isLeader.Store(true) + c.logger.Debug().Msg("successfully claimed leadership") + for _, handler := range c.onClaim { + go handler(ctx) + } + case !errors.Is(err, storage.ErrAlreadyExists): + return fmt.Errorf("failed to claim leadership: %w", err) + default: + c.isLeader.Store(false) + } + + return nil +} + +func (c *Candidate) watch(ctx context.Context) { + c.mu.Lock() + defer c.mu.Unlock() + + c.logger.Debug().Msg("starting watch") + + c.watchOp = c.store.Watch(c.electionName) + err := c.watchOp.Watch(ctx, func(evt *storage.Event[*StoredElection]) { + switch evt.Type { + case storage.EventTypeDelete: + // The delete event will fire simultaneously with the ticker in some + // types of outages, so the claim might have already been created + // when this handler runs, even though its for a 'delete' event. + if err := c.lockAndCheckClaim(ctx); err != nil { + c.errCh <- err + } + case storage.EventTypeError: + c.logger.Debug().Err(evt.Err).Msg("encountered a watch error") + if errors.Is(evt.Err, storage.ErrWatchClosed) && c.running { + // Restart the watch if we're still running. + c.watch(ctx) + } + } + }) + if err != nil { + c.errCh <- fmt.Errorf("failed to start watch: %w", err) + } +} + +func (c *Candidate) release(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, c.ttl) + defer cancel() + + if !c.isLeader.Load() { + return nil + } + + curr, err := c.store.GetByKey(c.electionName).Exec(ctx) + switch { + case errors.Is(err, storage.ErrNotFound): + // Happens when the claim has expired since the last time we checked it + // and no one else has claimed it. + c.isLeader.Store(false) + return nil + case err != nil: + return fmt.Errorf("failed to fetch current leader: %w", err) + case curr.LeaderID != c.candidateID: + // Happens when the claim has expired since the last time we checked it + // and someone else has claimed it. + c.isLeader.Store(false) + return nil + } + + err = c.store.Delete(curr).Exec(ctx) + switch { + case errors.Is(err, storage.ErrValueVersionMismatch): + // Happens when the claim has expired after the above check and someone + // else has claimed it. + c.isLeader.Store(false) + return nil + case err != nil: + return fmt.Errorf("failed to release leadership claim: %w", err) + } + + c.isLeader.Store(false) + return nil +} diff --git a/server/internal/election/candidate_test.go b/server/internal/election/candidate_test.go new file mode 100644 index 00000000..734d3d29 --- /dev/null +++ b/server/internal/election/candidate_test.go @@ -0,0 +1,101 @@ +package election_test + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/pgEdge/control-plane/server/internal/election" + "github.com/pgEdge/control-plane/server/internal/storage/storagetest" + "github.com/pgEdge/control-plane/server/internal/testutils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCandidate(t *testing.T) { + server := storagetest.NewEtcdTestServer(t) + client := server.Client(t) + logger := testutils.Logger(t) + store := election.NewElectionStore(client, uuid.NewString()) + electionSvc := election.NewService(store, logger) + + t.Run("basic functionality", func(t *testing.T) { + ctx := t.Context() + name := election.Name(uuid.NewString()) + candidate := electionSvc.NewCandidate(name, "host-1", time.Second) + + t.Cleanup(func() { + candidate.Stop(context.Background()) + }) + + // Calling Stop is fine when the candidate is not running + require.NoError(t, candidate.Stop(ctx)) + + require.NoError(t, candidate.Start(ctx)) + // running unopposed, should always be the leader + assert.True(t, candidate.IsLeader()) + + // Wait until we're sure the TTL has expired at least once and that the + // claim has been refreshed. + time.Sleep(2 * time.Second) + assert.True(t, candidate.IsLeader()) + + // No error from running start again + require.NoError(t, candidate.Start(ctx)) + assert.True(t, candidate.IsLeader()) + + require.NoError(t, candidate.Stop(ctx)) + assert.False(t, candidate.IsLeader()) + + // No error from running stop again + require.NoError(t, candidate.Stop(ctx)) + assert.False(t, candidate.IsLeader()) + + // can be started again after being stopped + require.NoError(t, candidate.Start(ctx)) + assert.True(t, candidate.IsLeader()) + + // Subsequent stop works as normal + require.NoError(t, candidate.Stop(ctx)) + assert.False(t, candidate.IsLeader()) + }) + + t.Run("multiple candidates", func(t *testing.T) { + bElected := make(chan struct{}, 1) + + ctx := t.Context() + name := election.Name(uuid.NewString()) + candidateA := electionSvc.NewCandidate(name, "host-1", 30*time.Second) + candidateB := electionSvc.NewCandidate(name, "host-2", 30*time.Second, func(ctx context.Context) { + bElected <- struct{}{} + }) + + t.Cleanup(func() { + candidateA.Stop(context.Background()) + candidateB.Stop(context.Background()) + }) + + require.NoError(t, candidateA.Start(ctx)) + assert.True(t, candidateA.IsLeader()) + + require.NoError(t, candidateB.Start(ctx)) + assert.False(t, candidateB.IsLeader()) + + // Candidate B should take over after we stop candidate A + require.NoError(t, candidateA.Stop(ctx)) + assert.False(t, candidateA.IsLeader()) + + // Block until B has claimed leadership or we time out + select { + case <-bElected: + // B claimed leadership + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for candidate B to claim leadership") + } + assert.True(t, candidateB.IsLeader()) + + require.NoError(t, candidateB.Stop(ctx)) + assert.False(t, candidateB.IsLeader()) + }) +} diff --git a/server/internal/election/provide.go b/server/internal/election/provide.go new file mode 100644 index 00000000..5e3a5f74 --- /dev/null +++ b/server/internal/election/provide.go @@ -0,0 +1,41 @@ +package election + +import ( + "github.com/pgEdge/control-plane/server/internal/config" + "github.com/rs/zerolog" + "github.com/samber/do" + clientv3 "go.etcd.io/etcd/client/v3" +) + +func Provide(i *do.Injector) { + provideElectionStore(i) + provideService(i) +} + +func provideElectionStore(i *do.Injector) { + do.Provide(i, func(i *do.Injector) (*ElectionStore, error) { + client, err := do.Invoke[*clientv3.Client](i) + if err != nil { + return nil, err + } + cfg, err := do.Invoke[config.Config](i) + if err != nil { + return nil, err + } + return NewElectionStore(client, cfg.EtcdKeyRoot), nil + }) +} + +func provideService(i *do.Injector) { + do.Provide(i, func(i *do.Injector) (*Service, error) { + store, err := do.Invoke[*ElectionStore](i) + if err != nil { + return nil, err + } + logger, err := do.Invoke[zerolog.Logger](i) + if err != nil { + return nil, err + } + return NewService(store, logger), nil + }) +} diff --git a/server/internal/election/service.go b/server/internal/election/service.go new file mode 100644 index 00000000..8c488a77 --- /dev/null +++ b/server/internal/election/service.go @@ -0,0 +1,30 @@ +package election + +import ( + "time" + + "github.com/rs/zerolog" +) + +// Service manages election operations. +type Service struct { + store *ElectionStore + logger zerolog.Logger +} + +// NewService returns a new Service. +func NewService( + store *ElectionStore, + logger zerolog.Logger, +) *Service { + return &Service{ + store: store, + logger: logger, + } +} + +// NewCandidate creates a new Candidate for the given election. candidateID must +// be unique amongst candidates. +func (s *Service) NewCandidate(electionName Name, candidateID string, ttl time.Duration, onClaim ...ClaimHandler) *Candidate { + return NewCandidate(s.store, s.logger, electionName, candidateID, ttl, onClaim) +} diff --git a/server/internal/election/store.go b/server/internal/election/store.go new file mode 100644 index 00000000..251122c1 --- /dev/null +++ b/server/internal/election/store.go @@ -0,0 +1,62 @@ +package election + +import ( + "time" + + "github.com/pgEdge/control-plane/server/internal/storage" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type Name string + +func (n Name) String() string { + return string(n) +} + +type StoredElection struct { + storage.StoredValue + Name Name `json:"name"` + LeaderID string `json:"leader_id"` + CreatedAt time.Time `json:"created_at"` +} + +type ElectionStore struct { + client *clientv3.Client + root string +} + +func NewElectionStore(client *clientv3.Client, root string) *ElectionStore { + return &ElectionStore{ + client: client, + root: root, + } +} + +func (s *ElectionStore) Key(name Name) string { + return storage.Key("/", s.root, "elections", name.String()) +} + +func (s *ElectionStore) GetByKey(name Name) storage.GetOp[*StoredElection] { + key := s.Key(name) + return storage.NewGetOp[*StoredElection](s.client, key) +} + +func (s *ElectionStore) Create(item *StoredElection) storage.PutOp[*StoredElection] { + key := s.Key(item.Name) + return storage.NewCreateOp(s.client, key, item) +} + +func (s *ElectionStore) Update(item *StoredElection) storage.PutOp[*StoredElection] { + key := s.Key(item.Name) + return storage.NewUpdateOp(s.client, key, item) +} + +func (s *ElectionStore) Delete(item *StoredElection) storage.DeleteValueOp[*StoredElection] { + key := s.Key(item.Name) + return storage.NewDeleteValueOp(s.client, key, item) +} + +func (s *ElectionStore) Watch(name Name) storage.WatchOp[*StoredElection] { + key := s.Key(name) + return storage.NewWatchOp[*StoredElection](s.client, key) +} diff --git a/server/internal/scheduler/elector.go b/server/internal/scheduler/elector.go index e206b1ae..7e8656aa 100644 --- a/server/internal/scheduler/elector.go +++ b/server/internal/scheduler/elector.go @@ -3,15 +3,11 @@ package scheduler import ( "context" "errors" - "fmt" - "sync" - "time" "github.com/go-co-op/gocron" - "github.com/rs/zerolog" "github.com/samber/do" - "github.com/pgEdge/control-plane/server/internal/storage" + "github.com/pgEdge/control-plane/server/internal/election" ) var ErrNonLeader = errors.New("the elector is not leader") @@ -20,71 +16,21 @@ var _ gocron.Elector = (*Elector)(nil) var _ do.Shutdownable = (*Elector)(nil) type Elector struct { - mu sync.Mutex - isLeader bool - hostID string - store *LeaderStore - logger zerolog.Logger - ttl time.Duration - watchOp storage.WatchOp[*StoredLeader] - ticker *time.Ticker - done chan struct{} - errCh chan error + candidate *election.Candidate } -func NewElector( - hostID string, - store *LeaderStore, - logger zerolog.Logger, - ttl time.Duration, -) *Elector { +func NewElector(candidate *election.Candidate) *Elector { return &Elector{ - hostID: hostID, - store: store, - logger: logger.With().Str("component", "scheduler_elector").Logger(), - ttl: ttl, - done: make(chan struct{}), - errCh: make(chan error, 1), + candidate: candidate, } } func (e *Elector) Start(ctx context.Context) error { - if err := e.checkClaim(ctx); err != nil { - return err - } - - e.ticker = time.NewTicker(e.ttl / 3) - go func() { - defer e.ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-e.done: - return - case <-e.ticker.C: - if err := e.checkClaim(ctx); err != nil { - e.errCh <- err - } - } - } - }() - - go e.watch(ctx) - - if err := e.IsLeader(ctx); err == nil { - e.logger.Debug().Msg("this host is the scheduler leader") - } - - return nil + return e.candidate.Start(ctx) } func (e *Elector) IsLeader(_ context.Context) error { - e.mu.Lock() - defer e.mu.Unlock() - - if !e.isLeader { + if !e.candidate.IsLeader() { return ErrNonLeader } @@ -92,88 +38,12 @@ func (e *Elector) IsLeader(_ context.Context) error { } func (e *Elector) Shutdown() error { - if e.watchOp != nil { - e.watchOp.Close() - } + ctx, cancel := context.WithTimeout(context.Background(), electionTTL/3) + defer cancel() - close(e.done) - - return nil + return e.candidate.Stop(ctx) } func (e *Elector) Error() <-chan error { - return e.errCh -} - -func (e *Elector) checkClaim(ctx context.Context) error { - e.mu.Lock() - defer e.mu.Unlock() - - leader, err := e.store.GetByKey().Exec(ctx) - switch { - case errors.Is(err, storage.ErrNotFound): - return e.attemptClaim(ctx) - case err != nil: - return fmt.Errorf("failed to check for existing leader: %w", err) - } - - e.isLeader = leader.HostID == e.hostID - if !e.isLeader { - return nil - } - - err = e.store.Update(leader). - WithTTL(e.ttl). - Exec(ctx) - if err != nil { - return fmt.Errorf("failed to refresh claim: %w", err) - } - - return nil -} - -func (e *Elector) attemptClaim(ctx context.Context) error { - e.logger.Debug().Msg("attempting to claim scheduler leadership") - - err := e.store. - Create(&StoredLeader{ - HostID: e.hostID, - CreatedAt: time.Now(), - }). - WithTTL(e.ttl). - Exec(ctx) - switch { - case err == nil: - e.isLeader = true - e.logger.Debug().Msg("successfully claimed scheduler leadership") - case !errors.Is(err, storage.ErrAlreadyExists): - return fmt.Errorf("failed to claim scheduler leadership: %w", err) - default: - e.isLeader = false - } - - return nil -} - -func (e *Elector) watch(ctx context.Context) { - e.watchOp = e.store.Watch() - err := e.watchOp.Watch(ctx, func(evt *storage.Event[*StoredLeader]) { - switch evt.Type { - case storage.EventTypeDelete: - // The delete event will fire simultaneously with the ticker in some - // types of outages, so the claim might have already been created - // when this handler runs, even though its for a 'delete' event. - if err := e.checkClaim(ctx); err != nil { - e.errCh <- err - } - case storage.EventTypeError: - e.logger.Debug().Err(evt.Err).Msg("encountered a watch error") - if errors.Is(evt.Err, storage.ErrWatchClosed) { - defer e.watch(ctx) - } - } - }) - if err != nil { - e.errCh <- fmt.Errorf("failed to start watch: %w", err) - } + return e.candidate.Error() } diff --git a/server/internal/scheduler/provide.go b/server/internal/scheduler/provide.go index f9c018dd..8229b728 100644 --- a/server/internal/scheduler/provide.go +++ b/server/internal/scheduler/provide.go @@ -3,39 +3,29 @@ package scheduler import ( "time" - "github.com/pgEdge/control-plane/server/internal/config" - "github.com/pgEdge/control-plane/server/internal/database" - "github.com/pgEdge/control-plane/server/internal/workflows" "github.com/rs/zerolog" "github.com/samber/do" clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/pgEdge/control-plane/server/internal/config" + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/election" + "github.com/pgEdge/control-plane/server/internal/workflows" ) +const electionName election.Name = "scheduler" +const electionTTL time.Duration = 30 * time.Second + func Provide(i *do.Injector) { - provideLeaderStore(i) provideElector(i) provideScheduledJobStore(i) provideService(i) provideExecutor(i) } -func provideLeaderStore(i *do.Injector) { - do.Provide(i, func(i *do.Injector) (*LeaderStore, error) { - client, err := do.Invoke[*clientv3.Client](i) - if err != nil { - return nil, err - } - cfg, err := do.Invoke[config.Config](i) - if err != nil { - return nil, err - } - return NewLeaderStore(client, cfg.EtcdKeyRoot), nil - }) -} - func provideElector(i *do.Injector) { do.Provide(i, func(i *do.Injector) (*Elector, error) { - store, err := do.Invoke[*LeaderStore](i) + electionSvc, err := do.Invoke[*election.Service](i) if err != nil { return nil, err } @@ -43,11 +33,9 @@ func provideElector(i *do.Injector) { if err != nil { return nil, err } - logger, err := do.Invoke[zerolog.Logger](i) - if err != nil { - return nil, err - } - return NewElector(cfg.HostID, store, logger, 30*time.Second), nil + + candidate := electionSvc.NewCandidate(electionName, cfg.HostID, electionTTL) + return NewElector(candidate), nil }) }