From f1a0e4eb8b9a1e9afdfbf4faf94e8069d1835860 Mon Sep 17 00:00:00 2001 From: "Semenov, Dmytro()" Date: Sun, 15 May 2016 02:54:17 -0700 Subject: [PATCH 1/2] added change watcher --- README.md | 9 ++++++--- lib/big-array.js | 30 +++++++++++++++++++++++++++--- lib/big-queue.js | 8 ++++++++ test/big-array.js | 39 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 80 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 7dd337e..163f55e 100644 --- a/README.md +++ b/README.md @@ -222,6 +222,8 @@ bigArray.setAutoSync(false); * isEmpty() - check if array is empty. * isFull() - check if array is full. * isValidIndex(arrayIndex) - checks if given array index is valid. +* watch(listener:fn()) - starts watching for changes using index file +* unwatch(listener:fn()) - stops watching for changes using index file #### Big Queue @@ -285,16 +287,17 @@ console.log(bigQueue.dequeue()); // out: world * backlog: fn(ByteBuffer) - a function to be called when the array space is maxed out and the oldest entries will be auto-backlogged to free up the space. * backlogBatchSize: Number - a number of entries to auto backlog when max size of array is reached * enqueue(Buffer) - put buffer to the end of queue -* dequeue(): ByteBuffer - get element from head of the queue and removes it from the queue +* dequeue(): ByteBuffer - get element at the head of the queue and remove it from the queue * peek(): ByteBuffer - get element from head of the queue * each(fn(element: ByteBuffer, index, next)) - iterate through all queue elements asynchronously * eachSync(fn(element: ByteBuffer, index)) - iterate through all queue elements in sync mode * close() - close and gc queue from the memory * flush() - flush the queue * size(): Bignum - get size of the queue -* removeAll +* removeAll() - removes all elements +* watch(listener:fn()) - starts watching for changes using index file +* unwatch(listener:fn()) - stops watching for changes using index file ### To do: -* use/check watchFile to detect changes * PR to mmap.js to support windows, currently blocked by https://github.com/indutny/mmap.js/issues/3 # mmap-kit diff --git a/lib/big-array.js b/lib/big-array.js index f699018..83b32e6 100644 --- a/lib/big-array.js +++ b/lib/big-array.js @@ -88,6 +88,7 @@ module.exports.Long = Long; * @throws IOException exception throws during array initialization */ function BigArray(dir, name, dataPageSize, maxDataSize) { + var options = dir !== null && typeof dir === 'object' ? dir : { dir: dir, name: name, @@ -203,6 +204,20 @@ proto.init = function init() { this.sync(); }; +proto.watch = function watch(listener) { + var metaDataPage = this.metaPageFactory.acquirePage(META_DATA_PAGE_INDEX); + debug('watching', metaDataPage.pageFile); + this.watcher = this.watcher || Fs.watch(metaDataPage.pageFile, { + persistent: false + }); + + this.watcher.addListener('change', listener); +}; + +proto.unwatch = function unwatch(listener) { + this.watcher.removeListener('change', listener); +}; + proto.sync = function sync() { this.syncMeta(); this.syncData(); @@ -219,6 +234,11 @@ proto.syncData = function sync() { }; proto.isEmpty = function isEmpty() { + this.autoSync && this.sync(); + return this._isEmpty(); +}; + +proto._isEmpty = function _isEmpty() { return this.arrayHeadIndex.cmp(this.arrayTailIndex) === 0; }; @@ -237,7 +257,7 @@ proto.getIndexPageOffset = function getIndexPageOffset(index) { }; proto.initDataPageIndex = function initDataPageIndex() { - if (!this.isEmpty()) { + if (!this._isEmpty()) { debug('array is not empty'); var previousIndexPageIndex = Bignum(-1); var previousIndex = prevIndex(this.arrayHeadIndex, this.MAX_INDEX); @@ -265,7 +285,7 @@ proto.initDataPageIndex = function initDataPageIndex() { proto.syncTailDataIndexFromMemory = function syncTailDataIndexFromMemory() { debug('syncTailDataIndexFromMemory'); - if (!this.isEmpty()) { + if (!this._isEmpty()) { var tailIndex = this.arrayTailIndex; var tailIndexPageIndex = this.getPageIndex(tailIndex); var tailIndexPage = this.indexPageFactory.acquirePage(tailIndexPageIndex); @@ -429,6 +449,10 @@ proto.append = function append(data) { debug('tail data index is %s', this.tailDataPageIndex); debug('<-----------'); + if (typeof data === 'string') { + data = new Buffer(data); + } + // prepare the data pointer if (this.headDataItemOffset.add(data.length).gt(this.dataPageFactory.pageSize)) { // not enough space this.headDataPageIndex = nextIndex(this.headDataPageIndex, this.maxDataFiles); @@ -523,9 +547,9 @@ proto.syncTailIndexToMemory = function syncTailIndexToMemory() { }; proto.flush = function flush() { - this.metaPageFactory.flush(); this.indexPageFactory.flush(); this.dataPageFactory.flush(); + this.metaPageFactory.flush(); }; proto.shift = function shift() { diff --git a/lib/big-queue.js b/lib/big-queue.js index 93b4b77..765f6da 100644 --- a/lib/big-queue.js +++ b/lib/big-queue.js @@ -151,3 +151,11 @@ proto.flush = function flush() { proto.size = function size() { return this.innerArray.size(); }; + +proto.watch = function watch(listener) { + this.innerArray.watch(listener); +}; + +proto.unwatch = function unwatch(listener) { + this.innerArray.unwatch(listener); +}; diff --git a/test/big-array.js b/test/big-array.js index 68a7eaa..596b192 100644 --- a/test/big-array.js +++ b/test/big-array.js @@ -713,6 +713,45 @@ Test(__filename, function (t) { }); }); + t.test('watching', function (t) { + t.timeoutAfter(10000); + + var bigArrayA = new BigArray(testDir, 'watch'); + var bigArrayB = new BigArray(testDir, 'watch'); + + var st; + var count = 0; + + bigArrayB.watch(function onchange() { + console.log('time to detect: ', Date.now() - st, arguments); + t.equal(bigArrayB.shift().toString(), 'hello'); + count++; + if (count === 3) { + bigArrayB.unwatch(onchange); + // flush more + setTimeout(function wait() { + bigArrayA.append('hello'); + bigArrayA.flush(); + }, 200); + setTimeout(function wait() { + t.end(); + }, 500); + } + if (count > 4) { + t.fail('should have stopped watching'); + } + }); + + Async.timesSeries(3, function iter(n, next) { + setImmediate(function () { + st = Date.now(); + bigArrayA.append('hello'); + bigArrayA.flush(); + next(); + }); + }); + }); + t.test('after', function (t) { rm('-rf', testDir); t.end(); From 3f4afae361a638b3ffbedfbbca504cc909e7001f Mon Sep 17 00:00:00 2001 From: "Semenov, Dmytro()" Date: Sun, 15 May 2016 03:09:17 -0700 Subject: [PATCH 2/2] test update --- test/big-array.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/big-array.js b/test/big-array.js index 596b192..17f9148 100644 --- a/test/big-array.js +++ b/test/big-array.js @@ -724,7 +724,9 @@ Test(__filename, function (t) { bigArrayB.watch(function onchange() { console.log('time to detect: ', Date.now() - st, arguments); - t.equal(bigArrayB.shift().toString(), 'hello'); + if (count < 3) { + t.equal(bigArrayB.shift().toString(), 'hello'); + } count++; if (count === 3) { bigArrayB.unwatch(onchange);