diff --git a/README.md b/README.md index 99acc02e7..831494ca5 100644 --- a/README.md +++ b/README.md @@ -320,4 +320,23 @@ ElastAlert is licensed under the Apache License, Version 2.0: http://www.apache. ### Read the documentation at [Read the Docs](http://elastalert.readthedocs.org). +### Bigdata Boutique Additions +There is now a `must_not` field which behaves similarly to the filter field, only excludes hits. + +A cron scheduler is added. This works by running jobs based on a cron scheduler rather than +the default interval scheduling. To use this, instead of defining "run_every" in the rule, +use cron_schedule. See the below example. + +`cron_schedule: "0 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23 * * *"` + +Format is: +minute (0-59), hour (0-23), day of month (1-31), month (1-12), day of week (MON-SUN) + +Fields are separated by space, values separated by comma, * for all values + +notice that cron_schedule is currently only supported for use_count_query uses. + + ### Questions? Drop by #elastalert on Freenode IRC. +### BDB enhancements Questions? Contact us at info@bigdataboutique.com + diff --git a/config.yaml.example b/config.yaml.example index 9d9176382..572dbf5a6 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -53,6 +53,10 @@ es_port: 9200 #client_cert: /path/to/client_cert.pem #client_key: /path/to/client_key.key +# Add custom http_headers to the request to elasticsearch +#http_headers: +# Bearer: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c + # The index on es_host which is used for metadata storage # This can be a unmapped index, but it is recommended that you run # elastalert-create-index to set a mapping diff --git a/docs/source/ruletypes.rst b/docs/source/ruletypes.rst index ff3763712..e3731d170 100644 --- a/docs/source/ruletypes.rst +++ b/docs/source/ruletypes.rst @@ -553,9 +553,9 @@ The currently supported versions of Kibana Discover are: - `5.6` - `6.0`, `6.1`, `6.2`, `6.3`, `6.4`, `6.5`, `6.6`, `6.7`, `6.8` -- `7.0`, `7.1`, `7.2`, `7.3` +- `7.0`, `7.1`, `7.2`, `7.3`, `7.4`, `7.5`, `7.6`, `7.7`, `7.8` -``kibana_discover_version: '7.3'`` +``kibana_discover_version: '7.8'`` kibana_discover_index_pattern_id ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/elastalert/__init__.py b/elastalert/__init__.py index 55bfdb32f..6dc3f82e9 100644 --- a/elastalert/__init__.py +++ b/elastalert/__init__.py @@ -23,6 +23,7 @@ def __init__(self, conf): verify_certs=conf['verify_certs'], ca_certs=conf['ca_certs'], connection_class=RequestsHttpConnection, + headers=conf['http_headers'], http_auth=conf['http_auth'], timeout=conf['es_conn_timeout'], send_get_body_as=conf['send_get_body_as'], diff --git a/elastalert/alerts.py b/elastalert/alerts.py index f2f31853f..51b55fdf5 100644 --- a/elastalert/alerts.py +++ b/elastalert/alerts.py @@ -1962,9 +1962,12 @@ def __init__(self, rule): def alert(self, matches): """ Each match will trigger a POST to the specified endpoint(s). """ + alert_subject = self.create_custom_title(matches) for match in matches: payload = match if self.post_all_values else {} payload.update(self.post_static_payload) + payload["alert_subject"] = alert_subject + payload["alert_text"] = str(BasicMatchString(self.rule, match)) for post_key, es_key in list(self.post_payload.items()): payload[post_key] = lookup_es_key(match, es_key) headers = { diff --git a/elastalert/create_index.py b/elastalert/create_index.py index a0858da70..639b3f8fb 100644 --- a/elastalert/create_index.py +++ b/elastalert/create_index.py @@ -204,6 +204,7 @@ def main(): send_get_body_as = data.get('send_get_body_as', 'GET') ca_certs = data.get('ca_certs') client_cert = data.get('client_cert') + http_headers = data.get('http_headers') client_key = data.get('client_key') index = args.index if args.index is not None else data.get('writeback_index') alias = args.alias if args.alias is not None else data.get('writeback_alias') @@ -229,6 +230,7 @@ def main(): send_get_body_as = args.send_get_body_as ca_certs = None client_cert = None + http_headers = None client_key = None index = args.index if args.index is not None else input('New index name? (Default elastalert_status) ') if not index: @@ -254,6 +256,7 @@ def main(): use_ssl=use_ssl, verify_certs=verify_certs, connection_class=RequestsHttpConnection, + headers=http_headers, http_auth=http_auth, url_prefix=url_prefix, send_get_body_as=send_get_body_as, diff --git a/elastalert/elastalert.py b/elastalert/elastalert.py index b078c86db..5f54b115b 100755 --- a/elastalert/elastalert.py +++ b/elastalert/elastalert.py @@ -20,6 +20,7 @@ import dateutil.tz import pytz from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger from croniter import croniter from elasticsearch.exceptions import ConnectionError from elasticsearch.exceptions import ElasticsearchException @@ -198,7 +199,7 @@ def get_index(rule, starttime=None, endtime=None): @staticmethod def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field='@timestamp', to_ts_func=dt_to_ts, desc=False, - five=False): + five=False, must_not=[]): """ Returns a query dict that will apply a list of filters, filter by start and end time, and sort results by timestamp. @@ -211,14 +212,15 @@ def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field= starttime = to_ts_func(starttime) endtime = to_ts_func(endtime) filters = copy.copy(filters) - es_filters = {'filter': {'bool': {'must': filters}}} + es_filters = {'filter': {'bool': {'filter': filters, 'must_not': must_not}}} if starttime and endtime: - es_filters['filter']['bool']['must'].insert(0, {'range': {timestamp_field: {'gt': starttime, - 'lte': endtime}}}) + es_filters['filter']['bool']['filter'].insert(0, {'range': {timestamp_field: {'gt': starttime, + 'lte': endtime}}}) if five: query = {'query': {'bool': es_filters}} else: - query = {'query': {'filtered': es_filters}} + raise Exception('Unsupported operation - the five parameter is not supported') + if sort: query['sort'] = [{timestamp_field: {'order': 'desc' if desc else 'asc'}}] return query @@ -358,7 +360,7 @@ def get_hits(self, rule, starttime, endtime, index, scroll=False): endtime, timestamp_field=rule['timestamp_field'], to_ts_func=rule['dt_to_ts'], - five=rule['five'], + five=rule['five'], must_not=rule['must_not'], ) if self.thread_data.current_es.is_atleastsixsix(): extra_args = {'_source_includes': rule['include']} @@ -448,7 +450,7 @@ def get_hits_count(self, rule, starttime, endtime, index): timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'], - five=rule['five'] + five=rule['five'], must_not=rule['must_not'], ) try: @@ -499,7 +501,7 @@ def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=Non timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'], - five=rule['five'] + five=rule['five'], must_not=rule['must_not'], ) if size is None: size = rule.get('terms_size', 50) @@ -547,7 +549,7 @@ def get_hits_aggregation(self, rule, starttime, endtime, index, query_key, term_ timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'], - five=rule['five'] + five=rule['five'], must_not=rule['must_not'], ) if term_size is None: term_size = rule.get('terms_size', 50) @@ -719,7 +721,7 @@ def set_starttime(self, rule, endtime): # Use buffer for normal queries, or run_every increments otherwise # or, if scan_entire_timeframe, use timeframe - + #TODO handle cron interval for non "use_count_query" case if not rule.get('use_count_query') and not rule.get('use_terms_query'): if not rule.get('scan_entire_timeframe'): buffer_time = rule.get('buffer_time', self.buffer_time) @@ -739,11 +741,14 @@ def set_starttime(self, rule, endtime): self.adjust_start_time_for_interval_sync(rule, endtime) else: - if not rule.get('scan_entire_timeframe'): - # Query from the end of the last run, if it exists, otherwise a run_every sized window - rule['starttime'] = rule.get('previous_endtime', endtime - self.run_every) + if not rule.get('cron_schedule'): + if not rule.get('scan_entire_timeframe'): + # Query from the end of the last run, if it exists, otherwise a run_every sized window + rule['starttime'] = rule.get('previous_endtime', endtime - self.run_every) + else: + rule['starttime'] = rule.get('previous_endtime', endtime - rule['timeframe']) else: - rule['starttime'] = rule.get('previous_endtime', endtime - rule['timeframe']) + rule['starttime'] = endtime - rule['timeframe'] def adjust_start_time_for_overlapping_agg_query(self, rule): if rule.get('aggregation_query_element'): @@ -871,15 +876,16 @@ def run_rule(self, rule, endtime, starttime=None): rule['original_starttime'] = rule['starttime'] rule['scrolling_cycle'] = 0 + self.thread_data.num_hits = 0 + self.thread_data.num_dupes = 0 + self.thread_data.cumulative_hits = 0 + # Don't run if starttime was set to the future if ts_now() <= rule['starttime']: logging.warning("Attempted to use query start time in the future (%s), sleeping instead" % (starttime)) return 0 # Run the rule. If querying over a large time period, split it up into segments - self.thread_data.num_hits = 0 - self.thread_data.num_dupes = 0 - self.thread_data.cumulative_hits = 0 segment_size = self.get_segment_size(rule) tmp_endtime = rule['starttime'] @@ -1030,13 +1036,22 @@ def init_rule(self, new_rule, new=True): continue new_rule[prop] = rule[prop] - job = self.scheduler.add_job(self.handle_rule_execution, 'interval', - args=[new_rule], - seconds=new_rule['run_every'].total_seconds(), - id=new_rule['name'], - max_instances=1, - jitter=5) - job.modify(next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=random.randint(0, 15))) + cron_schedule = new_rule.get('cron_schedule') + # Cron trigger - shaig 16.3 + if cron_schedule is not None: + job = self.scheduler.add_job(self.handle_rule_execution, CronTrigger.from_crontab(cron_schedule), + args=[new_rule], + id=new_rule['name'], + max_instances=1, + jitter=5) + else: + job = self.scheduler.add_job(self.handle_rule_execution, 'interval', + args=[new_rule], + seconds=new_rule['run_every'].total_seconds(), + id=new_rule['name'], + max_instances=1, + jitter=5) + job.modify(next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=random.randint(0, 15))) return new_rule diff --git a/elastalert/enhancements.py b/elastalert/enhancements.py index 6cc1cdd57..e70570b1d 100644 --- a/elastalert/enhancements.py +++ b/elastalert/enhancements.py @@ -1,4 +1,6 @@ # -*- coding: utf-8 -*- +import datetime + from .util import pretty_ts @@ -23,3 +25,25 @@ def process(self, match): class DropMatchException(Exception): """ ElastAlert will drop a match if this exception type is raised by an enhancement """ pass + + +class CopyFullRuleEnhancement(BaseEnhancement): + # The enhancement is run against every match + # The match is passed to the process function where it can be modified in any way + # ElastAlert will do this for each enhancement linked to a rule + def process(self, match): + rule_copy = dict(self.rule) + rule_copy.pop("type") + rule_copy.pop("match_enhancements") + rule_copy.pop("alert") + keys = [] + for key in list(rule_copy.keys()): + if isinstance(rule_copy[key], datetime.timedelta) or callable(rule_copy[key]): + keys.append(key) + rule_copy["type"] = type(self.rule["type"]).__name__ + match["rule"] = rule_copy + + +class CopyRuleTypeEnhancement(BaseEnhancement): + def process(self, match): + match["rule_type"] = type(self.rule["type"]).__name__ diff --git a/elastalert/kibana_discover.py b/elastalert/kibana_discover.py index 7e4dbb5d1..b332facce 100644 --- a/elastalert/kibana_discover.py +++ b/elastalert/kibana_discover.py @@ -14,7 +14,7 @@ kibana_default_timedelta = datetime.timedelta(minutes=10) kibana5_kibana6_versions = frozenset(['5.6', '6.0', '6.1', '6.2', '6.3', '6.4', '6.5', '6.6', '6.7', '6.8']) -kibana7_versions = frozenset(['7.0', '7.1', '7.2', '7.3']) +kibana7_versions = frozenset(['7.0', '7.1', '7.2', '7.3', '7.4', '7.5', '7.6', '7.7', '7.8', '7.9']) def generate_kibana_discover_url(rule, match): ''' Creates a link for a kibana discover app. ''' diff --git a/elastalert/loaders.py b/elastalert/loaders.py index 771194768..5db008f90 100644 --- a/elastalert/loaders.py +++ b/elastalert/loaders.py @@ -199,6 +199,8 @@ def load_yaml(self, filename): # Special case for merging filters - if both files specify a filter merge (AND) them if 'filter' in rule and 'filter' in loaded: rule['filter'] = loaded['filter'] + rule['filter'] + if 'must_not' in rule and 'must_not' in loaded: + rule['must_not'] = loaded['must_not'] + rule['must_not'] loaded.update(rule) rule = loaded @@ -274,6 +276,7 @@ def load_options(self, rule, conf, filename, args=None): rule.setdefault('query_delay', datetime.timedelta(seconds=0)) rule.setdefault('timestamp_field', '@timestamp') rule.setdefault('filter', []) + rule.setdefault('must_not', []) rule.setdefault('timestamp_type', 'iso') rule.setdefault('timestamp_format', '%Y-%m-%dT%H:%M:%SZ') rule.setdefault('_source_enabled', True) @@ -315,6 +318,10 @@ def _dt_to_ts_with_format(dt): rule.setdefault('client_cert', conf.get('client_cert')) rule.setdefault('client_key', conf.get('client_key')) + # Add supoprt for custom http headers + if 'http_headers' in conf: + rule.setdefault('http_headers', conf.get('http_headers')) + # Set HipChat options from global config rule.setdefault('hipchat_msg_color', 'red') rule.setdefault('hipchat_domain', 'api.hipchat.com') @@ -366,6 +373,7 @@ def _dt_to_ts_with_format(dt): rule['include'] = list(set(include)) # Check that generate_kibana_url is compatible with the filters + # TODO must_not support ? if rule.get('generate_kibana_link'): for es_filter in rule.get('filter'): if es_filter: diff --git a/elastalert/ruletypes.py b/elastalert/ruletypes.py index 2f1d2f82c..00ac4c621 100644 --- a/elastalert/ruletypes.py +++ b/elastalert/ruletypes.py @@ -318,12 +318,14 @@ def append(self, event): This will also pop the oldest events and call onRemoved on them until the window size is less than timeframe. """ self.data.add(event) - self.running_count += event[1] + if event and event[1]: + self.running_count += event[1] while self.duration() >= self.timeframe: oldest = self.data[0] self.data.remove(oldest) - self.running_count -= oldest[1] + if oldest and oldest[1]: + self.running_count -= oldest[1] self.onRemoved and self.onRemoved(oldest) def duration(self): @@ -363,7 +365,8 @@ def append_middle(self, event): # Append left if ts is earlier than first event if self.get_ts(self.data[0]) > ts: self.data.appendleft(event) - self.running_count += event[1] + if event and event[1]: + self.running_count += event[1] return # Rotate window until we can insert event @@ -374,7 +377,8 @@ def append_middle(self, event): # This should never happen return self.data.append(event) - self.running_count += event[1] + if event and event[1]: + self.running_count += event[1] self.data.rotate(-rotation) diff --git a/elastalert/schema.yaml b/elastalert/schema.yaml index 1241315dc..10cf4ebcd 100644 --- a/elastalert/schema.yaml +++ b/elastalert/schema.yaml @@ -217,7 +217,7 @@ properties: ### Kibana Discover App Link generate_kibana_discover_url: {type: boolean} kibana_discover_app_url: {type: string, format: uri} - kibana_discover_version: {type: string, enum: ['7.3', '7.2', '7.1', '7.0', '6.8', '6.7', '6.6', '6.5', '6.4', '6.3', '6.2', '6.1', '6.0', '5.6']} + kibana_discover_version: {type: string, enum: ['7.9', '7.8', '7.7', '7.6', '7.5', '7.4', '7.3', '7.2', '7.1', '7.0', '6.8', '6.7', '6.6', '6.5', '6.4', '6.3', '6.2', '6.1', '6.0', '5.6']} kibana_discover_index_pattern_id: {type: string, minLength: 1} kibana_discover_columns: {type: array, items: {type: string, minLength: 1}, minItems: 1} kibana_discover_from_timedelta: *timedelta diff --git a/elastalert/test_rule.py b/elastalert/test_rule.py index 06100aa0f..61c9707e8 100644 --- a/elastalert/test_rule.py +++ b/elastalert/test_rule.py @@ -77,7 +77,7 @@ def test_file(self, conf, args): endtime=end_time, timestamp_field=ts, to_ts_func=conf['dt_to_ts'], - five=conf['five'] + five=conf['five'], must_not=conf['must_not'], ) index = ElastAlerter.get_index(conf, start_time, end_time) @@ -106,7 +106,7 @@ def test_file(self, conf, args): timestamp_field=ts, to_ts_func=conf['dt_to_ts'], sort=False, - five=conf['five'] + five=conf['five'], must_not=conf['must_not'], ) try: res = es_client.count(index, doc_type=doc_type, body=count_query, ignore_unavailable=True) diff --git a/elastalert/util.py b/elastalert/util.py index bbb0600ff..426377115 100644 --- a/elastalert/util.py +++ b/elastalert/util.py @@ -338,6 +338,7 @@ def build_es_conn_config(conf): parsed_conf['ca_certs'] = None parsed_conf['client_cert'] = None parsed_conf['client_key'] = None + parsed_conf['http_headers'] = None parsed_conf['http_auth'] = None parsed_conf['es_username'] = None parsed_conf['es_password'] = None @@ -379,6 +380,9 @@ def build_es_conn_config(conf): if 'client_cert' in conf: parsed_conf['client_cert'] = conf['client_cert'] + if 'http_headers' in conf: + parsed_conf['http_headers'] = conf['http_headers'] + if 'client_key' in conf: parsed_conf['client_key'] = conf['client_key'] diff --git a/tests/kibana_discover_test.py b/tests/kibana_discover_test.py index f06fe4e0c..ae5c8bca1 100644 --- a/tests/kibana_discover_test.py +++ b/tests/kibana_discover_test.py @@ -38,7 +38,7 @@ def test_generate_kibana_discover_url_with_kibana_5x_and_6x(kibana_version): assert url == expectedUrl -@pytest.mark.parametrize("kibana_version", ['7.0', '7.1', '7.2', '7.3']) +@pytest.mark.parametrize("kibana_version", ['7.0', '7.1', '7.2', '7.3', '7.4', '7.5', '7.6', '7.7', '7.8', '7.9']) def test_generate_kibana_discover_url_with_kibana_7x(kibana_version): url = generate_kibana_discover_url( rule={ @@ -171,7 +171,7 @@ def test_generate_kibana_discover_url_with_from_timedelta(): url = generate_kibana_discover_url( rule={ 'kibana_discover_app_url': 'http://kibana:5601/#/discover', - 'kibana_discover_version': '7.3', + 'kibana_discover_version': '7.9', 'kibana_discover_index_pattern_id': 'd6cabfb6-aaef-44ea-89c5-600e9a76991a', 'kibana_discover_from_timedelta': timedelta(hours=1), 'timestamp_field': 'timestamp' @@ -204,7 +204,7 @@ def test_generate_kibana_discover_url_with_from_timedelta_and_timeframe(): url = generate_kibana_discover_url( rule={ 'kibana_discover_app_url': 'http://kibana:5601/#/discover', - 'kibana_discover_version': '7.3', + 'kibana_discover_version': '7.9', 'kibana_discover_index_pattern_id': 'd6cabfb6-aaef-44ea-89c5-600e9a76991a', 'kibana_discover_from_timedelta': timedelta(hours=1), 'timeframe': timedelta(minutes=20), @@ -238,7 +238,7 @@ def test_generate_kibana_discover_url_with_to_timedelta(): url = generate_kibana_discover_url( rule={ 'kibana_discover_app_url': 'http://kibana:5601/#/discover', - 'kibana_discover_version': '7.3', + 'kibana_discover_version': '7.9', 'kibana_discover_index_pattern_id': 'd6cabfb6-aaef-44ea-89c5-600e9a76991a', 'kibana_discover_to_timedelta': timedelta(hours=1), 'timestamp_field': 'timestamp' @@ -271,7 +271,7 @@ def test_generate_kibana_discover_url_with_to_timedelta_and_timeframe(): url = generate_kibana_discover_url( rule={ 'kibana_discover_app_url': 'http://kibana:5601/#/discover', - 'kibana_discover_version': '7.3', + 'kibana_discover_version': '7.9', 'kibana_discover_index_pattern_id': 'd6cabfb6-aaef-44ea-89c5-600e9a76991a', 'kibana_discover_to_timedelta': timedelta(hours=1), 'timeframe': timedelta(minutes=20), @@ -305,7 +305,7 @@ def test_generate_kibana_discover_url_with_timeframe(): url = generate_kibana_discover_url( rule={ 'kibana_discover_app_url': 'http://kibana:5601/#/discover', - 'kibana_discover_version': '7.3', + 'kibana_discover_version': '7.9', 'kibana_discover_index_pattern_id': 'd6cabfb6-aaef-44ea-89c5-600e9a76991a', 'timeframe': timedelta(minutes=20), 'timestamp_field': 'timestamp'