From b51cb39f1038aecc05327c374e885ca34e2aa500 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Thu, 19 Jun 2025 16:26:51 +0200 Subject: [PATCH] feat(client): support modern and legacy elasticsearch clients --- configMap.js | 42 ++++++++++++++++++++++++++++++++++++++++++ index.js | 11 ++++++++++- package.json | 2 ++ src/client.js | 18 ++++++++++++++++-- test/index.js | 11 ++++++----- test/stream.js | 2 +- v2/client.js | 20 ++++++++++++++++++++ v2/sink.js | 18 ++++++++++++++++++ 8 files changed, 115 insertions(+), 9 deletions(-) create mode 100644 configMap.js create mode 100644 v2/client.js create mode 100644 v2/sink.js diff --git a/configMap.js b/configMap.js new file mode 100644 index 0000000..e252b70 --- /dev/null +++ b/configMap.js @@ -0,0 +1,42 @@ +const _ = require('lodash'); + +/** + * This module is responsible for mapping betwen the legacy `npm elasticsearch` + * config format and the modern `npm @elastic/elasticsearch` format. + * + * legacy: https://www.elastic.co/guide/en/elasticsearch/client/elasticsearch-js/16.x/config-options.html + * modern: https://www.elastic.co/docs/reference/elasticsearch/clients/javascript/basic-config#maxretries + */ + +const modernToLegacy = (modern) => { + const legacy = {}; + + // modern config allows for *either* the key 'node' (string) or 'nodes' ([]string) + const nodes = _.has(modern, 'nodes') ? _.get(modern, 'nodes') : [_.get(modern, 'node', {})]; + _.set(legacy, 'hosts', nodes); + + // map basic configuration options + _.set(legacy, 'maxRetries', _.get(modern, 'maxRetries', undefined)); + _.set(legacy, 'requestTimeout', _.get(modern, 'requestTimeout', undefined)); + + return legacy; +}; + +const legacyToModern = (legacy) => { + const modern = {}; + + // legacy config allows for *either* the key 'host' (string) or 'hosts' ([]string|[]Object) + const hosts = _.has(legacy, 'hosts') ? _.get(legacy, 'hosts') : [_.get(legacy, 'host', {})]; + _.set(modern, 'nodes', hosts.map(host => { + if (_.isObject(host)) { host = `${host.protocol}://${host.host}:${host.port}`; } + return host; + })); + + // map basic configuration options + _.set(modern, 'maxRetries', _.get(legacy, 'maxRetries', undefined)); + _.set(modern, 'requestTimeout', _.get(legacy, 'requestTimeout', undefined)); + + return modern; +}; + +module.exports = { modernToLegacy, legacyToModern }; \ No newline at end of file diff --git a/index.js b/index.js index 20fb93f..84e8d55 100644 --- a/index.js +++ b/index.js @@ -2,4 +2,13 @@ if (process.env.NODE_ENV !== 'test') { require('./src/configValidation').validate(require('pelias-config').generate()); } -module.exports = require('./src/sink'); +module.exports = { + v1: { + client: require('./src/client'), + createWriteStream: require('./src/sink') + }, + v2: { + client: require('./v2/client'), + createWriteStream: require('./v2/sink') + } +}; diff --git a/package.json b/package.json index 7c68415..4bde4fd 100644 --- a/package.json +++ b/package.json @@ -36,8 +36,10 @@ "tape": "^5.0.0" }, "dependencies": { + "@elastic/elasticsearch": "^9.0.2", "@hapi/joi": "^16.0.0", "elasticsearch": "^16.0.0", + "lodash": "^4.17.21", "pelias-config": "^6.0.0", "pelias-logger": "^1.2.1", "through2": "^3.0.0" diff --git a/src/client.js b/src/client.js index 38b61f0..e1a0e44 100644 --- a/src/client.js +++ b/src/client.js @@ -1,6 +1,20 @@ +const _ = require('lodash'); const elasticsearch = require('elasticsearch'); -const settings = require('pelias-config').generate(); +const config = require('pelias-config').generate(); +const { modernToLegacy } = require('../configMap'); module.exports = function(){ - return new elasticsearch.Client( settings.esclient || {} ); + + // use legacy config + if (_.has(config, 'esclient')) { + return new elasticsearch.Client(config.esclient); + } + + // check for modern config + if (_.has(config, 'elasticsearch.client')) { + return new elasticsearch.Client(modernToLegacy(config.elasticsearch.client)); + } + + // no config specified + return new elasticsearch.Client({}); }; diff --git a/test/index.js b/test/index.js index 5a4204a..f2ba5f0 100644 --- a/test/index.js +++ b/test/index.js @@ -6,13 +6,15 @@ module.exports.tests = {}; module.exports.tests.interface = function(test) { test('configValidation not throwing error should return a function', function(t) { - const factory = proxyquire('../index', { + const index = proxyquire('../index', { './src/configValidation': { validate: () => {} } }); - t.equal(typeof factory, 'function', 'stream factory'); + t.equal(typeof index, 'object', 'exports'); + t.equal(typeof index.v1.client, 'function', 'client factory'); + t.equal(typeof index.v1.createWriteStream, 'function', 'stream factory'); t.end(); }); @@ -25,7 +27,7 @@ module.exports.tests.interface = function(test) { './src/configValidation': { validate: () => {} } - })(opts); + }).v1.createWriteStream(opts); t.equal(typeof stream, 'object', 'valid stream'); t.equal(typeof stream._read, 'function', 'valid readable'); @@ -49,8 +51,7 @@ module.exports.tests.invalidConfig = function(test) { throw Error('config is not valid'); } } - }); - + }).v1.createWriteStream(); }, /config is not valid/); process.env.NODE_ENV = env; diff --git a/test/stream.js b/test/stream.js index 64ad301..c587423 100644 --- a/test/stream.js +++ b/test/stream.js @@ -10,7 +10,7 @@ module.exports.tests.functional_example = function(test, common) { './src/configValidation': { validate: () => {} } - }); + }).v1.createWriteStream; test('functional example', function(t) { diff --git a/v2/client.js b/v2/client.js new file mode 100644 index 0000000..133c4f7 --- /dev/null +++ b/v2/client.js @@ -0,0 +1,20 @@ +const _ = require('lodash'); +const { Client } = require('@elastic/elasticsearch'); +const config = require('pelias-config').generate(); +const { legacyToModern } = require('../configMap'); + +module.exports = function(){ + + // use modern config + if (_.has(config, 'elasticsearch.client')) { + return new Client(config.elasticsearch.client); + } + + // check for legacy config + if (_.has(config, 'esclient')) { + return new Client(legacyToModern(config.esclient)); + } + + // no config specified + return new Client({}); +}; diff --git a/v2/sink.js b/v2/sink.js new file mode 100644 index 0000000..2903818 --- /dev/null +++ b/v2/sink.js @@ -0,0 +1,18 @@ +const through = require('through2'); + +function streamFactory( opts ){ + opts = opts || {}; + if( !opts.client ){ opts.client = require('./client')(); } + + // passthrough stream + const stream = through.obj(); + + stream.bulk = opts.client.helpers.bulk({ + datasource: stream, + onDocument (doc) { return doc; } + }); + + return stream; +} + +module.exports = streamFactory;