diff --git a/lib/eep.core.js b/lib/eep.core.js index ff0684c..057c7e3 100755 --- a/lib/eep.core.js +++ b/lib/eep.core.js @@ -48,8 +48,8 @@ Temporal.make = function(initialValue, instant) { }; // A (degenerated) temporal aggregate function. Useful for temporal stream operators -var TemporalFunction = function(aggFn,instant) { - var self = this; var inner = aggFn.make(); var t; +var TemporalFunction = function(aggFn,instant, win) { + var self = this; var inner = aggFn.make(win); var t; self.init = function() { inner.init(); t = Temporal.make(inner.emit(), instant); }; self.accumulate = function(value) { t = t.update(inner.accumulate(value.value())); }; self.emit = function() { return Temporal.make(inner.emit()); }; @@ -57,8 +57,8 @@ var TemporalFunction = function(aggFn,instant) { }; // Factory function for turning regular aggregate functions into temporal ones -TemporalFunction.make = function(aggFn, at) { - return new TemporalFunction(aggFn, at); +TemporalFunction.make = function(aggFn, at, win) { + return new TemporalFunction(aggFn, at, win); }; // Exports diff --git a/lib/eep.window_monotonic.js b/lib/eep.window_monotonic.js index 6e290d9..8eb506e 100755 --- a/lib/eep.window_monotonic.js +++ b/lib/eep.window_monotonic.js @@ -28,7 +28,7 @@ var TemporalFunction = require('./eep.core').TemporalFunction; var MonotonicWindow = function(aggFn, clock) { events.EventEmitter.call(this); var self = this; - self.fn = TemporalFunction.make(aggFn,clock.init()); + self.fn = TemporalFunction.make(aggFn,clock.init(),self); var idx = 0; self.fn.init(); diff --git a/lib/eep.window_periodic.js b/lib/eep.window_periodic.js index 670fc6f..8e92220 100755 --- a/lib/eep.window_periodic.js +++ b/lib/eep.window_periodic.js @@ -31,7 +31,7 @@ var TemporalFunction = require('./eep.core').TemporalFunction; // var PeriodicWindow = function(aggFn, intervalMillis) { events.EventEmitter.call(this); - var self = this, clock = new WallClock(intervalMillis), fn = TemporalFunction.make(aggFn,clock.init()); + var self = this, clock = new WallClock(intervalMillis), fn = TemporalFunction.make(aggFn,clock.init(),self); var idx = 0; fn.init(); diff --git a/lib/eep.window_sliding.js b/lib/eep.window_sliding.js index 44d77cf..e4e22fa 100755 --- a/lib/eep.window_sliding.js +++ b/lib/eep.window_sliding.js @@ -58,7 +58,7 @@ var SlidingWindow = function(aggFn, n) { self.min = function() { throw "Unordered sliding window"; }; self.max = function() { throw "Unordered sliding window"; }; - self.size = function() { return win.size; }; + self.size = function() { return n; }; self.enqueue = function(v) { self.fn.accumulate(v); diff --git a/test/test-windows.js b/test/test-windows.js index f549921..7a0b8ff 100755 --- a/test/test-windows.js +++ b/test/test-windows.js @@ -1,6 +1,7 @@ var eep = require("eep"); var events = require('events'); var testCase = require('nodeunit').testCase; +var util = require('util'); var floatEquals = function (a,b) { var a1 = Math.round(parseFloat(a)*100)/100; @@ -19,7 +20,7 @@ function AvgFunction() { self.make = function() { return new AvgFunction(); }; }; -CountFunction.protorype = new eep.AggregateFunction(); +CountFunction.prototype = new eep.AggregateFunction(); function CountFunction() { var self = this; var count = 0; @@ -49,8 +50,10 @@ exports.read = testCase({ cb(); }, 'tumbling window': function(assert) { - var tumbling = eep.EventWorld.make().windows().tumbling(new AvgFunction(), 2); + const windowSize = 2; + var tumbling = eep.EventWorld.make().windows().tumbling(new AvgFunction(), windowSize); assert.ok(tumbling != null); + assert.equal(tumbling.size(), windowSize); var results = new Array(); tumbling.on('emit', function(v) { @@ -66,8 +69,10 @@ exports.read = testCase({ assert.done(); }, 'sliding window': function(assert) { - var sliding = eep.EventWorld.make().windows().sliding(new AvgFunction(), 2); + const windowSize = 2; + var sliding = eep.EventWorld.make().windows().sliding(new AvgFunction(), windowSize); assert.ok(sliding != null); + assert.equal(sliding.size(), windowSize); var results = []; sliding.on('emit', function(v) { @@ -172,6 +177,7 @@ exports.read = testCase({ enqueueEvent(); setTimeout(enqueueEvent, 20); + setTimeout(triggerTick, 50); setTimeout(enqueueEvent, 80); setTimeout(triggerTick, 100); //3 setTimeout(triggerTick, 200); //empty window should be produced here @@ -187,5 +193,100 @@ exports.read = testCase({ setTimeout(function() { assert.done(); }, 450); + }, + 'monotonic window counter': function (assert) { + var monotonic = eep.EventWorld.make().windows().monotonic(new CountFunction(), new eep.CountingClock()); + + assert.ok(monotonic != null); + + var results = []; + monotonic.on('emit', function(v) { + results.push(v); + }); + + monotonic.enqueue(5); + assert.same([], results); + + monotonic.tick(); + assert.same([1], results); + + monotonic.enqueue(4); + monotonic.enqueue(10); + monotonic.tick(); + assert.same([1, 2], results); + + setTimeout(function() { + monotonic.enqueue(3); + monotonic.enqueue(9); + monotonic.enqueue(81); + }, 50); + + setTimeout(function() { + monotonic.tick(); + assert.same([1, 2, 3], results); + }, 60); + + setTimeout(function() { + monotonic.tick(); + assert.same([1, 2, 3, 0], results); + }, 80); + + setTimeout(function() { + assert.done(); + }, 100); + }, + 'monotonic window when right': function (assert) { + + function EmitWhenConditionsAreRight(optionsIn, win) { + const options = optionsIn; + this.win = win; + + this.init = function() { + options.whatsUp = false; + }; + + this.accumulate = function(whatsUpNew) { + this.whatsUp = whatsUpNew; + + if(this.whatsUp) { + win.tick(); + } + }; + + this.compensate = function() { + + }; + + this.emit = function() { + return !!this.whatsUp? 1:0; + }; + + this.make = function(win) { return new EmitWhenConditionsAreRight(options, win); }; + } + + + var monotonic = eep.EventWorld.make().windows().monotonic(new EmitWhenConditionsAreRight({}), new eep.CountingClock()); + + assert.ok(monotonic != null); + + var results = []; + monotonic.on('emit', function(v) { + results.push(v); + }); + + + setTimeout(() => {monotonic.enqueue(false);}, 10); + setTimeout(() => {monotonic.enqueue(true);}, 20); + setTimeout(() => {monotonic.tick();}, 100); + setTimeout(() => {monotonic.enqueue(false);}, 120); + setTimeout(() => {monotonic.tick();}, 200); + setTimeout(() => {monotonic.enqueue(true);}, 220); + + setTimeout(() => {assert.same([1,1,0,1], results);}, 300); + + + setTimeout(function() { + assert.done(); + }, 310); } });