From 2d97f2921839a4d4fa60b1b3b4aa13925d8f49d4 Mon Sep 17 00:00:00 2001 From: Gerben Date: Sat, 27 Dec 2014 15:36:50 +0100 Subject: [PATCH 1/7] Remove needless context --- run.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/run.py b/run.py index 3af8aad..faa26e2 100755 --- a/run.py +++ b/run.py @@ -60,20 +60,19 @@ def main(): if app.config.get('AUTHZ_ON') is not None: es.authorization_enabled = app.config['AUTHZ_ON'] - with app.test_request_context(): - try: - annotation.Annotation.create_all() - document.Document.create_all() - except elasticsearch.exceptions.RequestError as e: - if e.error.startswith('MergeMappingException'): - date = time.strftime('%Y-%m-%d') - log.fatal("Elasticsearch index mapping is incorrect! Please " - "reindex it. You can use reindex.py for this, e.g. " - "python reindex.py --host {0} {1} {1}-{2}".format( - es.host, - es.index, - date)) - raise + try: + annotation.Annotation.create_all() + document.Document.create_all() + except elasticsearch.exceptions.RequestError as e: + if e.error.startswith('MergeMappingException'): + date = time.strftime('%Y-%m-%d') + log.fatal("Elasticsearch index mapping is incorrect! Please " + "reindex it. You can use reindex.py for this, e.g. " + "python reindex.py --host {0} {1} {1}-{2}".format( + es.host, + es.index, + date)) + raise @app.before_request def before_request(): From 02b4484ce0aa2a841b3b285d986f548c4c6dfd97 Mon Sep 17 00:00:00 2001 From: Gerben Date: Sun, 28 Dec 2014 00:22:55 +0100 Subject: [PATCH 2/7] Support custom analysers, add es.create_models To support custom analysers, it seems best to abandon model.create_all in favour of an index-wide method (es.create_models) that sets all mappings (and custom analysis settings) at once. Code is largely transplanted from https://github.com/hypothesis/h/pull/1825 Details: Custom analysers(&filters&tokenisers) are shared in Elasticsearch among all document types (models). To update a model's mappings one needs to make sure the custom analysers are defined first, but updating those just for one model is inelegant; The index would have to be closed for each update, and one cannot check for duplicate definitions of analysers as it is not visible which model defined a particular analyser. Treating index-wide settings as per-model settings creates leaky abstractions. --- annotator/annotation.py | 2 + annotator/elasticsearch.py | 111 ++++++++++++++++++++++++++++++------- run.py | 3 +- tests/__init__.py | 3 +- 4 files changed, 95 insertions(+), 24 deletions(-) diff --git a/annotator/annotation.py b/annotator/annotation.py index ff26913..be74904 100644 --- a/annotator/annotation.py +++ b/annotator/annotation.py @@ -35,11 +35,13 @@ } } +ANALYSIS = {} class Annotation(es.Model): __type__ = TYPE __mapping__ = MAPPING + __analysis__ = ANALYSIS def save(self, *args, **kwargs): _add_default_permissions(self) diff --git a/annotator/elasticsearch.py b/annotator/elasticsearch.py index 077b7d4..bb1c6b2 100644 --- a/annotator/elasticsearch.py +++ b/annotator/elasticsearch.py @@ -66,6 +66,65 @@ def conn(self): self._connection = self._connect() return self._connection + def create_models(self, models): + mappings = _compile_mappings(models) + analysis = _compile_analysis(models) + + # If it does not yet exist, simply create the index + try: + response = self.conn.indices.create(self.index, ignore=400, body={ + 'mappings': mappings, + 'settings': {'analysis': analysis}, + }) + return + except elasticsearch.exceptions.ConnectionError as e: + msg = ('Can not access ElasticSearch at {0}! ' + 'Check to ensure it is running.').format(self.host) + raise elasticsearch.exceptions.ConnectionError('N/A', msg, e) + + # Bad request (400) is ignored above, to prevent warnings in the log + # when the index already exists, but the failure could be for other + # reasons. If so, raise the error here. + if 'error' in response and 'IndexAlreadyExists' not in response['error']: + raise elasticsearch.exceptions.RequestError(400, response['error']) + + # Update the mappings of the existing index + self._update_analysis(analysis) + self._update_mappings(mappings) + + def _update_analysis(self, analysis): + """Update analyzers and filters""" + settings = self.conn.indices.get_settings(index=self.index) + existing = settings[self.index]['settings']['index']['analysis'] + # Only bother if new settings would differ from existing settings + if not self._analysis_up_to_date(existing, analysis): + try: + self.conn.indices.close(index=self.index) + self.conn.indices.put_settings(index=self.index, + body={'analysis': analysis}) + finally: + self.conn.indices.open(index=self.index) + + def _update_mappings(self, mappings): + """Update mappings. + + Warning: can explode because of a MergeMappingError when mappings are + incompatible""" + for doc_type, body in mappings.items(): + self.conn.indices.put_mapping( + index=self.index, + doc_type=doc_type, + body=body + ) + + @staticmethod + def _analysis_up_to_date(existing, analysis): + """Tell whether existing settings are up to date""" + new_analysis = existing.copy() + for section, items in analysis.items(): + new_analysis.setdefault(section,{}).update(items) + return new_analysis == existing + class _Model(dict): """Base class that represents a document type in an ElasticSearch index. @@ -74,7 +133,7 @@ class _Model(dict): __type__ -- The name of the document type __mapping__ -- A mapping of the document's fields - Mapping: Calling create_all() will create the mapping in the index. + Mapping: One field, 'id', is treated specially. Its value will not be stored, but be used as the _id identifier of the document in Elasticsearch. If an item is indexed without providing an id, the _id is automatically @@ -87,25 +146,6 @@ class _Model(dict): with 'analyzer':'standard'. """ - @classmethod - def create_all(cls): - log.info("Creating index '%s'." % cls.es.index) - conn = cls.es.conn - try: - conn.indices.create(cls.es.index) - except elasticsearch.exceptions.RequestError as e: - # Reraise anything that isn't just a notification that the index - # already exists (either as index or as an alias). - if not (e.error.startswith('IndexAlreadyExistsException') - or e.error.startswith('InvalidIndexNameException')): - log.fatal("Failed to create an Elasticsearch index") - raise - log.warn("Index creation failed as index appears to already exist.") - mapping = cls.get_mapping() - conn.indices.put_mapping(index=cls.es.index, - doc_type=cls.__type__, - body=mapping) - @classmethod def get_mapping(cls): return { @@ -121,6 +161,10 @@ def get_mapping(cls): } } + @classmethod + def get_analysis(cls): + return getattr(cls, '__analysis__', {}) + @classmethod def drop_all(cls): if cls.es.conn.indices.exists(cls.es.index): @@ -215,6 +259,33 @@ def make_model(es): return type('Model', (_Model,), {'es': es}) +def _compile_mappings(models): + """Collect the mappings from the models""" + mappings = {} + for model in models: + mappings.update(model.get_mapping()) + return mappings + + +def _compile_analysis(models): + """Merge the custom analyzers and such from the models""" + analysis = {} + for model in models: + for section, items in model.get_analysis().items(): + existing_items = analysis.setdefault(section, {}) + for name in items: + if name in existing_items: + fmt = "Duplicate definition of 'index.analysis.{}.{}'." + msg = fmt.format(section, name) + raise RuntimeError(msg) + existing_items.update(items) + return analysis + + +def _csv_split(s, delimiter=','): + return [r for r in csv.reader([s], delimiter=delimiter)][0] + + def _build_query(query, offset, limit): # Create a match query for each keyword match_clauses = [{'match': {k: v}} for k, v in iteritems(query)] diff --git a/run.py b/run.py index faa26e2..cdc68ee 100755 --- a/run.py +++ b/run.py @@ -61,8 +61,7 @@ def main(): es.authorization_enabled = app.config['AUTHZ_ON'] try: - annotation.Annotation.create_all() - document.Document.create_all() + es.create_models([annotation.Annotation, document.Document]) except elasticsearch.exceptions.RequestError as e: if e.error.startswith('MergeMappingException'): date = time.strftime('%Y-%m-%d') diff --git a/tests/__init__.py b/tests/__init__.py index 9b37303..ead7f44 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -34,8 +34,7 @@ def setup_class(cls): document.Document.drop_all() def setup(self): - annotation.Annotation.create_all() - document.Document.create_all() + es.create_models([annotation.Annotation, document.Document]) es.conn.cluster.health(wait_for_status='yellow') self.cli = self.app.test_client() From 1fe0464c6dd572b45272187dffee5bf91ec78572 Mon Sep 17 00:00:00 2001 From: Gerben Date: Sun, 28 Dec 2014 00:45:06 +0100 Subject: [PATCH 3/7] Move Model.drop_all -> es.drop_all The function deletes the whole index, not just one type of documents, so this makes much more sense. --- annotator/elasticsearch.py | 12 ++++++------ tests/__init__.py | 6 ++---- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/annotator/elasticsearch.py b/annotator/elasticsearch.py index bb1c6b2..7af63fe 100644 --- a/annotator/elasticsearch.py +++ b/annotator/elasticsearch.py @@ -66,6 +66,12 @@ def conn(self): self._connection = self._connect() return self._connection + def drop_all(self): + """Delete the index and its contents""" + if self.conn.indices.exists(self.index): + self.conn.indices.close(self.index) + self.conn.indices.delete(self.index) + def create_models(self, models): mappings = _compile_mappings(models) analysis = _compile_analysis(models) @@ -165,12 +171,6 @@ def get_mapping(cls): def get_analysis(cls): return getattr(cls, '__analysis__', {}) - @classmethod - def drop_all(cls): - if cls.es.conn.indices.exists(cls.es.index): - cls.es.conn.indices.close(cls.es.index) - cls.es.conn.indices.delete(cls.es.index) - # It would be lovely if this were called 'get', but the dict semantics # already define that method name. @classmethod diff --git a/tests/__init__.py b/tests/__init__.py index ead7f44..9c7c352 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -30,8 +30,7 @@ class TestCase(object): @classmethod def setup_class(cls): cls.app = create_app() - annotation.Annotation.drop_all() - document.Document.drop_all() + es.drop_all() def setup(self): es.create_models([annotation.Annotation, document.Document]) @@ -39,5 +38,4 @@ def setup(self): self.cli = self.app.test_client() def teardown(self): - annotation.Annotation.drop_all() - document.Document.drop_all() + es.drop_all() From 36ec42fcbc509c30cd8a61e18acc4ca0c4f66c1e Mon Sep 17 00:00:00 2001 From: Gerben Date: Tue, 30 Dec 2014 12:58:36 +0100 Subject: [PATCH 4/7] Test for index existence before creation Note that also when the index is an alias index_exists will be true. --- annotator/elasticsearch.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/annotator/elasticsearch.py b/annotator/elasticsearch.py index 7af63fe..7409f01 100644 --- a/annotator/elasticsearch.py +++ b/annotator/elasticsearch.py @@ -76,27 +76,24 @@ def create_models(self, models): mappings = _compile_mappings(models) analysis = _compile_analysis(models) - # If it does not yet exist, simply create the index + # Test for index existence while also checking if connection works try: - response = self.conn.indices.create(self.index, ignore=400, body={ - 'mappings': mappings, - 'settings': {'analysis': analysis}, - }) - return + index_exists = self.conn.indices.exists(self.index) except elasticsearch.exceptions.ConnectionError as e: msg = ('Can not access ElasticSearch at {0}! ' 'Check to ensure it is running.').format(self.host) raise elasticsearch.exceptions.ConnectionError('N/A', msg, e) - # Bad request (400) is ignored above, to prevent warnings in the log - # when the index already exists, but the failure could be for other - # reasons. If so, raise the error here. - if 'error' in response and 'IndexAlreadyExists' not in response['error']: - raise elasticsearch.exceptions.RequestError(400, response['error']) - - # Update the mappings of the existing index - self._update_analysis(analysis) - self._update_mappings(mappings) + if not index_exists: + # If index does not yet exist, simply create the index + self.conn.indices.create(self.index, body={ + 'mappings': mappings, + 'settings': {'analysis': analysis}, + }) + else: + # Otherwise, update its settings and mappings + self._update_analysis(analysis) + self._update_mappings(mappings) def _update_analysis(self, analysis): """Update analyzers and filters""" From a41ab8915d91ac061eed3e648aba17772bf8abfb Mon Sep 17 00:00:00 2001 From: Gerben Date: Tue, 30 Dec 2014 14:29:00 +0100 Subject: [PATCH 5/7] Take analysis settings out of models The approach where each model can define custom analysers did not match Elasticsearch's structure well, and created more complexity than it was worth. --- annotator/annotation.py | 3 --- annotator/elasticsearch.py | 28 ++++++---------------------- annotator/elasticsearch_analyzers.py | 9 +++++++++ annotator/reindexer.py | 6 +++++- run.py | 6 ++++-- tests/__init__.py | 6 ++++-- 6 files changed, 28 insertions(+), 30 deletions(-) create mode 100644 annotator/elasticsearch_analyzers.py diff --git a/annotator/annotation.py b/annotator/annotation.py index be74904..eec5e05 100644 --- a/annotator/annotation.py +++ b/annotator/annotation.py @@ -35,13 +35,10 @@ } } -ANALYSIS = {} - class Annotation(es.Model): __type__ = TYPE __mapping__ = MAPPING - __analysis__ = ANALYSIS def save(self, *args, **kwargs): _add_default_permissions(self) diff --git a/annotator/elasticsearch.py b/annotator/elasticsearch.py index 7409f01..e8a7d81 100644 --- a/annotator/elasticsearch.py +++ b/annotator/elasticsearch.py @@ -72,9 +72,8 @@ def drop_all(self): self.conn.indices.close(self.index) self.conn.indices.delete(self.index) - def create_models(self, models): + def create_models(self, models, analysis_settings): mappings = _compile_mappings(models) - analysis = _compile_analysis(models) # Test for index existence while also checking if connection works try: @@ -88,17 +87,17 @@ def create_models(self, models): # If index does not yet exist, simply create the index self.conn.indices.create(self.index, body={ 'mappings': mappings, - 'settings': {'analysis': analysis}, + 'settings': {'analysis': analysis_settings}, }) else: # Otherwise, update its settings and mappings - self._update_analysis(analysis) + self._update_analysis(analysis_settings) self._update_mappings(mappings) def _update_analysis(self, analysis): """Update analyzers and filters""" - settings = self.conn.indices.get_settings(index=self.index) - existing = settings[self.index]['settings']['index']['analysis'] + settings = self.conn.indices.get_settings(index=self.index).values()[0] + existing = settings['settings']['index'].get('analysis', {}) # Only bother if new settings would differ from existing settings if not self._analysis_up_to_date(existing, analysis): try: @@ -122,7 +121,7 @@ def _update_mappings(self, mappings): @staticmethod def _analysis_up_to_date(existing, analysis): - """Tell whether existing settings are up to date""" + """Tell whether existing analysis settings are up to date""" new_analysis = existing.copy() for section, items in analysis.items(): new_analysis.setdefault(section,{}).update(items) @@ -264,21 +263,6 @@ def _compile_mappings(models): return mappings -def _compile_analysis(models): - """Merge the custom analyzers and such from the models""" - analysis = {} - for model in models: - for section, items in model.get_analysis().items(): - existing_items = analysis.setdefault(section, {}) - for name in items: - if name in existing_items: - fmt = "Duplicate definition of 'index.analysis.{}.{}'." - msg = fmt.format(section, name) - raise RuntimeError(msg) - existing_items.update(items) - return analysis - - def _csv_split(s, delimiter=','): return [r for r in csv.reader([s], delimiter=delimiter)][0] diff --git a/annotator/elasticsearch_analyzers.py b/annotator/elasticsearch_analyzers.py new file mode 100644 index 0000000..5318a09 --- /dev/null +++ b/annotator/elasticsearch_analyzers.py @@ -0,0 +1,9 @@ +"""Custom Elasticsearch analyzers that can be used for indexing fields in + models (Annotation, Document). +""" + +ANALYSIS = { + 'analyzer': {}, + 'filter': {}, + 'tokenizer': {}, +} diff --git a/annotator/reindexer.py b/annotator/reindexer.py index b4283e1..162cedd 100644 --- a/annotator/reindexer.py +++ b/annotator/reindexer.py @@ -4,11 +4,13 @@ from .annotation import Annotation from .document import Document +from .elasticsearch_analyzers import ANALYSIS class Reindexer(object): es_models = Annotation, Document + analysis_settings = ANALYSIS def __init__(self, conn, interactive=False): self.conn = conn @@ -60,7 +62,9 @@ def alias(self, index, alias): def get_index_config(self): # Configure index mappings - index_config = {'mappings': {}} + index_config = {'mappings': {}, + 'settings': {'analysis': self.analysis_settings}, + } for model in self.es_models: index_config['mappings'].update(model.get_mapping()) return index_config diff --git a/run.py b/run.py index cdc68ee..795d62b 100755 --- a/run.py +++ b/run.py @@ -20,7 +20,8 @@ from flask import Flask, g, current_app import elasticsearch -from annotator import es, annotation, auth, authz, document, store +from annotator import es, annotation, auth, authz, document, \ + elasticsearch_analyzers, store from tests.helpers import MockUser, MockConsumer, MockAuthenticator from tests.helpers import mock_authorizer @@ -61,7 +62,8 @@ def main(): es.authorization_enabled = app.config['AUTHZ_ON'] try: - es.create_models([annotation.Annotation, document.Document]) + es.create_models(models=[annotation.Annotation, document.Document], + analysis_settings=elasticsearch_analyzers.ANALYSIS) except elasticsearch.exceptions.RequestError as e: if e.error.startswith('MergeMappingException'): date = time.strftime('%Y-%m-%d') diff --git a/tests/__init__.py b/tests/__init__.py index 9c7c352..9d060df 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,7 +1,8 @@ import os from flask import Flask, g, request -from annotator import es, auth, authz, annotation, store, document +from annotator import es, auth, authz, annotation, document, \ + elasticsearch_analyzers, store from .helpers import MockUser, MockConsumer @@ -33,7 +34,8 @@ def setup_class(cls): es.drop_all() def setup(self): - es.create_models([annotation.Annotation, document.Document]) + es.create_models(models=[annotation.Annotation, document.Document], + analysis_settings=elasticsearch_analyzers.ANALYSIS) es.conn.cluster.health(wait_for_status='yellow') self.cli = self.app.test_client() From bbf71eb81327f02ae8183c98b50b45654a51a732 Mon Sep 17 00:00:00 2001 From: Gerben Date: Tue, 30 Dec 2014 14:34:10 +0100 Subject: [PATCH 6/7] Rename create_models -> create_all --- annotator/elasticsearch.py | 2 +- run.py | 2 +- tests/__init__.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/annotator/elasticsearch.py b/annotator/elasticsearch.py index e8a7d81..d3a65b9 100644 --- a/annotator/elasticsearch.py +++ b/annotator/elasticsearch.py @@ -72,7 +72,7 @@ def drop_all(self): self.conn.indices.close(self.index) self.conn.indices.delete(self.index) - def create_models(self, models, analysis_settings): + def create_all(self, models, analysis_settings): mappings = _compile_mappings(models) # Test for index existence while also checking if connection works diff --git a/run.py b/run.py index 795d62b..7d92f88 100755 --- a/run.py +++ b/run.py @@ -62,7 +62,7 @@ def main(): es.authorization_enabled = app.config['AUTHZ_ON'] try: - es.create_models(models=[annotation.Annotation, document.Document], + es.create_all(models=[annotation.Annotation, document.Document], analysis_settings=elasticsearch_analyzers.ANALYSIS) except elasticsearch.exceptions.RequestError as e: if e.error.startswith('MergeMappingException'): diff --git a/tests/__init__.py b/tests/__init__.py index 9d060df..6522f42 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -34,7 +34,7 @@ def setup_class(cls): es.drop_all() def setup(self): - es.create_models(models=[annotation.Annotation, document.Document], + es.create_all(models=[annotation.Annotation, document.Document], analysis_settings=elasticsearch_analyzers.ANALYSIS) es.conn.cluster.health(wait_for_status='yellow') self.cli = self.app.test_client() From 458789c0c4c3ce2839c15a9b278ec1e083b2a2db Mon Sep 17 00:00:00 2001 From: Gerben Date: Tue, 30 Dec 2014 14:43:58 +0100 Subject: [PATCH 7/7] Update changelog --- CHANGES.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.rst b/CHANGES.rst index f074283..3267a9e 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,7 @@ Next release ============ +- Support custom analyzers in Elasticsearch - Fix bug '_csv_split not found' 0.13.2