From 26e0e64b5e13287d23ba62ad4af5a2d2de94d1c6 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 17 Jul 2025 16:03:34 -0400 Subject: [PATCH 1/4] try this --- mongodump/mongodump.go | 14 +++++ mongorestore/mongorestore_test.go | 100 ++++++++++++++++++++++++++++-- 2 files changed, 110 insertions(+), 4 deletions(-) diff --git a/mongodump/mongodump.go b/mongodump/mongodump.go index ad4c1d899..ab98b3781 100644 --- a/mongodump/mongodump.go +++ b/mongodump/mongodump.go @@ -206,6 +206,16 @@ func (dump *MongoDump) verifyCollectionExists() (bool, error) { func (dump *MongoDump) Dump() (err error) { defer dump.SessionProvider.Close() + err = dump.DumpUntilOplog() + if err != nil { + return + } + + return dump.DumpOplogAndAfter() +} + +// Dump handles some final options checking and executes MongoDump. +func (dump *MongoDump) DumpUntilOplog() (err error) { if !dump.OutputOptions.Oplog && (dump.InputOptions.SourceWritesDoneBarrier != "") { // Wait for tests to stop writes before dumping any collections. // @@ -432,6 +442,10 @@ func (dump *MongoDump) Dump() (err error) { return err } + return nil +} + +func (dump *MongoDump) DumpOplogAndAfter() (err error) { // IO Phase III // oplog diff --git a/mongorestore/mongorestore_test.go b/mongorestore/mongorestore_test.go index 7488e28dc..9579c5059 100644 --- a/mongorestore/mongorestore_test.go +++ b/mongorestore/mongorestore_test.go @@ -3280,17 +3280,111 @@ func uniqueDBName() string { return fmt.Sprintf("mongorestore_test_%d_%d", os.Getpid(), time.Now().UnixMilli()) } +type nopWriteCloser struct { + io.Writer +} + +func (nwc nopWriteCloser) Close() error { + return nil +} + +func TestNamespaceFilterWithBulkWriteAndTxn(t *testing.T) { + testtype.SkipUnlessTestType(t, testtype.IntegrationTestType) + + // TODO: Make this t.Context() once we move to Go 1.24. + ctx := context.Background() + + provider, _, err := testutil.GetBareSessionProvider() + require.NoError(t, err, "should get session provider") + + client, err := provider.GetSession() + require.NoError(t, err, "should get session") + + db := client.Database(uniqueDBName()) + require.NoError(t, db.Drop(ctx), "should pre-drop DB %#q", db.Name()) + + require.NoError(t, db.CreateCollection(ctx, "stuff")) + coll := db.Collection("stuff") + + backup := bytes.Buffer{} + + dump, err := GetArchiveMongoDump(nopWriteCloser{&backup}) + require.NoError(t, err, "should create dump") + + dump.OutputOptions.Oplog = true + + assert.NoError(t, dump.DumpUntilOplog(), "dump until oplog should work") + + _, err = coll.InsertMany( + ctx, + lo.RepeatBy( + 1000, + func(index int) any { + return bson.D{{"num", index}} + }, + ), + ) + require.NoError(t, err, "should bulk-insert docs") + + require.NoError( + t, + client.UseSession( + ctx, + func(sc mongo.SessionContext) error { + _, err := sc.WithTransaction( + sc, + func(ctx mongo.SessionContext) (interface{}, error) { + _, err = coll.InsertMany( + ctx, + lo.RepeatBy( + 1000, + func(index int) any { + return bson.D{{"num", index}} + }, + ), + ) + return nil, err + }, + ) + + return err + }, + ), + "should do txn", + ) + + assert.NoError(t, dump.DumpOplogAndAfter(), "finishing dump should work") + + // ------------------------------------------ + + require.NoError(t, db.Drop(ctx), "should drop database") + + restore, err := GetArchiveMongoRestore( + io.NopCloser(bytes.NewReader(backup.Bytes())), + ) + require.NoError(t, err, "should create restore") + + restore.NSOptions = &NSOptions{ + NSInclude: []string{db.Name()}, + } + + assert.NoError(t, restore.Restore().Err, "restore should work") + + docs, err := coll.CountDocuments(ctx, nil) + require.NoError(t, err, "should count documents") + + assert.Equal(t, 200, docs, "all inserted docs should be archived & restored") +} + func TestPipedDumpRestore(t *testing.T) { testtype.SkipUnlessTestType(t, testtype.IntegrationTestType) - t.Logf("start %#q", t.Name()) // TODO: Make this t.Context() once we move to Go 1.24. ctx := context.Background() provider, _, err := testutil.GetBareSessionProvider() require.NoError(t, err, "should get session provider") - t.Logf("getting session") sess, err := provider.GetSession() require.NoError(t, err, "should get session") @@ -3299,8 +3393,6 @@ func TestPipedDumpRestore(t *testing.T) { db := sess.Database(uniqueDBName()) require.NoError(t, db.Drop(ctx), "should pre-drop DB %#q", db.Name()) - t.Logf("creating collections") - for _, collName := range srcCollNames { docs := lo.RepeatBy( 10_000, From fef52bffdd5f7ad8608e4661d1ab062026c2ce50 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 17 Jul 2025 16:06:23 -0400 Subject: [PATCH 2/4] any --- mongorestore/mongorestore_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mongorestore/mongorestore_test.go b/mongorestore/mongorestore_test.go index 9579c5059..cfe6aed5d 100644 --- a/mongorestore/mongorestore_test.go +++ b/mongorestore/mongorestore_test.go @@ -3333,7 +3333,7 @@ func TestNamespaceFilterWithBulkWriteAndTxn(t *testing.T) { func(sc mongo.SessionContext) error { _, err := sc.WithTransaction( sc, - func(ctx mongo.SessionContext) (interface{}, error) { + func(ctx mongo.SessionContext) (any, error) { _, err = coll.InsertMany( ctx, lo.RepeatBy( From 0585a7c54cfdf5a9d416bbbf973387d671804d95 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 17 Jul 2025 16:26:54 -0400 Subject: [PATCH 3/4] wonky --- mongodump/mongodump.go | 48 ++++++++++++++++++------------- mongorestore/mongorestore_test.go | 9 ++++-- 2 files changed, 35 insertions(+), 22 deletions(-) diff --git a/mongodump/mongodump.go b/mongodump/mongodump.go index ab98b3781..5dc12a347 100644 --- a/mongodump/mongodump.go +++ b/mongodump/mongodump.go @@ -203,15 +203,39 @@ func (dump *MongoDump) verifyCollectionExists() (bool, error) { } // Dump handles some final options checking and executes MongoDump. -func (dump *MongoDump) Dump() (err error) { +func (dump *MongoDump) Dump() (dumpErr error) { defer dump.SessionProvider.Close() + defer dump.CloseMuxIfNeeded(dumpErr) - err = dump.DumpUntilOplog() - if err != nil { + dumpErr = dump.DumpUntilOplog() + if dumpErr != nil { return } - return dump.DumpOplogAndAfter() + dumpErr = dump.DumpOplogAndAfter() + + return +} + +func (dump *MongoDump) CloseMuxIfNeeded(dumpErr error) { + if dump.archive == nil { + return + } + + // The Mux runs until its Control is closed + close(dump.archive.Mux.Control) + muxErr := <-dump.archive.Mux.Completed + dump.archive.Out.Close() + if muxErr != nil { + if dumpErr != nil { + dumpErr = fmt.Errorf("archive writer: %v / %v", dumpErr, muxErr) + } else { + dumpErr = fmt.Errorf("archive writer: %v", muxErr) + } + log.Logvf(log.DebugLow, "%v", dumpErr) + } else { + log.Logvf(log.DebugLow, "mux completed successfully") + } } // Dump handles some final options checking and executes MongoDump. @@ -297,22 +321,6 @@ func (dump *MongoDump) DumpUntilOplog() (err error) { Mux: archive.NewMultiplexer(archiveOut, dump.shutdownIntentsNotifier), } go dump.archive.Mux.Run() - defer func() { - // The Mux runs until its Control is closed - close(dump.archive.Mux.Control) - muxErr := <-dump.archive.Mux.Completed - archiveOut.Close() - if muxErr != nil { - if err != nil { - err = fmt.Errorf("archive writer: %v / %v", err, muxErr) - } else { - err = fmt.Errorf("archive writer: %v", muxErr) - } - log.Logvf(log.DebugLow, "%v", err) - } else { - log.Logvf(log.DebugLow, "mux completed successfully") - } - }() } // Confirm connectivity diff --git a/mongorestore/mongorestore_test.go b/mongorestore/mongorestore_test.go index cfe6aed5d..bd569ac12 100644 --- a/mongorestore/mongorestore_test.go +++ b/mongorestore/mongorestore_test.go @@ -3313,7 +3313,10 @@ func TestNamespaceFilterWithBulkWriteAndTxn(t *testing.T) { dump.OutputOptions.Oplog = true - assert.NoError(t, dump.DumpUntilOplog(), "dump until oplog should work") + dumpErr := dump.DumpUntilOplog() + defer dump.CloseMuxIfNeeded(dumpErr) + + require.NoError(t, dumpErr, "dump until oplog should work") _, err = coll.InsertMany( ctx, @@ -3353,7 +3356,9 @@ func TestNamespaceFilterWithBulkWriteAndTxn(t *testing.T) { "should do txn", ) - assert.NoError(t, dump.DumpOplogAndAfter(), "finishing dump should work") + dumpErr = dump.DumpOplogAndAfter() + require.NoError(t, dumpErr, "finishing dump should work") + dump.CloseMuxIfNeeded(dumpErr) // ------------------------------------------ From 0705e0e1fc8553a527c290d8fd9950003e71480d Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 17 Jul 2025 16:33:46 -0400 Subject: [PATCH 4/4] grr nil doc --- mongorestore/mongorestore_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mongorestore/mongorestore_test.go b/mongorestore/mongorestore_test.go index bd569ac12..3148822aa 100644 --- a/mongorestore/mongorestore_test.go +++ b/mongorestore/mongorestore_test.go @@ -3375,7 +3375,7 @@ func TestNamespaceFilterWithBulkWriteAndTxn(t *testing.T) { assert.NoError(t, restore.Restore().Err, "restore should work") - docs, err := coll.CountDocuments(ctx, nil) + docs, err := coll.CountDocuments(ctx, bson.D{}) require.NoError(t, err, "should count documents") assert.Equal(t, 200, docs, "all inserted docs should be archived & restored")