Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions server/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,23 @@ package cmd
import (
"fmt"

"github.com/pgEdge/control-plane/server/internal/cluster"
"github.com/pgEdge/control-plane/server/internal/election"
"github.com/rs/zerolog"
"github.com/samber/do"
"github.com/spf13/cobra"

"github.com/pgEdge/control-plane/server/internal/api"
"github.com/pgEdge/control-plane/server/internal/certificates"
"github.com/pgEdge/control-plane/server/internal/cluster"
"github.com/pgEdge/control-plane/server/internal/config"
"github.com/pgEdge/control-plane/server/internal/database"
"github.com/pgEdge/control-plane/server/internal/docker"
"github.com/pgEdge/control-plane/server/internal/election"
"github.com/pgEdge/control-plane/server/internal/etcd"
"github.com/pgEdge/control-plane/server/internal/filesystem"
"github.com/pgEdge/control-plane/server/internal/host"
"github.com/pgEdge/control-plane/server/internal/ipam"
"github.com/pgEdge/control-plane/server/internal/logging"
"github.com/pgEdge/control-plane/server/internal/migrate"
"github.com/pgEdge/control-plane/server/internal/monitor"
"github.com/pgEdge/control-plane/server/internal/orchestrator"
"github.com/pgEdge/control-plane/server/internal/orchestrator/swarm"
Expand Down Expand Up @@ -69,6 +70,7 @@ func newRootCmd(i *do.Injector) *cobra.Command {
host.Provide(i)
ipam.Provide(i)
logging.Provide(i)
migrate.Provide(i)
monitor.Provide(i)
resource.Provide(i)
scheduler.Provide(i)
Expand Down
13 changes: 13 additions & 0 deletions server/internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/pgEdge/control-plane/server/internal/database"
"github.com/pgEdge/control-plane/server/internal/etcd"
"github.com/pgEdge/control-plane/server/internal/host"
"github.com/pgEdge/control-plane/server/internal/migrate"
"github.com/pgEdge/control-plane/server/internal/monitor"
"github.com/pgEdge/control-plane/server/internal/scheduler"
"github.com/pgEdge/control-plane/server/internal/workflows"
Expand Down Expand Up @@ -110,25 +111,37 @@ func (a *App) runInitialized(ctx context.Context) error {
return err
}

// Run migrations before starting other services
migrationRunner, err := do.Invoke[*migrate.Runner](a.i)
if err != nil {
return handleError(fmt.Errorf("failed to initialize migration runner: %w", err))
}
if err := migrationRunner.Run(ctx); err != nil {
return handleError(fmt.Errorf("failed to run migrations: %w", err))
}

certSvc, err := do.Invoke[*certificates.Service](a.i)
if err != nil {
return handleError(fmt.Errorf("failed to initialize certificate service: %w", err))
}
if err := certSvc.Start(ctx); err != nil {
return handleError(fmt.Errorf("failed to start certificate service: %w", err))
}

hostSvc, err := do.Invoke[*host.Service](a.i)
if err != nil {
return handleError(fmt.Errorf("failed to initialize host service: %w", err))
}
if err := hostSvc.UpdateHost(ctx); err != nil {
return handleError(fmt.Errorf("failed to update host: %w", err))
}

hostTicker, err := do.Invoke[*host.UpdateTicker](a.i)
if err != nil {
return handleError(fmt.Errorf("failed to initialize host ticker: %w", err))
}
hostTicker.Start(ctx)

monitorSvc, err := do.Invoke[*monitor.Service](a.i)
if err != nil {
return handleError(fmt.Errorf("failed to initialize monitor service: %w", err))
Expand Down
13 changes: 13 additions & 0 deletions server/internal/migrate/all_migrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package migrate

// allMigrations returns the ordered list of migrations.
// Order matters - migrations are executed in slice order.
// Add new migrations to this list in chronological order.
func allMigrations() []Migration {
return []Migration{
// Add migrations here in chronological order
// Example:
// &AddHostMetadataField{},
// &RenameDatabaseStatus{},
}
}
5 changes: 5 additions & 0 deletions server/internal/migrate/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Package migrate provides a mechanism for arbitrary migration operations that
// should block startup, such as moving Etcd objects from one key to another.
// IMPORTANT: migrations _must_ be written to be idempotent, and we should
// prefer non-destructive updates in order to allow rollbacks.
package migrate
16 changes: 16 additions & 0 deletions server/internal/migrate/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package migrate

import (
"context"

"github.com/samber/do"
)

// Migration defines the interface for data migrations.
type Migration interface {
// Identifier returns a unique semantic name for this migration.
Identifier() string
// Run executes the migration using dependencies from the injector.
// The context should be used for cancellation and timeouts.
Run(ctx context.Context, i *do.Injector) error
}
65 changes: 65 additions & 0 deletions server/internal/migrate/provide.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package migrate

import (
"time"

"github.com/rs/zerolog"
"github.com/samber/do"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/pgEdge/control-plane/server/internal/config"
"github.com/pgEdge/control-plane/server/internal/election"
)

const ElectionName = election.Name("migration_runner")
const LockTTL time.Duration = 30 * time.Second

// Provide registers migration dependencies with the injector.
func Provide(i *do.Injector) {
provideStore(i)
provideRunner(i)
}

func provideStore(i *do.Injector) {
do.Provide(i, func(i *do.Injector) (*Store, error) {
cfg, err := do.Invoke[config.Config](i)
if err != nil {
return nil, err
}
client, err := do.Invoke[*clientv3.Client](i)
if err != nil {
return nil, err
}
return NewStore(client, cfg.EtcdKeyRoot), nil
})
}

func provideRunner(i *do.Injector) {
do.Provide(i, func(i *do.Injector) (*Runner, error) {
store, err := do.Invoke[*Store](i)
if err != nil {
return nil, err
}
cfg, err := do.Invoke[config.Config](i)
if err != nil {
return nil, err
}
logger, err := do.Invoke[zerolog.Logger](i)
if err != nil {
return nil, err
}
electionSvc, err := do.Invoke[*election.Service](i)
if err != nil {
return nil, err
}
locker := electionSvc.NewCandidate(ElectionName, cfg.HostID, LockTTL)
return NewRunner(
cfg.HostID,
store,
i,
logger,
allMigrations(),
locker,
), nil
})
}
49 changes: 49 additions & 0 deletions server/internal/migrate/result_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package migrate

import (
"time"

"github.com/pgEdge/control-plane/server/internal/storage"
"github.com/pgEdge/control-plane/server/internal/version"
clientv3 "go.etcd.io/etcd/client/v3"
)

// StoredResult tracks the outcome of a specific migration.
type StoredResult struct {
storage.StoredValue
Identifier string `json:"identifier"`
Successful bool `json:"successful"`
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at"`
RunByHostID string `json:"run_by_host_id"`
RunByVersionInfo *version.Info `json:"run_by_version_info"`
Error string `json:"error,omitempty"`
}

type ResultStore struct {
client *clientv3.Client
root string
}

func NewResultStore(client *clientv3.Client, root string) *ResultStore {
return &ResultStore{
client: client,
root: root,
}
}

func (s *ResultStore) Prefix() string {
return storage.Prefix(s.root, "migrations", "results")
}

func (s *ResultStore) Key(identifier string) string {
return storage.Key(s.Prefix(), identifier)
}

func (s *ResultStore) Get(identifier string) storage.GetOp[*StoredResult] {
return storage.NewGetOp[*StoredResult](s.client, s.Key(identifier))
}

func (s *ResultStore) Put(item *StoredResult) storage.PutOp[*StoredResult] {
return storage.NewPutOp(s.client, s.Key(item.Identifier), item)
}
44 changes: 44 additions & 0 deletions server/internal/migrate/revision_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package migrate

import (
"github.com/pgEdge/control-plane/server/internal/storage"
clientv3 "go.etcd.io/etcd/client/v3"
)

// StoredRevision tracks the most recently applied migration.
type StoredRevision struct {
storage.StoredValue
Identifier string `json:"identifier"`
}

type RevisionStore struct {
client *clientv3.Client
root string
}

func NewRevisionStore(client *clientv3.Client, root string) *RevisionStore {
return &RevisionStore{
client: client,
root: root,
}
}

func (s *RevisionStore) Key() string {
return storage.Key(s.root, "migrations", "revision")
}

func (s *RevisionStore) Get() storage.GetOp[*StoredRevision] {
return storage.NewGetOp[*StoredRevision](s.client, s.Key())
}

func (s *RevisionStore) Create(item *StoredRevision) storage.PutOp[*StoredRevision] {
return storage.NewCreateOp(s.client, s.Key(), item)
}

func (s *RevisionStore) Update(item *StoredRevision) storage.PutOp[*StoredRevision] {
return storage.NewUpdateOp(s.client, s.Key(), item)
}

func (s *RevisionStore) Watch() storage.WatchOp[*StoredRevision] {
return storage.NewWatchOp[*StoredRevision](s.client, s.Key())
}
Loading