Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 34 additions & 25 deletions pybana/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,9 @@
}


def _fix_es(using):
if isinstance(using, ElasticsearchExtClient):
return using
using = using or "default"
es = elasticsearch_dsl.connections.get_connection(using)
if not isinstance(es, Elasticsearch):
return es
es_ext = ElasticsearchExtClient(es)
elasticsearch_dsl.connections.add_connection(using, es_ext)
return es_ext


class Kibana:
"""
Kibana client.
Kibana client (with support of multi elasticsearch).
"""

klasses = {
Expand All @@ -47,29 +35,45 @@ class Kibana:
"search": Search,
}

def __init__(self, index=".kibana"):
def get_es(self, using):
using = using or self._default
if isinstance(using, ElasticsearchExtClient):
return using
es = elasticsearch_dsl.connections.get_connection(using)
if not isinstance(es, Elasticsearch):
return es
es_ext = ElasticsearchExtClient(es)
if isinstance(using, str):
elasticsearch_dsl.connections.add_connection(using, es_ext)
return es_ext

def __init__(self, *, using, index=".kibana"):
"""
Initialize a client to kibana.

:param index string: Index used by kibana (default: .kibana).
"""
self._default = self.get_es(using)
self._index = index

@property
def using(self):
return self._default

def _search(self, type, using):
klass = self.klasses.get(type)
search = klass.search if klass else elasticsearch_dsl.Search
return search(index=self._index, using=_fix_es(using))
es = self.get_es(using)
return search(index=self._index, using=es)

def _get(self, klass, id, using):
ret = klass.get(index=self._index, id=id, using=_fix_es(using))
return ret
return klass.get(index=self._index, id=id, using=self.get_es(using))

def objects(self, type, using=None):
return self._search(type, using=using).filter("term", type=type)

def config_id(self, using=None):
elastic = _fix_es(using or "default")
# return "config:6.7.1"
elastic = self.get_es(using)
return "config:%s" % elastic.info()["version"]["number"]

def config(self, using=None):
Expand All @@ -78,11 +82,16 @@ def config(self, using=None):
"""
return self._get(Config, self.config_id(using), using=using)

def init_index(self):
def is_v8(self, using=None):
elastic = self.get_es(using)
version = elastic.info()["version"]["number"].split(".")[0]
return version >= "8"

def init_index(self, using=None):
"""
Create the elasticsearch index as kibana would do.
"""
elastic = _fix_es("default")
elastic = self.get_es(using)
mappingsfn = os.path.join(os.path.dirname(__file__), "mappings.json")
suffix = 1
while not elastic.indices.exists(self._index):
Expand All @@ -103,9 +112,9 @@ def init_config(self, using=None):
try:
self.config(using=using)
except NotFoundError:
Config(config=DEFAULT_CONFIG, meta={"id": self.config_id()}).save(
index=self._index, refresh="wait_for", using=using
)
Config(
config=DEFAULT_CONFIG, meta={"id": self.config_id(using=using)}
).save(index=self._index, refresh="wait_for", using=self.get_es(using))

def index_patterns(self, using=None):
"""
Expand Down Expand Up @@ -167,4 +176,4 @@ def update_or_create_default_index_pattern(self, index_pattern, using=None):
config = self.config(using)
if not config.config.to_dict().get("defaultIndex"):
config.config.defaultIndex = index_pattern.meta.id.split(":")[-1]
config.save(refresh="wait_for")
config.save(refresh="wait_for", using=using or self.using)
10 changes: 7 additions & 3 deletions pybana/elastic/fixes_for_v8.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,23 @@ def _is_calendar_interval(interval: str) -> bool:

class V6ToV8:
def fix_transport_instance(self, transport: Transport):
_perform_request = transport.perform_request
transport._perform_request = _perform_request # type: ignore
if hasattr(transport, "_perform_request_v8"):
print("Transport already fixed for v8")
return

