From d323c31b0cbc03ac362534f6fbb43de5c56ba2ad Mon Sep 17 00:00:00 2001 From: Sanjay Sanghavi Date: Mon, 15 Feb 2016 12:36:13 +0530 Subject: [PATCH] Support connection load balancing Client can pass 'loadBalance' parameter to enable or disable connection load balancing. Load balancing is provided by vertica server. Even if client enables and if it is turned OFF on server, driver should not fail. --- lib/backend_message.js | 17 ++++++ lib/connection.js | 49 +++++++++++++++-- lib/frontend_message.js | 20 +++++++ src/backend_message.coffee | 6 +++ src/connection.coffee | 41 +++++++++++++-- src/frontend_message.coffee | 7 +++ test/functional/connection_test.coffee | 73 ++++++++++++++++++++++++++ test/unit/backend_message_test.coffee | 6 +++ test/unit/frontend_message_test.coffee | 6 +++ 9 files changed, 218 insertions(+), 7 deletions(-) diff --git a/lib/backend_message.js b/lib/backend_message.js index f6fb41b..ca336d7 100644 --- a/lib/backend_message.js +++ b/lib/backend_message.js @@ -20,6 +20,23 @@ BackendMessage = (function() { })(); +BackendMessage.ConnectionLoadBalanceResponse = (function(_super) { + __extends(ConnectionLoadBalanceResponse, _super); + + function ConnectionLoadBalanceResponse() { + return ConnectionLoadBalanceResponse.__super__.constructor.apply(this, arguments); + } + + ConnectionLoadBalanceResponse.prototype.typeId = 89; + + ConnectionLoadBalanceResponse.prototype.read = function(buffer) { + return this.host = buffer.toString('ascii', 4, buffer.length - 1); + }; + + return ConnectionLoadBalanceResponse; + +})(BackendMessage); + BackendMessage.Authentication = (function(_super) { __extends(Authentication, _super); diff --git a/lib/connection.js b/lib/connection.js index 7b0d65c..5d0a442 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -45,9 +45,8 @@ Connection = (function(_super) { } Connection.prototype.connect = function(callback) { - var initialErrorHandler; + var connect, initialErrorHandler; this.connectedCallback = callback; - this.connection = net.createConnection(this.connectionOptions.port, this.connectionOptions.host); initialErrorHandler = (function(_this) { return function(err) { if (_this.connectedCallback) { @@ -57,8 +56,48 @@ Connection = (function(_super) { } }; })(this); - this.connection.on('error', initialErrorHandler); - return this.connection.on('connect', (function(_this) { + connect = (function(_this) { + return function() { + var redirectConn; + redirectConn = function(msg) { + if (_this.debug) { + console.log('Redirect to = ', msg.host, _this.connectionOptions.port); + } + _this.connection.destroy(); + _this.redirctedHost = msg.host; + _this.connection = net.createConnection(_this.connectionOptions.port, msg.host); + _this.connection.on('error', initialErrorHandler); + return _this.connection.on('connect', function() { + return _this.emit('connect'); + }); + }; + _this.connection = net.createConnection(_this.connectionOptions.port, _this.connectionOptions.host); + _this.connection.on('error', initialErrorHandler); + return _this.connection.on('connect', function() { + if (_this.connectionOptions.loadBalance) { + _this.connection.removeListener('error', initialErrorHandler); + _this._bindEventListeners(); + _this.connection.once('data', function(buffer) { + var message, size; + if (buffer.toString('ascii') === 'N') { + if (_this.debug) { + console.log("Connection load balancing is turned OFF on server"); + } + return _this.emit('connect'); + } else { + size = buffer.readUInt32BE(1); + message = BackendMessage.fromBuffer(buffer.slice(0, size + 1)); + return redirectConn(message); + } + }); + return _this._writeMessage(new FrontendMessage.ConnectionLoadBalance()); + } else { + return _this.emit('connect'); + } + }); + }; + })(this); + this.on('connect', (function(_this) { return function() { _this.connection.removeListener('error', initialErrorHandler); _this.connected = true; @@ -109,6 +148,8 @@ Connection = (function(_super) { } }; })(this)); + connect(); + return this.connection; }; Connection.prototype._bindEventListeners = function() { diff --git a/lib/frontend_message.js b/lib/frontend_message.js index d58b777..22836b8 100644 --- a/lib/frontend_message.js +++ b/lib/frontend_message.js @@ -43,6 +43,26 @@ FrontendMessage = (function() { })(); +FrontendMessage.ConnectionLoadBalance = (function(_super) { + __extends(ConnectionLoadBalance, _super); + + function ConnectionLoadBalance() { + return ConnectionLoadBalance.__super__.constructor.apply(this, arguments); + } + + ConnectionLoadBalance.prototype.typeId = null; + + ConnectionLoadBalance.prototype.payload = function() { + var arr, pl; + arr = [0x0, 0x0, 0x0, 0x8, 0x4, 0xd3, 0x0, 0x0]; + pl = new Buffer(arr); + return pl.slice(4); + }; + + return ConnectionLoadBalance; + +})(FrontendMessage); + FrontendMessage.Startup = (function(_super) { __extends(Startup, _super); diff --git a/src/backend_message.coffee b/src/backend_message.coffee index bc577b9..f527922 100644 --- a/src/backend_message.coffee +++ b/src/backend_message.coffee @@ -10,6 +10,12 @@ class BackendMessage read: (buffer) -> # Implement me in subclass +class BackendMessage.ConnectionLoadBalanceResponse extends BackendMessage + typeId: 89 + + read: (buffer) -> + this.host = buffer.toString('ascii', 4, buffer.length - 1) + class BackendMessage.Authentication extends BackendMessage typeId: 82 # R diff --git a/src/connection.coffee b/src/connection.coffee index 99498d3..0844d75 100644 --- a/src/connection.coffee +++ b/src/connection.coffee @@ -28,14 +28,45 @@ class Connection extends EventEmitter connect: (callback) -> @connectedCallback = callback - @connection = net.createConnection @connectionOptions.port, @connectionOptions.host initialErrorHandler = (err) => if @connectedCallback then @connectedCallback(err.message) else @emit 'error', err - @connection.on 'error', initialErrorHandler + connect = () => + redirectConn = (msg) => + console.log 'Redirect to = ', msg.host, @connectionOptions.port if @debug + @connection.destroy() + @redirctedHost = msg.host + @connection = net.createConnection(@connectionOptions.port, msg.host) + @connection.on 'error', initialErrorHandler - @connection.on 'connect', => + @connection.on 'connect', () => + @emit 'connect' + + @connection = net.createConnection @connectionOptions.port, @connectionOptions.host + + @connection.on 'error', initialErrorHandler + + @connection.on 'connect', () => + if @connectionOptions.loadBalance + @connection.removeListener 'error', initialErrorHandler + @_bindEventListeners() + + @connection.once 'data', (buffer) => + if buffer.toString('ascii') == 'N' + console.log "Connection load balancing is turned OFF on server" if @debug + @emit 'connect' + else + size = buffer.readUInt32BE(1) + message = BackendMessage.fromBuffer(buffer.slice(0, size + 1)) + redirectConn message + + @_writeMessage new FrontendMessage.ConnectionLoadBalance() + + else + @emit 'connect' + + @on 'connect', => @connection.removeListener 'error', initialErrorHandler @connected = true @_bindEventListeners() @@ -65,6 +96,10 @@ class Connection extends EventEmitter else @_handshake() + connect() + + @connection + _bindEventListeners: -> @connection.once 'close', @_onClose.bind(this) @connection.once 'error', @_onError.bind(this) diff --git a/src/frontend_message.coffee b/src/frontend_message.coffee index 9be7bb2..3ba98e1 100644 --- a/src/frontend_message.coffee +++ b/src/frontend_message.coffee @@ -33,6 +33,13 @@ class FrontendMessage return messageBuffer +class FrontendMessage.ConnectionLoadBalance extends FrontendMessage + typeId: null + + payload: -> + arr = [0x0, 0x0, 0x0, 0x8, 0x4, 0xd3, 0x0, 0x0] + pl = new Buffer(arr) + pl.slice 4 class FrontendMessage.Startup extends FrontendMessage typeId: null diff --git a/test/functional/connection_test.coffee b/test/functional/connection_test.coffee index e778b13..2b4caae 100644 --- a/test/functional/connection_test.coffee +++ b/test/functional/connection_test.coffee @@ -71,6 +71,79 @@ describe 'Vertica.connect', -> assert.equal err.message, 'The connection was closed.' done() + describe 'Connection Load Balancing', -> + origLoadBalancingPolicy = '' + + beforeEach (done) -> + if !fs.existsSync('./test/connection.json') + throw new Error("Create test/connection.json to run functional tests") + else + connectionInfo = JSON.parse(fs.readFileSync('./test/connection.json')) + Vertica.connect connectionInfo, (err, connection) -> + assert.equal err, null + assert.ok !connection.busy + assert.ok connection.connected + + connection.query "SELECT GET_LOAD_BALANCE_POLICY()", (err, resultset) -> + assert.ok resultset instanceof Vertica.Resultset + assert.ok resultset.rows.length == 1 + origLoadBalancingPolicy = resultset.rows[0][0] + done() + + it "should connect even when load balancing is truned OFF", (done) -> + Vertica.connect connectionInfo, (err, connection) -> + assert.equal err, null + assert.ok !connection.busy + assert.ok connection.connected + + connection.query "SELECT SET_LOAD_BALANCE_POLICY('NONE')", (err, resultset) -> + assert.equal err, null + + connectionInfo.loadBalance = true + Vertica.connect connectionInfo, (err, conn) -> + assert.equal err, null + assert.ok !conn.busy + assert.ok conn.connected + + done() + + assert.ok connection.busy + assert.ok connection.connected + + it "should connect to different host at least once", (done) -> + Vertica.connect connectionInfo, (err, connection) -> + assert.equal err, null + assert.ok !connection.busy + assert.ok connection.connected + + connection.query "SELECT SET_LOAD_BALANCE_POLICY('ROUNDROBIN')", (err, resultset) -> + assert.equal err, null + + connectionInfo.loadBalance = true + Vertica.connect connectionInfo, (err, conn) -> + assert.equal err, null + assert.ok !conn.busy + assert.ok conn.connected + if connectionInfo.host == conn.redirctedHost + Vertica.connect connectionInfo, (err, conn) -> + assert.equal err, null + assert.ok !conn.busy + assert.ok conn.connected + assert.notEqual conn.redirctedHost, connectionInfo.host + + done() + + assert.ok connection.busy + assert.ok connection.connected + + afterEach (done) -> + Vertica.connect connectionInfo, (err, connection) -> + return done(err) if err? + + connection.query "SELECT SET_LOAD_BALANCE_POLICY('" + origLoadBalancingPolicy + "')", (err, rs) -> + return done(err) if err? + done() + describe 'Statement interruption', -> beforeEach (done) -> if !fs.existsSync('./test/connection.json') diff --git a/test/unit/backend_message_test.coffee b/test/unit/backend_message_test.coffee index db2ffc7..477b550 100644 --- a/test/unit/backend_message_test.coffee +++ b/test/unit/backend_message_test.coffee @@ -2,6 +2,12 @@ assert = require 'assert' BackendMessage = require('../../src/backend_message') Buffer = require('../../src/buffer').Buffer +describe 'BackendMessage.ConnectionLoadBalanceResponse', -> + it "should read a message correctly", -> + message = BackendMessage.fromBuffer(new Buffer([0x59, 0x00, 0x00, 0x00, 0x15, 0x00, 0x00, 0x15, 0x39, 0x31, 0x39, 0x32, 0x2e, 0x31, 0x36, 0x38, 0x2e, 0x31, 0x2e, 0x33, 0x39, 0x00])) + assert.ok message instanceof BackendMessage.ConnectionLoadBalanceResponse + assert.equal message.host, '192.168.1.39' + describe 'BackendMessage.Authentication', -> it "should read a message correctly", -> message = BackendMessage.fromBuffer(new Buffer([0x52, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00])) diff --git a/test/unit/frontend_message_test.coffee b/test/unit/frontend_message_test.coffee index cde2643..0399e65 100644 --- a/test/unit/frontend_message_test.coffee +++ b/test/unit/frontend_message_test.coffee @@ -4,6 +4,12 @@ Buffer = require('../../src/buffer').Buffer FrontendMessage = require('../../src/frontend_message') Authentication = require('../../src/authentication') +describe "FrontendMessage.ConnectionLoadBalance", -> + it "should encode the message correctly", -> + topic = new FrontendMessage.ConnectionLoadBalance + reference = new Buffer([0x0, 0x0, 0x0, 0x8, 0x4, 0xD3, 0x0, 0x0]) + assert.deepEqual topic.toBuffer(), reference + describe "FrontendMessage.Startup", -> it "should hold the message's information", -> topic = new FrontendMessage.Startup('username', 'database')