-
Notifications
You must be signed in to change notification settings - Fork 111
PBM-1561: Enable selective logical restore for sharded cluster #1235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
eb94d38
c809f08
8608b36
9549e4f
e9a1d67
c23f970
5443b38
2411d7e
9cf4462
bab153e
f3e44b7
b626bd4
d7d5147
050df42
cae275c
273428f
e925b65
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,6 @@ import ( | |
| "go.mongodb.org/mongo-driver/bson" | ||
| "go.mongodb.org/mongo-driver/bson/primitive" | ||
| "go.mongodb.org/mongo-driver/mongo" | ||
| "go.mongodb.org/mongo-driver/mongo/writeconcern" | ||
|
|
||
| "github.com/percona/percona-backup-mongodb/pbm/archive" | ||
| "github.com/percona/percona-backup-mongodb/pbm/backup" | ||
|
|
@@ -38,6 +37,13 @@ import ( | |
| "github.com/percona/percona-backup-mongodb/pbm/version" | ||
| ) | ||
|
|
||
| // mDBCl represents mDB client iterface for the DB related ops. | ||
| type mDBCl interface { | ||
| runCmdShardsvrDropDatabase(ctx context.Context, db string, configDBDoc *configDatabasesDoc) error | ||
| runCmdShardsvrDropCollection(ctx context.Context, db, coll string, configDBDoc *configDatabasesDoc) error | ||
| getConfigDatabasesDoc(ctx context.Context, db string) (*configDatabasesDoc, error) | ||
| } | ||
|
|
||
| type Restore struct { | ||
| name string | ||
| leadConn connect.Client | ||
|
|
@@ -69,6 +75,8 @@ type Restore struct { | |
| opid string | ||
|
|
||
| indexCatalog *idx.IndexCatalog | ||
|
|
||
| db mDBCl | ||
| } | ||
|
|
||
| type oplogRange struct { | ||
|
|
@@ -108,6 +116,7 @@ func New( | |
| rsMap: rsMap, | ||
|
|
||
| cfg: cfg, | ||
| db: newMDB(leadConn, nodeConn), | ||
|
|
||
| numParallelColls: numParallelColls, | ||
| numInsertionWorkersPerCol: numInsertionWorkersPerCol, | ||
|
|
@@ -253,11 +262,18 @@ func (r *Restore) Snapshot( | |
| return err | ||
| } | ||
|
|
||
| // drop sharded dbs on sharded cluster, on each shard (not CSRS), only for full restore | ||
| if r.nodeInfo.IsSharded() && !r.nodeInfo.IsConfigSrv() && !util.IsSelective(nss) { | ||
| err = r.dropShardedDBs(ctx, bcp) | ||
| if err != nil { | ||
| return err | ||
| // drop databases on the sharded cluster as part of cleanup phase, on each shard (not CSRS) | ||
| if r.nodeInfo.IsSharded() && !r.nodeInfo.IsConfigSrv() { | ||
| if util.IsSelective(nss) { | ||
| err = r.selRestoreDBCleanup(ctx, nss) | ||
| if err != nil { | ||
| return errors.Wrap(err, "selective restore cleanup") | ||
| } | ||
| } else { | ||
| err = r.fullRestoreDBCleanup(ctx, bcp) | ||
| if err != nil { | ||
| return errors.Wrap(err, "full restore cleanup") | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+265
to
278
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd extract this into standalone method e.g. IMHO it's complex enough set of conditions so we don't want to maintain it at two different places exactly the same
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with that, but PBM uses that style from before in the whole code base. So we do checks and branching within the main flow (orchestrator) on not within the function or method. I wouldn't change that style now, but I agree with your comment. |
||
|
|
||
|
|
@@ -441,11 +457,18 @@ func (r *Restore) PITR( | |
| return err | ||
| } | ||
|
|
||
| // drop sharded dbs on sharded cluster, on each shard (not CSRS), only for full restore | ||
| if r.nodeInfo.IsSharded() && !r.nodeInfo.IsConfigSrv() && !util.IsSelective(nss) { | ||
| err = r.dropShardedDBs(ctx, bcp) | ||
| if err != nil { | ||
| return err | ||
| // drop databases on the sharded cluster as part of cleanup phase, on each shard (not CSRS) | ||
| if r.nodeInfo.IsSharded() && !r.nodeInfo.IsConfigSrv() { | ||
| if util.IsSelective(nss) { | ||
| err = r.selRestoreDBCleanup(ctx, nss) | ||
| if err != nil { | ||
| return errors.Wrap(err, "selective restore cleanup") | ||
| } | ||
| } else { | ||
| err = r.fullRestoreDBCleanup(ctx, bcp) | ||
| if err != nil { | ||
| return errors.Wrap(err, "full restore cleanup") | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -866,23 +889,19 @@ func (r *Restore) toState(ctx context.Context, status defs.Status, wait *time.Du | |
| return toState(ctx, r.leadConn, status, r.name, r.nodeInfo, r.reconcileStatus, wait) | ||
| } | ||
|
|
||
| // dropShardedDBs drop all sharded databases present in the backup. | ||
| // fullRestoreDBCleanup drops all databases present in the backup. | ||
| // Backup is specified with bcp parameter. | ||
| // For each sharded database present in the backup _shardsvrDropDatabase command | ||
| // For each database present in the backup _shardsvrDropDatabase command | ||
| // is used to drop the database from the config srv and all shards. | ||
| func (r *Restore) dropShardedDBs(ctx context.Context, bcp *backup.BackupMeta) error { | ||
| func (r *Restore) fullRestoreDBCleanup(ctx context.Context, bcp *backup.BackupMeta) error { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. previous name clearly communicated that this targets only Sharded environment. Maybe |
||
| dbsInBcp, err := r.getDBsFromBackup(bcp) | ||
| if err != nil { | ||
| return errors.Wrap(err, "get dbs from backup") | ||
| } | ||
|
|
||
| // make cluster-wide drop for each db from the backup | ||
| for _, db := range dbsInBcp { | ||
| var configDBDoc configDatabasesDoc | ||
| err := r.leadConn.ConfigDatabase(). | ||
| Collection("databases"). | ||
| FindOne(ctx, bson.D{{"_id", db}}). | ||
| Decode(&configDBDoc) | ||
| configDBDoc, err := r.db.getConfigDatabasesDoc(ctx, db) | ||
| if err != nil { | ||
| if errors.Is(err, mongo.ErrNoDocuments) { | ||
| continue | ||
|
|
@@ -895,21 +914,67 @@ func (r *Restore) dropShardedDBs(ctx context.Context, bcp *backup.BackupMeta) er | |
| continue | ||
| } | ||
|
|
||
| cmd := bson.D{ | ||
| {"_shardsvrDropDatabase", 1}, | ||
| {"databaseVersion", configDBDoc.Version}, | ||
| {"writeConcern", writeconcern.Majority()}, | ||
| } | ||
| res := r.nodeConn.Database(db).RunCommand(ctx, cmd) | ||
| if err := res.Err(); err != nil { | ||
| return errors.Wrapf(err, "_shardsvrDropDatabase for %q", db) | ||
| err = r.db.runCmdShardsvrDropDatabase(ctx, db, configDBDoc) | ||
| if err != nil { | ||
| return errors.Wrap(err, "full restore cleanup") | ||
| } | ||
| r.log.Debug("drop %q", db) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // selRestoreDBCleanup drops all databases and/or collections specified in | ||
| // selective restore namespaces. | ||
| // For each namespace listed within selective restore _shardsvrDropDatabase or | ||
| // _shardsvrDropCollection is used for the purpose of clister-wide cleanup. | ||
| func (r *Restore) selRestoreDBCleanup(ctx context.Context, nss []string) error { | ||
| droppedDBs := []string{} | ||
| for _, ns := range nss { | ||
| db, coll := util.ParseNS(ns) | ||
| if db == "" || db == defs.DB || db == defs.ConfigDB { | ||
| // not allowed dbs for sel restore | ||
| continue | ||
| } | ||
| if slices.Contains(droppedDBs, db) { | ||
| // db is already dropped | ||
| continue | ||
| } | ||
|
|
||
| configDBDoc, err := r.db.getConfigDatabasesDoc(ctx, db) | ||
| if err != nil { | ||
| if errors.Is(err, mongo.ErrNoDocuments) { | ||
| continue | ||
| } | ||
| return errors.Wrapf(err, "get config.databases doc for %q", db) | ||
| } | ||
|
|
||
| if configDBDoc.Primary != r.nodeInfo.SetName { | ||
| // this shard is not primary shard for this db, so ignore it | ||
| continue | ||
| } | ||
|
|
||
| if coll == "" { | ||
| // do db cleanup | ||
| err = r.db.runCmdShardsvrDropDatabase(ctx, db, configDBDoc) | ||
| if err != nil { | ||
| return errors.Wrapf(err, "db cleanup %s", db) | ||
| } | ||
| droppedDBs = append(droppedDBs, db) | ||
| r.log.Debug("drop db: %q", db) | ||
| } else { | ||
| // do collection cleanup | ||
| err = r.db.runCmdShardsvrDropCollection(ctx, db, coll, configDBDoc) | ||
| if err != nil { | ||
| return errors.Wrapf(err, "collection cleanup %s", ns) | ||
| } | ||
| r.log.Debug("drop ns: %q", ns) | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // getDBsFromBackup returns all databases present in backup metadata file | ||
| // for each replicaset. | ||
| func (r *Restore) getDBsFromBackup(bcp *backup.BackupMeta) ([]string, error) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this meant to represent a generic interface fro mongo commands or just shard ing operations specific for restore? If generic, then I am not sure whether this is the right location to define it. If specific, then we should find a better name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That db layer just for the restore, it's private, and I was forced to create it to be able to unit test it. So it's near the usage place. If we refactor the whole code in restore we can move it somewhere else.