diff --git a/pbm/defs/defs.go b/pbm/defs/defs.go index fead46438..3c7618941 100644 --- a/pbm/defs/defs.go +++ b/pbm/defs/defs.go @@ -38,6 +38,7 @@ const ( ) const ( + ConfigDB = "config" ConfigDatabasesNS = "config.databases" ConfigCollectionsNS = "config.collections" ConfigChunksNS = "config.chunks" diff --git a/pbm/restore/logical.go b/pbm/restore/logical.go index 8e890ba69..0ac97d44f 100644 --- a/pbm/restore/logical.go +++ b/pbm/restore/logical.go @@ -17,7 +17,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/writeconcern" "github.com/percona/percona-backup-mongodb/pbm/archive" "github.com/percona/percona-backup-mongodb/pbm/backup" @@ -38,6 +37,13 @@ import ( "github.com/percona/percona-backup-mongodb/pbm/version" ) +// mDBCl represents mDB client iterface for the DB related ops. +type mDBCl interface { + runCmdShardsvrDropDatabase(ctx context.Context, db string, configDBDoc *configDatabasesDoc) error + runCmdShardsvrDropCollection(ctx context.Context, db, coll string, configDBDoc *configDatabasesDoc) error + getConfigDatabasesDoc(ctx context.Context, db string) (*configDatabasesDoc, error) +} + type Restore struct { name string leadConn connect.Client @@ -69,6 +75,8 @@ type Restore struct { opid string indexCatalog *idx.IndexCatalog + + db mDBCl } type oplogRange struct { @@ -108,6 +116,7 @@ func New( rsMap: rsMap, cfg: cfg, + db: newMDB(leadConn, nodeConn), numParallelColls: numParallelColls, numInsertionWorkersPerCol: numInsertionWorkersPerCol, @@ -253,11 +262,18 @@ func (r *Restore) Snapshot( return err } - // drop sharded dbs on sharded cluster, on each shard (not CSRS), only for full restore - if r.nodeInfo.IsSharded() && !r.nodeInfo.IsConfigSrv() && !util.IsSelective(nss) { - err = r.dropShardedDBs(ctx, bcp) - if err != nil { - return err + // drop databases on the sharded cluster as part of cleanup phase, on each shard (not CSRS) + if r.nodeInfo.IsSharded() && !r.nodeInfo.IsConfigSrv() { + if util.IsSelective(nss) { + err = r.selRestoreDBCleanup(ctx, nss) + if err != nil { + return errors.Wrap(err, "selective restore cleanup") + } + } else { + err = r.fullRestoreDBCleanup(ctx, bcp) + if err != nil { + return errors.Wrap(err, "full restore cleanup") + } } } @@ -441,11 +457,18 @@ func (r *Restore) PITR( return err } - // drop sharded dbs on sharded cluster, on each shard (not CSRS), only for full restore - if r.nodeInfo.IsSharded() && !r.nodeInfo.IsConfigSrv() && !util.IsSelective(nss) { - err = r.dropShardedDBs(ctx, bcp) - if err != nil { - return err + // drop databases on the sharded cluster as part of cleanup phase, on each shard (not CSRS) + if r.nodeInfo.IsSharded() && !r.nodeInfo.IsConfigSrv() { + if util.IsSelective(nss) { + err = r.selRestoreDBCleanup(ctx, nss) + if err != nil { + return errors.Wrap(err, "selective restore cleanup") + } + } else { + err = r.fullRestoreDBCleanup(ctx, bcp) + if err != nil { + return errors.Wrap(err, "full restore cleanup") + } } } @@ -866,11 +889,11 @@ func (r *Restore) toState(ctx context.Context, status defs.Status, wait *time.Du return toState(ctx, r.leadConn, status, r.name, r.nodeInfo, r.reconcileStatus, wait) } -// dropShardedDBs drop all sharded databases present in the backup. +// fullRestoreDBCleanup drops all databases present in the backup. // Backup is specified with bcp parameter. -// For each sharded database present in the backup _shardsvrDropDatabase command +// For each database present in the backup _shardsvrDropDatabase command // is used to drop the database from the config srv and all shards. -func (r *Restore) dropShardedDBs(ctx context.Context, bcp *backup.BackupMeta) error { +func (r *Restore) fullRestoreDBCleanup(ctx context.Context, bcp *backup.BackupMeta) error { dbsInBcp, err := r.getDBsFromBackup(bcp) if err != nil { return errors.Wrap(err, "get dbs from backup") @@ -878,11 +901,7 @@ func (r *Restore) dropShardedDBs(ctx context.Context, bcp *backup.BackupMeta) er // make cluster-wide drop for each db from the backup for _, db := range dbsInBcp { - var configDBDoc configDatabasesDoc - err := r.leadConn.ConfigDatabase(). - Collection("databases"). - FindOne(ctx, bson.D{{"_id", db}}). - Decode(&configDBDoc) + configDBDoc, err := r.db.getConfigDatabasesDoc(ctx, db) if err != nil { if errors.Is(err, mongo.ErrNoDocuments) { continue @@ -895,14 +914,9 @@ func (r *Restore) dropShardedDBs(ctx context.Context, bcp *backup.BackupMeta) er continue } - cmd := bson.D{ - {"_shardsvrDropDatabase", 1}, - {"databaseVersion", configDBDoc.Version}, - {"writeConcern", writeconcern.Majority()}, - } - res := r.nodeConn.Database(db).RunCommand(ctx, cmd) - if err := res.Err(); err != nil { - return errors.Wrapf(err, "_shardsvrDropDatabase for %q", db) + err = r.db.runCmdShardsvrDropDatabase(ctx, db, configDBDoc) + if err != nil { + return errors.Wrap(err, "full restore cleanup") } r.log.Debug("drop %q", db) } @@ -910,6 +924,57 @@ func (r *Restore) dropShardedDBs(ctx context.Context, bcp *backup.BackupMeta) er return nil } +// selRestoreDBCleanup drops all databases and/or collections specified in +// selective restore namespaces. +// For each namespace listed within selective restore _shardsvrDropDatabase or +// _shardsvrDropCollection is used for the purpose of clister-wide cleanup. +func (r *Restore) selRestoreDBCleanup(ctx context.Context, nss []string) error { + droppedDBs := []string{} + for _, ns := range nss { + db, coll := util.ParseNS(ns) + if db == "" || db == defs.DB || db == defs.ConfigDB { + // not allowed dbs for sel restore + continue + } + if slices.Contains(droppedDBs, db) { + // db is already dropped + continue + } + + configDBDoc, err := r.db.getConfigDatabasesDoc(ctx, db) + if err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + continue + } + return errors.Wrapf(err, "get config.databases doc for %q", db) + } + + if configDBDoc.Primary != r.nodeInfo.SetName { + // this shard is not primary shard for this db, so ignore it + continue + } + + if coll == "" { + // do db cleanup + err = r.db.runCmdShardsvrDropDatabase(ctx, db, configDBDoc) + if err != nil { + return errors.Wrapf(err, "db cleanup %s", db) + } + droppedDBs = append(droppedDBs, db) + r.log.Debug("drop db: %q", db) + } else { + // do collection cleanup + err = r.db.runCmdShardsvrDropCollection(ctx, db, coll, configDBDoc) + if err != nil { + return errors.Wrapf(err, "collection cleanup %s", ns) + } + r.log.Debug("drop ns: %q", ns) + } + } + + return nil +} + // getDBsFromBackup returns all databases present in backup metadata file // for each replicaset. func (r *Restore) getDBsFromBackup(bcp *backup.BackupMeta) ([]string, error) { diff --git a/pbm/restore/logical_cfgsvr_full_test.go b/pbm/restore/logical_cfgsvr_full_test.go index 28f30a217..5a3bf60b0 100644 --- a/pbm/restore/logical_cfgsvr_full_test.go +++ b/pbm/restore/logical_cfgsvr_full_test.go @@ -6,19 +6,14 @@ import ( "encoding/hex" "fmt" "io" - "log" - "os" "testing" "time" - "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/modules/mongodb" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "github.com/percona/percona-backup-mongodb/pbm/backup" "github.com/percona/percona-backup-mongodb/pbm/compress" - "github.com/percona/percona-backup-mongodb/pbm/connect" "github.com/percona/percona-backup-mongodb/pbm/errors" pbmlog "github.com/percona/percona-backup-mongodb/pbm/log" "github.com/percona/percona-backup-mongodb/pbm/storage" @@ -26,39 +21,8 @@ import ( "github.com/percona/percona-backup-mongodb/pbm/util" ) -var leadConn connect.Client - const configSystemSessionsUUID = "92e8635902a743a09cf410d4a2a4a576" -func TestMain(m *testing.M) { - ctx := context.Background() - mongodbContainer, err := mongodb.Run(ctx, "perconalab/percona-server-mongodb:8.0.4-multi") - if err != nil { - log.Fatalf("error while creating mongo test container: %v", err) - } - connStr, err := mongodbContainer.ConnectionString(ctx) - if err != nil { - log.Fatalf("conn string error: %v", err) - } - - leadConn, err = connect.Connect(ctx, connStr, "restore-test") - if err != nil { - log.Fatalf("mongo client connect error: %v", err) - } - - code := m.Run() - - err = leadConn.Disconnect(ctx) - if err != nil { - log.Fatalf("mongo client disconnect error: %v", err) - } - if err := testcontainers.TerminateContainer(mongodbContainer); err != nil { - log.Fatalf("failed to terminate container: %s", err) - } - - os.Exit(code) -} - func TestFullRestoreConfigDatabases(t *testing.T) { ctx := context.Background() diff --git a/pbm/restore/logical_test.go b/pbm/restore/logical_test.go index cb38c7467..8dc03092b 100644 --- a/pbm/restore/logical_test.go +++ b/pbm/restore/logical_test.go @@ -1,10 +1,22 @@ package restore import ( + "context" + "io" "reflect" + "strings" "testing" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + + "github.com/percona/percona-backup-mongodb/pbm/backup" + "github.com/percona/percona-backup-mongodb/pbm/connect" + "github.com/percona/percona-backup-mongodb/pbm/defs" + pbmlog "github.com/percona/percona-backup-mongodb/pbm/log" "github.com/percona/percona-backup-mongodb/pbm/snapshot" + "github.com/percona/percona-backup-mongodb/pbm/storage" + "github.com/percona/percona-backup-mongodb/pbm/topo" ) func TestResolveNamespace(t *testing.T) { @@ -240,3 +252,590 @@ func TestShouldRestoreUsersAndRoles(t *testing.T) { } }) } + +func TestSelRestoreDBCleanup(t *testing.T) { + ctx := context.Background() + + t.Run("drop single database", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + err := createConfigDatabasesDoc(t, "db", "rs0") + if err != nil { + t.Fatalf("create config.databases doc: %v", err) + } + + nss := []string{"db.*"} + err = r.selRestoreDBCleanup(ctx, nss) + if err != nil { + t.Fatalf("selRestoreDBCleanup: %v", err) + } + + if len(db.dropDatabaseCalls) != 1 { + t.Fatalf("expected 1 dropDatabase call, got %d", len(db.dropDatabaseCalls)) + } + if db.dropDatabaseCalls[0] != "db" { + t.Fatalf("expected dropDatabase to be called with %q, got %q", "db", db.dropDatabaseCalls[0]) + } + if len(db.dropCollectionCalls) != 0 { + t.Fatalf("expected 0 dropCollection calls, got %d", len(db.dropCollectionCalls)) + } + }) + + t.Run("drop single collection", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + err := createConfigDatabasesDoc(t, "db", "rs0") + if err != nil { + t.Fatalf("create config.databases doc: %v", err) + } + + nss := []string{"db.coll"} + err = r.selRestoreDBCleanup(ctx, nss) + if err != nil { + t.Fatalf("selRestoreDBCleanup: %v", err) + } + + if len(db.dropCollectionCalls) != 1 { + t.Fatalf("expected 1 dropCollection call, got %d", len(db.dropCollectionCalls)) + } + if db.dropCollectionCalls[0].db != "db" { + t.Fatalf("expected dropCollection db to be %q, got %q", "db", db.dropCollectionCalls[0].db) + } + if db.dropCollectionCalls[0].coll != "coll" { + t.Fatalf("expected dropCollection coll to be %q, got %q", "coll", db.dropCollectionCalls[0].coll) + } + if len(db.dropDatabaseCalls) != 0 { + t.Fatalf("expected 0 dropDatabase calls, got %d", len(db.dropDatabaseCalls)) + } + }) + + t.Run("drop multiple databases", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + err := createConfigDatabasesDoc(t, "db1", "rs0") + if err != nil { + t.Fatalf("create config.databases doc 1: %v", err) + } + err = createConfigDatabasesDoc(t, "db2", "rs0") + if err != nil { + t.Fatalf("create config.databases doc 2: %v", err) + } + + nss := []string{"db1.*", "db2.*"} + err = r.selRestoreDBCleanup(ctx, nss) + if err != nil { + t.Fatalf("selRestoreDBCleanup: %v", err) + } + + if len(db.dropDatabaseCalls) != 2 { + t.Fatalf("expected 2 dropDatabase calls, got %d", len(db.dropDatabaseCalls)) + } + if db.dropDatabaseCalls[0] != "db1" { + t.Fatalf("expected first dropDatabase call with %q, got %q", "db1", db.dropDatabaseCalls[0]) + } + if db.dropDatabaseCalls[1] != "db2" { + t.Fatalf("expected second dropDatabase call with %q, got %q", "db2", db.dropDatabaseCalls[1]) + } + if len(db.dropCollectionCalls) != 0 { + t.Fatalf("expected 0 dropCollection calls, got %d", len(db.dropCollectionCalls)) + } + }) + + t.Run("drop multiple databases and collections", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + err := createConfigDatabasesDoc(t, "db1", "rs0") + if err != nil { + t.Fatalf("create config.databases doc 1: %v", err) + } + err = createConfigDatabasesDoc(t, "db2", "rs0") + if err != nil { + t.Fatalf("create config.databases doc 2: %v", err) + } + + nss := []string{"db1.c1", "db2.c2", "db2.c3", "db1.*"} + err = r.selRestoreDBCleanup(ctx, nss) + if err != nil { + t.Fatalf("selRestoreDBCleanup: %v", err) + } + + if len(db.dropDatabaseCalls) != 1 { + t.Fatalf("expected 1 dropDatabase call, got %d", len(db.dropDatabaseCalls)) + } + if db.dropDatabaseCalls[0] != "db1" { + t.Fatalf("expected first dropDatabase call with %q, got %q", "db1", db.dropDatabaseCalls[0]) + } + if len(db.dropCollectionCalls) != 3 { + t.Fatalf("expected 3 dropCollection calls, got %d", len(db.dropCollectionCalls)) + } + if db.dropCollectionCalls[0].coll != "c1" { + t.Fatalf("expected dropCollection coll to be c1, got %q", db.dropCollectionCalls[0].coll) + } + if db.dropCollectionCalls[1].coll != "c2" { + t.Fatalf("expected dropCollection coll to be c2, got %q", db.dropCollectionCalls[1].coll) + } + if db.dropCollectionCalls[2].coll != "c3" { + t.Fatalf("expected dropCollection coll to be c3, got %q", db.dropCollectionCalls[2].coll) + } + }) + + t.Run("skip admin and config databases", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + nss := []string{defs.DB + ".*", defs.ConfigDB + ".*"} + err := r.selRestoreDBCleanup(ctx, nss) + if err != nil { + t.Fatalf("selRestoreDBCleanup: %v", err) + } + + if len(db.dropDatabaseCalls) != 0 { + t.Fatalf("expected 0 dropDatabase calls, got %d", len(db.dropDatabaseCalls)) + } + if len(db.dropCollectionCalls) != 0 { + t.Fatalf("expected 0 dropCollection calls, got %d", len(db.dropCollectionCalls)) + } + }) + + t.Run("skip database not in config.databases", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + nss := []string{"nonexistent.*"} + err := r.selRestoreDBCleanup(ctx, nss) + if err != nil { + t.Fatalf("selRestoreDBCleanup: %v", err) + } + + if len(db.dropDatabaseCalls) != 0 { + t.Fatalf("expected 0 dropDatabase calls, got %d", len(db.dropDatabaseCalls)) + } + if len(db.dropCollectionCalls) != 0 { + t.Fatalf("expected 0 dropCollection calls, got %d", len(db.dropCollectionCalls)) + } + }) + + t.Run("skip database with different primary shard", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + err := createConfigDatabasesDoc(t, "db", "rs1") + if err != nil { + t.Fatalf("create config.databases doc: %v", err) + } + + nss := []string{"db.*"} + err = r.selRestoreDBCleanup(ctx, nss) + if err != nil { + t.Fatalf("selRestoreDBCleanup: %v", err) + } + + if len(db.dropDatabaseCalls) != 0 { + t.Fatalf("expected 0 dropDatabase calls, got %d", len(db.dropDatabaseCalls)) + } + if len(db.dropCollectionCalls) != 0 { + t.Fatalf("expected 0 dropCollection calls, got %d", len(db.dropCollectionCalls)) + } + }) + + t.Run("skip collection with different primary shard", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + err := createConfigDatabasesDoc(t, "db", "rs1") + if err != nil { + t.Fatalf("create config.databases doc: %v", err) + } + + nss := []string{"db.coll"} + err = r.selRestoreDBCleanup(ctx, nss) + if err != nil { + t.Fatalf("selRestoreDBCleanup: %v", err) + } + + if len(db.dropDatabaseCalls) != 0 { + t.Fatalf("expected 0 dropDatabase calls, got %d", len(db.dropDatabaseCalls)) + } + if len(db.dropCollectionCalls) != 0 { + t.Fatalf("expected 0 dropCollection calls, got %d", len(db.dropCollectionCalls)) + } + }) + + t.Run("drop database and collection in same namespace list", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + err := createConfigDatabasesDoc(t, "db1", "rs0") + if err != nil { + t.Fatalf("create config.databases doc 1: %v", err) + } + err = createConfigDatabasesDoc(t, "db2", "rs0") + if err != nil { + t.Fatalf("create config.databases doc 2: %v", err) + } + + nss := []string{"db1.*", "db2.coll"} + err = r.selRestoreDBCleanup(ctx, nss) + if err != nil { + t.Fatalf("selRestoreDBCleanup: %v", err) + } + + if len(db.dropDatabaseCalls) != 1 { + t.Fatalf("expected 1 dropDatabase call, got %d", len(db.dropDatabaseCalls)) + } + if db.dropDatabaseCalls[0] != "db1" { + t.Fatalf("expected dropDatabase call with %q, got %q", "db1", db.dropDatabaseCalls[0]) + } + + if len(db.dropCollectionCalls) != 1 { + t.Fatalf("expected 1 dropCollection call, got %d", len(db.dropCollectionCalls)) + } + if db.dropCollectionCalls[0].db != "db2" { + t.Fatalf("expected dropCollection db to be %q, got %q", "db2", db.dropCollectionCalls[0].db) + } + if db.dropCollectionCalls[0].coll != "coll" { + t.Fatalf("expected dropCollection coll to be %q, got %q", "coll", db.dropCollectionCalls[0].coll) + } + }) + + t.Run("skip already dropped database", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + err := createConfigDatabasesDoc(t, "db", "rs0") + if err != nil { + t.Fatalf("create config.databases doc: %v", err) + } + + nss := []string{"db.*", "db.c"} + err = r.selRestoreDBCleanup(ctx, nss) + if err != nil { + t.Fatalf("selRestoreDBCleanup: %v", err) + } + + if len(db.dropDatabaseCalls) != 1 { + t.Fatalf("expected 1 dropDatabase call, got %d", len(db.dropDatabaseCalls)) + } + if db.dropDatabaseCalls[0] != "db" { + t.Fatalf("expected dropDatabase call with %q, got %q", "db", db.dropDatabaseCalls[0]) + } + if len(db.dropCollectionCalls) != 0 { + t.Fatalf("expected 0 dropCollection calls, got %d", len(db.dropCollectionCalls)) + } + }) + + t.Run("skip already dropped multiple databases", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + err := createConfigDatabasesDoc(t, "db1", "rs0") + if err != nil { + t.Fatalf("create config.databases doc: %v", err) + } + err = createConfigDatabasesDoc(t, "db2", "rs0") + if err != nil { + t.Fatalf("create config.databases doc: %v", err) + } + + nss := []string{"db1.*", "db2.*", "db1.c1", "db1.c2", "db2.c2"} + err = r.selRestoreDBCleanup(ctx, nss) + if err != nil { + t.Fatalf("selRestoreDBCleanup: %v", err) + } + + if len(db.dropDatabaseCalls) != 2 { + t.Fatalf("expected 2 dropDatabase calls, got %d", len(db.dropDatabaseCalls)) + } + if len(db.dropCollectionCalls) != 0 { + t.Fatalf("expected 0 dropCollection calls, got %d", len(db.dropCollectionCalls)) + } + }) + + t.Run("skip already dropped databases preserves order", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + err := createConfigDatabasesDoc(t, "db1", "rs0") + if err != nil { + t.Fatalf("create config.databases doc: %v", err) + } + err = createConfigDatabasesDoc(t, "db2", "rs0") + if err != nil { + t.Fatalf("create config.databases doc: %v", err) + } + + nss := []string{"db1.c1", "db2.c1", "db1.*", "db2.*", "db1.c2", "db2.c2"} + err = r.selRestoreDBCleanup(ctx, nss) + if err != nil { + t.Fatalf("selRestoreDBCleanup: %v", err) + } + + if len(db.dropDatabaseCalls) != 2 { + t.Fatalf("expected 2 dropDatabase calls, got %d", len(db.dropDatabaseCalls)) + } + if len(db.dropCollectionCalls) != 2 { + t.Fatalf("expected 2 dropCollection calls, got %d", len(db.dropCollectionCalls)) + } + }) +} + +func TestFullRestoreDBCleanup(t *testing.T) { + ctx := context.Background() + + t.Run("drop single database", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + err := createConfigDatabasesDoc(t, "db1", "rs0") + if err != nil { + t.Fatalf("create config.databases doc: %v", err) + } + + bcp := &backup.BackupMeta{Name: "backup1"} + err = r.fullRestoreDBCleanup(ctx, bcp) + if err != nil { + t.Fatalf("fullRestoreDBCleanup: %v", err) + } + + if len(db.dropDatabaseCalls) != 1 { + t.Fatalf("expected 1 dropDatabase call, got %d", len(db.dropDatabaseCalls)) + } + if db.dropDatabaseCalls[0] != "db1" { + t.Fatalf("expected dropDatabase to be called with %q, got %q", "db", db.dropDatabaseCalls[0]) + } + }) + + t.Run("drop multiple databases", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + err := createConfigDatabasesDoc(t, "db1", "rs0") + if err != nil { + t.Fatalf("create config.databases doc 1: %v", err) + } + err = createConfigDatabasesDoc(t, "db2", "rs0") + if err != nil { + t.Fatalf("create config.databases doc 2: %v", err) + } + + bcp := &backup.BackupMeta{Name: "backup1"} + err = r.fullRestoreDBCleanup(ctx, bcp) + if err != nil { + t.Fatalf("fullRestoreDBCleanup: %v", err) + } + + if len(db.dropDatabaseCalls) != 2 { + t.Fatalf("expected 2 dropDatabase calls, got %d", len(db.dropDatabaseCalls)) + } + if db.dropDatabaseCalls[0] != "db1" { + t.Fatalf("expected dropDatabase call with db1, got %q", db.dropDatabaseCalls[0]) + } + if db.dropDatabaseCalls[1] != "db2" { + t.Fatalf("expected dropDatabase call with db2, got %q", db.dropDatabaseCalls[1]) + } + }) + + t.Run("skip database not in config.databases", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + bcp := &backup.BackupMeta{Name: "backup1"} + err := r.fullRestoreDBCleanup(ctx, bcp) + if err != nil { + t.Fatalf("fullRestoreDBCleanup: %v", err) + } + + if len(db.dropDatabaseCalls) != 0 { + t.Fatalf("expected 0 dropDatabase calls, got %d", len(db.dropDatabaseCalls)) + } + }) + + t.Run("skip database with different primary shard", func(t *testing.T) { + r, db := createCleanupRestoreTest(t, "rs0") + + err := createConfigDatabasesDoc(t, "db1", "rs1") + if err != nil { + t.Fatalf("create config.databases doc: %v", err) + } + + bcp := &backup.BackupMeta{Name: "backup1"} + err = r.fullRestoreDBCleanup(ctx, bcp) + if err != nil { + t.Fatalf("fullRestoreDBCleanup: %v", err) + } + + if len(db.dropDatabaseCalls) != 0 { + t.Fatalf("expected 0 dropDatabase calls, got %d", len(db.dropDatabaseCalls)) + } + }) +} + +func createCleanupRestoreTest(t *testing.T, setName string) (*Restore, *mockMDB) { + t.Helper() + + nodeConn := leadConn.MongoClient() + restore := New( + leadConn, + nodeConn, + topo.NodeBrief{ + SetName: setName, + Sharded: true, + }, + nil, nil, 1, 1) + restore.log = pbmlog.DiscardEvent + restore.nodeInfo = &topo.NodeInfo{ + SetName: setName, + ConfigServerState: &topo.ConfigServerState{}, // Makes it sharded + } + + dbMock := &mockMDB{ + leadConn: leadConn, + mDB: newMDB(leadConn, nodeConn), + dropDatabaseCalls: []string{}, + dropCollectionCalls: []struct { + db string + coll string + }{}, + } + restore.db = dbMock + + restore.bcpStg = &mockBcpStg{} + + t.Cleanup(func() { + cleanupTestData(t) + }) + + return restore, dbMock +} + +func createConfigDatabasesDoc(t *testing.T, dbName, primaryShard string) error { + t.Helper() + + ctx := context.Background() + doc := bson.D{ + {"_id", dbName}, + {"primary", primaryShard}, + {"version", bson.D{ + {"uuid", primitive.Binary{Subtype: 0x04, Data: []byte("test-uuid")}}, + {"timestamp", primitive.Timestamp{T: 1, I: 0}}, + {"lastMod", int64(1)}, + }}, + } + + _, err := leadConn.ConfigDatabase().Collection("databases").InsertOne(ctx, doc) + return err +} + +func cleanupTestData(t *testing.T) { + t.Helper() + + ctx := context.Background() + + _, err := leadConn.ConfigDatabase().Collection("databases").DeleteMany(ctx, bson.D{}) + if err != nil { + t.Fatalf("clean up config.databases: %v", err) + } +} + +type mockMDB struct { + leadConn connect.Client + mDB *mDB + dropDatabaseCalls []string + dropCollectionCalls []struct { + db string + coll string + } +} + +func (m *mockMDB) runCmdShardsvrDropDatabase( + ctx context.Context, + db string, + configDBDoc *configDatabasesDoc, +) error { + m.dropDatabaseCalls = append(m.dropDatabaseCalls, db) + return nil +} + +func (m *mockMDB) runCmdShardsvrDropCollection( + ctx context.Context, + db, coll string, + configDBDoc *configDatabasesDoc, +) error { + m.dropCollectionCalls = append(m.dropCollectionCalls, struct { + db string + coll string + }{db: db, coll: coll}) + return nil +} + +func (m *mockMDB) getConfigDatabasesDoc( + ctx context.Context, + db string, +) (*configDatabasesDoc, error) { + return m.mDB.getConfigDatabasesDoc(ctx, db) +} + +type mockBcpStg struct{} + +func (m *mockBcpStg) Type() storage.Type { + return storage.Filesystem +} + +func (m *mockBcpStg) Save(_ string, _ io.Reader, _ ...storage.Option) error { + return nil +} + +func (m *mockBcpStg) List(_, _ string) ([]storage.FileInfo, error) { + return nil, nil +} + +func (m *mockBcpStg) Delete(_ string) error { + return nil +} + +func (m *mockBcpStg) FileStat(_ string) (storage.FileInfo, error) { + return storage.FileInfo{}, nil +} + +func (m *mockBcpStg) Copy(_, _ string) error { + return nil +} + +func (m *mockBcpStg) DownloadStat() storage.DownloadStat { + return storage.DownloadStat{} +} + +func (m *mockBcpStg) SourceReader(filepath string) (io.ReadCloser, error) { + metaJson := ` + { + "namespaces": [ + { + "db": "admin", + "collection": "system.version", + "metadata": "", + "type": "collection", + "crc": { + "$numberLong": "7899326899661778306" + }, + "size": { + "$numberLong": "272" + } + }, + { + "db": "db1", + "collection": "c1", + "metadata": "", + "type": "collection", + "crc": { + "$numberLong": "-7684575339597196538" + }, + "size": { + "$numberLong": "24795" + } + }, + { + "db": "db2", + "collection": "c1", + "metadata": "", + "type": "collection", + "crc": { + "$numberLong": "-7684575339597196538" + }, + "size": { + "$numberLong": "24795" + } + } + ] + } + ` + return io.NopCloser(strings.NewReader(metaJson)), nil +} diff --git a/pbm/restore/main_test.go b/pbm/restore/main_test.go new file mode 100644 index 000000000..21958cced --- /dev/null +++ b/pbm/restore/main_test.go @@ -0,0 +1,44 @@ +package restore + +import ( + "context" + "log" + "os" + "testing" + + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/modules/mongodb" + + "github.com/percona/percona-backup-mongodb/pbm/connect" +) + +var leadConn connect.Client + +func TestMain(m *testing.M) { + ctx := context.Background() + mongodbContainer, err := mongodb.Run(ctx, "perconalab/percona-server-mongodb:8.0.4-multi") + if err != nil { + log.Fatalf("error while creating mongo test container: %v", err) + } + connStr, err := mongodbContainer.ConnectionString(ctx) + if err != nil { + log.Fatalf("conn string error: %v", err) + } + + leadConn, err = connect.Connect(ctx, connStr, "restore-test") + if err != nil { + log.Fatalf("mongo client connect error: %v", err) + } + + code := m.Run() + + err = leadConn.Disconnect(ctx) + if err != nil { + log.Fatalf("mongo client disconnect error: %v", err) + } + if err := testcontainers.TerminateContainer(mongodbContainer); err != nil { + log.Fatalf("failed to terminate container: %s", err) + } + + os.Exit(code) +} diff --git a/pbm/restore/query.go b/pbm/restore/query.go index 6a222bb2e..4703f6701 100644 --- a/pbm/restore/query.go +++ b/pbm/restore/query.go @@ -2,6 +2,7 @@ package restore import ( "context" + "fmt" "time" "github.com/mongodb/mongo-tools/common/db" @@ -9,6 +10,7 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/writeconcern" "github.com/percona/percona-backup-mongodb/pbm/connect" "github.com/percona/percona-backup-mongodb/pbm/defs" @@ -260,3 +262,74 @@ func RestoreList(ctx context.Context, m connect.Client, limit int64) ([]RestoreM err = cur.All(ctx, &restores) return restores, err } + +// mDB contains restore related DB operations. +type mDB struct { + leadConn connect.Client + nodeConn *mongo.Client +} + +func newMDB(leadConn connect.Client, nodeConn *mongo.Client) *mDB { + return &mDB{ + leadConn: leadConn, + nodeConn: nodeConn, + } +} + +// runCmdShardsvrDropDatabase executes command _shardsvrDropDatabase. +// The command does cluster-wide drop (from CSRS and all shards) of the whole +// database specified with db parameter. +// The command needs to be executed on the primary shard. +func (d *mDB) runCmdShardsvrDropDatabase( + ctx context.Context, + db string, + configDBDoc *configDatabasesDoc, +) error { + cmd := bson.D{ + {"_shardsvrDropDatabase", 1}, + {"databaseVersion", configDBDoc.Version}, + {"writeConcern", writeconcern.Majority()}, + } + res := d.nodeConn.Database(db).RunCommand(ctx, cmd) + return errors.Wrapf(res.Err(), "_shardsvrDropDatabase for %q", db) +} + +// runCmdShardsvrDropCollection executes command _shardsvrDropCollection. +// The command does cluster-wide drop (from CSRS and all shards) of the namespace +// specified with db and coll parameters. +// The command needs to be executed on the primary shard. +func (d *mDB) runCmdShardsvrDropCollection( + ctx context.Context, + db, coll string, + configDBDoc *configDatabasesDoc, +) error { + cmd := bson.D{ + {"_shardsvrDropCollection", coll}, + {"databaseVersion", configDBDoc.Version}, + {"writeConcern", writeconcern.Majority()}, + } + res := d.nodeConn.Database(db).RunCommand(ctx, cmd) + return errors.Wrapf( + res.Err(), + "_shardsvrDropCollection for %q", + fmt.Sprintf("%s.%s", db, coll), + ) +} + +// getConfigDatabasesDoc fetches database doc from config.databases collection +// based on db key. +func (d *mDB) getConfigDatabasesDoc( + ctx context.Context, + db string, +) (*configDatabasesDoc, error) { + var configDBDoc configDatabasesDoc + err := d.leadConn.ConfigDatabase(). + Collection("databases"). + FindOne(ctx, bson.D{{"_id", db}}). + Decode(&configDBDoc) + if err != nil { + return nil, err + } + + return &configDBDoc, nil +} diff --git a/pbm/topo/node.go b/pbm/topo/node.go index a8bffd9a3..2dd889dfa 100644 --- a/pbm/topo/node.go +++ b/pbm/topo/node.go @@ -127,7 +127,7 @@ func (i *NodeInfo) IsClusterLeader() bool { return i.IsPrimary && i.Me == i.Primary && i.IsLeader() } -// ReplsetRole returns replset role in sharded clister +// ReplsetRole returns replset role in sharded cluster func (i *NodeInfo) ReplsetRole() ReplsetRole { switch { case i.ConfigSvr == 2: