diff --git a/README.md b/README.md index 7dd337e..163f55e 100644 --- a/README.md +++ b/README.md @@ -222,6 +222,8 @@ bigArray.setAutoSync(false); * isEmpty() - check if array is empty. * isFull() - check if array is full. * isValidIndex(arrayIndex) - checks if given array index is valid. +* watch(listener:fn()) - starts watching for changes using index file +* unwatch(listener:fn()) - stops watching for changes using index file #### Big Queue @@ -285,16 +287,17 @@ console.log(bigQueue.dequeue()); // out: world * backlog: fn(ByteBuffer) - a function to be called when the array space is maxed out and the oldest entries will be auto-backlogged to free up the space. * backlogBatchSize: Number - a number of entries to auto backlog when max size of array is reached * enqueue(Buffer) - put buffer to the end of queue -* dequeue(): ByteBuffer - get element from head of the queue and removes it from the queue +* dequeue(): ByteBuffer - get element at the head of the queue and remove it from the queue * peek(): ByteBuffer - get element from head of the queue * each(fn(element: ByteBuffer, index, next)) - iterate through all queue elements asynchronously * eachSync(fn(element: ByteBuffer, index)) - iterate through all queue elements in sync mode * close() - close and gc queue from the memory * flush() - flush the queue * size(): Bignum - get size of the queue -* removeAll +* removeAll() - removes all elements +* watch(listener:fn()) - starts watching for changes using index file +* unwatch(listener:fn()) - stops watching for changes using index file ### To do: -* use/check watchFile to detect changes * PR to mmap.js to support windows, currently blocked by https://github.com/indutny/mmap.js/issues/3 # mmap-kit diff --git a/lib/big-array.js b/lib/big-array.js index f699018..83b32e6 100644 --- a/lib/big-array.js +++ b/lib/big-array.js @@ -88,6 +88,7 @@ module.exports.Long = Long; * @throws IOException exception throws during array initialization */ function BigArray(dir, name, dataPageSize, maxDataSize) { + var options = dir !== null && typeof dir === 'object' ? dir : { dir: dir, name: name, @@ -203,6 +204,20 @@ proto.init = function init() { this.sync(); }; +proto.watch = function watch(listener) { + var metaDataPage = this.metaPageFactory.acquirePage(META_DATA_PAGE_INDEX); + debug('watching', metaDataPage.pageFile); + this.watcher = this.watcher || Fs.watch(metaDataPage.pageFile, { + persistent: false + }); + + this.watcher.addListener('change', listener); +}; + +proto.unwatch = function unwatch(listener) { + this.watcher.removeListener('change', listener); +}; + proto.sync = function sync() { this.syncMeta(); this.syncData(); @@ -219,6 +234,11 @@ proto.syncData = function sync() { }; proto.isEmpty = function isEmpty() { + this.autoSync && this.sync(); + return this._isEmpty(); +}; + +proto._isEmpty = function _isEmpty() { return this.arrayHeadIndex.cmp(this.arrayTailIndex) === 0; }; @@ -237,7 +257,7 @@ proto.getIndexPageOffset = function getIndexPageOffset(index) { }; proto.initDataPageIndex = function initDataPageIndex() { - if (!this.isEmpty()) { + if (!this._isEmpty()) { debug('array is not empty'); var previousIndexPageIndex = Bignum(-1); var previousIndex = prevIndex(this.arrayHeadIndex, this.MAX_INDEX); @@ -265,7 +285,7 @@ proto.initDataPageIndex = function initDataPageIndex() { proto.syncTailDataIndexFromMemory = function syncTailDataIndexFromMemory() { debug('syncTailDataIndexFromMemory'); - if (!this.isEmpty()) { + if (!this._isEmpty()) { var tailIndex = this.arrayTailIndex; var tailIndexPageIndex = this.getPageIndex(tailIndex); var tailIndexPage = this.indexPageFactory.acquirePage(tailIndexPageIndex); @@ -429,6 +449,10 @@ proto.append = function append(data) { debug('tail data index is %s', this.tailDataPageIndex); debug('<-----------'); + if (typeof data === 'string') { + data = new Buffer(data); + } + // prepare the data pointer if (this.headDataItemOffset.add(data.length).gt(this.dataPageFactory.pageSize)) { // not enough space this.headDataPageIndex = nextIndex(this.headDataPageIndex, this.maxDataFiles); @@ -523,9 +547,9 @@ proto.syncTailIndexToMemory = function syncTailIndexToMemory() { }; proto.flush = function flush() { - this.metaPageFactory.flush(); this.indexPageFactory.flush(); this.dataPageFactory.flush(); + this.metaPageFactory.flush(); }; proto.shift = function shift() { diff --git a/lib/big-queue.js b/lib/big-queue.js index 93b4b77..765f6da 100644 --- a/lib/big-queue.js +++ b/lib/big-queue.js @@ -151,3 +151,11 @@ proto.flush = function flush() { proto.size = function size() { return this.innerArray.size(); }; + +proto.watch = function watch(listener) { + this.innerArray.watch(listener); +}; + +proto.unwatch = function unwatch(listener) { + this.innerArray.unwatch(listener); +}; diff --git a/test/big-array.js b/test/big-array.js index 68a7eaa..17f9148 100644 --- a/test/big-array.js +++ b/test/big-array.js @@ -713,6 +713,47 @@ Test(__filename, function (t) { }); }); + t.test('watching', function (t) { + t.timeoutAfter(10000); + + var bigArrayA = new BigArray(testDir, 'watch'); + var bigArrayB = new BigArray(testDir, 'watch'); + + var st; + var count = 0; + + bigArrayB.watch(function onchange() { + console.log('time to detect: ', Date.now() - st, arguments); + if (count < 3) { + t.equal(bigArrayB.shift().toString(), 'hello'); + } + count++; + if (count === 3) { + bigArrayB.unwatch(onchange); + // flush more + setTimeout(function wait() { + bigArrayA.append('hello'); + bigArrayA.flush(); + }, 200); + setTimeout(function wait() { + t.end(); + }, 500); + } + if (count > 4) { + t.fail('should have stopped watching'); + } + }); + + Async.timesSeries(3, function iter(n, next) { + setImmediate(function () { + st = Date.now(); + bigArrayA.append('hello'); + bigArrayA.flush(); + next(); + }); + }); + }); + t.test('after', function (t) { rm('-rf', testDir); t.end();