Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 289 additions & 6 deletions mongoimport/mongoimport.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ func (imp *MongoImport) validateSettings() error {
}
}

// validate --dir flag early, before type-specific validations
if imp.InputOptions.Dir != "" {
if imp.InputOptions.Type != JSON {
return fmt.Errorf("--dir can only be used with JSON input type")
}
if imp.InputOptions.JSONArray {
return fmt.Errorf("cannot use --jsonArray with --dir")
}
}

// ensure headers are supplied for CSV/TSV
if imp.InputOptions.Type == CSV ||
imp.InputOptions.Type == TSV {
Expand Down Expand Up @@ -268,13 +278,20 @@ func (imp *MongoImport) validateSettings() error {
// ensure we have a valid string to use for the collection
if imp.ToolOptions.Collection == "" {
log.Logvf(log.Always, "no collection specified")
fileBaseName := filepath.Base(imp.InputOptions.File)
lastDotIndex := strings.LastIndex(fileBaseName, ".")
if lastDotIndex != -1 {
fileBaseName = fileBaseName[0:lastDotIndex]
// For directory imports, use the directory name as collection
if imp.InputOptions.Dir != "" {
dirBaseName := filepath.Base(imp.InputOptions.Dir)
log.Logvf(log.Always, "using directory name '%v' as collection", dirBaseName)
imp.ToolOptions.Collection = dirBaseName
} else {
fileBaseName := filepath.Base(imp.InputOptions.File)
lastDotIndex := strings.LastIndex(fileBaseName, ".")
if lastDotIndex != -1 {
fileBaseName = fileBaseName[0:lastDotIndex]
}
log.Logvf(log.Always, "using filename '%v' as collection", fileBaseName)
imp.ToolOptions.Collection = fileBaseName
}
log.Logvf(log.Always, "using filename '%v' as collection", fileBaseName)
imp.ToolOptions.Collection = fileBaseName
}
err = util.ValidateCollectionName(imp.ToolOptions.Collection)
if err != nil {
Expand Down Expand Up @@ -306,6 +323,36 @@ func (imp *MongoImport) getSourceReader() (io.ReadCloser, int64, error) {
return os.Stdin, 0, nil
}

// getJSONFilesFromDir returns a list of JSON files from the specified directory.
func (imp *MongoImport) getJSONFilesFromDir(dirPath string) ([]string, error) {
var jsonFiles []string

universalDirPath := util.ToUniversalPath(dirPath)
entries, err := os.ReadDir(universalDirPath)
if err != nil {
return nil, fmt.Errorf("error reading directory: %v", err)
}

for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
// Accept .json files only
if strings.HasSuffix(strings.ToLower(name), ".json") {
fullPath := filepath.Join(universalDirPath, name)
jsonFiles = append(jsonFiles, fullPath)
}
}

if len(jsonFiles) == 0 {
return nil, fmt.Errorf("no JSON files found in directory: %v", dirPath)
}

log.Logvf(log.Info, "found %v JSON file(s) in directory", len(jsonFiles))
return jsonFiles, nil
}

// fileSizeProgressor implements Progressor to allow a sizeTracker to hook up with a
// progress.Bar instance, so that the progress bar can report the percentage of the file read.
type fileSizeProgressor struct {
Expand All @@ -321,6 +368,12 @@ func (fsp *fileSizeProgressor) Progress() (int64, int64) {
// number of documents successfully imported to the appropriate namespace,
// the number of failures, and any error encountered in doing this.
func (imp *MongoImport) ImportDocuments() (uint64, uint64, error) {
// Handle directory imports with optimized batch loading
if imp.InputOptions.Dir != "" {
return imp.importFromDirectoryBatched()
}

// Existing single-file import path (unchanged)
source, fileSize, err := imp.getSourceReader()
if err != nil {
return 0, 0, err
Expand Down Expand Up @@ -355,6 +408,236 @@ func (imp *MongoImport) ImportDocuments() (uint64, uint64, error) {
return imp.importDocuments(inputReader)
}

// importFromDirectoryBatched imports all JSON files from a directory using
// true batch loading - documents from multiple files are batched together
// in the same insertMany/bulkWrite operations for optimal performance.
func (imp *MongoImport) importFromDirectoryBatched() (uint64, uint64, error) {
jsonFiles, err := imp.getJSONFilesFromDir(imp.InputOptions.Dir)
if err != nil {
return 0, 0, err
}

log.Logvf(
log.Always,
"importing %v JSON file(s) from directory: %v",
len(jsonFiles),
imp.InputOptions.Dir,
)

// Connect once for all files
session, err := imp.SessionProvider.GetSession()
if err != nil {
return 0, 0, err
}

log.Logvf(log.Always, "connected to: %v", util.SanitizeURI(imp.ToolOptions.ConnectionString))
log.Logvf(log.Info, "ns: %v.%v", imp.ToolOptions.DB, imp.ToolOptions.Collection)

// Check node type
imp.nodeType, err = imp.SessionProvider.GetNodeType()
if err != nil {
return 0, 0, fmt.Errorf("error checking connected node type: %v", err)
}
log.Logvf(log.Info, "connected to node type: %v", imp.nodeType)

// Drop collection if necessary
if imp.IngestOptions.Drop {
log.Logvf(log.Always, "dropping: %v.%v", imp.ToolOptions.DB, imp.ToolOptions.Collection)
collection := session.Database(imp.ToolOptions.DB).Collection(imp.ToolOptions.Collection)
if err := collection.Drop(context.TODO()); err != nil {
return 0, 0, err
}
}

// Single shared document channel for ALL files
readDocs := make(chan bson.D, workerBufferSize)
processingErrChan := make(chan error)

// Start ingestion workers ONCE for all files
// These will use insertMany/bulkWrite across ALL documents
go func() {
processingErrChan <- imp.ingestDocuments(readDocs)
}()

// Calculate total size for progress tracking
var totalSize int64
for _, filePath := range jsonFiles {
if stat, err := os.Stat(filePath); err == nil {
totalSize += stat.Size()
}
}

// Create a combined size tracker for all files.
// Note: InputReader.StreamDocument closes the output channel it is given.
// For directory imports we therefore stream each file into its own channel and
// forward into readDocs, which we control.
combinedTracker := &multiFileSizeTracker{}

// Progress bar for entire directory
bar := &progress.Bar{
Name: fmt.Sprintf(
"%v.%v [%v files]",
imp.ToolOptions.DB,
imp.ToolOptions.Collection,
len(jsonFiles),
),
Watching: &fileSizeProgressor{totalSize, combinedTracker},
Writer: log.Writer(0),
BarLength: progressBarLength,
IsBytes: true,
}
bar.Start()
defer bar.Stop()

// Stream documents from all files into the shared channel.
// We cannot pass readDocs directly to StreamDocument because StreamDocument
// closes the channel it is given.
go func() {
defer close(readDocs)

for i, filePath := range jsonFiles {
log.Logvf(
log.Info,
"streaming file %v/%v: %v",
i+1,
len(jsonFiles),
filepath.Base(filePath),
)

file, err := os.Open(util.ToUniversalPath(filePath))
if err != nil {
log.Logvf(log.Always, "error opening file %v: %v", filePath, err)
// Ensure progress can still reach 100% when skipping files.
if st, stErr := os.Stat(util.ToUniversalPath(filePath)); stErr == nil {
combinedTracker.addCompleted(st.Size())
}
if imp.IngestOptions.StopOnError {
processingErrChan <- err
return
}
continue
}

fileStat, err := file.Stat()
if err != nil {
_ = file.Close()
log.Logvf(log.Always, "error stating file %v: %v", filePath, err)
// Best-effort progress accounting when file stat fails.
if st, stErr := os.Stat(util.ToUniversalPath(filePath)); stErr == nil {
combinedTracker.addCompleted(st.Size())
}
if imp.IngestOptions.StopOnError {
processingErrChan <- err
return
}
continue
}
combinedTracker.setCurrent(file, fileStat.Size())

inputReader, err := imp.getInputReader(file)
if err != nil {
combinedTracker.markCurrentDone()
_ = file.Close()
log.Logvf(log.Always, "error creating input reader for %v: %v", filePath, err)
if imp.IngestOptions.StopOnError {
processingErrChan <- err
return
}
continue
}

// Per-file channel that StreamDocument owns (it will close it).
fileDocs := make(chan bson.D, workerBufferSize)
streamErrChan := make(chan error, 1)
go func() {
// The `ordered` argument controls whether decoding preserves read order.
// Respect the user's --maintainInsertionOrder preference for directory imports.
streamErrChan <- inputReader.StreamDocument(imp.IngestOptions.MaintainInsertionOrder, fileDocs)
}()

for doc := range fileDocs {
readDocs <- doc
}
streamErr := <-streamErrChan

combinedTracker.markCurrentDone()
_ = file.Close()

if streamErr != nil {
log.Logvf(log.Always, "error streaming from file %v: %v", filePath, streamErr)
if imp.IngestOptions.StopOnError {
processingErrChan <- streamErr
return
}
}

log.Logvf(log.Info, "completed streaming file: %v", filepath.Base(filePath))
}
processingErrChan <- nil
}()

// Wait for completion
err = channelQuorumError(processingErrChan)
processedCount := atomic.LoadUint64(&imp.processedCount)
failureCount := atomic.LoadUint64(&imp.failureCount)

log.Logvf(
log.Always,
"directory import complete: %v documents imported, %v failed",
processedCount,
failureCount,
)
return processedCount, failureCount, err
}

// multiFileSizeTracker tracks bytes read across multiple files for progress reporting.
type multiFileSizeTracker struct {
mu sync.Mutex
currentFile *os.File
currentFileSize int64
completedBytes int64
}

func (m *multiFileSizeTracker) setCurrent(file *os.File, size int64) {
m.mu.Lock()
defer m.mu.Unlock()
m.currentFile = file
m.currentFileSize = size
}

func (m *multiFileSizeTracker) markCurrentDone() {
m.mu.Lock()
defer m.mu.Unlock()
if m.currentFile != nil {
m.completedBytes += m.currentFileSize
m.currentFile = nil
m.currentFileSize = 0
}
}

func (m *multiFileSizeTracker) addCompleted(bytes int64) {
if bytes <= 0 {
return
}
m.mu.Lock()
defer m.mu.Unlock()
m.completedBytes += bytes
}

func (m *multiFileSizeTracker) Size() int64 {
m.mu.Lock()
defer m.mu.Unlock()

total := m.completedBytes
if m.currentFile == nil {
return total
}
if pos, err := m.currentFile.Seek(0, io.SeekCurrent); err == nil {
total += pos
}
return total
}

// importDocuments is a helper to ImportDocuments and does all the ingestion
// work by taking data from the inputReader source and writing it to the
// appropriate namespace. It returns the number of documents successfully
Expand Down
21 changes: 19 additions & 2 deletions mongoimport/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (
"github.com/mongodb/mongo-tools/common/options"
)

var Usage = `<options> <connection-string> <file>
var Usage = `<options> <connection-string> [<file>]

Import CSV, TSV or JSON data into MongoDB. If no file is provided, mongoimport reads from stdin.

To import all .json files in a directory (JSON input only), use --dir.

Connection strings must begin with mongodb:// or mongodb+srv://.

See http://docs.mongodb.com/database-tools/mongoimport/ for more information.`
Expand All @@ -33,6 +35,9 @@ type InputOptions struct {
// Specifies the location and name of a file containing the data to import.
File string `long:"file" value-name:"<filename>" description:"file to import from; if not specified, stdin is used"`

// Specifies a directory containing files to import.
Dir string `long:"dir" value-name:"<directory>" description:"directory to import all JSON files from (only works with JSON format)"`

// Treats the input source's first line as field list (csv and tsv only).
HeaderLine bool `long:"headerline" description:"use first line in input source as the field list (CSV and TSV only)"`

Expand Down Expand Up @@ -148,13 +153,25 @@ func ParseOptions(rawArgs []string, versionStr, gitCommit string) (Options, erro
opts.WriteConcern = wc

// ensure either a positional argument is supplied or an argument is passed
// to the --file flag - and not both
// to the --file flag or --dir flag - but not multiple
if inputOpts.File != "" && len(extraArgs) != 0 {
return Options{}, fmt.Errorf(
"error parsing positional arguments: cannot use both --file and a positional argument to set the input file",
)
}

if inputOpts.Dir != "" && inputOpts.File != "" {
return Options{}, fmt.Errorf(
"error parsing positional arguments: cannot use both --dir and --file",
)
}

if inputOpts.Dir != "" && len(extraArgs) != 0 {
return Options{}, fmt.Errorf(
"error parsing positional arguments: cannot use both --dir and a positional argument",
)
}

if inputOpts.File == "" {
if len(extraArgs) != 0 {
// if --file is not supplied, use the positional argument supplied
Expand Down