diff --git a/mongodump/mongodump.go b/mongodump/mongodump.go index ad4c1d899..5dc12a347 100644 --- a/mongodump/mongodump.go +++ b/mongodump/mongodump.go @@ -203,9 +203,43 @@ func (dump *MongoDump) verifyCollectionExists() (bool, error) { } // Dump handles some final options checking and executes MongoDump. -func (dump *MongoDump) Dump() (err error) { +func (dump *MongoDump) Dump() (dumpErr error) { defer dump.SessionProvider.Close() + defer dump.CloseMuxIfNeeded(dumpErr) + dumpErr = dump.DumpUntilOplog() + if dumpErr != nil { + return + } + + dumpErr = dump.DumpOplogAndAfter() + + return +} + +func (dump *MongoDump) CloseMuxIfNeeded(dumpErr error) { + if dump.archive == nil { + return + } + + // The Mux runs until its Control is closed + close(dump.archive.Mux.Control) + muxErr := <-dump.archive.Mux.Completed + dump.archive.Out.Close() + if muxErr != nil { + if dumpErr != nil { + dumpErr = fmt.Errorf("archive writer: %v / %v", dumpErr, muxErr) + } else { + dumpErr = fmt.Errorf("archive writer: %v", muxErr) + } + log.Logvf(log.DebugLow, "%v", dumpErr) + } else { + log.Logvf(log.DebugLow, "mux completed successfully") + } +} + +// Dump handles some final options checking and executes MongoDump. +func (dump *MongoDump) DumpUntilOplog() (err error) { if !dump.OutputOptions.Oplog && (dump.InputOptions.SourceWritesDoneBarrier != "") { // Wait for tests to stop writes before dumping any collections. // @@ -287,22 +321,6 @@ func (dump *MongoDump) Dump() (err error) { Mux: archive.NewMultiplexer(archiveOut, dump.shutdownIntentsNotifier), } go dump.archive.Mux.Run() - defer func() { - // The Mux runs until its Control is closed - close(dump.archive.Mux.Control) - muxErr := <-dump.archive.Mux.Completed - archiveOut.Close() - if muxErr != nil { - if err != nil { - err = fmt.Errorf("archive writer: %v / %v", err, muxErr) - } else { - err = fmt.Errorf("archive writer: %v", muxErr) - } - log.Logvf(log.DebugLow, "%v", err) - } else { - log.Logvf(log.DebugLow, "mux completed successfully") - } - }() } // Confirm connectivity @@ -432,6 +450,10 @@ func (dump *MongoDump) Dump() (err error) { return err } + return nil +} + +func (dump *MongoDump) DumpOplogAndAfter() (err error) { // IO Phase III // oplog diff --git a/mongorestore/mongorestore_test.go b/mongorestore/mongorestore_test.go index 7488e28dc..3148822aa 100644 --- a/mongorestore/mongorestore_test.go +++ b/mongorestore/mongorestore_test.go @@ -3280,17 +3280,116 @@ func uniqueDBName() string { return fmt.Sprintf("mongorestore_test_%d_%d", os.Getpid(), time.Now().UnixMilli()) } +type nopWriteCloser struct { + io.Writer +} + +func (nwc nopWriteCloser) Close() error { + return nil +} + +func TestNamespaceFilterWithBulkWriteAndTxn(t *testing.T) { + testtype.SkipUnlessTestType(t, testtype.IntegrationTestType) + + // TODO: Make this t.Context() once we move to Go 1.24. + ctx := context.Background() + + provider, _, err := testutil.GetBareSessionProvider() + require.NoError(t, err, "should get session provider") + + client, err := provider.GetSession() + require.NoError(t, err, "should get session") + + db := client.Database(uniqueDBName()) + require.NoError(t, db.Drop(ctx), "should pre-drop DB %#q", db.Name()) + + require.NoError(t, db.CreateCollection(ctx, "stuff")) + coll := db.Collection("stuff") + + backup := bytes.Buffer{} + + dump, err := GetArchiveMongoDump(nopWriteCloser{&backup}) + require.NoError(t, err, "should create dump") + + dump.OutputOptions.Oplog = true + + dumpErr := dump.DumpUntilOplog() + defer dump.CloseMuxIfNeeded(dumpErr) + + require.NoError(t, dumpErr, "dump until oplog should work") + + _, err = coll.InsertMany( + ctx, + lo.RepeatBy( + 1000, + func(index int) any { + return bson.D{{"num", index}} + }, + ), + ) + require.NoError(t, err, "should bulk-insert docs") + + require.NoError( + t, + client.UseSession( + ctx, + func(sc mongo.SessionContext) error { + _, err := sc.WithTransaction( + sc, + func(ctx mongo.SessionContext) (any, error) { + _, err = coll.InsertMany( + ctx, + lo.RepeatBy( + 1000, + func(index int) any { + return bson.D{{"num", index}} + }, + ), + ) + return nil, err + }, + ) + + return err + }, + ), + "should do txn", + ) + + dumpErr = dump.DumpOplogAndAfter() + require.NoError(t, dumpErr, "finishing dump should work") + dump.CloseMuxIfNeeded(dumpErr) + + // ------------------------------------------ + + require.NoError(t, db.Drop(ctx), "should drop database") + + restore, err := GetArchiveMongoRestore( + io.NopCloser(bytes.NewReader(backup.Bytes())), + ) + require.NoError(t, err, "should create restore") + + restore.NSOptions = &NSOptions{ + NSInclude: []string{db.Name()}, + } + + assert.NoError(t, restore.Restore().Err, "restore should work") + + docs, err := coll.CountDocuments(ctx, bson.D{}) + require.NoError(t, err, "should count documents") + + assert.Equal(t, 200, docs, "all inserted docs should be archived & restored") +} + func TestPipedDumpRestore(t *testing.T) { testtype.SkipUnlessTestType(t, testtype.IntegrationTestType) - t.Logf("start %#q", t.Name()) // TODO: Make this t.Context() once we move to Go 1.24. ctx := context.Background() provider, _, err := testutil.GetBareSessionProvider() require.NoError(t, err, "should get session provider") - t.Logf("getting session") sess, err := provider.GetSession() require.NoError(t, err, "should get session") @@ -3299,8 +3398,6 @@ func TestPipedDumpRestore(t *testing.T) { db := sess.Database(uniqueDBName()) require.NoError(t, db.Drop(ctx), "should pre-drop DB %#q", db.Name()) - t.Logf("creating collections") - for _, collName := range srcCollNames { docs := lo.RepeatBy( 10_000,