diff --git a/lib/backend_message.js b/lib/backend_message.js index f6fb41b..ca336d7 100644 --- a/lib/backend_message.js +++ b/lib/backend_message.js @@ -20,6 +20,23 @@ BackendMessage = (function() { })(); +BackendMessage.ConnectionLoadBalanceResponse = (function(_super) { + __extends(ConnectionLoadBalanceResponse, _super); + + function ConnectionLoadBalanceResponse() { + return ConnectionLoadBalanceResponse.__super__.constructor.apply(this, arguments); + } + + ConnectionLoadBalanceResponse.prototype.typeId = 89; + + ConnectionLoadBalanceResponse.prototype.read = function(buffer) { + return this.host = buffer.toString('ascii', 4, buffer.length - 1); + }; + + return ConnectionLoadBalanceResponse; + +})(BackendMessage); + BackendMessage.Authentication = (function(_super) { __extends(Authentication, _super); diff --git a/lib/connection.js b/lib/connection.js index 7b0d65c..5d0a442 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -45,9 +45,8 @@ Connection = (function(_super) { } Connection.prototype.connect = function(callback) { - var initialErrorHandler; + var connect, initialErrorHandler; this.connectedCallback = callback; - this.connection = net.createConnection(this.connectionOptions.port, this.connectionOptions.host); initialErrorHandler = (function(_this) { return function(err) { if (_this.connectedCallback) { @@ -57,8 +56,48 @@ Connection = (function(_super) { } }; })(this); - this.connection.on('error', initialErrorHandler); - return this.connection.on('connect', (function(_this) { + connect = (function(_this) { + return function() { + var redirectConn; + redirectConn = function(msg) { + if (_this.debug) { + console.log('Redirect to = ', msg.host, _this.connectionOptions.port); + } + _this.connection.destroy(); + _this.redirctedHost = msg.host; + _this.connection = net.createConnection(_this.connectionOptions.port, msg.host); + _this.connection.on('error', initialErrorHandler); + return _this.connection.on('connect', function() { + return _this.emit('connect'); + }); + }; + _this.connection = net.createConnection(_this.connectionOptions.port, _this.connectionOptions.host); + _this.connection.on('error', initialErrorHandler); + return _this.connection.on('connect', function() { + if (_this.connectionOptions.loadBalance) { + _this.connection.removeListener('error', initialErrorHandler); + _this._bindEventListeners(); + _this.connection.once('data', function(buffer) { + var message, size; + if (buffer.toString('ascii') === 'N') { + if (_this.debug) { + console.log("Connection load balancing is turned OFF on server"); + } + return _this.emit('connect'); + } else { + size = buffer.readUInt32BE(1); + message = BackendMessage.fromBuffer(buffer.slice(0, size + 1)); + return redirectConn(message); + } + }); + return _this._writeMessage(new FrontendMessage.ConnectionLoadBalance()); + } else { + return _this.emit('connect'); + } + }); + }; + })(this); + this.on('connect', (function(_this) { return function() { _this.connection.removeListener('error', initialErrorHandler); _this.connected = true; @@ -109,6 +148,8 @@ Connection = (function(_super) { } }; })(this)); + connect(); + return this.connection; }; Connection.prototype._bindEventListeners = function() { diff --git a/lib/frontend_message.js b/lib/frontend_message.js index d58b777..22836b8 100644 --- a/lib/frontend_message.js +++ b/lib/frontend_message.js @@ -43,6 +43,26 @@ FrontendMessage = (function() { })(); +FrontendMessage.ConnectionLoadBalance = (function(_super) { + __extends(ConnectionLoadBalance, _super); + + function ConnectionLoadBalance() { + return ConnectionLoadBalance.__super__.constructor.apply(this, arguments); + } + + ConnectionLoadBalance.prototype.typeId = null; + + ConnectionLoadBalance.prototype.payload = function() { + var arr, pl; + arr = [0x0, 0x0, 0x0, 0x8, 0x4, 0xd3, 0x0, 0x0]; + pl = new Buffer(arr); + return pl.slice(4); + }; + + return ConnectionLoadBalance; + +})(FrontendMessage); + FrontendMessage.Startup = (function(_super) { __extends(Startup, _super); diff --git a/src/backend_message.coffee b/src/backend_message.coffee index bc577b9..f527922 100644 --- a/src/backend_message.coffee +++ b/src/backend_message.coffee @@ -10,6 +10,12 @@ class BackendMessage read: (buffer) -> # Implement me in subclass +class BackendMessage.ConnectionLoadBalanceResponse extends BackendMessage + typeId: 89 + + read: (buffer) -> + this.host = buffer.toString('ascii', 4, buffer.length - 1) + class BackendMessage.Authentication extends BackendMessage typeId: 82 # R diff --git a/src/connection.coffee b/src/connection.coffee index 99498d3..0844d75 100644 --- a/src/connection.coffee +++ b/src/connection.coffee @@ -28,14 +28,45 @@ class Connection extends EventEmitter connect: (callback) -> @connectedCallback = callback - @connection = net.createConnection @connectionOptions.port, @connectionOptions.host initialErrorHandler = (err) => if @connectedCallback then @connectedCallback(err.message) else @emit 'error', err - @connection.on 'error', initialErrorHandler + connect = () => + redirectConn = (msg) => + console.log 'Redirect to = ', msg.host, @connectionOptions.port if @debug + @connection.destroy() + @redirctedHost = msg.host + @connection = net.createConnection(@connectionOptions.port, msg.host) + @connection.on 'error', initialErrorHandler - @connection.on 'connect', => + @connection.on 'connect', () => + @emit 'connect' + + @connection = net.createConnection @connectionOptions.port, @connectionOptions.host + + @connection.on 'error', initialErrorHandler + + @connection.on 'connect', () => + if @connectionOptions.loadBalance + @connection.removeListener 'error', initialErrorHandler + @_bindEventListeners() + + @connection.once 'data', (buffer) => + if buffer.toString('ascii') == 'N' + console.log "Connection load balancing is turned OFF on server" if @debug + @emit 'connect' + else + size = buffer.readUInt32BE(1) + message = BackendMessage.fromBuffer(buffer.slice(0, size + 1)) + redirectConn message + + @_writeMessage new FrontendMessage.ConnectionLoadBalance() + + else + @emit 'connect' + + @on 'connect', => @connection.removeListener 'error', initialErrorHandler @connected = true @_bindEventListeners() @@ -65,6 +96,10 @@ class Connection extends EventEmitter else @_handshake() + connect() + + @connection + _bindEventListeners: -> @connection.once 'close', @_onClose.bind(this) @connection.once 'error', @_onError.bind(this) diff --git a/src/frontend_message.coffee b/src/frontend_message.coffee index 9be7bb2..3ba98e1 100644 --- a/src/frontend_message.coffee +++ b/src/frontend_message.coffee @@ -33,6 +33,13 @@ class FrontendMessage return messageBuffer +class FrontendMessage.ConnectionLoadBalance extends FrontendMessage + typeId: null + + payload: -> + arr = [0x0, 0x0, 0x0, 0x8, 0x4, 0xd3, 0x0, 0x0] + pl = new Buffer(arr) + pl.slice 4 class FrontendMessage.Startup extends FrontendMessage typeId: null diff --git a/test/functional/connection_test.coffee b/test/functional/connection_test.coffee index e778b13..2b4caae 100644 --- a/test/functional/connection_test.coffee +++ b/test/functional/connection_test.coffee @@ -71,6 +71,79 @@ describe 'Vertica.connect', -> assert.equal err.message, 'The connection was closed.' done() + describe 'Connection Load Balancing', -> + origLoadBalancingPolicy = '' + + beforeEach (done) -> + if !fs.existsSync('./test/connection.json') + throw new Error("Create test/connection.json to run functional tests") + else + connectionInfo = JSON.parse(fs.readFileSync('./test/connection.json')) + Vertica.connect connectionInfo, (err, connection) -> + assert.equal err, null + assert.ok !connection.busy + assert.ok connection.connected + + connection.query "SELECT GET_LOAD_BALANCE_POLICY()", (err, resultset) -> + assert.ok resultset instanceof Vertica.Resultset + assert.ok resultset.rows.length == 1 + origLoadBalancingPolicy = resultset.rows[0][0] + done() + + it "should connect even when load balancing is truned OFF", (done) -> + Vertica.connect connectionInfo, (err, connection) -> + assert.equal err, null + assert.ok !connection.busy + assert.ok connection.connected + + connection.query "SELECT SET_LOAD_BALANCE_POLICY('NONE')", (err, resultset) -> + assert.equal err, null + + connectionInfo.loadBalance = true + Vertica.connect connectionInfo, (err, conn) -> + assert.equal err, null + assert.ok !conn.busy + assert.ok conn.connected + + done() + + assert.ok connection.busy + assert.ok connection.connected + + it "should connect to different host at least once", (done) -> + Vertica.connect connectionInfo, (err, connection) -> + assert.equal err, null + assert.ok !connection.busy + assert.ok connection.connected + + connection.query "SELECT SET_LOAD_BALANCE_POLICY('ROUNDROBIN')", (err, resultset) -> + assert.equal err, null + + connectionInfo.loadBalance = true + Vertica.connect connectionInfo, (err, conn) -> + assert.equal err, null + assert.ok !conn.busy + assert.ok conn.connected + if connectionInfo.host == conn.redirctedHost + Vertica.connect connectionInfo, (err, conn) -> + assert.equal err, null + assert.ok !conn.busy + assert.ok conn.connected + assert.notEqual conn.redirctedHost, connectionInfo.host + + done() + + assert.ok connection.busy + assert.ok connection.connected + + afterEach (done) -> + Vertica.connect connectionInfo, (err, connection) -> + return done(err) if err? + + connection.query "SELECT SET_LOAD_BALANCE_POLICY('" + origLoadBalancingPolicy + "')", (err, rs) -> + return done(err) if err? + done() + describe 'Statement interruption', -> beforeEach (done) -> if !fs.existsSync('./test/connection.json') diff --git a/test/unit/backend_message_test.coffee b/test/unit/backend_message_test.coffee index db2ffc7..477b550 100644 --- a/test/unit/backend_message_test.coffee +++ b/test/unit/backend_message_test.coffee @@ -2,6 +2,12 @@ assert = require 'assert' BackendMessage = require('../../src/backend_message') Buffer = require('../../src/buffer').Buffer +describe 'BackendMessage.ConnectionLoadBalanceResponse', -> + it "should read a message correctly", -> + message = BackendMessage.fromBuffer(new Buffer([0x59, 0x00, 0x00, 0x00, 0x15, 0x00, 0x00, 0x15, 0x39, 0x31, 0x39, 0x32, 0x2e, 0x31, 0x36, 0x38, 0x2e, 0x31, 0x2e, 0x33, 0x39, 0x00])) + assert.ok message instanceof BackendMessage.ConnectionLoadBalanceResponse + assert.equal message.host, '192.168.1.39' + describe 'BackendMessage.Authentication', -> it "should read a message correctly", -> message = BackendMessage.fromBuffer(new Buffer([0x52, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00])) diff --git a/test/unit/frontend_message_test.coffee b/test/unit/frontend_message_test.coffee index cde2643..0399e65 100644 --- a/test/unit/frontend_message_test.coffee +++ b/test/unit/frontend_message_test.coffee @@ -4,6 +4,12 @@ Buffer = require('../../src/buffer').Buffer FrontendMessage = require('../../src/frontend_message') Authentication = require('../../src/authentication') +describe "FrontendMessage.ConnectionLoadBalance", -> + it "should encode the message correctly", -> + topic = new FrontendMessage.ConnectionLoadBalance + reference = new Buffer([0x0, 0x0, 0x0, 0x8, 0x4, 0xD3, 0x0, 0x0]) + assert.deepEqual topic.toBuffer(), reference + describe "FrontendMessage.Startup", -> it "should hold the message's information", -> topic = new FrontendMessage.Startup('username', 'database')