Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
## [1.1.0] - 2024-11-03
### Added
- Support for error handling
- attr_readers for `max_batch_size` and `max_interval_seconds`
- update to support more recent minitest

## [1.0.0] - 2019-09-01
### Added
- Initial release of the gem.
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
batch_queue (1.0.0)
batch_queue (1.1.0)

GEM
remote: https://rubygems.org/
Expand Down
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,30 @@ or
bq << MyJob.new(...)

```
### 3. Error handling
You have two options for handling errors in `BatchQueue`:

* Rescue exceptions within the processing block:

```
bq = BatchQueue.new(max_batch_size: 20, max_interval_seconds: 60) do |batch_metric_data|
begin
# Put your code that you want to execute here.
rescue => e
# Handle the exception here.
end
end

```

* Set a global error handler:

```
bq.on_error = ->(e) { puts e.message }
```

If neither method is used, `BatchQueue` will catch the exception and print it to
the standard console output.

## Development

Expand Down
16 changes: 14 additions & 2 deletions lib/batch_queue/batch_queue.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
class BatchQueue
attr_reader :max_batch_size
attr_reader :max_interval_seconds

# starts the queue
# either max_batch_size or interval_milliseconds or both must be set
def initialize(max_batch_size: nil, max_interval_seconds: nil, &block)
Expand All @@ -13,6 +16,7 @@ def initialize(max_batch_size: nil, max_interval_seconds: nil, &block)
@mutex = Mutex.new
@cond_var = ConditionVariable.new
@runner = Thread.new { run }
@on_error_callback = nil

at_exit do
stop
Expand Down Expand Up @@ -44,6 +48,10 @@ def stop
@runner.join
end

def on_error(&block)
@on_error_callback = block
end

private

def run
Expand Down Expand Up @@ -81,8 +89,12 @@ def process_batch(arr)
@mutex.unlock
begin
@block.call(arr)
rescue StandardError => exc
puts "BatchQueue: Unhandled exception #{exc.inspect}"
rescue StandardError => e
if @on_error_callback
@on_error_callback.call(e)
else
puts "BatchQueue: Unhandled exception #{exc.inspect}"
end
ensure
@mutex.lock
end
Expand Down
2 changes: 1 addition & 1 deletion lib/batch_queue/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
class BatchQueue
VERSION = "1.0.0"
VERSION = "1.1.0"
end
21 changes: 21 additions & 0 deletions test/batch_queue_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,27 @@ def test_it_batches_at_max_interval
assert_equal 0, bq.size
end

def test_it_handles_error
is_error_handled = false

bq = BatchQueue.new(max_batch_size: 2, max_interval_seconds: 1) do |_arr|
raise 'an error'
end

bq.on_error do |err|
is_error_handled = true
assert_equal 'an error', err.message
end

bq << 'Yo'
sleep(0.2)
assert !is_error_handled

bq << 'Whatsup'
sleep(0.2)
assert is_error_handled
end

def test_it_handles_close
processed_count = 0
bq = BatchQueue.new(max_batch_size: 2, max_interval_seconds: 1) do |arr|
Expand Down
4 changes: 3 additions & 1 deletion test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@

require "minitest/autorun"
require 'minitest/reporters'
MiniTest::Reporters.use!

# Only use Minitest::Reporters if not running in RubyMine
Minitest::Reporters.use! unless ENV['RM_INFO']
Loading