diff --git a/lib/stalker.rb b/lib/stalker.rb index 55d6586..b6edbc3 100644 --- a/lib/stalker.rb +++ b/lib/stalker.rb @@ -21,6 +21,16 @@ def enqueue(job, args={}, opts={}) failed_connection(e) end + def enqueue_with_response(job, args={}, opts={}) + job_id = enqueue(job, {:beanstalk_respond_to => beanstalk_url}.merge(args), opts) + response_queue = "response.#{job}.#{job_id}" + beanstalk.watch response_queue + response = beanstalk.reserve(opts[:timeout]) + beanstalk.ignore response_queue + response.delete + JSON.parse(response.body).last + end + def job(j, &block) @@handlers ||= {} @@handlers[j] = block @@ -69,6 +79,7 @@ class JobTimeout < RuntimeError; end def work_one_job job = beanstalk.reserve name, args = JSON.parse job.body + beanstalk_respond_to = args.delete('beanstalk_respond_to') log_job_begin(name, args) handler = @@handlers[name] raise(NoSuchJob, name) unless handler @@ -80,12 +91,17 @@ def work_one_job block.call(name) end end - handler.call(args) + response = handler.call(args) + unless beanstalk_respond_to.nil? + response_queue = "response.#{name}.#{job.id}" + beanstalk.use response_queue + beanstalk.put [name, response].to_json + beanstalk.ignore response_queue + end end rescue Timeout::Error raise JobTimeout, "#{name} hit #{job.ttr-1}s timeout" end - job.delete log_job_end(name) rescue Beanstalk::NotConnected => e