Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions input_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
@author: cloudysunny14@gmail.com
'''
import logging

from mapreduce.lib import files
import cloudstorage
from mapreduce import input_readers
from mapreduce import errors

Expand Down Expand Up @@ -87,6 +86,7 @@ def validate(cls, mapper_spec):

params = _get_params(mapper_spec)

# import pdb; pdb.set_trace()
if cls.FILE_PATHS_PARAM not in params:
raise BadReaderParamsError("Must specify 'file_path' for mapper input")

Expand Down Expand Up @@ -126,9 +126,8 @@ def split_input(cls, mapper_spec):
file_sizes = {}

for file_path in file_paths:
fp = files.BufferedFile(file_path)
fp.seek(0, 2)
file_sizes[file_path] = fp.tell()
fstat = cloudstorage.stat(file_path)
file_sizes[file_path] = fstat.st_size

shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count)
shards_per_file = shard_count // len(file_paths)
Expand Down Expand Up @@ -157,7 +156,7 @@ def next(self):
self._has_iterated = True

if not self._filestream:
self._filestream = files.BufferedFile(self._file_path)
self._filestream = cloudstorage.open(self._file_path)
if self._start_position:
self._filestream.seek(self._start_position)
self._filestream.readline()
Expand Down