Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,4 +320,23 @@ ElastAlert is licensed under the Apache License, Version 2.0: http://www.apache.

### Read the documentation at [Read the Docs](http://elastalert.readthedocs.org).

### Bigdata Boutique Additions
There is now a `must_not` field which behaves similarly to the filter field, only excludes hits.

A cron scheduler is added. This works by running jobs based on a cron scheduler rather than
the default interval scheduling. To use this, instead of defining "run_every" in the rule,
use cron_schedule. See the below example.

`cron_schedule: "0 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23 * * *"`

Format is:
minute (0-59), hour (0-23), day of month (1-31), month (1-12), day of week (MON-SUN)

Fields are separated by space, values separated by comma, * for all values

notice that cron_schedule is currently only supported for use_count_query uses.


### Questions? Drop by #elastalert on Freenode IRC.
### BDB enhancements Questions? Contact us at info@bigdataboutique.com

4 changes: 4 additions & 0 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ es_port: 9200
#client_cert: /path/to/client_cert.pem
#client_key: /path/to/client_key.key

# Add custom http_headers to the request to elasticsearch
#http_headers:
# Bearer: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

# The index on es_host which is used for metadata storage
# This can be a unmapped index, but it is recommended that you run
# elastalert-create-index to set a mapping
Expand Down
4 changes: 2 additions & 2 deletions docs/source/ruletypes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,9 @@ The currently supported versions of Kibana Discover are:

- `5.6`
- `6.0`, `6.1`, `6.2`, `6.3`, `6.4`, `6.5`, `6.6`, `6.7`, `6.8`
- `7.0`, `7.1`, `7.2`, `7.3`
- `7.0`, `7.1`, `7.2`, `7.3`, `7.4`, `7.5`, `7.6`, `7.7`, `7.8`

``kibana_discover_version: '7.3'``
``kibana_discover_version: '7.8'``

kibana_discover_index_pattern_id
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
1 change: 1 addition & 0 deletions elastalert/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(self, conf):
verify_certs=conf['verify_certs'],
ca_certs=conf['ca_certs'],
connection_class=RequestsHttpConnection,
headers=conf['http_headers'],
http_auth=conf['http_auth'],
timeout=conf['es_conn_timeout'],
send_get_body_as=conf['send_get_body_as'],
Expand Down
3 changes: 3 additions & 0 deletions elastalert/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -1962,9 +1962,12 @@ def __init__(self, rule):