def new_perform_request(method, url, headers=None, params=None, body=None):
try:
return transport._perform_request( # type: ignore
# print(f"new_perform_request: method={method}, url={url}, params:{params}, body:{body}")
return transport._perform_request_v8( # type: ignore
method=method, url=url, headers=headers, params=params, body=body
)
except TransportError as e:
if len(e.args) > 2:
e.args = v6_to_v8.fix_transport_error_args(e.args)
raise

_perform_request_v8 = transport.perform_request
transport._perform_request_v8 = _perform_request_v8 # type: ignore
transport.perform_request = new_perform_request

def fix_dynamic_template(self, dynamic: Dict) -> bool:
Expand Down
25 changes: 16 additions & 9 deletions pybana/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

class BaseDocument(Document):
type = Keyword()
# _using = None

# List of json attributes.
json_attrs = []
Expand Down Expand Up @@ -47,40 +48,46 @@ class IndexPattern(BaseDocument):
class Search(BaseDocument):
_type = "search"

def index(self):
def index(self, using):
"""
Returns the index-pattern associated to the visualization. Go through the
search if needed.
"""
search_source = self.search["kibanaSavedObjectMeta"]["searchSourceJSON"]
key = json.loads(search_source).get("index")
return IndexPattern.get(id=f"index-pattern:{key}", index=self.meta.index)
return IndexPattern.get(
id=f"index-pattern:{key}", index=self.meta.index, using=using
)


class Visualization(BaseDocument):
_type = "visualization"
json_attrs = ["visState", "uiStateJSON"]

def related_search(self):
def related_search(self, using):
"""
Returns the search associated to the visualization.

An error is raised if the visualization is not associated to any search.
"""
return Search.get(
id=f"search:{self.visualization.savedSearchId}", index=self.meta.index
id=f"search:{self.visualization.savedSearchId}",
index=self.meta.index,
using=using,
)

def index(self):
def index(self, using):
"""
Returns the index-pattern associated to the visualization. Go through the
search if needed.
"""
if hasattr(self.visualization, "savedSearchId"):
return self.related_search().index()
return self.related_search(using).index(using)
search_source = self.visualization.kibanaSavedObjectMeta.searchSourceJSON
key = json.loads(search_source).get("index")
return IndexPattern.get(id=f"index-pattern:{key}", index=self.meta.index)
return IndexPattern.get(
id=f"index-pattern:{key}", index=self.meta.index, using=using
)

def filters(self):
"""
Expand All @@ -98,7 +105,7 @@ class Dashboard(BaseDocument):
_type = "dashboard"
json_attrs = ["panelsJSON", "optionsJSON"]

def visualizations(self, missing="skip", using=None):
def visualizations(self, *, using, missing="skip"):
"""
Does the join automatically by parsing panelsJSON.

Expand All @@ -117,7 +124,7 @@ def visualizations(self, missing="skip", using=None):
else []
)

def searches(self, missing="skip", using=None):
def searches(self, *, using, missing="skip"):
"""
Does the join automatically by parsing panelsJSON.

Expand Down
11 changes: 8 additions & 3 deletions pybana/translators/elastic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@


class ElasticTranslator:
def __init__(self, using):
self._using = using

def translate_vega(self, visualization, scope):
def replace_magic_keywords(node):
if isinstance(node, list):
Expand Down Expand Up @@ -66,7 +69,9 @@ def translate_data_item(data):
index = data["url"]["index"]
body = replace_magic_keywords(data["url"]["body"])
body = add_time_zone(body)
search = elasticsearch_dsl.Search(index=index).update_from_dict(body)
search = elasticsearch_dsl.Search(
index=index, using=self._using
).update_from_dict(body)
if data["url"].get("%timefield%"):
ts = data["url"]["%timefield%"]
search = search.filter(
Expand All @@ -79,7 +84,7 @@ def translate_data_item(data):
}
)
else:
search = elasticsearch_dsl.Search()[:0]
search = elasticsearch_dsl.Search(using=self._using)[:0]
return search

spec = hjson.loads(visualization.visState["params"]["spec"])
Expand All @@ -91,7 +96,7 @@ def translate_data_item(data):
)

def translate_legacy(self, visualization, scope):
index_pattern = visualization.index()
index_pattern = visualization.index(using=self._using)
fields = {
field["name"]: field
for field in json.loads(index_pattern["index-pattern"]["fields"])
Expand Down
1 change: 1 addition & 0 deletions pybana/translators/vega/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def flatten(value):
values = [
value
for hit in bucket[agg["id"]]["hits"]["hits"]
if agg["params"]["field"] in hit["_source"]
for value in flatten(hit["_source"][agg["params"]["field"]])
if value is not None
]
Expand Down
7 changes: 6 additions & 1 deletion pybana/translators/vega/vega.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@


class VegaTranslator:
def __init__(self, using):
self._using = using

def conf(self, state):
return {
"$schema": "https://vega.github.io/schema/vega/v5.json",
Expand Down Expand Up @@ -998,7 +1001,9 @@ def marks_bar(self, conf, state, response):
return conf

def translate_legacy(self, visualization, response, scope):
state = ContextVisualization(visualization=visualization, config=scope.config)
state = ContextVisualization(
visualization=visualization, config=scope.config, using=self._using
)

ret = self.conf(state)
ret = self.data(ret, state, response, scope)
Expand Down
4 changes: 2 additions & 2 deletions pybana/translators/vega/visualization.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class ContextVisualization:
:param pybana.Config config: Config of the kibana instance.
"""

def __init__(self, visualization, config):
self._index_pattern = visualization.index()
def __init__(self, visualization, config, using=None):
self._index_pattern = visualization.index(using=using)
self._state = visualization.visState.to_dict()
self._ui_state = visualization.uiStateJSON.to_dict()
self._config = config
Expand Down
Loading