From d35513b5c47476f1b77cd7304857320a5269cb96 Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Tue, 13 Jan 2026 09:10:01 -0500 Subject: [PATCH] feat: add migrate package Adds a `migrate` package with a mechanism to run an ordered set of migrations on startup, such as Etcd data migrations. The migrations runner uses the `election` package to ensure that only one server instance is running the migrations at a time. Other instances will block startup until the migrations are complete, so we should aim to keep migrations as lightweight as possible. As stated in the package documentation, these migrations should be idempotent and, where possible, non-destructive. PLAT-347 --- server/cmd/root.go | 6 +- server/internal/app/app.go | 13 + server/internal/migrate/all_migrations.go | 13 + server/internal/migrate/doc.go | 5 + server/internal/migrate/migration.go | 16 ++ server/internal/migrate/provide.go | 65 +++++ server/internal/migrate/result_store.go | 49 ++++ server/internal/migrate/revision_store.go | 44 +++ server/internal/migrate/runner.go | 259 +++++++++++++++++ server/internal/migrate/runner_test.go | 320 ++++++++++++++++++++++ server/internal/migrate/store.go | 20 ++ 11 files changed, 808 insertions(+), 2 deletions(-) create mode 100644 server/internal/migrate/all_migrations.go create mode 100644 server/internal/migrate/doc.go create mode 100644 server/internal/migrate/migration.go create mode 100644 server/internal/migrate/provide.go create mode 100644 server/internal/migrate/result_store.go create mode 100644 server/internal/migrate/revision_store.go create mode 100644 server/internal/migrate/runner.go create mode 100644 server/internal/migrate/runner_test.go create mode 100644 server/internal/migrate/store.go diff --git a/server/cmd/root.go b/server/cmd/root.go index cd12050b..b5ff0a7e 100644 --- a/server/cmd/root.go +++ b/server/cmd/root.go @@ -3,22 +3,23 @@ package cmd import ( "fmt" - "github.com/pgEdge/control-plane/server/internal/cluster" - "github.com/pgEdge/control-plane/server/internal/election" "github.com/rs/zerolog" "github.com/samber/do" "github.com/spf13/cobra" "github.com/pgEdge/control-plane/server/internal/api" "github.com/pgEdge/control-plane/server/internal/certificates" + "github.com/pgEdge/control-plane/server/internal/cluster" "github.com/pgEdge/control-plane/server/internal/config" "github.com/pgEdge/control-plane/server/internal/database" "github.com/pgEdge/control-plane/server/internal/docker" + "github.com/pgEdge/control-plane/server/internal/election" "github.com/pgEdge/control-plane/server/internal/etcd" "github.com/pgEdge/control-plane/server/internal/filesystem" "github.com/pgEdge/control-plane/server/internal/host" "github.com/pgEdge/control-plane/server/internal/ipam" "github.com/pgEdge/control-plane/server/internal/logging" + "github.com/pgEdge/control-plane/server/internal/migrate" "github.com/pgEdge/control-plane/server/internal/monitor" "github.com/pgEdge/control-plane/server/internal/orchestrator" "github.com/pgEdge/control-plane/server/internal/orchestrator/swarm" @@ -69,6 +70,7 @@ func newRootCmd(i *do.Injector) *cobra.Command { host.Provide(i) ipam.Provide(i) logging.Provide(i) + migrate.Provide(i) monitor.Provide(i) resource.Provide(i) scheduler.Provide(i) diff --git a/server/internal/app/app.go b/server/internal/app/app.go index 02a9f7f1..51b3ca69 100644 --- a/server/internal/app/app.go +++ b/server/internal/app/app.go @@ -15,6 +15,7 @@ import ( "github.com/pgEdge/control-plane/server/internal/database" "github.com/pgEdge/control-plane/server/internal/etcd" "github.com/pgEdge/control-plane/server/internal/host" + "github.com/pgEdge/control-plane/server/internal/migrate" "github.com/pgEdge/control-plane/server/internal/monitor" "github.com/pgEdge/control-plane/server/internal/scheduler" "github.com/pgEdge/control-plane/server/internal/workflows" @@ -110,6 +111,15 @@ func (a *App) runInitialized(ctx context.Context) error { return err } + // Run migrations before starting other services + migrationRunner, err := do.Invoke[*migrate.Runner](a.i) + if err != nil { + return handleError(fmt.Errorf("failed to initialize migration runner: %w", err)) + } + if err := migrationRunner.Run(ctx); err != nil { + return handleError(fmt.Errorf("failed to run migrations: %w", err)) + } + certSvc, err := do.Invoke[*certificates.Service](a.i) if err != nil { return handleError(fmt.Errorf("failed to initialize certificate service: %w", err)) @@ -117,6 +127,7 @@ func (a *App) runInitialized(ctx context.Context) error { if err := certSvc.Start(ctx); err != nil { return handleError(fmt.Errorf("failed to start certificate service: %w", err)) } + hostSvc, err := do.Invoke[*host.Service](a.i) if err != nil { return handleError(fmt.Errorf("failed to initialize host service: %w", err)) @@ -124,11 +135,13 @@ func (a *App) runInitialized(ctx context.Context) error { if err := hostSvc.UpdateHost(ctx); err != nil { return handleError(fmt.Errorf("failed to update host: %w", err)) } + hostTicker, err := do.Invoke[*host.UpdateTicker](a.i) if err != nil { return handleError(fmt.Errorf("failed to initialize host ticker: %w", err)) } hostTicker.Start(ctx) + monitorSvc, err := do.Invoke[*monitor.Service](a.i) if err != nil { return handleError(fmt.Errorf("failed to initialize monitor service: %w", err)) diff --git a/server/internal/migrate/all_migrations.go b/server/internal/migrate/all_migrations.go new file mode 100644 index 00000000..2a5222be --- /dev/null +++ b/server/internal/migrate/all_migrations.go @@ -0,0 +1,13 @@ +package migrate + +// allMigrations returns the ordered list of migrations. +// Order matters - migrations are executed in slice order. +// Add new migrations to this list in chronological order. +func allMigrations() []Migration { + return []Migration{ + // Add migrations here in chronological order + // Example: + // &AddHostMetadataField{}, + // &RenameDatabaseStatus{}, + } +} diff --git a/server/internal/migrate/doc.go b/server/internal/migrate/doc.go new file mode 100644 index 00000000..8161c277 --- /dev/null +++ b/server/internal/migrate/doc.go @@ -0,0 +1,5 @@ +// Package migrate provides a mechanism for arbitrary migration operations that +// should block startup, such as moving Etcd objects from one key to another. +// IMPORTANT: migrations _must_ be written to be idempotent, and we should +// prefer non-destructive updates in order to allow rollbacks. +package migrate diff --git a/server/internal/migrate/migration.go b/server/internal/migrate/migration.go new file mode 100644 index 00000000..048ebb82 --- /dev/null +++ b/server/internal/migrate/migration.go @@ -0,0 +1,16 @@ +package migrate + +import ( + "context" + + "github.com/samber/do" +) + +// Migration defines the interface for data migrations. +type Migration interface { + // Identifier returns a unique semantic name for this migration. + Identifier() string + // Run executes the migration using dependencies from the injector. + // The context should be used for cancellation and timeouts. + Run(ctx context.Context, i *do.Injector) error +} diff --git a/server/internal/migrate/provide.go b/server/internal/migrate/provide.go new file mode 100644 index 00000000..e3945821 --- /dev/null +++ b/server/internal/migrate/provide.go @@ -0,0 +1,65 @@ +package migrate + +import ( + "time" + + "github.com/rs/zerolog" + "github.com/samber/do" + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/pgEdge/control-plane/server/internal/config" + "github.com/pgEdge/control-plane/server/internal/election" +) + +const ElectionName = election.Name("migration_runner") +const LockTTL time.Duration = 30 * time.Second + +// Provide registers migration dependencies with the injector. +func Provide(i *do.Injector) { + provideStore(i) + provideRunner(i) +} + +func provideStore(i *do.Injector) { + do.Provide(i, func(i *do.Injector) (*Store, error) { + cfg, err := do.Invoke[config.Config](i) + if err != nil { + return nil, err + } + client, err := do.Invoke[*clientv3.Client](i) + if err != nil { + return nil, err + } + return NewStore(client, cfg.EtcdKeyRoot), nil + }) +} + +func provideRunner(i *do.Injector) { + do.Provide(i, func(i *do.Injector) (*Runner, error) { + store, err := do.Invoke[*Store](i) + if err != nil { + return nil, err + } + cfg, err := do.Invoke[config.Config](i) + if err != nil { + return nil, err + } + logger, err := do.Invoke[zerolog.Logger](i) + if err != nil { + return nil, err + } + electionSvc, err := do.Invoke[*election.Service](i) + if err != nil { + return nil, err + } + locker := electionSvc.NewCandidate(ElectionName, cfg.HostID, LockTTL) + return NewRunner( + cfg.HostID, + store, + i, + logger, + allMigrations(), + locker, + ), nil + }) +} diff --git a/server/internal/migrate/result_store.go b/server/internal/migrate/result_store.go new file mode 100644 index 00000000..89f1524a --- /dev/null +++ b/server/internal/migrate/result_store.go @@ -0,0 +1,49 @@ +package migrate + +import ( + "time" + + "github.com/pgEdge/control-plane/server/internal/storage" + "github.com/pgEdge/control-plane/server/internal/version" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// StoredResult tracks the outcome of a specific migration. +type StoredResult struct { + storage.StoredValue + Identifier string `json:"identifier"` + Successful bool `json:"successful"` + StartedAt time.Time `json:"started_at"` + CompletedAt time.Time `json:"completed_at"` + RunByHostID string `json:"run_by_host_id"` + RunByVersionInfo *version.Info `json:"run_by_version_info"` + Error string `json:"error,omitempty"` +} + +type ResultStore struct { + client *clientv3.Client + root string +} + +func NewResultStore(client *clientv3.Client, root string) *ResultStore { + return &ResultStore{ + client: client, + root: root, + } +} + +func (s *ResultStore) Prefix() string { + return storage.Prefix(s.root, "migrations", "results") +} + +func (s *ResultStore) Key(identifier string) string { + return storage.Key(s.Prefix(), identifier) +} + +func (s *ResultStore) Get(identifier string) storage.GetOp[*StoredResult] { + return storage.NewGetOp[*StoredResult](s.client, s.Key(identifier)) +} + +func (s *ResultStore) Put(item *StoredResult) storage.PutOp[*StoredResult] { + return storage.NewPutOp(s.client, s.Key(item.Identifier), item) +} diff --git a/server/internal/migrate/revision_store.go b/server/internal/migrate/revision_store.go new file mode 100644 index 00000000..c988f46c --- /dev/null +++ b/server/internal/migrate/revision_store.go @@ -0,0 +1,44 @@ +package migrate + +import ( + "github.com/pgEdge/control-plane/server/internal/storage" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// StoredRevision tracks the most recently applied migration. +type StoredRevision struct { + storage.StoredValue + Identifier string `json:"identifier"` +} + +type RevisionStore struct { + client *clientv3.Client + root string +} + +func NewRevisionStore(client *clientv3.Client, root string) *RevisionStore { + return &RevisionStore{ + client: client, + root: root, + } +} + +func (s *RevisionStore) Key() string { + return storage.Key(s.root, "migrations", "revision") +} + +func (s *RevisionStore) Get() storage.GetOp[*StoredRevision] { + return storage.NewGetOp[*StoredRevision](s.client, s.Key()) +} + +func (s *RevisionStore) Create(item *StoredRevision) storage.PutOp[*StoredRevision] { + return storage.NewCreateOp(s.client, s.Key(), item) +} + +func (s *RevisionStore) Update(item *StoredRevision) storage.PutOp[*StoredRevision] { + return storage.NewUpdateOp(s.client, s.Key(), item) +} + +func (s *RevisionStore) Watch() storage.WatchOp[*StoredRevision] { + return storage.NewWatchOp[*StoredRevision](s.client, s.Key()) +} diff --git a/server/internal/migrate/runner.go b/server/internal/migrate/runner.go new file mode 100644 index 00000000..4ed4339e --- /dev/null +++ b/server/internal/migrate/runner.go @@ -0,0 +1,259 @@ +package migrate + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/rs/zerolog" + "github.com/samber/do" + + "github.com/pgEdge/control-plane/server/internal/election" + "github.com/pgEdge/control-plane/server/internal/storage" + "github.com/pgEdge/control-plane/server/internal/version" +) + +// migrations should take on the order of seconds at a maximum, but we're going +// to be overly cautious just in case since this can prevent startup. +const migrationTimeout = 5 * time.Minute + +// Runner orchestrates migration execution with distributed locking. +type Runner struct { + hostID string + store *Store + injector *do.Injector + logger zerolog.Logger + migrations []Migration + candidate *election.Candidate + watchOp storage.WatchOp[*StoredRevision] + errCh chan error + doneCh chan struct{} + doneOnce sync.Once + versionInfo *version.Info +} + +// NewRunner creates a new migration runner. +func NewRunner( + hostID string, + store *Store, + injector *do.Injector, + logger zerolog.Logger, + migrations []Migration, + candidate *election.Candidate, +) *Runner { + return &Runner{ + hostID: hostID, + store: store, + injector: injector, + logger: logger.With(). + Str("component", "migration_runner"). + Logger(), + migrations: migrations, + candidate: candidate, + errCh: make(chan error, 1), + doneCh: make(chan struct{}), + } +} + +// Run executes any pending migrations if this runner wins the election, +// otherwise waits until the current revision reaches its target. +func (r *Runner) Run(ctx context.Context) error { + hasPendingMigrations, err := r.hasPendingMigrations(ctx) + if err != nil { + return err + } + if !hasPendingMigrations { + return nil + } + + // failure to get version info is non-fatal + versionInfo, _ := version.GetInfo() + r.versionInfo = versionInfo + + ctx, cancel := context.WithTimeout(ctx, migrationTimeout) + defer cancel() + + r.watch(ctx) + defer r.watchOp.Close() + + r.candidate.AddHandlers(func(_ context.Context) { + if err := r.runMigrations(ctx); err != nil { + r.errCh <- err + } + }) + if err := r.candidate.Start(ctx); err != nil { + return fmt.Errorf("failed to initialize locker: %w", err) + } + defer r.candidate.Stop(ctx) + + // Block until either the migrations complete, we timeout, or we encounter + // an error. + select { + case <-r.doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("context cancelled while waiting for migrations: %w", ctx.Err()) + case err := <-r.errCh: + return err + } +} + +func (r *Runner) watch(ctx context.Context) { + r.logger.Debug().Msg("starting watch") + + if len(r.migrations) == 0 { + r.errCh <- errors.New("watch called with empty migrations list") + return + } + targetRevision := r.migrations[len(r.migrations)-1].Identifier() + + // Ensure that any previous watches were closed. Close thread-safe and + // idempotent. + if r.watchOp != nil { + r.watchOp.Close() + } + + // Since we're not specifying a start version on the watch, this will always + // fire for the current revision. + r.watchOp = r.store.Revision.Watch() + err := r.watchOp.Watch(ctx, func(evt *storage.Event[*StoredRevision]) { + switch evt.Type { + case storage.EventTypePut: + if evt.Value.Identifier == targetRevision { + r.doneOnce.Do(func() { + close(r.doneCh) + }) + } + case storage.EventTypeError: + r.logger.Debug().Err(evt.Err).Msg("encountered a watch error") + if errors.Is(evt.Err, storage.ErrWatchClosed) { + r.watch(ctx) + } + } + }) + if err != nil { + r.errCh <- fmt.Errorf("failed to start watch: %w", err) + } +} + +func (r *Runner) runMigrations(ctx context.Context) error { + currentRevision, err := r.getCurrentRevision(ctx) + if err != nil { + return err + } + + startIndex := r.findStartIndex(currentRevision) + if startIndex >= len(r.migrations) { + r.logger.Info().Msg("control-plane db is up to date, no migrations to run") + return nil + } + + for i := startIndex; i < len(r.migrations); i++ { + migration := r.migrations[i] + identifier := migration.Identifier() + + if err := r.runMigration(ctx, migration); err != nil { + r.logger.Err(err). + Str("migration", identifier). + Msg("run migrations error, stopping migrations") + return err + } + + if err := r.updateRevision(ctx, identifier); err != nil { + return fmt.Errorf("failed to update revision: %w", err) + } + } + + return nil +} + +func (r *Runner) getCurrentRevision(ctx context.Context) (string, error) { + rev, err := r.store.Revision.Get().Exec(ctx) + if errors.Is(err, storage.ErrNotFound) { + return "", nil + } + if err != nil { + return "", fmt.Errorf("failed to get current revision: %w", err) + } + return rev.Identifier, nil +} + +func (r *Runner) findStartIndex(currentRevision string) int { + if currentRevision == "" { + return 0 + } + + for i := len(r.migrations) - 1; i >= 0; i-- { + if r.migrations[i].Identifier() == currentRevision { + return i + 1 + } + } + + r.logger.Warn(). + Str("revision", currentRevision). + Msg("current revision not found in migrations list, starting from beginning") + return 0 +} + +func (r *Runner) hasPendingMigrations(ctx context.Context) (bool, error) { + if len(r.migrations) == 0 { + r.logger.Info().Msg("no migrations to run") + return false, nil + } + + currentRevision, err := r.getCurrentRevision(ctx) + if err != nil { + return false, err + } + + startIndex := r.findStartIndex(currentRevision) + if startIndex >= len(r.migrations) { + r.logger.Info().Msg("control-plane db is up to date, no migrations to run") + return false, nil + } + + return true, nil +} + +func (r *Runner) runMigration(ctx context.Context, migration Migration) error { + identifier := migration.Identifier() + r.logger.Info().Str("migration", identifier).Msg("running migration") + + stored := &StoredResult{ + Identifier: identifier, + StartedAt: time.Now(), + } + err := migration.Run(ctx, r.injector) + if err != nil { + stored.Error = err.Error() + } else { + stored.Successful = true + } + stored.CompletedAt = time.Now() + stored.RunByHostID = r.hostID + stored.RunByVersionInfo = r.versionInfo + + if storeErr := r.store.Result.Put(stored).Exec(ctx); storeErr != nil { + return fmt.Errorf("failed to store migration result: %w", storeErr) + } + + if err != nil { + return fmt.Errorf("migration failed: %w", err) + } + + return nil +} + +func (r *Runner) updateRevision(ctx context.Context, identifier string) error { + rev, err := r.store.Revision.Get().Exec(ctx) + if errors.Is(err, storage.ErrNotFound) { + return r.store.Revision.Create(&StoredRevision{Identifier: identifier}).Exec(ctx) + } + if err != nil { + return err + } + rev.Identifier = identifier + return r.store.Revision.Update(rev).Exec(ctx) +} diff --git a/server/internal/migrate/runner_test.go b/server/internal/migrate/runner_test.go new file mode 100644 index 00000000..603a7419 --- /dev/null +++ b/server/internal/migrate/runner_test.go @@ -0,0 +1,320 @@ +package migrate_test + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/google/uuid" + "github.com/rs/zerolog" + "github.com/samber/do" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/pgEdge/control-plane/server/internal/election" + "github.com/pgEdge/control-plane/server/internal/migrate" + "github.com/pgEdge/control-plane/server/internal/storage/storagetest" + "github.com/pgEdge/control-plane/server/internal/testutils" +) + +func TestRunner(t *testing.T) { + server := storagetest.NewEtcdTestServer(t) + client := server.Client(t) + logger := testutils.Logger(t) + + t.Run("acquires lock and runs migrations", func(t *testing.T) { + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + var ran bool + m := &runnerMockMigration{ + id: "test-migration", + runFunc: func(_ context.Context, _ *do.Injector) error { + ran = true + return nil + }, + } + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m}, candidate) + err := runner.Run(t.Context()) + require.NoError(t, err) + assert.True(t, ran, "migration should have run") + }) + + t.Run("multiple runners", func(t *testing.T) { + // Starts two concurrent runners and asserts that they both exit + // successfully and that the migration is only run once. + + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + var ranOnce atomic.Bool + var ranTwice atomic.Bool + m := &runnerMockMigration{ + id: "test-migration", + runFunc: func(_ context.Context, _ *do.Injector) error { + if ranOnce.Load() { + ranTwice.Store(true) + } else { + ranOnce.Store(true) + } + return nil + }, + } + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m}, candidate) + require.NoError(t, runner.Run(t.Context())) + }() + + wg.Add(1) + go func() { + defer wg.Done() + + candidate := testCandidate(t, electionSvc, "host-2") + runner := migrate.NewRunner("host-2", store, i, logger, []migrate.Migration{m}, candidate) + require.NoError(t, runner.Run(t.Context())) + }() + + wg.Wait() + + assert.True(t, ranOnce.Load()) + assert.False(t, ranTwice.Load()) + }) +} + +func TestRunnerMigrationOrdering(t *testing.T) { + server := storagetest.NewEtcdTestServer(t) + client := server.Client(t) + logger := zerolog.Nop() + + t.Run("runs migrations in order", func(t *testing.T) { + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + var order []string + m1 := &runnerMockMigration{ + id: "migration-1", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-1") + return nil + }, + } + m2 := &runnerMockMigration{ + id: "migration-2", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-2") + return nil + }, + } + m3 := &runnerMockMigration{ + id: "migration-3", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-3") + return nil + }, + } + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m1, m2, m3}, candidate) + err := runner.Run(t.Context()) + require.NoError(t, err) + + assert.Equal(t, []string{"migration-1", "migration-2", "migration-3"}, order) + }) + + t.Run("starts from current revision", func(t *testing.T) { + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + // Pre-set revision to migration-2 + err := store.Revision.Create(&migrate.StoredRevision{Identifier: "migration-2"}).Exec(t.Context()) + require.NoError(t, err) + + var order []string + m1 := &runnerMockMigration{ + id: "migration-1", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-1") + return nil + }, + } + m2 := &runnerMockMigration{ + id: "migration-2", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-2") + return nil + }, + } + m3 := &runnerMockMigration{ + id: "migration-3", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-3") + return nil + }, + } + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m1, m2, m3}, candidate) + err = runner.Run(t.Context()) + require.NoError(t, err) + + // Should only run migration-3 + assert.Equal(t, []string{"migration-3"}, order) + }) + + t.Run("stops on first failure", func(t *testing.T) { + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + var order []string + m1 := &runnerMockMigration{ + id: "migration-1", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-1") + return nil + }, + } + m2 := &runnerMockMigration{ + id: "migration-2", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-2") + return errors.New("migration failed") + }, + } + m3 := &runnerMockMigration{ + id: "migration-3", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-3") + return nil + }, + } + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m1, m2, m3}, candidate) + err := runner.Run(t.Context()) + assert.ErrorContains(t, err, "migration failed") + + // Should stop after migration-2 fails + assert.Equal(t, []string{"migration-1", "migration-2"}, order) + }) + + t.Run("records status for each migration", func(t *testing.T) { + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + m1 := &runnerMockMigration{id: "migration-1"} + m2 := &runnerMockMigration{id: "migration-2", err: errors.New("failed")} + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m1, m2}, candidate) + err := runner.Run(t.Context()) + assert.ErrorContains(t, err, "failed") + + status1, err := store.Result.Get("migration-1").Exec(t.Context()) + require.NoError(t, err) + assert.True(t, status1.Successful) + assert.Equal(t, "host-1", status1.RunByHostID) + assert.NotEmpty(t, status1.StartedAt) + assert.NotEmpty(t, status1.CompletedAt) + assert.NotNil(t, status1.RunByVersionInfo) + + status2, err := store.Result.Get("migration-2").Exec(t.Context()) + require.NoError(t, err) + assert.False(t, status2.Successful) + assert.Equal(t, "failed", status2.Error) + assert.Equal(t, "host-1", status2.RunByHostID) + assert.NotEmpty(t, status2.StartedAt) + assert.NotEmpty(t, status2.CompletedAt) + assert.NotNil(t, status2.RunByVersionInfo) + }) + + t.Run("updates revision after each successful migration", func(t *testing.T) { + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + m1 := &runnerMockMigration{id: "migration-1"} + m2 := &runnerMockMigration{id: "migration-2"} + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m1, m2}, candidate) + err := runner.Run(t.Context()) + require.NoError(t, err) + + rev, err := store.Revision.Get().Exec(t.Context()) + require.NoError(t, err) + assert.Equal(t, "migration-2", rev.Identifier) + }) + + t.Run("does not update revision after failed migration", func(t *testing.T) { + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + m1 := &runnerMockMigration{id: "migration-1"} + m2 := &runnerMockMigration{id: "migration-2", err: errors.New("failed")} + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m1, m2}, candidate) + err := runner.Run(t.Context()) + assert.ErrorContains(t, err, "failed") + + rev, err := store.Revision.Get().Exec(t.Context()) + require.NoError(t, err) + assert.Equal(t, "migration-1", rev.Identifier) + }) +} + +type runnerMockMigration struct { + id string + err error + runFunc func(context.Context, *do.Injector) error +} + +func (m *runnerMockMigration) Identifier() string { + return m.id +} + +func (m *runnerMockMigration) Run(ctx context.Context, i *do.Injector) error { + if m.runFunc != nil { + return m.runFunc(ctx, i) + } + return m.err +} + +func testCandidate(t *testing.T, electionSvc *election.Service, holderID string) *election.Candidate { + t.Helper() + + candidate := electionSvc.NewCandidate(migrate.ElectionName, holderID, 30*time.Second) + + t.Cleanup(func() { + candidate.Stop(context.Background()) + }) + + return candidate +} diff --git a/server/internal/migrate/store.go b/server/internal/migrate/store.go new file mode 100644 index 00000000..0f2d5e04 --- /dev/null +++ b/server/internal/migrate/store.go @@ -0,0 +1,20 @@ +// server/internal/migrate/store.go +package migrate + +import ( + clientv3 "go.etcd.io/etcd/client/v3" +) + +// Store wraps all migration-related stores. +type Store struct { + Revision *RevisionStore + Result *ResultStore +} + +// NewStore creates a new composite migration store. +func NewStore(client *clientv3.Client, root string) *Store { + return &Store{ + Revision: NewRevisionStore(client, root), + Result: NewResultStore(client, root), + } +}