def alert(self, matches):
""" Each match will trigger a POST to the specified endpoint(s). """
alert_subject = self.create_custom_title(matches)
for match in matches:
payload = match if self.post_all_values else {}
payload.update(self.post_static_payload)
payload["alert_subject"] = alert_subject
payload["alert_text"] = str(BasicMatchString(self.rule, match))
for post_key, es_key in list(self.post_payload.items()):
payload[post_key] = lookup_es_key(match, es_key)
headers = {
Expand Down
3 changes: 3 additions & 0 deletions elastalert/create_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ def main():
send_get_body_as = data.get('send_get_body_as', 'GET')
ca_certs = data.get('ca_certs')
client_cert = data.get('client_cert')
http_headers = data.get('http_headers')
client_key = data.get('client_key')
index = args.index if args.index is not None else data.get('writeback_index')
alias = args.alias if args.alias is not None else data.get('writeback_alias')
Expand All @@ -229,6 +230,7 @@ def main():
send_get_body_as = args.send_get_body_as
ca_certs = None
client_cert = None
http_headers = None
client_key = None
index = args.index if args.index is not None else input('New index name? (Default elastalert_status) ')
if not index:
Expand All @@ -254,6 +256,7 @@ def main():
use_ssl=use_ssl,
verify_certs=verify_certs,
connection_class=RequestsHttpConnection,
headers=http_headers,
http_auth=http_auth,
url_prefix=url_prefix,
send_get_body_as=send_get_body_as,
Expand Down
63 changes: 39 additions & 24 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import dateutil.tz
import pytz
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from croniter import croniter
from elasticsearch.exceptions import ConnectionError
from elasticsearch.exceptions import ElasticsearchException
Expand Down Expand Up @@ -198,7 +199,7 @@ def get_index(rule, starttime=None, endtime=None):

@staticmethod
def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field='@timestamp', to_ts_func=dt_to_ts, desc=False,
five=False):
five=False, must_not=[]):
""" Returns a query dict that will apply a list of filters, filter by
start and end time, and sort results by timestamp.

Expand All @@ -211,14 +212,15 @@ def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field=
starttime = to_ts_func(starttime)
endtime = to_ts_func(endtime)
filters = copy.copy(filters)
es_filters = {'filter': {'bool': {'must': filters}}}
es_filters = {'filter': {'bool': {'filter': filters, 'must_not': must_not}}}
if starttime and endtime:
es_filters['filter']['bool']['must'].insert(0, {'range': {timestamp_field: {'gt': starttime,
'lte': endtime}}})
es_filters['filter']['bool']['filter'].insert(0, {'range': {timestamp_field: {'gt': starttime,
'lte': endtime}}})
if five:
query = {'query': {'bool': es_filters}}
else:
query = {'query': {'filtered': es_filters}}
raise Exception('Unsupported operation - the five parameter is not supported')

if sort:
query['sort'] = [{timestamp_field: {'order': 'desc' if desc else 'asc'}}]
return query
Expand Down Expand Up @@ -358,7 +360,7 @@ def get_hits(self, rule, starttime, endtime, index, scroll=False):
endtime,
timestamp_field=rule['timestamp_field'],
to_ts_func=rule['dt_to_ts'],
five=rule['five'],
five=rule['five'], must_not=rule['must_not'],
)
if self.thread_data.current_es.is_atleastsixsix():
extra_args = {'_source_includes': rule['include']}
Expand Down Expand Up @@ -448,7 +450,7 @@ def get_hits_count(self, rule, starttime, endtime, index):
timestamp_field=rule['timestamp_field'],
sort=False,
to_ts_func=rule['dt_to_ts'],
five=rule['five']
five=rule['five'], must_not=rule['must_not'],
)

try:
Expand Down Expand Up @@ -499,7 +501,7 @@ def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=Non
timestamp_field=rule['timestamp_field'],
sort=False,
to_ts_func=rule['dt_to_ts'],
five=rule['five']
five=rule['five'], must_not=rule['must_not'],
)
if size is None:
size = rule.get('terms_size', 50)
Expand Down Expand Up @@ -547,7 +549,7 @@ def get_hits_aggregation(self, rule, starttime, endtime, index, query_key, term_
timestamp_field=rule['timestamp_field'],
sort=False,
to_ts_func=rule['dt_to_ts'],
five=rule['five']
five=rule['five'], must_not=rule['must_not'],
)
if term_size is None:
term_size = rule.get('terms_size', 50)
Expand Down Expand Up @@ -719,7 +721,7 @@ def set_starttime(self, rule, endtime):

# Use buffer for normal queries, or run_every increments otherwise
# or, if scan_entire_timeframe, use timeframe

#TODO handle cron interval for non "use_count_query" case
if not rule.get('use_count_query') and not rule.get('use_terms_query'):
if not rule.get('scan_entire_timeframe'):
buffer_time = rule.get('buffer_time', self.buffer_time)
Expand All @@ -739,11 +741,14 @@ def set_starttime(self, rule, endtime):
self.adjust_start_time_for_interval_sync(rule, endtime)

else:
if not rule.get('scan_entire_timeframe'):
# Query from the end of the last run, if it exists, otherwise a run_every sized window
rule['starttime'] = rule.get('previous_endtime', endtime - self.run_every)
if not rule.get('cron_schedule'):
if not rule.get('scan_entire_timeframe'):
# Query from the end of the last run, if it exists, otherwise a run_every sized window
rule['starttime'] = rule.get('previous_endtime', endtime - self.run_every)
else:
rule['starttime'] = rule.get('previous_endtime', endtime - rule['timeframe'])
else:
rule['starttime'] = rule.get('previous_endtime', endtime - rule['timeframe'])
rule['starttime'] = endtime - rule['timeframe']

def adjust_start_time_for_overlapping_agg_query(self, rule):
if rule.get('aggregation_query_element'):
Expand Down Expand Up @@ -871,15 +876,16 @@ def run_rule(self, rule, endtime, starttime=None):
rule['original_starttime'] = rule['starttime']
rule['scrolling_cycle'] = 0

self.thread_data.num_hits = 0
self.thread_data.num_dupes = 0
self.thread_data.cumulative_hits = 0

# Don't run if starttime was set to the future
if ts_now() <= rule['starttime']:
logging.warning("Attempted to use query start time in the future (%s), sleeping instead" % (starttime))
return 0

# Run the rule. If querying over a large time period, split it up into segments
self.thread_data.num_hits = 0
self.thread_data.num_dupes = 0
self.thread_data.cumulative_hits = 0
segment_size = self.get_segment_size(rule)

tmp_endtime = rule['starttime']
Expand Down Expand Up @@ -1030,13 +1036,22 @@ def init_rule(self, new_rule, new=True):
continue
new_rule[prop] = rule[prop]

job = self.scheduler.add_job(self.handle_rule_execution, 'interval',
args=[new_rule],
seconds=new_rule['run_every'].total_seconds(),
id=new_rule['name'],
max_instances=1,
jitter=5)
job.modify(next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=random.randint(0, 15)))
cron_schedule = new_rule.get('cron_schedule')
# Cron trigger - shaig 16.3
if cron_schedule is not None:
job = self.scheduler.add_job(self.handle_rule_execution, CronTrigger.from_crontab(cron_schedule),
args=[new_rule],
id=new_rule['name'],
max_instances=1,
jitter=5)
else:
job = self.scheduler.add_job(self.handle_rule_execution, 'interval',
args=[new_rule],
seconds=new_rule['run_every'].total_seconds(),
id=new_rule['name'],
max_instances=1,
jitter=5)
job.modify(next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=random.randint(0, 15)))

return new_rule

Expand Down
24 changes: 24 additions & 0 deletions elastalert/enhancements.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# -*- coding: utf-8 -*-
import datetime

from .util import pretty_ts


Expand All @@ -23,3 +25,25 @@ def process(self, match):
class DropMatchException(Exception):
""" ElastAlert will drop a match if this exception type is raised by an enhancement """
pass


class CopyFullRuleEnhancement(BaseEnhancement):
# The enhancement is run against every match
# The match is passed to the process function where it can be modified in any way
# ElastAlert will do this for each enhancement linked to a rule
def process(self, match):
rule_copy = dict(self.rule)
rule_copy.pop("type")
rule_copy.pop("match_enhancements")
rule_copy.pop("alert")
keys = []
for key in list(rule_copy.keys()):
if isinstance(rule_copy[key], datetime.timedelta) or callable(rule_copy[key]):
keys.append(key)
rule_copy["type"] = type(self.rule["type"]).__name__
match["rule"] = rule_copy


class CopyRuleTypeEnhancement(BaseEnhancement):
def process(self, match):
match["rule_type"] = type(self.rule["type"]).__name__
2 changes: 1 addition & 1 deletion elastalert/kibana_discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
kibana_default_timedelta = datetime.timedelta(minutes=10)

kibana5_kibana6_versions = frozenset(['5.6', '6.0', '6.1', '6.2', '6.3', '6.4', '6.5', '6.6', '6.7', '6.8'])
kibana7_versions = frozenset(['7.0', '7.1', '7.2', '7.3'])
kibana7_versions = frozenset(['7.0', '7.1', '7.2', '7.3', '7.4', '7.5', '7.6', '7.7', '7.8', '7.9'])

def generate_kibana_discover_url(rule, match):
''' Creates a link for a kibana discover app. '''
Expand Down
8 changes: 8 additions & 0 deletions elastalert/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ def load_yaml(self, filename):
# Special case for merging filters - if both files specify a filter merge (AND) them
if 'filter' in rule and 'filter' in loaded:
rule['filter'] = loaded['filter'] + rule['filter']
if 'must_not' in rule and 'must_not' in loaded:
rule['must_not'] = loaded['must_not'] + rule['must_not']

loaded.update(rule)
rule = loaded
Expand Down Expand Up @@ -274,6 +276,7 @@ def load_options(self, rule, conf, filename, args=None):
rule.setdefault('query_delay', datetime.timedelta(seconds=0))
rule.setdefault('timestamp_field', '@timestamp')
rule.setdefault('filter', [])
rule.setdefault('must_not', [])
rule.setdefault('timestamp_type', 'iso')
rule.setdefault('timestamp_format', '%Y-%m-%dT%H:%M:%SZ')
rule.setdefault('_source_enabled', True)
Expand Down Expand Up @@ -315,6 +318,10 @@ def _dt_to_ts_with_format(dt):
rule.setdefault('client_cert', conf.get('client_cert'))
rule.setdefault('client_key', conf.get('client_key'))

# Add supoprt for custom http headers
if 'http_headers' in conf:
rule.setdefault('http_headers', conf.get('http_headers'))

# Set HipChat options from global config
rule.setdefault('hipchat_msg_color', 'red')
rule.setdefault('hipchat_domain', 'api.hipchat.com')
Expand Down Expand Up @@ -366,6 +373,7 @@ def _dt_to_ts_with_format(dt):
rule['include'] = list(set(include))

# Check that generate_kibana_url is compatible with the filters
# TODO must_not support ?
if rule.get('generate_kibana_link'):
for es_filter in rule.get('filter'):
if es_filter:
Expand Down
12 changes: 8 additions & 4 deletions elastalert/ruletypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,14 @@ def append(self, event):
This will also pop the oldest events and call onRemoved on them until the
window size is less than timeframe. """
self.data.add(event)
self.running_count += event[1]
if event and event[1]:
self.running_count += event[1]

