diff --git a/server/cmd/root.go b/server/cmd/root.go index cd12050b..b5ff0a7e 100644 --- a/server/cmd/root.go +++ b/server/cmd/root.go @@ -3,22 +3,23 @@ package cmd import ( "fmt" - "github.com/pgEdge/control-plane/server/internal/cluster" - "github.com/pgEdge/control-plane/server/internal/election" "github.com/rs/zerolog" "github.com/samber/do" "github.com/spf13/cobra" "github.com/pgEdge/control-plane/server/internal/api" "github.com/pgEdge/control-plane/server/internal/certificates" + "github.com/pgEdge/control-plane/server/internal/cluster" "github.com/pgEdge/control-plane/server/internal/config" "github.com/pgEdge/control-plane/server/internal/database" "github.com/pgEdge/control-plane/server/internal/docker" + "github.com/pgEdge/control-plane/server/internal/election" "github.com/pgEdge/control-plane/server/internal/etcd" "github.com/pgEdge/control-plane/server/internal/filesystem" "github.com/pgEdge/control-plane/server/internal/host" "github.com/pgEdge/control-plane/server/internal/ipam" "github.com/pgEdge/control-plane/server/internal/logging" + "github.com/pgEdge/control-plane/server/internal/migrate" "github.com/pgEdge/control-plane/server/internal/monitor" "github.com/pgEdge/control-plane/server/internal/orchestrator" "github.com/pgEdge/control-plane/server/internal/orchestrator/swarm" @@ -69,6 +70,7 @@ func newRootCmd(i *do.Injector) *cobra.Command { host.Provide(i) ipam.Provide(i) logging.Provide(i) + migrate.Provide(i) monitor.Provide(i) resource.Provide(i) scheduler.Provide(i) diff --git a/server/internal/app/app.go b/server/internal/app/app.go index 02a9f7f1..51b3ca69 100644 --- a/server/internal/app/app.go +++ b/server/internal/app/app.go @@ -15,6 +15,7 @@ import ( "github.com/pgEdge/control-plane/server/internal/database" "github.com/pgEdge/control-plane/server/internal/etcd" "github.com/pgEdge/control-plane/server/internal/host" + "github.com/pgEdge/control-plane/server/internal/migrate" "github.com/pgEdge/control-plane/server/internal/monitor" "github.com/pgEdge/control-plane/server/internal/scheduler" "github.com/pgEdge/control-plane/server/internal/workflows" @@ -110,6 +111,15 @@ func (a *App) runInitialized(ctx context.Context) error { return err } + // Run migrations before starting other services + migrationRunner, err := do.Invoke[*migrate.Runner](a.i) + if err != nil { + return handleError(fmt.Errorf("failed to initialize migration runner: %w", err)) + } + if err := migrationRunner.Run(ctx); err != nil { + return handleError(fmt.Errorf("failed to run migrations: %w", err)) + } + certSvc, err := do.Invoke[*certificates.Service](a.i) if err != nil { return handleError(fmt.Errorf("failed to initialize certificate service: %w", err)) @@ -117,6 +127,7 @@ func (a *App) runInitialized(ctx context.Context) error { if err := certSvc.Start(ctx); err != nil { return handleError(fmt.Errorf("failed to start certificate service: %w", err)) } + hostSvc, err := do.Invoke[*host.Service](a.i) if err != nil { return handleError(fmt.Errorf("failed to initialize host service: %w", err)) @@ -124,11 +135,13 @@ func (a *App) runInitialized(ctx context.Context) error { if err := hostSvc.UpdateHost(ctx); err != nil { return handleError(fmt.Errorf("failed to update host: %w", err)) } + hostTicker, err := do.Invoke[*host.UpdateTicker](a.i) if err != nil { return handleError(fmt.Errorf("failed to initialize host ticker: %w", err)) } hostTicker.Start(ctx) + monitorSvc, err := do.Invoke[*monitor.Service](a.i) if err != nil { return handleError(fmt.Errorf("failed to initialize monitor service: %w", err)) diff --git a/server/internal/migrate/all_migrations.go b/server/internal/migrate/all_migrations.go new file mode 100644 index 00000000..2a5222be --- /dev/null +++ b/server/internal/migrate/all_migrations.go @@ -0,0 +1,13 @@ +package migrate + +// allMigrations returns the ordered list of migrations. +// Order matters - migrations are executed in slice order. +// Add new migrations to this list in chronological order. +func allMigrations() []Migration { + return []Migration{ + // Add migrations here in chronological order + // Example: + // &AddHostMetadataField{}, + // &RenameDatabaseStatus{}, + } +} diff --git a/server/internal/migrate/doc.go b/server/internal/migrate/doc.go new file mode 100644 index 00000000..8161c277 --- /dev/null +++ b/server/internal/migrate/doc.go @@ -0,0 +1,5 @@ +// Package migrate provides a mechanism for arbitrary migration operations that +// should block startup, such as moving Etcd objects from one key to another. +// IMPORTANT: migrations _must_ be written to be idempotent, and we should +// prefer non-destructive updates in order to allow rollbacks. +package migrate diff --git a/server/internal/migrate/migration.go b/server/internal/migrate/migration.go new file mode 100644 index 00000000..048ebb82 --- /dev/null +++ b/server/internal/migrate/migration.go @@ -0,0 +1,16 @@ +package migrate + +import ( + "context" + + "github.com/samber/do" +) + +// Migration defines the interface for data migrations. +type Migration interface { + // Identifier returns a unique semantic name for this migration. + Identifier() string + // Run executes the migration using dependencies from the injector. + // The context should be used for cancellation and timeouts. + Run(ctx context.Context, i *do.Injector) error +} diff --git a/server/internal/migrate/provide.go b/server/internal/migrate/provide.go new file mode 100644 index 00000000..e3945821 --- /dev/null +++ b/server/internal/migrate/provide.go @@ -0,0 +1,65 @@ +package migrate + +import ( + "time" + + "github.com/rs/zerolog" + "github.com/samber/do" + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/pgEdge/control-plane/server/internal/config" + "github.com/pgEdge/control-plane/server/internal/election" +) + +const ElectionName = election.Name("migration_runner") +const LockTTL time.Duration = 30 * time.Second + +// Provide registers migration dependencies with the injector. +func Provide(i *do.Injector) { + provideStore(i) + provideRunner(i) +} + +func provideStore(i *do.Injector) { + do.Provide(i, func(i *do.Injector) (*Store, error) { + cfg, err := do.Invoke[config.Config](i) + if err != nil { + return nil, err + } + client, err := do.Invoke[*clientv3.Client](i) + if err != nil { + return nil, err + } + return NewStore(client, cfg.EtcdKeyRoot), nil + }) +} + +func provideRunner(i *do.Injector) { + do.Provide(i, func(i *do.Injector) (*Runner, error) { + store, err := do.Invoke[*Store](i) + if err != nil { + return nil, err + } + cfg, err := do.Invoke[config.Config](i) + if err != nil { + return nil, err + } + logger, err := do.Invoke[zerolog.Logger](i) + if err != nil { + return nil, err + } + electionSvc, err := do.Invoke[*election.Service](i) + if err != nil { + return nil, err + } + locker := electionSvc.NewCandidate(ElectionName, cfg.HostID, LockTTL) + return NewRunner( + cfg.HostID, + store, + i, + logger, + allMigrations(), + locker, + ), nil + }) +} diff --git a/server/internal/migrate/result_store.go b/server/internal/migrate/result_store.go new file mode 100644 index 00000000..89f1524a --- /dev/null +++ b/server/internal/migrate/result_store.go @@ -0,0 +1,49 @@ +package migrate + +import ( + "time" + + "github.com/pgEdge/control-plane/server/internal/storage" + "github.com/pgEdge/control-plane/server/internal/version" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// StoredResult tracks the outcome of a specific migration. +type StoredResult struct { + storage.StoredValue + Identifier string `json:"identifier"` + Successful bool `json:"successful"` + StartedAt time.Time `json:"started_at"` + CompletedAt time.Time `json:"completed_at"` + RunByHostID string `json:"run_by_host_id"` + RunByVersionInfo *version.Info `json:"run_by_version_info"` + Error string `json:"error,omitempty"` +} + +type ResultStore struct { + client *clientv3.Client + root string +} + +func NewResultStore(client *clientv3.Client, root string) *ResultStore { + return &ResultStore{ + client: client, + root: root, + } +} + +func (s *ResultStore) Prefix() string { + return storage.Prefix(s.root, "migrations", "results") +} + +func (s *ResultStore) Key(identifier string) string { + return storage.Key(s.Prefix(), identifier) +} + +func (s *ResultStore) Get(identifier string) storage.GetOp[*StoredResult] { + return storage.NewGetOp[*StoredResult](s.client, s.Key(identifier)) +} + +func (s *ResultStore) Put(item *StoredResult) storage.PutOp[*StoredResult] { + return storage.NewPutOp(s.client, s.Key(item.Identifier), item) +} diff --git a/server/internal/migrate/revision_store.go b/server/internal/migrate/revision_store.go new file mode 100644 index 00000000..c988f46c --- /dev/null +++ b/server/internal/migrate/revision_store.go @@ -0,0 +1,44 @@ +package migrate + +import ( + "github.com/pgEdge/control-plane/server/internal/storage" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// StoredRevision tracks the most recently applied migration. +type StoredRevision struct { + storage.StoredValue + Identifier string `json:"identifier"` +} + +type RevisionStore struct { + client *clientv3.Client + root string +} + +func NewRevisionStore(client *clientv3.Client, root string) *RevisionStore { + return &RevisionStore{ + client: client, + root: root, + } +} + +func (s *RevisionStore) Key() string { + return storage.Key(s.root, "migrations", "revision") +} + +func (s *RevisionStore) Get() storage.GetOp[*StoredRevision] { + return storage.NewGetOp[*StoredRevision](s.client, s.Key()) +} + +func (s *RevisionStore) Create(item *StoredRevision) storage.PutOp[*StoredRevision] { + return storage.NewCreateOp(s.client, s.Key(), item) +} + +func (s *RevisionStore) Update(item *StoredRevision) storage.PutOp[*StoredRevision] { + return storage.NewUpdateOp(s.client, s.Key(), item) +} + +func (s *RevisionStore) Watch() storage.WatchOp[*StoredRevision] { + return storage.NewWatchOp[*StoredRevision](s.client, s.Key()) +} diff --git a/server/internal/migrate/runner.go b/server/internal/migrate/runner.go new file mode 100644 index 00000000..4ed4339e --- /dev/null +++ b/server/internal/migrate/runner.go @@ -0,0 +1,259 @@ +package migrate + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/rs/zerolog" + "github.com/samber/do" + + "github.com/pgEdge/control-plane/server/internal/election" + "github.com/pgEdge/control-plane/server/internal/storage" + "github.com/pgEdge/control-plane/server/internal/version" +) + +// migrations should take on the order of seconds at a maximum, but we're going +// to be overly cautious just in case since this can prevent startup. +const migrationTimeout = 5 * time.Minute + +// Runner orchestrates migration execution with distributed locking. +type Runner struct { + hostID string + store *Store + injector *do.Injector + logger zerolog.Logger + migrations []Migration + candidate *election.Candidate + watchOp storage.WatchOp[*StoredRevision] + errCh chan error + doneCh chan struct{} + doneOnce sync.Once + versionInfo *version.Info +} + +// NewRunner creates a new migration runner. +func NewRunner( + hostID string, + store *Store, + injector *do.Injector, + logger zerolog.Logger, + migrations []Migration, + candidate *election.Candidate, +) *Runner { + return &Runner{ + hostID: hostID, + store: store, + injector: injector, + logger: logger.With(). + Str("component", "migration_runner"). + Logger(), + migrations: migrations, + candidate: candidate, + errCh: make(chan error, 1), + doneCh: make(chan struct{}), + } +} + +// Run executes any pending migrations if this runner wins the election, +// otherwise waits until the current revision reaches its target. +func (r *Runner) Run(ctx context.Context) error { + hasPendingMigrations, err := r.hasPendingMigrations(ctx) + if err != nil { + return err + } + if !hasPendingMigrations { + return nil + } + + // failure to get version info is non-fatal + versionInfo, _ := version.GetInfo() + r.versionInfo = versionInfo + + ctx, cancel := context.WithTimeout(ctx, migrationTimeout) + defer cancel() + + r.watch(ctx) + defer r.watchOp.Close() + + r.candidate.AddHandlers(func(_ context.Context) { + if err := r.runMigrations(ctx); err != nil { + r.errCh <- err + } + }) + if err := r.candidate.Start(ctx); err != nil { + return fmt.Errorf("failed to initialize locker: %w", err) + } + defer r.candidate.Stop(ctx) + + // Block until either the migrations complete, we timeout, or we encounter + // an error. + select { + case <-r.doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("context cancelled while waiting for migrations: %w", ctx.Err()) + case err := <-r.errCh: + return err + } +} + +func (r *Runner) watch(ctx context.Context) { + r.logger.Debug().Msg("starting watch") + + if len(r.migrations) == 0 { + r.errCh <- errors.New("watch called with empty migrations list") + return + } + targetRevision := r.migrations[len(r.migrations)-1].Identifier() + + // Ensure that any previous watches were closed. Close thread-safe and + // idempotent. + if r.watchOp != nil { + r.watchOp.Close() + } + + // Since we're not specifying a start version on the watch, this will always + // fire for the current revision. + r.watchOp = r.store.Revision.Watch() + err := r.watchOp.Watch(ctx, func(evt *storage.Event[*StoredRevision]) { + switch evt.Type { + case storage.EventTypePut: + if evt.Value.Identifier == targetRevision { + r.doneOnce.Do(func() { + close(r.doneCh) + }) + } + case storage.EventTypeError: + r.logger.Debug().Err(evt.Err).Msg("encountered a watch error") + if errors.Is(evt.Err, storage.ErrWatchClosed) { + r.watch(ctx) + } + } + }) + if err != nil { + r.errCh <- fmt.Errorf("failed to start watch: %w", err) + } +} + +func (r *Runner) runMigrations(ctx context.Context) error { + currentRevision, err := r.getCurrentRevision(ctx) + if err != nil { + return err + } + + startIndex := r.findStartIndex(currentRevision) + if startIndex >= len(r.migrations) { + r.logger.Info().Msg("control-plane db is up to date, no migrations to run") + return nil + } + + for i := startIndex; i < len(r.migrations); i++ { + migration := r.migrations[i] + identifier := migration.Identifier() + + if err := r.runMigration(ctx, migration); err != nil { + r.logger.Err(err). + Str("migration", identifier). + Msg("run migrations error, stopping migrations") + return err + } + + if err := r.updateRevision(ctx, identifier); err != nil { + return fmt.Errorf("failed to update revision: %w", err) + } + } + + return nil +} + +func (r *Runner) getCurrentRevision(ctx context.Context) (string, error) { + rev, err := r.store.Revision.Get().Exec(ctx) + if errors.Is(err, storage.ErrNotFound) { + return "", nil + } + if err != nil { + return "", fmt.Errorf("failed to get current revision: %w", err) + } + return rev.Identifier, nil +} + +func (r *Runner) findStartIndex(currentRevision string) int { + if currentRevision == "" { + return 0 + } + + for i := len(r.migrations) - 1; i >= 0; i-- { + if r.migrations[i].Identifier() == currentRevision { + return i + 1 + } + } + + r.logger.Warn(). + Str("revision", currentRevision). + Msg("current revision not found in migrations list, starting from beginning") + return 0 +} + +func (r *Runner) hasPendingMigrations(ctx context.Context) (bool, error) { + if len(r.migrations) == 0 { + r.logger.Info().Msg("no migrations to run") + return false, nil + } + + currentRevision, err := r.getCurrentRevision(ctx) + if err != nil { + return false, err + } + + startIndex := r.findStartIndex(currentRevision) + if startIndex >= len(r.migrations) { + r.logger.Info().Msg("control-plane db is up to date, no migrations to run") + return false, nil + } + + return true, nil +} + +func (r *Runner) runMigration(ctx context.Context, migration Migration) error { + identifier := migration.Identifier() + r.logger.Info().Str("migration", identifier).Msg("running migration") + + stored := &StoredResult{ + Identifier: identifier, + StartedAt: time.Now(), + } + err := migration.Run(ctx, r.injector) + if err != nil { + stored.Error = err.Error() + } else { + stored.Successful = true + } + stored.CompletedAt = time.Now() + stored.RunByHostID = r.hostID + stored.RunByVersionInfo = r.versionInfo + + if storeErr := r.store.Result.Put(stored).Exec(ctx); storeErr != nil { + return fmt.Errorf("failed to store migration result: %w", storeErr) + } + + if err != nil { + return fmt.Errorf("migration failed: %w", err) + } + + return nil +} + +func (r *Runner) updateRevision(ctx context.Context, identifier string) error { + rev, err := r.store.Revision.Get().Exec(ctx) + if errors.Is(err, storage.ErrNotFound) { + return r.store.Revision.Create(&StoredRevision{Identifier: identifier}).Exec(ctx) + } + if err != nil { + return err + } + rev.Identifier = identifier + return r.store.Revision.Update(rev).Exec(ctx) +} diff --git a/server/internal/migrate/runner_test.go b/server/internal/migrate/runner_test.go new file mode 100644 index 00000000..603a7419 --- /dev/null +++ b/server/internal/migrate/runner_test.go @@ -0,0 +1,320 @@ +package migrate_test + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/google/uuid" + "github.com/rs/zerolog" + "github.com/samber/do" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/pgEdge/control-plane/server/internal/election" + "github.com/pgEdge/control-plane/server/internal/migrate" + "github.com/pgEdge/control-plane/server/internal/storage/storagetest" + "github.com/pgEdge/control-plane/server/internal/testutils" +) + +func TestRunner(t *testing.T) { + server := storagetest.NewEtcdTestServer(t) + client := server.Client(t) + logger := testutils.Logger(t) + + t.Run("acquires lock and runs migrations", func(t *testing.T) { + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + var ran bool + m := &runnerMockMigration{ + id: "test-migration", + runFunc: func(_ context.Context, _ *do.Injector) error { + ran = true + return nil + }, + } + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m}, candidate) + err := runner.Run(t.Context()) + require.NoError(t, err) + assert.True(t, ran, "migration should have run") + }) + + t.Run("multiple runners", func(t *testing.T) { + // Starts two concurrent runners and asserts that they both exit + // successfully and that the migration is only run once. + + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + var ranOnce atomic.Bool + var ranTwice atomic.Bool + m := &runnerMockMigration{ + id: "test-migration", + runFunc: func(_ context.Context, _ *do.Injector) error { + if ranOnce.Load() { + ranTwice.Store(true) + } else { + ranOnce.Store(true) + } + return nil + }, + } + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m}, candidate) + require.NoError(t, runner.Run(t.Context())) + }() + + wg.Add(1) + go func() { + defer wg.Done() + + candidate := testCandidate(t, electionSvc, "host-2") + runner := migrate.NewRunner("host-2", store, i, logger, []migrate.Migration{m}, candidate) + require.NoError(t, runner.Run(t.Context())) + }() + + wg.Wait() + + assert.True(t, ranOnce.Load()) + assert.False(t, ranTwice.Load()) + }) +} + +func TestRunnerMigrationOrdering(t *testing.T) { + server := storagetest.NewEtcdTestServer(t) + client := server.Client(t) + logger := zerolog.Nop() + + t.Run("runs migrations in order", func(t *testing.T) { + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + var order []string + m1 := &runnerMockMigration{ + id: "migration-1", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-1") + return nil + }, + } + m2 := &runnerMockMigration{ + id: "migration-2", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-2") + return nil + }, + } + m3 := &runnerMockMigration{ + id: "migration-3", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-3") + return nil + }, + } + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m1, m2, m3}, candidate) + err := runner.Run(t.Context()) + require.NoError(t, err) + + assert.Equal(t, []string{"migration-1", "migration-2", "migration-3"}, order) + }) + + t.Run("starts from current revision", func(t *testing.T) { + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + // Pre-set revision to migration-2 + err := store.Revision.Create(&migrate.StoredRevision{Identifier: "migration-2"}).Exec(t.Context()) + require.NoError(t, err) + + var order []string + m1 := &runnerMockMigration{ + id: "migration-1", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-1") + return nil + }, + } + m2 := &runnerMockMigration{ + id: "migration-2", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-2") + return nil + }, + } + m3 := &runnerMockMigration{ + id: "migration-3", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-3") + return nil + }, + } + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m1, m2, m3}, candidate) + err = runner.Run(t.Context()) + require.NoError(t, err) + + // Should only run migration-3 + assert.Equal(t, []string{"migration-3"}, order) + }) + + t.Run("stops on first failure", func(t *testing.T) { + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + var order []string + m1 := &runnerMockMigration{ + id: "migration-1", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-1") + return nil + }, + } + m2 := &runnerMockMigration{ + id: "migration-2", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-2") + return errors.New("migration failed") + }, + } + m3 := &runnerMockMigration{ + id: "migration-3", + runFunc: func(_ context.Context, _ *do.Injector) error { + order = append(order, "migration-3") + return nil + }, + } + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m1, m2, m3}, candidate) + err := runner.Run(t.Context()) + assert.ErrorContains(t, err, "migration failed") + + // Should stop after migration-2 fails + assert.Equal(t, []string{"migration-1", "migration-2"}, order) + }) + + t.Run("records status for each migration", func(t *testing.T) { + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + m1 := &runnerMockMigration{id: "migration-1"} + m2 := &runnerMockMigration{id: "migration-2", err: errors.New("failed")} + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m1, m2}, candidate) + err := runner.Run(t.Context()) + assert.ErrorContains(t, err, "failed") + + status1, err := store.Result.Get("migration-1").Exec(t.Context()) + require.NoError(t, err) + assert.True(t, status1.Successful) + assert.Equal(t, "host-1", status1.RunByHostID) + assert.NotEmpty(t, status1.StartedAt) + assert.NotEmpty(t, status1.CompletedAt) + assert.NotNil(t, status1.RunByVersionInfo) + + status2, err := store.Result.Get("migration-2").Exec(t.Context()) + require.NoError(t, err) + assert.False(t, status2.Successful) + assert.Equal(t, "failed", status2.Error) + assert.Equal(t, "host-1", status2.RunByHostID) + assert.NotEmpty(t, status2.StartedAt) + assert.NotEmpty(t, status2.CompletedAt) + assert.NotNil(t, status2.RunByVersionInfo) + }) + + t.Run("updates revision after each successful migration", func(t *testing.T) { + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + m1 := &runnerMockMigration{id: "migration-1"} + m2 := &runnerMockMigration{id: "migration-2"} + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m1, m2}, candidate) + err := runner.Run(t.Context()) + require.NoError(t, err) + + rev, err := store.Revision.Get().Exec(t.Context()) + require.NoError(t, err) + assert.Equal(t, "migration-2", rev.Identifier) + }) + + t.Run("does not update revision after failed migration", func(t *testing.T) { + root := uuid.NewString() + electionSvc := election.NewService(election.NewElectionStore(client, root), logger) + store := migrate.NewStore(client, root) + i := do.New() + + m1 := &runnerMockMigration{id: "migration-1"} + m2 := &runnerMockMigration{id: "migration-2", err: errors.New("failed")} + + candidate := testCandidate(t, electionSvc, "host-1") + runner := migrate.NewRunner("host-1", store, i, logger, []migrate.Migration{m1, m2}, candidate) + err := runner.Run(t.Context()) + assert.ErrorContains(t, err, "failed") + + rev, err := store.Revision.Get().Exec(t.Context()) + require.NoError(t, err) + assert.Equal(t, "migration-1", rev.Identifier) + }) +} + +type runnerMockMigration struct { + id string + err error + runFunc func(context.Context, *do.Injector) error +} + +func (m *runnerMockMigration) Identifier() string { + return m.id +} + +func (m *runnerMockMigration) Run(ctx context.Context, i *do.Injector) error { + if m.runFunc != nil { + return m.runFunc(ctx, i) + } + return m.err +} + +func testCandidate(t *testing.T, electionSvc *election.Service, holderID string) *election.Candidate { + t.Helper() + + candidate := electionSvc.NewCandidate(migrate.ElectionName, holderID, 30*time.Second) + + t.Cleanup(func() { + candidate.Stop(context.Background()) + }) + + return candidate +} diff --git a/server/internal/migrate/store.go b/server/internal/migrate/store.go new file mode 100644 index 00000000..0f2d5e04 --- /dev/null +++ b/server/internal/migrate/store.go @@ -0,0 +1,20 @@ +// server/internal/migrate/store.go +package migrate + +import ( + clientv3 "go.etcd.io/etcd/client/v3" +) + +// Store wraps all migration-related stores. +type Store struct { + Revision *RevisionStore + Result *ResultStore +} + +// NewStore creates a new composite migration store. +func NewStore(client *clientv3.Client, root string) *Store { + return &Store{ + Revision: NewRevisionStore(client, root), + Result: NewResultStore(client, root), + } +}