From afaf16cf74908a3d15c043fb054f23aff066defd Mon Sep 17 00:00:00 2001 From: Kimmo Lehto Date: Tue, 14 Aug 2012 17:03:15 +0300 Subject: [PATCH 1/2] Added method enqueue_with_response that uses a temporary response tube to get result from a worker --- lib/stalker.rb | 20 ++++++++++++++++++-- test/stalker_test.rb | 12 ++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/lib/stalker.rb b/lib/stalker.rb index 55d6586..b6edbc3 100644 --- a/lib/stalker.rb +++ b/lib/stalker.rb @@ -21,6 +21,16 @@ def enqueue(job, args={}, opts={}) failed_connection(e) end + def enqueue_with_response(job, args={}, opts={}) + job_id = enqueue(job, {:beanstalk_respond_to => beanstalk_url}.merge(args), opts) + response_queue = "response.#{job}.#{job_id}" + beanstalk.watch response_queue + response = beanstalk.reserve(opts[:timeout]) + beanstalk.ignore response_queue + response.delete + JSON.parse(response.body).last + end + def job(j, &block) @@handlers ||= {} @@handlers[j] = block @@ -69,6 +79,7 @@ class JobTimeout < RuntimeError; end def work_one_job job = beanstalk.reserve name, args = JSON.parse job.body + beanstalk_respond_to = args.delete('beanstalk_respond_to') log_job_begin(name, args) handler = @@handlers[name] raise(NoSuchJob, name) unless handler @@ -80,12 +91,17 @@ def work_one_job block.call(name) end end - handler.call(args) + response = handler.call(args) + unless beanstalk_respond_to.nil? + response_queue = "response.#{name}.#{job.id}" + beanstalk.use response_queue + beanstalk.put [name, response].to_json + beanstalk.ignore response_queue + end end rescue Timeout::Error raise JobTimeout, "#{name} hit #{job.ttr-1}s timeout" end - job.delete log_job_end(name) rescue Beanstalk::NotConnected => e diff --git a/test/stalker_test.rb b/test/stalker_test.rb index 8d64740..8bac4a3 100644 --- a/test/stalker_test.rb +++ b/test/stalker_test.rb @@ -31,6 +31,18 @@ def with_an_error_handler assert_equal val, $result end + test "enqueue and work a job with response" do + val = rand(999999) + worker = Thread.new { + Stalker.job('my.job') { |args| args['val'] } + Stalker.prep + Stalker.work_one_job + } + result = Stalker.enqueue_with_response('my.job', {:val => val}, :timeout => 10) + worker.kill + assert_equal val, result['val'] + end + test "invoke error handler when defined" do with_an_error_handler Stalker.job('my.job') { |args| fail } From 8f7775a8a6fbc8e630e03d7d80071b87f660aea5 Mon Sep 17 00:00:00 2001 From: Kimmo Lehto Date: Tue, 14 Aug 2012 17:46:29 +0300 Subject: [PATCH 2/2] realized it's not testable with single thread --- test/stalker_test.rb | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/test/stalker_test.rb b/test/stalker_test.rb index 8bac4a3..8d64740 100644 --- a/test/stalker_test.rb +++ b/test/stalker_test.rb @@ -31,18 +31,6 @@ def with_an_error_handler assert_equal val, $result end - test "enqueue and work a job with response" do - val = rand(999999) - worker = Thread.new { - Stalker.job('my.job') { |args| args['val'] } - Stalker.prep - Stalker.work_one_job - } - result = Stalker.enqueue_with_response('my.job', {:val => val}, :timeout => 10) - worker.kill - assert_equal val, result['val'] - end - test "invoke error handler when defined" do with_an_error_handler Stalker.job('my.job') { |args| fail }