while self.duration() >= self.timeframe:
oldest = self.data[0]
self.data.remove(oldest)
self.running_count -= oldest[1]
if oldest and oldest[1]:
self.running_count -= oldest[1]
self.onRemoved and self.onRemoved(oldest)

def duration(self):
Expand Down Expand Up @@ -363,7 +365,8 @@ def append_middle(self, event):
# Append left if ts is earlier than first event
if self.get_ts(self.data[0]) > ts:
self.data.appendleft(event)
self.running_count += event[1]
if event and event[1]:
self.running_count += event[1]
return

# Rotate window until we can insert event
Expand All @@ -374,7 +377,8 @@ def append_middle(self, event):
# This should never happen
return
self.data.append(event)
self.running_count += event[1]
if event and event[1]:
self.running_count += event[1]
self.data.rotate(-rotation)


Expand Down
2 changes: 1 addition & 1 deletion elastalert/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ properties:
### Kibana Discover App Link
generate_kibana_discover_url: {type: boolean}
kibana_discover_app_url: {type: string, format: uri}
kibana_discover_version: {type: string, enum: ['7.3', '7.2', '7.1', '7.0', '6.8', '6.7', '6.6', '6.5', '6.4', '6.3', '6.2', '6.1', '6.0', '5.6']}
kibana_discover_version: {type: string, enum: ['7.9', '7.8', '7.7', '7.6', '7.5', '7.4', '7.3', '7.2', '7.1', '7.0', '6.8', '6.7', '6.6', '6.5', '6.4', '6.3', '6.2', '6.1', '6.0', '5.6']}
kibana_discover_index_pattern_id: {type: string, minLength: 1}
kibana_discover_columns: {type: array, items: {type: string, minLength: 1}, minItems: 1}
kibana_discover_from_timedelta: *timedelta
Expand Down
4 changes: 2 additions & 2 deletions elastalert/test_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_file(self, conf, args):
endtime=end_time,
timestamp_field=ts,
to_ts_func=conf['dt_to_ts'],
five=conf['five']
five=conf['five'], must_not=conf['must_not'],
)
index = ElastAlerter.get_index(conf, start_time, end_time)

Expand Down Expand Up @@ -106,7 +106,7 @@ def test_file(self, conf, args):
timestamp_field=ts,
to_ts_func=conf['dt_to_ts'],
sort=False,
five=conf['five']
five=conf['five'], must_not=conf['must_not'],
)
try:
res = es_client.count(index, doc_type=doc_type, body=count_query, ignore_unavailable=True)
Expand Down
Loading