Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 47 additions & 27 deletions src/fairy.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ shut_down = (signo = 'SIGTERM') ->
return workers
.filter(({task}) -> task)
.forEach((worker) -> worker.shut_down(on))

shutting_down = on

workers
Expand Down Expand Up @@ -234,8 +234,8 @@ class Fairy
# values are according objects of class `Queue`.
constructor: (@options) ->
@id = uuid.v4()
@redis = create_client options
@pubsub = create_client options, on
@redis = create_client @options
@pubsub = create_client @options, on
@queue_pool = {}


Expand Down Expand Up @@ -263,7 +263,7 @@ class Fairy


# ### Get All Queues Asynchronously

# Return named queues whose names are stored in the `QUEUES` set.
#
# queues = fairy.queues()
Expand All @@ -276,7 +276,7 @@ class Fairy


# ### Get Statistics for All Queues Asynchronously

# `statistics` is an asynchronous method. The only arg of the callback
# function is an array containing statistics of all queues. The actual dirty
# work is handed to objects of class `Queue`'s `statistics` method.
Expand Down Expand Up @@ -327,8 +327,8 @@ class Queue
# The constructor of class `Queue` stores the Redis connection and the name
# of the queue as instance properties.
constructor: (@fairy, @name) ->
{@redis, @pubsub} = fairy

{@redis, @pubsub} = @fairy
@workingTasks = {}

# ### Function to Resolve Key Name

Expand All @@ -349,12 +349,12 @@ class Queue
key: (key) -> "#{prefix}:#{key}:#{@name}"




# ### Placing Tasks

# Tasks will be pushed into `SOURCE` Redis lists:
#
#
# + `foo` tasks will be queued at `SOURCE:foo` list.
# + A callback is optional.
# + Arguments except the (optional) callback function will be serialized as
Expand All @@ -367,7 +367,7 @@ class Queue
# **Usage:**
#
# queue.enqueue 'group_id', 'param2', (err, res) -> # YOUR CODE
#
#
# A transaction ensures the atomicity.
enqueue: (group) =>
original_arguments = Array.prototype.slice.call(arguments)
Expand Down Expand Up @@ -465,7 +465,7 @@ class Queue
#
# Commit the transaction, re-initiate the transaction when concurrency
# occurred, otherwise the retry is finished.

client.watch groups.map((group) => "#{@key('QUEUED')}:#{group}")... if groups.length
start_transaction = =>
multi = client.multi()
Expand All @@ -485,7 +485,7 @@ class Queue
retry callback

# If there're blocked task groups, then:
#
#
# 1. Find all blocked tasks, and:
# 2. Push them into the temporary tasks array, finally:
# 3. Start the transaction when this is done for all blocked groups.
Expand All @@ -500,7 +500,7 @@ class Queue


# ### Clear A Queue

# Remove **all** tasks of the queue, and reset statistics. Set `TOTAL` to
# `PROCESSING` tasks to prevent negative pending tasks being calculated.
clear: (callback) =>
Expand All @@ -520,7 +520,7 @@ class Queue


# ### Ignore Failed Tasks

# Remove **all** tasks of the queue, and reset statistics. Set `TOTAL` to
# `PROCESSING` tasks to prevent negative pending tasks being calculated.
ignore_failed_tasks: (callback) =>
Expand Down Expand Up @@ -579,7 +579,7 @@ class Queue
retry callback

# If there're blocked task groups, then:
#
#
# 1. Find all blocked tasks, and:
# 2. Push them into the temporary tasks array, finally:
# 3. Start the transaction when this is done for all blocked groups.
Expand Down Expand Up @@ -667,12 +667,12 @@ class Queue


# ### Get Failed Tasks Asynchronously

# Failed tasks are stored in the `FAILED` list.
#
# `failed_tasks` is an asynchronous method. Arguments of the callback function
# follow node.js convention: `err` and `res`.
#
#
# Below is an example `res` array:
#
# [{ id: '8c0c3eab-8114-41d6-8808-2ae8615d38b4',
Expand Down Expand Up @@ -703,7 +703,7 @@ class Queue


# ### Get Blocked Groups Asynchronously

# Blocked groups' identifiers are stored in the `BLOCKED` set.
#
# `blocked_groups` is an asynchronous method. Arguments of the callback
Expand Down Expand Up @@ -762,7 +762,7 @@ class Queue


# ### Get Processing Tasks Asynchronously

# Currently processing tasks are tasks in the `PROCESSING` list.
#
# `processing_tasks` is an asynchronous method. Arguments of the callback
Expand Down Expand Up @@ -875,9 +875,9 @@ class Queue


# ### Get Statistics of a Queue Asynchronously

