diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 8a10d1a20..b9d49002c 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -153,6 +153,16 @@ func (imp *MongoImport) validateSettings() error { } } + // validate --dir flag early, before type-specific validations + if imp.InputOptions.Dir != "" { + if imp.InputOptions.Type != JSON { + return fmt.Errorf("--dir can only be used with JSON input type") + } + if imp.InputOptions.JSONArray { + return fmt.Errorf("cannot use --jsonArray with --dir") + } + } + // ensure headers are supplied for CSV/TSV if imp.InputOptions.Type == CSV || imp.InputOptions.Type == TSV { @@ -268,13 +278,20 @@ func (imp *MongoImport) validateSettings() error { // ensure we have a valid string to use for the collection if imp.ToolOptions.Collection == "" { log.Logvf(log.Always, "no collection specified") - fileBaseName := filepath.Base(imp.InputOptions.File) - lastDotIndex := strings.LastIndex(fileBaseName, ".") - if lastDotIndex != -1 { - fileBaseName = fileBaseName[0:lastDotIndex] + // For directory imports, use the directory name as collection + if imp.InputOptions.Dir != "" { + dirBaseName := filepath.Base(imp.InputOptions.Dir) + log.Logvf(log.Always, "using directory name '%v' as collection", dirBaseName) + imp.ToolOptions.Collection = dirBaseName + } else { + fileBaseName := filepath.Base(imp.InputOptions.File) + lastDotIndex := strings.LastIndex(fileBaseName, ".") + if lastDotIndex != -1 { + fileBaseName = fileBaseName[0:lastDotIndex] + } + log.Logvf(log.Always, "using filename '%v' as collection", fileBaseName) + imp.ToolOptions.Collection = fileBaseName } - log.Logvf(log.Always, "using filename '%v' as collection", fileBaseName) - imp.ToolOptions.Collection = fileBaseName } err = util.ValidateCollectionName(imp.ToolOptions.Collection) if err != nil { @@ -306,6 +323,36 @@ func (imp *MongoImport) getSourceReader() (io.ReadCloser, int64, error) { return os.Stdin, 0, nil } +// getJSONFilesFromDir returns a list of JSON files from the specified directory. +func (imp *MongoImport) getJSONFilesFromDir(dirPath string) ([]string, error) { + var jsonFiles []string + + universalDirPath := util.ToUniversalPath(dirPath) + entries, err := os.ReadDir(universalDirPath) + if err != nil { + return nil, fmt.Errorf("error reading directory: %v", err) + } + + for _, entry := range entries { + if entry.IsDir() { + continue + } + name := entry.Name() + // Accept .json files only + if strings.HasSuffix(strings.ToLower(name), ".json") { + fullPath := filepath.Join(universalDirPath, name) + jsonFiles = append(jsonFiles, fullPath) + } + } + + if len(jsonFiles) == 0 { + return nil, fmt.Errorf("no JSON files found in directory: %v", dirPath) + } + + log.Logvf(log.Info, "found %v JSON file(s) in directory", len(jsonFiles)) + return jsonFiles, nil +} + // fileSizeProgressor implements Progressor to allow a sizeTracker to hook up with a // progress.Bar instance, so that the progress bar can report the percentage of the file read. type fileSizeProgressor struct { @@ -321,6 +368,12 @@ func (fsp *fileSizeProgressor) Progress() (int64, int64) { // number of documents successfully imported to the appropriate namespace, // the number of failures, and any error encountered in doing this. func (imp *MongoImport) ImportDocuments() (uint64, uint64, error) { + // Handle directory imports with optimized batch loading + if imp.InputOptions.Dir != "" { + return imp.importFromDirectoryBatched() + } + + // Existing single-file import path (unchanged) source, fileSize, err := imp.getSourceReader() if err != nil { return 0, 0, err @@ -355,6 +408,236 @@ func (imp *MongoImport) ImportDocuments() (uint64, uint64, error) { return imp.importDocuments(inputReader) } +// importFromDirectoryBatched imports all JSON files from a directory using +// true batch loading - documents from multiple files are batched together +// in the same insertMany/bulkWrite operations for optimal performance. +func (imp *MongoImport) importFromDirectoryBatched() (uint64, uint64, error) { + jsonFiles, err := imp.getJSONFilesFromDir(imp.InputOptions.Dir) + if err != nil { + return 0, 0, err + } + + log.Logvf( + log.Always, + "importing %v JSON file(s) from directory: %v", + len(jsonFiles), + imp.InputOptions.Dir, + ) + + // Connect once for all files + session, err := imp.SessionProvider.GetSession() + if err != nil { + return 0, 0, err + } + + log.Logvf(log.Always, "connected to: %v", util.SanitizeURI(imp.ToolOptions.ConnectionString)) + log.Logvf(log.Info, "ns: %v.%v", imp.ToolOptions.DB, imp.ToolOptions.Collection) + + // Check node type + imp.nodeType, err = imp.SessionProvider.GetNodeType() + if err != nil { + return 0, 0, fmt.Errorf("error checking connected node type: %v", err) + } + log.Logvf(log.Info, "connected to node type: %v", imp.nodeType) + + // Drop collection if necessary + if imp.IngestOptions.Drop { + log.Logvf(log.Always, "dropping: %v.%v", imp.ToolOptions.DB, imp.ToolOptions.Collection) + collection := session.Database(imp.ToolOptions.DB).Collection(imp.ToolOptions.Collection) + if err := collection.Drop(context.TODO()); err != nil { + return 0, 0, err + } + } + + // Single shared document channel for ALL files + readDocs := make(chan bson.D, workerBufferSize) + processingErrChan := make(chan error) + + // Start ingestion workers ONCE for all files + // These will use insertMany/bulkWrite across ALL documents + go func() { + processingErrChan <- imp.ingestDocuments(readDocs) + }() + + // Calculate total size for progress tracking + var totalSize int64 + for _, filePath := range jsonFiles { + if stat, err := os.Stat(filePath); err == nil { + totalSize += stat.Size() + } + } + + // Create a combined size tracker for all files. + // Note: InputReader.StreamDocument closes the output channel it is given. + // For directory imports we therefore stream each file into its own channel and + // forward into readDocs, which we control. + combinedTracker := &multiFileSizeTracker{} + + // Progress bar for entire directory + bar := &progress.Bar{ + Name: fmt.Sprintf( + "%v.%v [%v files]", + imp.ToolOptions.DB, + imp.ToolOptions.Collection, + len(jsonFiles), + ), + Watching: &fileSizeProgressor{totalSize, combinedTracker}, + Writer: log.Writer(0), + BarLength: progressBarLength, + IsBytes: true, + } + bar.Start() + defer bar.Stop() + + // Stream documents from all files into the shared channel. + // We cannot pass readDocs directly to StreamDocument because StreamDocument + // closes the channel it is given. + go func() { + defer close(readDocs) + + for i, filePath := range jsonFiles { + log.Logvf( + log.Info, + "streaming file %v/%v: %v", + i+1, + len(jsonFiles), + filepath.Base(filePath), + ) + + file, err := os.Open(util.ToUniversalPath(filePath)) + if err != nil { + log.Logvf(log.Always, "error opening file %v: %v", filePath, err) + // Ensure progress can still reach 100% when skipping files. + if st, stErr := os.Stat(util.ToUniversalPath(filePath)); stErr == nil { + combinedTracker.addCompleted(st.Size()) + } + if imp.IngestOptions.StopOnError { + processingErrChan <- err + return + } + continue + } + + fileStat, err := file.Stat() + if err != nil { + _ = file.Close() + log.Logvf(log.Always, "error stating file %v: %v", filePath, err) + // Best-effort progress accounting when file stat fails. + if st, stErr := os.Stat(util.ToUniversalPath(filePath)); stErr == nil { + combinedTracker.addCompleted(st.Size()) + } + if imp.IngestOptions.StopOnError { + processingErrChan <- err + return + } + continue + } + combinedTracker.setCurrent(file, fileStat.Size()) + + inputReader, err := imp.getInputReader(file) + if err != nil { + combinedTracker.markCurrentDone() + _ = file.Close() + log.Logvf(log.Always, "error creating input reader for %v: %v", filePath, err) + if imp.IngestOptions.StopOnError { + processingErrChan <- err + return + } + continue + } + + // Per-file channel that StreamDocument owns (it will close it). + fileDocs := make(chan bson.D, workerBufferSize) + streamErrChan := make(chan error, 1) + go func() { + // The `ordered` argument controls whether decoding preserves read order. + // Respect the user's --maintainInsertionOrder preference for directory imports. + streamErrChan <- inputReader.StreamDocument(imp.IngestOptions.MaintainInsertionOrder, fileDocs) + }() + + for doc := range fileDocs { + readDocs <- doc + } + streamErr := <-streamErrChan + + combinedTracker.markCurrentDone() + _ = file.Close() + + if streamErr != nil { + log.Logvf(log.Always, "error streaming from file %v: %v", filePath, streamErr) + if imp.IngestOptions.StopOnError { + processingErrChan <- streamErr + return + } + } + + log.Logvf(log.Info, "completed streaming file: %v", filepath.Base(filePath)) + } + processingErrChan <- nil + }() + + // Wait for completion + err = channelQuorumError(processingErrChan) + processedCount := atomic.LoadUint64(&imp.processedCount) + failureCount := atomic.LoadUint64(&imp.failureCount) + + log.Logvf( + log.Always, + "directory import complete: %v documents imported, %v failed", + processedCount, + failureCount, + ) + return processedCount, failureCount, err +} + +// multiFileSizeTracker tracks bytes read across multiple files for progress reporting. +type multiFileSizeTracker struct { + mu sync.Mutex + currentFile *os.File + currentFileSize int64 + completedBytes int64 +} + +func (m *multiFileSizeTracker) setCurrent(file *os.File, size int64) { + m.mu.Lock() + defer m.mu.Unlock() + m.currentFile = file + m.currentFileSize = size +} + +func (m *multiFileSizeTracker) markCurrentDone() { + m.mu.Lock() + defer m.mu.Unlock() + if m.currentFile != nil { + m.completedBytes += m.currentFileSize + m.currentFile = nil + m.currentFileSize = 0 + } +} + +func (m *multiFileSizeTracker) addCompleted(bytes int64) { + if bytes <= 0 { + return + } + m.mu.Lock() + defer m.mu.Unlock() + m.completedBytes += bytes +} + +func (m *multiFileSizeTracker) Size() int64 { + m.mu.Lock() + defer m.mu.Unlock() + + total := m.completedBytes + if m.currentFile == nil { + return total + } + if pos, err := m.currentFile.Seek(0, io.SeekCurrent); err == nil { + total += pos + } + return total +} + // importDocuments is a helper to ImportDocuments and does all the ingestion // work by taking data from the inputReader source and writing it to the // appropriate namespace. It returns the number of documents successfully diff --git a/mongoimport/options.go b/mongoimport/options.go index a75861f72..a827f188b 100644 --- a/mongoimport/options.go +++ b/mongoimport/options.go @@ -14,10 +14,12 @@ import ( "github.com/mongodb/mongo-tools/common/options" ) -var Usage = ` +var Usage = ` [] Import CSV, TSV or JSON data into MongoDB. If no file is provided, mongoimport reads from stdin. +To import all .json files in a directory (JSON input only), use --dir. + Connection strings must begin with mongodb:// or mongodb+srv://. See http://docs.mongodb.com/database-tools/mongoimport/ for more information.` @@ -33,6 +35,9 @@ type InputOptions struct { // Specifies the location and name of a file containing the data to import. File string `long:"file" value-name:"" description:"file to import from; if not specified, stdin is used"` + // Specifies a directory containing files to import. + Dir string `long:"dir" value-name:"" description:"directory to import all JSON files from (only works with JSON format)"` + // Treats the input source's first line as field list (csv and tsv only). HeaderLine bool `long:"headerline" description:"use first line in input source as the field list (CSV and TSV only)"` @@ -148,13 +153,25 @@ func ParseOptions(rawArgs []string, versionStr, gitCommit string) (Options, erro opts.WriteConcern = wc // ensure either a positional argument is supplied or an argument is passed - // to the --file flag - and not both + // to the --file flag or --dir flag - but not multiple if inputOpts.File != "" && len(extraArgs) != 0 { return Options{}, fmt.Errorf( "error parsing positional arguments: cannot use both --file and a positional argument to set the input file", ) } + if inputOpts.Dir != "" && inputOpts.File != "" { + return Options{}, fmt.Errorf( + "error parsing positional arguments: cannot use both --dir and --file", + ) + } + + if inputOpts.Dir != "" && len(extraArgs) != 0 { + return Options{}, fmt.Errorf( + "error parsing positional arguments: cannot use both --dir and a positional argument", + ) + } + if inputOpts.File == "" { if len(extraArgs) != 0 { // if --file is not supplied, use the positional argument supplied