diff --git a/src/fairy.coffee b/src/fairy.coffee index 2de227e..bf4ab8d 100644 --- a/src/fairy.coffee +++ b/src/fairy.coffee @@ -125,7 +125,7 @@ shut_down = (signo = 'SIGTERM') -> return workers .filter(({task}) -> task) .forEach((worker) -> worker.shut_down(on)) - + shutting_down = on workers @@ -234,8 +234,8 @@ class Fairy # values are according objects of class `Queue`. constructor: (@options) -> @id = uuid.v4() - @redis = create_client options - @pubsub = create_client options, on + @redis = create_client @options + @pubsub = create_client @options, on @queue_pool = {} @@ -263,7 +263,7 @@ class Fairy # ### Get All Queues Asynchronously - + # Return named queues whose names are stored in the `QUEUES` set. # # queues = fairy.queues() @@ -276,7 +276,7 @@ class Fairy # ### Get Statistics for All Queues Asynchronously - + # `statistics` is an asynchronous method. The only arg of the callback # function is an array containing statistics of all queues. The actual dirty # work is handed to objects of class `Queue`'s `statistics` method. @@ -327,8 +327,8 @@ class Queue # The constructor of class `Queue` stores the Redis connection and the name # of the queue as instance properties. constructor: (@fairy, @name) -> - {@redis, @pubsub} = fairy - + {@redis, @pubsub} = @fairy + @workingTasks = {} # ### Function to Resolve Key Name @@ -349,12 +349,12 @@ class Queue key: (key) -> "#{prefix}:#{key}:#{@name}" - + # ### Placing Tasks # Tasks will be pushed into `SOURCE` Redis lists: - # + # # + `foo` tasks will be queued at `SOURCE:foo` list. # + A callback is optional. # + Arguments except the (optional) callback function will be serialized as @@ -367,7 +367,7 @@ class Queue # **Usage:** # # queue.enqueue 'group_id', 'param2', (err, res) -> # YOUR CODE - # + # # A transaction ensures the atomicity. enqueue: (group) => original_arguments = Array.prototype.slice.call(arguments) @@ -465,7 +465,7 @@ class Queue # # Commit the transaction, re-initiate the transaction when concurrency # occurred, otherwise the retry is finished. - + client.watch groups.map((group) => "#{@key('QUEUED')}:#{group}")... if groups.length start_transaction = => multi = client.multi() @@ -485,7 +485,7 @@ class Queue retry callback # If there're blocked task groups, then: - # + # # 1. Find all blocked tasks, and: # 2. Push them into the temporary tasks array, finally: # 3. Start the transaction when this is done for all blocked groups. @@ -500,7 +500,7 @@ class Queue # ### Clear A Queue - + # Remove **all** tasks of the queue, and reset statistics. Set `TOTAL` to # `PROCESSING` tasks to prevent negative pending tasks being calculated. clear: (callback) => @@ -520,7 +520,7 @@ class Queue # ### Ignore Failed Tasks - + # Remove **all** tasks of the queue, and reset statistics. Set `TOTAL` to # `PROCESSING` tasks to prevent negative pending tasks being calculated. ignore_failed_tasks: (callback) => @@ -579,7 +579,7 @@ class Queue retry callback # If there're blocked task groups, then: - # + # # 1. Find all blocked tasks, and: # 2. Push them into the temporary tasks array, finally: # 3. Start the transaction when this is done for all blocked groups. @@ -667,12 +667,12 @@ class Queue # ### Get Failed Tasks Asynchronously - + # Failed tasks are stored in the `FAILED` list. # # `failed_tasks` is an asynchronous method. Arguments of the callback function # follow node.js convention: `err` and `res`. - # + # # Below is an example `res` array: # # [{ id: '8c0c3eab-8114-41d6-8808-2ae8615d38b4', @@ -703,7 +703,7 @@ class Queue # ### Get Blocked Groups Asynchronously - + # Blocked groups' identifiers are stored in the `BLOCKED` set. # # `blocked_groups` is an asynchronous method. Arguments of the callback @@ -762,7 +762,7 @@ class Queue # ### Get Processing Tasks Asynchronously - + # Currently processing tasks are tasks in the `PROCESSING` list. # # `processing_tasks` is an asynchronous method. Arguments of the callback @@ -875,9 +875,9 @@ class Queue # ### Get Statistics of a Queue Asynchronously - + # Statistics of a queue include: - # + # # + `name`, name of the queue. # + `workers`, total live workers. # + `processing_tasks`, total processing tasks. @@ -977,7 +977,7 @@ class Queue result.averageprocess_time = '-' # Calculate blocked and pending tasks: - # + # # 1. Initiate another transaction to count all `BLOCKED` tasks. Blocked # tasks are tasks in the `QUEUED` lists whose group identifiers are in # the `BLOCKED` set. **Note:** The leftmost task of each `QUEUED` list @@ -1011,7 +1011,7 @@ class Worker # + Polling interval in milliseconds # + Maximum times of retries # + Retry interval in milliseconds - # + Storage capacity for newly finished tasks + # + Storage capacity for newly finished tasks # + Storage capacity for slowest tasks retry_limit : 2 retry_delay : 0 @@ -1019,19 +1019,33 @@ class Worker slowest_size : 10000 constructor: (@queue, @handler) -> - {@name, @fairy, @redis, @pubsub} = queue + {@name, @fairy, @redis, @pubsub} = @queue @id = uuid.v4() @redis.hset @key('WORKERS'), @id, "#{version}|#{os.hostname()}|#{server_ip()}|#{process.pid}|#{Date.now()}" @pubsub.subscribe @key('ENQUEUED') @pubsub.on 'message', (channel, message) => + if channel == 'FAIRY:COMPLETE' + return @removeLock(message); return unless channel in [@key('ENQUEUED')] @start() if @idle @idle = on @start() - + key: (key) -> "#{prefix}:#{key}:#{@name}" + # ### Remove worker lock + # **Private** method. Remove any lock we put on a worker. This is important. + # + # On each and every message, we add the messageId with a value = true to the + # temporary object call `workingTasks` in the parent `Queue` object. We can + # then check for every message if any other `Worker` has picked up this + # message and not process it, thus free the worker for other message + removeLock: (message) => + task = JSON.parse(message) + if task && Array.isArray(task) && task[0] + delete @queue.workingTasks[task[0]] + return # ### Poll New Task @@ -1048,12 +1062,18 @@ class Worker # immediately. # # If there's no tasks in the `SOURCE` list, take worker into `idle` state. + # + # In here we check if the taskId have not been added to `Queue.workingTasks` + # If not we set the value of `workingTasks[messageId]` to `true` + # If the value has been set, we process like normal and free the `Worker` start: => return @shut_down() if shutting_down or error @idle = off @redis.watch @key('SOURCE') @redis.lindex @key('SOURCE'), 0, (err, res) => - if task = JSON.parse(res) + task = JSON.parse(res) + if task && Array.isArray(task) && !@queue.workingTasks[task[0]] + @queue.workingTasks[task[0]] = true @redis.multi() .lpop(@key('SOURCE')) .rpush("#{@key('QUEUED')}:#{task[1]}", res) @@ -1079,7 +1099,7 @@ class Worker # # Calling the callback function is the responsibility of you. Otherwise # `Fairy` will stop dispatching tasks. - process: (@task) => + process: (task) => @redis.hset @key('PROCESSING'), @id, JSON.stringify([task..., task.start_time = Date.now()]) # Before Processing the Task: diff --git a/test/0.test.coffee b/test/0.test.coffee index 89c0716..d426fa8 100644 --- a/test/0.test.coffee +++ b/test/0.test.coffee @@ -10,7 +10,7 @@ total_groups = 5 total_tasks = 200 total_workers = require('os').cpus().length -describe ["Process #{total_tasks} Tasks of #{total_groups} Groups by #{total_workers} Perfect Workers"], -> +describe "Process #{total_tasks} Tasks of #{total_groups} Groups by #{total_workers} Perfect Workers", -> @timeout(200000) diff --git a/test/1.test.coffee b/test/1.test.coffee index a1dd1ae..5f7c0c5 100644 --- a/test/1.test.coffee +++ b/test/1.test.coffee @@ -10,7 +10,7 @@ total_groups = 5 total_tasks = 200 total_workers = require('os').cpus().length -describe ["Process #{total_tasks} Tasks of #{total_groups} Groups by #{total_workers} Fail-n-Block Workers"], -> +describe "Process #{total_tasks} Tasks of #{total_groups} Groups by #{total_workers} Fail-n-Block Workers", -> @timeout(200000) diff --git a/test/7.test.coffee b/test/7.test.coffee index 98887e9..ee88277 100644 --- a/test/7.test.coffee +++ b/test/7.test.coffee @@ -3,7 +3,7 @@ queue = fairy.queue('TEST7') uuid = require('node-uuid') should = require 'should' -describe ["Enqueuer should receive progress notification"], -> +describe "Enqueuer should receive progress notification", -> @timeout(200000) diff --git a/test/8.test.coffee b/test/8.test.coffee index f2bde9a..e30a3f4 100644 --- a/test/8.test.coffee +++ b/test/8.test.coffee @@ -3,7 +3,7 @@ queue = fairy.queue('TEST8') uuid = require('node-uuid') should = require 'should' -describe ["Enqueuer should receive progress notification"], -> +describe "Enqueuer should receive progress notification", -> @timeout(200000)