Skip to content

Conversation

@ducdigital
Copy link

Please ignore the first 2 commit. It was because I needed to understand how this work and I don't code in coffee-script

This basically allow you to run multiple queue listener on a single process and prevent a message process more than once in the same process.

This PR also fix some of typo and allow the coffee-script to compile on latest coffee-script npm

Some mocha test are fixed and it run fine on my machine.

The reason I needed this is because I was creating some queue that will run AWS Lambda function. But I encounter exactly the same problem this package was trying to resolve. Since to trigger a lambda function, it does not require a lot of resource, but I need a small timeout for each operation (30s - 60s) so it's will not be feasible if I run multiple node processes to archive my result.

I also try to use fairy.queue('worker_queue'); inside a loop and this create multiple connection to redis, which spike CPU up to 200% in large quantity.

So I added a fix to allow multiple regist and allow message to be use once across all workers in a queue with a single Redis connection.

Here's an example of how it work:

sender.js

const fairy = require('fairy').connect();
const queue = fairy.queue('worker_queue');
const Random = require('random-js');
const random = new Random(Random.engines.mt19937().autoSeed());

const TASKS = [
  'TASK0',
  'TASK1',
  'TASK2',
  'TASK3',
];

let count = 0;

setInterval(function () {
  let group = random.integer(0,100);
  let randomWaitTime = random.integer(5000, 10000);
  let task = TASKS[random.integer(0,TASKS.length-1)];

  count++;

  console.log(count, ' - sent ', group, task, randomWaitTime);

  queue.enqueue(group, task, randomWaitTime);
}, 500);

receiver.js

require('events').EventEmitter.prototype._maxListeners = Infinity;
let count = 0;
let Fairy = require('fairy');
let fairy = Fairy.connect().queue('worker_queue');

let jobFn = (group, task, waitTime, cb) => {
  count++;
  console.log(`${count} - GROUP: ${group} => ${task} => ${waitTime}`);
  setTimeout(() => {
    cb();
  }, waitTime);
};

for (let i = 0; i < 500; i++) {
  fairy.regist(jobFn);  // Here it register the same job 500 times
}

Thanks for the amazing package! Though I would prefer if you can continue maintain it and maybe change it to ES6 and I am sure there will be a lot more contribution

@baoshan
Copy link
Owner

baoshan commented Feb 5, 2017

Thanks @ducdigital for your contributions! And thank you again for read the source.

If I understand correctly, we want multiple workers be registered on a single process. I think this is because the actual worker‘s work involves many IO / asynchronous works. So a single process can host hundreds of workers until reach its processing capacity.

I do believe it‘s a good and universal idea. But after a short consideration, I think we maybe should use another mechanism to achieve the goal:

regist: (handler, {concurrency}) =>
  concurrency |= 1

This is the outside interface I envisioned now. I prefer it for its more clean. From this interface, I guess the implementation may also be different: Rather than the 500 works race for one to successfully get the task, the logic may be: if there're are more available concurrency slots, we execute the task.

I'm open to discuss more. And it's my willing to continue working on the project. But I'm currently busy on something else. So I can not guarantee when I'll do it myself. But I will thank you in prior if you would like to discuss it with me more, even another PR.

Have a nice day!

@ducdigital
Copy link
Author

yes since node js is single threaded -> single core so by doing this you can optimize the thread of node.js. For instance my code utilize a lot of setTimeout to set a small random delay after the job is trigger. And my job is not a heavy task. Something like a Timed FIFO Queue

2nd thing is the connection to redis. Since it is running on different process it is really difficult or impossible to aggregate a single connection and single pubsub across all workers.

When I ran a test with 1000 workers, redis CPU raise to 200% (probably because of CPU intensive pubsub), and this does not count the worker memory allocation (node.js memory is shared, but a node.js app memory is not) so there are too many overhead of what I tried to archive.

I agree it is not the best solution but this might be a head start. I am not able to come up with a better solution than use a temp var and let the worker race for the queue but might be you will. the interface I agree that it can be put in the options Object.

I am looking forward to any update of this project :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants