diff --git a/input_readers.py b/input_readers.py index aab06df..6dcc28d 100644 --- a/input_readers.py +++ b/input_readers.py @@ -22,8 +22,7 @@ @author: cloudysunny14@gmail.com ''' import logging - -from mapreduce.lib import files +import cloudstorage from mapreduce import input_readers from mapreduce import errors @@ -87,6 +86,7 @@ def validate(cls, mapper_spec): params = _get_params(mapper_spec) + # import pdb; pdb.set_trace() if cls.FILE_PATHS_PARAM not in params: raise BadReaderParamsError("Must specify 'file_path' for mapper input") @@ -126,9 +126,8 @@ def split_input(cls, mapper_spec): file_sizes = {} for file_path in file_paths: - fp = files.BufferedFile(file_path) - fp.seek(0, 2) - file_sizes[file_path] = fp.tell() + fstat = cloudstorage.stat(file_path) + file_sizes[file_path] = fstat.st_size shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count) shards_per_file = shard_count // len(file_paths) @@ -157,7 +156,7 @@ def next(self): self._has_iterated = True if not self._filestream: - self._filestream = files.BufferedFile(self._file_path) + self._filestream = cloudstorage.open(self._file_path) if self._start_position: self._filestream.seek(self._start_position) self._filestream.readline()