# Statistics of a queue include:
#
#
# + `name`, name of the queue.
# + `workers`, total live workers.
# + `processing_tasks`, total processing tasks.
Expand Down Expand Up @@ -977,7 +977,7 @@ class Queue
result.averageprocess_time = '-'

# Calculate blocked and pending tasks:
#
#
# 1. Initiate another transaction to count all `BLOCKED` tasks. Blocked
# tasks are tasks in the `QUEUED` lists whose group identifiers are in
# the `BLOCKED` set. **Note:** The leftmost task of each `QUEUED` list
Expand Down Expand Up @@ -1011,27 +1011,41 @@ class Worker
# + Polling interval in milliseconds
# + Maximum times of retries
# + Retry interval in milliseconds
# + Storage capacity for newly finished tasks
# + Storage capacity for newly finished tasks
# + Storage capacity for slowest tasks
retry_limit : 2
retry_delay : 0
recent_size : 100000
slowest_size : 10000

constructor: (@queue, @handler) ->
{@name, @fairy, @redis, @pubsub} = queue
{@name, @fairy, @redis, @pubsub} = @queue
@id = uuid.v4()
@redis.hset @key('WORKERS'), @id, "#{version}|#{os.hostname()}|#{server_ip()}|#{process.pid}|#{Date.now()}"

@pubsub.subscribe @key('ENQUEUED')
@pubsub.on 'message', (channel, message) =>
if channel == 'FAIRY:COMPLETE'
return @removeLock(message);
return unless channel in [@key('ENQUEUED')]
@start() if @idle
@idle = on
@start()

key: (key) -> "#{prefix}:#{key}:#{@name}"

# ### Remove worker lock
# **Private** method. Remove any lock we put on a worker. This is important.
#
# On each and every message, we add the messageId with a value = true to the
# temporary object call `workingTasks` in the parent `Queue` object. We can
# then check for every message if any other `Worker` has picked up this
# message and not process it, thus free the worker for other message
removeLock: (message) =>
task = JSON.parse(message)
if task && Array.isArray(task) && task[0]
delete @queue.workingTasks[task[0]]
return

# ### Poll New Task

Expand All @@ -1048,12 +1062,18 @@ class Worker
# immediately.
#
# If there's no tasks in the `SOURCE` list, take worker into `idle` state.
#
# In here we check if the taskId have not been added to `Queue.workingTasks`
# If not we set the value of `workingTasks[messageId]` to `true`
# If the value has been set, we process like normal and free the `Worker`
start: =>
return @shut_down() if shutting_down or error
@idle = off
@redis.watch @key('SOURCE')
@redis.lindex @key('SOURCE'), 0, (err, res) =>
if task = JSON.parse(res)
task = JSON.parse(res)
if task && Array.isArray(task) && !@queue.workingTasks[task[0]]
@queue.workingTasks[task[0]] = true
@redis.multi()
.lpop(@key('SOURCE'))
.rpush("#{@key('QUEUED')}:#{task[1]}", res)
Expand All @@ -1079,7 +1099,7 @@ class Worker
#
# Calling the callback function is the responsibility of you. Otherwise
# `Fairy` will stop dispatching tasks.
process: (@task) =>
process: (task) =>
@redis.hset @key('PROCESSING'), @id, JSON.stringify([task..., task.start_time = Date.now()])

# Before Processing the Task:
Expand Down
2 changes: 1 addition & 1 deletion test/0.test.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ total_groups = 5
total_tasks = 200
total_workers = require('os').cpus().length

describe ["Process #{total_tasks} Tasks of #{total_groups} Groups by #{total_workers} Perfect Workers"], ->
describe "Process #{total_tasks} Tasks of #{total_groups} Groups by #{total_workers} Perfect Workers", ->

@timeout(200000)

Expand Down
2 changes: 1 addition & 1 deletion test/1.test.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ total_groups = 5
total_tasks = 200
total_workers = require('os').cpus().length

describe ["Process #{total_tasks} Tasks of #{total_groups} Groups by #{total_workers} Fail-n-Block Workers"], ->
describe "Process #{total_tasks} Tasks of #{total_groups} Groups by #{total_workers} Fail-n-Block Workers", ->

@timeout(200000)

Expand Down
2 changes: 1 addition & 1 deletion test/7.test.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ queue = fairy.queue('TEST7')
uuid = require('node-uuid')
should = require 'should'

describe ["Enqueuer should receive progress notification"], ->
describe "Enqueuer should receive progress notification", ->

@timeout(200000)

Expand Down
2 changes: 1 addition & 1 deletion test/8.test.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ queue = fairy.queue('TEST8')
uuid = require('node-uuid')
should = require 'should'

describe ["Enqueuer should receive progress notification"], ->
describe "Enqueuer should receive progress notification", ->

@timeout(200000)

Expand Down