diff --git a/kafka_replayer.py b/kafka_replayer.py index ec3798e..5c48ade 100644 --- a/kafka_replayer.py +++ b/kafka_replayer.py @@ -101,12 +101,12 @@ def _binary_search(self, consumer, tp, start, end, target_time): return self._binary_search(consumer, tp, start, insertion_point, target_time) return start - def replay(self, start_time, end_time): + def replay(self, start_time, end_time=None): """Replay all specified partitions over the specified time range (inclusive). Args: start_time: The start timestamp in milliseconds - end_time: The end timestamp in milliseconds + end_time: The end timestamp in milliseconds or None for open ended range Yields: The next ConsumerRecord found within the given time range @@ -116,11 +116,11 @@ def replay(self, start_time, end_time): """ if start_time < 0: raise ValueError('start_time must be non-negative') - if end_time < 0: + if end_time and end_time < 0: raise ValueError('end_time must be non-negative') if start_time > self._get_time_millis(): raise ValueError('start_time must not be in the future') - if start_time > end_time: + if end_time and start_time > end_time: raise ValueError('end_time must be at least start_time') count = 0 last_timestamp = 0 @@ -140,7 +140,7 @@ def replay(self, start_time, end_time): partitions = set() else: last_timestamp = record.timestamp - if last_timestamp > end_time: + if end_time and last_timestamp > end_time: # Since partitions are ordered, if we see a too-new timestamp, mark the # partition complete. partitions.discard(record.partition) @@ -149,7 +149,7 @@ def replay(self, start_time, end_time): consumer.pause(tp) self._logger.debug('Completed partition {0}'.format(tp)) elif (record.partition in partitions and last_timestamp >= start_time - and last_timestamp <= end_time): + and (not end_time or last_timestamp <= end_time)): # Send the record to the client if it's within the specified time range yield record count += 1 @@ -162,4 +162,3 @@ def replay(self, start_time, end_time): self._logger.info('Processed {0} offsets, last timestamp: {1}'.format( count, last_timestamp)) consumer.close() -