Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Yajob.prototype.put = function (attrs, opts) {
opts = opts || {};
opts.schedule = opts.schedule || new Date(Date.now() + this._delay);
opts.priority = opts.priority || 0;
opts.meta = opts.meta || {};

if (!Array.isArray(attrs)) {
attrs = [attrs];
Expand All @@ -56,7 +57,8 @@ Yajob.prototype.put = function (attrs, opts) {
attempts: 0,
attrs,
scheduledAt: opts.schedule,
priority: opts.priority
priority: opts.priority,
meta: opts.meta
};
}

Expand All @@ -65,6 +67,28 @@ Yajob.prototype.put = function (attrs, opts) {
return jobs.then(c => c.insert(attrs.map(attrsToJob)));
};

Yajob.prototype.replace = function (attrs, opts) {
opts = opts || {};
opts.schedule = opts.schedule || new Date(Date.now() + this._delay);
opts.priority = opts.priority || 0;
opts.meta = opts.meta || {};

function attrsToJob(attrs) {
return {
status: Yajob.status.new,
attempts: 0,
attrs,
scheduledAt: opts.schedule,
priority: opts.priority,
meta: opts.meta
};
}

const jobs = this._db.then(db => db.collection(this._tag));

return jobs.then(c => c.update({status: Yajob.status.new, attrs}, attrsToJob(attrs), {upsert: true, w: 1}));
};

Yajob.prototype.take = function (count) {
count = count || 1;

Expand Down Expand Up @@ -111,7 +135,7 @@ Yajob.prototype.take = function (count) {
try {
for (let i = 0; i < batch.length; i++) {
const job = batch[i];
const done = yield job.attrs;
const done = yield Object.assign(job.attrs, job.meta);

if (done === false) {
const status = job.attempts < maxTrys ? Yajob.status.new : Yajob.status.failed;
Expand Down
75 changes: 70 additions & 5 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,54 @@ for (var mail of yield mails.take(100)) {

Processed jobs removed from queue, when for-loop is ended or broken (either with `break` or exception).

### Updating pending events with metadata

You may also attach metadata to future job and update as follows:

```js
const yajob = require('yajob');
const mails = yajob('localhost/queuedb')
.tag('mails');

var d = new Date();
d.setHours(24,0,0,0);

mails.put({
from: 'floatdrop@gmail.com',
to: 'nodejs-dev@dev-null.com'
}, {
meta: {body: 'You have 1 new notification'},
schedule: d
});
// => Promise

// Meanwhile, a new notification comes in

mails.replace({
from: 'floatdrop@gmail.com',
to: 'nodejs-dev@dev-null.com'
}, {
meta: {body: 'You have 2 new notification'},
schedule: d
});

// Now, when you take the job in the future:

let job = yield mails.take();
console.log(job);
```

This would print out:
```
{
from: 'floatdrop@gmail.com',
to: 'nodejs-dev@dev-null.com',
body: 'You have 2 new notification'
}
```

This will only send out a single email with the new body.

### Skip jobs

In some cases you will need to skip taken job. To do this pass into generator `false` value:
Expand Down Expand Up @@ -55,13 +103,13 @@ const important = queue.tag('mail').sort({priority: -1});

Returns instance of queue, that stores data in MongoDB.

##### uri
Type: `String`
##### uri
Type: `String`

MongoDB URI string.

##### options
Type: `Object`
##### options
Type: `Object`

MongoDB [MongoClient.connect options](http://mongodb.github.io/node-mongodb-native/2.1/api/MongoClient.html).

Expand All @@ -82,6 +130,23 @@ Type: `Object`

* `schedule` - `Date`, when job should be available to `take`
* `priority` - `Number`, that represents priority of job
* `meta` - `Object`, optional metadata attached to job and returned in taken object

### replace(attrs, [options])

Update a pending job in the queue. Returns `Promise`.

##### attrs
Type: `Object`

Data, that will be attached to job.

##### options
Type: `Object`

* `schedule` - `Date`, when job should be available to `take`
* `priority` - `Number`, that represents priority of job
* `meta` - `Object`, optional metadata attached to job and returned in taken object

### take([count])

Expand All @@ -90,7 +155,7 @@ Returns `Promise` that resolves to a `Generator`, that will emit jobs one by one
After all jobs are taken from batch - they are considered `done` and removed from queue.

##### count
Type: `Number`
Type: `Number`
Default: `1`

Maximum number of jobs to take from one batch request.
Expand Down
14 changes: 14 additions & 0 deletions test/put.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,20 @@ test('put should add job to queue', async t => {
}
});

test('put should add job to queue with meta', async t => {
const queueDb = await new QueueDb();
const queue = yajob(queueDb.uri);

try {
await queue.put({test: 'message'}, {meta: {param: 1}});
const job = await queueDb.db.collection('default').find().toArray();
t.same(job[0].attrs, {test: 'message'});
t.same(job[0].meta, {param: 1});
} finally {
await queueDb.close();
}
});

test('put take an Array as argument', async t => {
const queueDb = await new QueueDb();
const queue = yajob(queueDb.uri);
Expand Down
14 changes: 14 additions & 0 deletions test/remove.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,17 @@ test('removes job', async t => {
await queueDb.close();
}
});

test('removes job with meta', async t => {
const queueDb = await new QueueDb();
const queue = yajob(queueDb.uri);

try {
await queue.put({test: 'wow'}, {meta: {param: 1}});
await queue.remove({test: 'wow'});
const jobs = await queueDb.db.collection('default').find().toArray();
t.is(jobs.length, 0, 'should remove job from queue');
} finally {
await queueDb.close();
}
});
35 changes: 35 additions & 0 deletions test/replace.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import test from 'ava';
import yajob from '..';
import {QueueDb} from './_utils';

test('replace should add job to queue then update it', async t => {
const queueDb = await new QueueDb();
const queue = yajob(queueDb.uri);

try {
await queue.put({test: 'message'}, {meta: {param: 1}});
const job = await queueDb.db.collection('default').find().toArray();
t.same(job[0].attrs, {test: 'message'});
t.same(job[0].meta, {param: 1});
await queue.replace({test: 'message'}, {meta: {param: 2}});
const job2 = await queueDb.db.collection('default').find().toArray();
t.same(job2[0].attrs, {test: 'message'});
t.same(job2[0].meta, {param: 2});
} finally {
await queueDb.close();
}
});

test('replace should add job to queue if it does not exist', async t => {
const queueDb = await new QueueDb();
const queue = yajob(queueDb.uri);

try {
await queue.replace({test: 'message'}, {meta: {param: 2}});
const job2 = await queueDb.db.collection('default').find().toArray();
t.same(job2[0].attrs, {test: 'message'});
t.same(job2[0].meta, {param: 2});
} finally {
await queueDb.close();
}
});
20 changes: 20 additions & 0 deletions test/take.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,26 @@ test('take one', async t => {
}
});

test('take one with meta', async t => {
const queueDb = await new QueueDb();
const queue = yajob(queueDb.uri);

try {
await queue.put({test: 'wow'}, {meta: {param: 1}});

const promise = queue.take();
t.is(typeof promise.then, 'function', 'should return a Promise');

const taken = Array.from(await promise);
t.same(taken, [{test: 'wow', param: 1}]);

const jobs = Array.from(queue.take());
t.is(jobs.length, 0, 'should remove job from queue');
} finally {
await queueDb.close();
}
});

test('take two', async t => {
const queueDb = await new QueueDb();
const queue = yajob(queueDb.uri);
Expand Down