Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ bigArray.setAutoSync(false);
* isEmpty() - check if array is empty.
* isFull() - check if array is full.
* isValidIndex(arrayIndex) - checks if given array index is valid.
* watch(listener:fn()) - starts watching for changes using index file
* unwatch(listener:fn()) - stops watching for changes using index file

#### Big Queue

Expand Down Expand Up @@ -285,16 +287,17 @@ console.log(bigQueue.dequeue()); // out: world
* backlog: fn(ByteBuffer) - a function to be called when the array space is maxed out and the oldest entries will be auto-backlogged to free up the space.
* backlogBatchSize: Number - a number of entries to auto backlog when max size of array is reached
* enqueue(Buffer) - put buffer to the end of queue
* dequeue(): ByteBuffer - get element from head of the queue and removes it from the queue
* dequeue(): ByteBuffer - get element at the head of the queue and remove it from the queue
* peek(): ByteBuffer - get element from head of the queue
* each(fn(element: ByteBuffer, index, next)) - iterate through all queue elements asynchronously
* eachSync(fn(element: ByteBuffer, index)) - iterate through all queue elements in sync mode
* close() - close and gc queue from the memory
* flush() - flush the queue
* size(): Bignum - get size of the queue
* removeAll
* removeAll() - removes all elements
* watch(listener:fn()) - starts watching for changes using index file
* unwatch(listener:fn()) - stops watching for changes using index file

### To do:
* use/check watchFile to detect changes
* PR to mmap.js to support windows, currently blocked by https://github.com/indutny/mmap.js/issues/3
# mmap-kit
30 changes: 27 additions & 3 deletions lib/big-array.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ module.exports.Long = Long;
* @throws IOException exception throws during array initialization
*/
function BigArray(dir, name, dataPageSize, maxDataSize) {

var options = dir !== null && typeof dir === 'object' ? dir : {
dir: dir,
name: name,
Expand Down Expand Up @@ -203,6 +204,20 @@ proto.init = function init() {
this.sync();
};

proto.watch = function watch(listener) {
var metaDataPage = this.metaPageFactory.acquirePage(META_DATA_PAGE_INDEX);
debug('watching', metaDataPage.pageFile);
this.watcher = this.watcher || Fs.watch(metaDataPage.pageFile, {
persistent: false
});

this.watcher.addListener('change', listener);
};

proto.unwatch = function unwatch(listener) {
this.watcher.removeListener('change', listener);
};

proto.sync = function sync() {
this.syncMeta();
this.syncData();
Expand All @@ -219,6 +234,11 @@ proto.syncData = function sync() {
};

proto.isEmpty = function isEmpty() {
this.autoSync && this.sync();
return this._isEmpty();
};

proto._isEmpty = function _isEmpty() {
return this.arrayHeadIndex.cmp(this.arrayTailIndex) === 0;
};

Expand All @@ -237,7 +257,7 @@ proto.getIndexPageOffset = function getIndexPageOffset(index) {
};

proto.initDataPageIndex = function initDataPageIndex() {
if (!this.isEmpty()) {
if (!this._isEmpty()) {
debug('array is not empty');
var previousIndexPageIndex = Bignum(-1);
var previousIndex = prevIndex(this.arrayHeadIndex, this.MAX_INDEX);
Expand Down Expand Up @@ -265,7 +285,7 @@ proto.initDataPageIndex = function initDataPageIndex() {
proto.syncTailDataIndexFromMemory = function syncTailDataIndexFromMemory() {
debug('syncTailDataIndexFromMemory');

if (!this.isEmpty()) {
if (!this._isEmpty()) {
var tailIndex = this.arrayTailIndex;
var tailIndexPageIndex = this.getPageIndex(tailIndex);
var tailIndexPage = this.indexPageFactory.acquirePage(tailIndexPageIndex);
Expand Down Expand Up @@ -429,6 +449,10 @@ proto.append = function append(data) {
debug('tail data index is %s', this.tailDataPageIndex);
debug('<-----------');

if (typeof data === 'string') {
data = new Buffer(data);
}

// prepare the data pointer
if (this.headDataItemOffset.add(data.length).gt(this.dataPageFactory.pageSize)) { // not enough space
this.headDataPageIndex = nextIndex(this.headDataPageIndex, this.maxDataFiles);
Expand Down Expand Up @@ -523,9 +547,9 @@ proto.syncTailIndexToMemory = function syncTailIndexToMemory() {
};

proto.flush = function flush() {
this.metaPageFactory.flush();
this.indexPageFactory.flush();
this.dataPageFactory.flush();
this.metaPageFactory.flush();
};

proto.shift = function shift() {
Expand Down
8 changes: 8 additions & 0 deletions lib/big-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,11 @@ proto.flush = function flush() {
proto.size = function size() {
return this.innerArray.size();
};

proto.watch = function watch(listener) {
this.innerArray.watch(listener);
};

proto.unwatch = function unwatch(listener) {
this.innerArray.unwatch(listener);
};
41 changes: 41 additions & 0 deletions test/big-array.js
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,47 @@ Test(__filename, function (t) {
});
});

t.test('watching', function (t) {
t.timeoutAfter(10000);

var bigArrayA = new BigArray(testDir, 'watch');
var bigArrayB = new BigArray(testDir, 'watch');

var st;
var count = 0;

bigArrayB.watch(function onchange() {
console.log('time to detect: ', Date.now() - st, arguments);
if (count < 3) {
t.equal(bigArrayB.shift().toString(), 'hello');
}
count++;
if (count === 3) {
bigArrayB.unwatch(onchange);
// flush more
setTimeout(function wait() {
bigArrayA.append('hello');
bigArrayA.flush();
}, 200);
setTimeout(function wait() {
t.end();
}, 500);
}
if (count > 4) {
t.fail('should have stopped watching');
}
});

Async.timesSeries(3, function iter(n, next) {
setImmediate(function () {
st = Date.now();
bigArrayA.append('hello');
bigArrayA.flush();
next();
});
});
});

t.test('after', function (t) {
rm('-rf', testDir);
t.end();
Expand Down