diff --git a/pybana/client.py b/pybana/client.py index 1fb662b..f76deb5 100644 --- a/pybana/client.py +++ b/pybana/client.py @@ -23,21 +23,9 @@ } -def _fix_es(using): - if isinstance(using, ElasticsearchExtClient): - return using - using = using or "default" - es = elasticsearch_dsl.connections.get_connection(using) - if not isinstance(es, Elasticsearch): - return es - es_ext = ElasticsearchExtClient(es) - elasticsearch_dsl.connections.add_connection(using, es_ext) - return es_ext - - class Kibana: """ - Kibana client. + Kibana client (with support of multi elasticsearch). """ klasses = { @@ -47,29 +35,45 @@ class Kibana: "search": Search, } - def __init__(self, index=".kibana"): + def get_es(self, using): + using = using or self._default + if isinstance(using, ElasticsearchExtClient): + return using + es = elasticsearch_dsl.connections.get_connection(using) + if not isinstance(es, Elasticsearch): + return es + es_ext = ElasticsearchExtClient(es) + if isinstance(using, str): + elasticsearch_dsl.connections.add_connection(using, es_ext) + return es_ext + + def __init__(self, *, using, index=".kibana"): """ Initialize a client to kibana. :param index string: Index used by kibana (default: .kibana). """ + self._default = self.get_es(using) self._index = index + @property + def using(self): + return self._default + def _search(self, type, using): klass = self.klasses.get(type) search = klass.search if klass else elasticsearch_dsl.Search - return search(index=self._index, using=_fix_es(using)) + es = self.get_es(using) + return search(index=self._index, using=es) def _get(self, klass, id, using): - ret = klass.get(index=self._index, id=id, using=_fix_es(using)) - return ret + return klass.get(index=self._index, id=id, using=self.get_es(using)) def objects(self, type, using=None): return self._search(type, using=using).filter("term", type=type) def config_id(self, using=None): - elastic = _fix_es(using or "default") - # return "config:6.7.1" + elastic = self.get_es(using) return "config:%s" % elastic.info()["version"]["number"] def config(self, using=None): @@ -78,11 +82,16 @@ def config(self, using=None): """ return self._get(Config, self.config_id(using), using=using) - def init_index(self): + def is_v8(self, using=None): + elastic = self.get_es(using) + version = elastic.info()["version"]["number"].split(".")[0] + return version >= "8" + + def init_index(self, using=None): """ Create the elasticsearch index as kibana would do. """ - elastic = _fix_es("default") + elastic = self.get_es(using) mappingsfn = os.path.join(os.path.dirname(__file__), "mappings.json") suffix = 1 while not elastic.indices.exists(self._index): @@ -103,9 +112,9 @@ def init_config(self, using=None): try: self.config(using=using) except NotFoundError: - Config(config=DEFAULT_CONFIG, meta={"id": self.config_id()}).save( - index=self._index, refresh="wait_for", using=using - ) + Config( + config=DEFAULT_CONFIG, meta={"id": self.config_id(using=using)} + ).save(index=self._index, refresh="wait_for", using=self.get_es(using)) def index_patterns(self, using=None): """ @@ -167,4 +176,4 @@ def update_or_create_default_index_pattern(self, index_pattern, using=None): config = self.config(using) if not config.config.to_dict().get("defaultIndex"): config.config.defaultIndex = index_pattern.meta.id.split(":")[-1] - config.save(refresh="wait_for") + config.save(refresh="wait_for", using=using or self.using) diff --git a/pybana/elastic/fixes_for_v8.py b/pybana/elastic/fixes_for_v8.py index 68f87d6..5248f15 100644 --- a/pybana/elastic/fixes_for_v8.py +++ b/pybana/elastic/fixes_for_v8.py @@ -142,12 +142,14 @@ def _is_calendar_interval(interval: str) -> bool: class V6ToV8: def fix_transport_instance(self, transport: Transport): - _perform_request = transport.perform_request - transport._perform_request = _perform_request # type: ignore + if hasattr(transport, "_perform_request_v8"): + print("Transport already fixed for v8") + return def new_perform_request(method, url, headers=None, params=None, body=None): try: - return transport._perform_request( # type: ignore + # print(f"new_perform_request: method={method}, url={url}, params:{params}, body:{body}") + return transport._perform_request_v8( # type: ignore method=method, url=url, headers=headers, params=params, body=body ) except TransportError as e: @@ -155,6 +157,8 @@ def new_perform_request(method, url, headers=None, params=None, body=None): e.args = v6_to_v8.fix_transport_error_args(e.args) raise + _perform_request_v8 = transport.perform_request + transport._perform_request_v8 = _perform_request_v8 # type: ignore transport.perform_request = new_perform_request def fix_dynamic_template(self, dynamic: Dict) -> bool: diff --git a/pybana/models.py b/pybana/models.py index f9a907e..ed240a8 100644 --- a/pybana/models.py +++ b/pybana/models.py @@ -9,6 +9,7 @@ class BaseDocument(Document): type = Keyword() + # _using = None # List of json attributes. json_attrs = [] @@ -47,40 +48,46 @@ class IndexPattern(BaseDocument): class Search(BaseDocument): _type = "search" - def index(self): + def index(self, using): """ Returns the index-pattern associated to the visualization. Go through the search if needed. """ search_source = self.search["kibanaSavedObjectMeta"]["searchSourceJSON"] key = json.loads(search_source).get("index") - return IndexPattern.get(id=f"index-pattern:{key}", index=self.meta.index) + return IndexPattern.get( + id=f"index-pattern:{key}", index=self.meta.index, using=using + ) class Visualization(BaseDocument): _type = "visualization" json_attrs = ["visState", "uiStateJSON"] - def related_search(self): + def related_search(self, using): """ Returns the search associated to the visualization. An error is raised if the visualization is not associated to any search. """ return Search.get( - id=f"search:{self.visualization.savedSearchId}", index=self.meta.index + id=f"search:{self.visualization.savedSearchId}", + index=self.meta.index, + using=using, ) - def index(self): + def index(self, using): """ Returns the index-pattern associated to the visualization. Go through the search if needed. """ if hasattr(self.visualization, "savedSearchId"): - return self.related_search().index() + return self.related_search(using).index(using) search_source = self.visualization.kibanaSavedObjectMeta.searchSourceJSON key = json.loads(search_source).get("index") - return IndexPattern.get(id=f"index-pattern:{key}", index=self.meta.index) + return IndexPattern.get( + id=f"index-pattern:{key}", index=self.meta.index, using=using + ) def filters(self): """ @@ -98,7 +105,7 @@ class Dashboard(BaseDocument): _type = "dashboard" json_attrs = ["panelsJSON", "optionsJSON"] - def visualizations(self, missing="skip", using=None): + def visualizations(self, *, using, missing="skip"): """ Does the join automatically by parsing panelsJSON. @@ -117,7 +124,7 @@ def visualizations(self, missing="skip", using=None): else [] ) - def searches(self, missing="skip", using=None): + def searches(self, *, using, missing="skip"): """ Does the join automatically by parsing panelsJSON. diff --git a/pybana/translators/elastic/__init__.py b/pybana/translators/elastic/__init__.py index 33bed9e..c97e21f 100644 --- a/pybana/translators/elastic/__init__.py +++ b/pybana/translators/elastic/__init__.py @@ -13,6 +13,9 @@ class ElasticTranslator: + def __init__(self, using): + self._using = using + def translate_vega(self, visualization, scope): def replace_magic_keywords(node): if isinstance(node, list): @@ -66,7 +69,9 @@ def translate_data_item(data): index = data["url"]["index"] body = replace_magic_keywords(data["url"]["body"]) body = add_time_zone(body) - search = elasticsearch_dsl.Search(index=index).update_from_dict(body) + search = elasticsearch_dsl.Search( + index=index, using=self._using + ).update_from_dict(body) if data["url"].get("%timefield%"): ts = data["url"]["%timefield%"] search = search.filter( @@ -79,7 +84,7 @@ def translate_data_item(data): } ) else: - search = elasticsearch_dsl.Search()[:0] + search = elasticsearch_dsl.Search(using=self._using)[:0] return search spec = hjson.loads(visualization.visState["params"]["spec"]) @@ -91,7 +96,7 @@ def translate_data_item(data): ) def translate_legacy(self, visualization, scope): - index_pattern = visualization.index() + index_pattern = visualization.index(using=self._using) fields = { field["name"]: field for field in json.loads(index_pattern["index-pattern"]["fields"]) diff --git a/pybana/translators/vega/metrics.py b/pybana/translators/vega/metrics.py index 0f7b2a4..0b93cc8 100644 --- a/pybana/translators/vega/metrics.py +++ b/pybana/translators/vega/metrics.py @@ -93,6 +93,7 @@ def flatten(value): values = [ value for hit in bucket[agg["id"]]["hits"]["hits"] + if agg["params"]["field"] in hit["_source"] for value in flatten(hit["_source"][agg["params"]["field"]]) if value is not None ] diff --git a/pybana/translators/vega/vega.py b/pybana/translators/vega/vega.py index 74481c6..0b06ff2 100644 --- a/pybana/translators/vega/vega.py +++ b/pybana/translators/vega/vega.py @@ -25,6 +25,9 @@ class VegaTranslator: + def __init__(self, using): + self._using = using + def conf(self, state): return { "$schema": "https://vega.github.io/schema/vega/v5.json", @@ -998,7 +1001,9 @@ def marks_bar(self, conf, state, response): return conf def translate_legacy(self, visualization, response, scope): - state = ContextVisualization(visualization=visualization, config=scope.config) + state = ContextVisualization( + visualization=visualization, config=scope.config, using=self._using + ) ret = self.conf(state) ret = self.data(ret, state, response, scope) diff --git a/pybana/translators/vega/visualization.py b/pybana/translators/vega/visualization.py index b2761ec..e7c31c7 100644 --- a/pybana/translators/vega/visualization.py +++ b/pybana/translators/vega/visualization.py @@ -14,8 +14,8 @@ class ContextVisualization: :param pybana.Config config: Config of the kibana instance. """ - def __init__(self, visualization, config): - self._index_pattern = visualization.index() + def __init__(self, visualization, config, using=None): + self._index_pattern = visualization.index(using=using) self._state = visualization.visState.to_dict() self._ui_state = visualization.uiStateJSON.to_dict() self._config = config diff --git a/tests/test_pybana.py b/tests/test_pybana.py index e8c3a72..fbd0418 100644 --- a/tests/test_pybana.py +++ b/tests/test_pybana.py @@ -29,9 +29,14 @@ PYBANA_INDEX = ".kibana_pybana_test" -ELASTIC1 = elasticsearch.Elasticsearch() -ELASTIC = ElasticsearchExtClient() -elasticsearch_dsl.connections.add_connection("default", ELASTIC1) +ELASTICSEARCH_V6 = elasticsearch.Elasticsearch() +ELASTICSEARCH_V8 = elasticsearch.Elasticsearch(["http://localhost:9201"]) +ELASTIC_V6 = ElasticsearchExtClient() +ELASTIC_V8 = ElasticsearchExtClient(ELASTICSEARCH_V8) +ELASTICS = {"default": ELASTIC_V6, "v6": ELASTIC_V6, "v8": ELASTIC_V8} +elasticsearch_dsl.connections.add_connection("default", ELASTIC_V6) +elasticsearch_dsl.connections.add_connection("v6", ELASTIC_V6) +elasticsearch_dsl.connections.add_connection("v8", ELASTIC_V8) def load_fixtures(elastic, kibana, index): @@ -95,11 +100,20 @@ def actions(): elasticsearch.helpers.bulk(elastic, actions(), refresh="wait_for") -def test_client(): - kibana = Kibana(PYBANA_INDEX) - ELASTIC.indices.delete(f"{PYBANA_INDEX}*") - ELASTIC.indices.create(f"{PYBANA_INDEX}_1") - load_fixtures(ELASTIC, kibana, PYBANA_INDEX) +def test_client_v6(): + client_test("v6") + + +def test_client_v8(): + client_test("v8") + + +def client_test(version): + kibana = Kibana(index=PYBANA_INDEX, using=version) + elastic = ELASTICS[version] + elastic.indices.delete(f"{PYBANA_INDEX}*") + elastic.indices.create(f"{PYBANA_INDEX}_1") + load_fixtures(elastic, kibana, PYBANA_INDEX) kibana.init_config() kibana.init_config() assert kibana.config() @@ -115,27 +129,44 @@ def test_client(): visualization = kibana.visualization("6eab7cb0-fb18-11e9-84e4-078763638bf3") visualization.visState visualization.uiStateJSON - assert visualization.index().meta.id == index_pattern.meta.id + assert visualization.index(using=elastic).meta.id == index_pattern.meta.id dashboards = list(kibana.dashboards()) assert len(dashboards) == 1 dashboard = kibana.dashboard("f57a7160-fb18-11e9-84e4-078763638bf3") dashboard.panelsJSON dashboard.optionsJSON - assert len(dashboard.visualizations()) == 2 + assert len(dashboard.visualizations(using=elastic)) == 2 visualization = kibana.visualization("f4a09a00-fe77-11e9-8c18-250a1adff826") - search = visualization.related_search() + search = visualization.related_search(using=elastic) assert search.meta.id == "search:2139a4e0-fe77-11e9-833a-0fef2d7dd143" assert len(list(kibana.searches())) == 1 search = kibana.search("2139a4e0-fe77-11e9-833a-0fef2d7dd143") - assert visualization.index().meta.id == index_pattern.meta.id + assert visualization.index(using=elastic).meta.id == index_pattern.meta.id + + +def test_translators_v6(): + translators_test("v6") + + +def test_translators_v8(): + translators_test("v8") + +def translators_test(version): + # elasticsearch_dsl.connections.add_connection("default", ELASTIC_V8) + + kibana = Kibana( + index=PYBANA_INDEX, using=elasticsearch_dsl.connections.get_connection(version) + ) + elastic = ELASTICS[version] + load_fixtures(elastic, kibana, PYBANA_INDEX) + load_data(elastic, "pybana") + assert isinstance(elastic, ElasticsearchExtClient) -def test_translators(): - kibana = Kibana(PYBANA_INDEX) - load_fixtures(ELASTIC, kibana, PYBANA_INDEX) - load_data(ELASTIC, "pybana") kibana.init_config() - translator = ElasticTranslator() + # assert False + + translator = ElasticTranslator(using=elastic) scope = Scope( datetime.datetime(2019, 1, 1, tzinfo=pytz.utc), datetime.datetime(2019, 1, 3, tzinfo=pytz.utc), @@ -173,7 +204,7 @@ def test_translators(): "96645fc0-d636-11ea-8206-6f7030d7dd42", ): response = search.execute() - VegaTranslator().translate(visualization, response, scope) + VegaTranslator(using=elastic).translate(visualization, response, scope) if visualization_id in ("d6c8b900-eea7-11eb-8e30-87c8d06ba6ff",): response = search.execute() metric = VEGA_METRICS["top_hits"]() @@ -191,12 +222,22 @@ def test_translators(): assert ret == results[agg["id"]] -def test_vega_visualization(): - kibana = Kibana(PYBANA_INDEX) - load_fixtures(ELASTIC, kibana, PYBANA_INDEX) - load_data(ELASTIC, "pybana") +def test_vega_visualization_v6(): + vega_visualization_test("v6") + + +def test_vega_visualization_v8(): + vega_visualization_test("v8") + + +def vega_visualization_test(version): + kibana = Kibana(index=PYBANA_INDEX, using=version) + elastic = ELASTICS[version] + + load_fixtures(elastic, kibana, PYBANA_INDEX) + load_data(elastic, "pybana") kibana.init_config() - translator = ElasticTranslator() + translator = ElasticTranslator(using=elastic) scope = Scope( datetime.datetime(2019, 1, 1, tzinfo=pytz.utc), datetime.datetime(2019, 1, 3, tzinfo=pytz.utc), @@ -216,7 +257,7 @@ def test_vega_visualization(): assert search_data["aggs"]["category"]["date_histogram"]["interval"] == "1h" response = search.execute() - VegaTranslator().translate(visualization, response, scope) + VegaTranslator(using=elastic).translate(visualization, response, scope) def test_elastic_translator_helpers():