diff --git a/plugins/__init__.py b/plugins/__init__.py new file mode 100644 index 0000000..af9356e --- /dev/null +++ b/plugins/__init__.py @@ -0,0 +1 @@ +__author__ = 'andrii' diff --git a/plugins/elasticsearch/__init__.py b/plugins/elasticsearch/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/plugins/elasticsearch/es_gc_count b/plugins/elasticsearch/es_gc_count new file mode 100755 index 0000000..7644511 --- /dev/null +++ b/plugins/elasticsearch/es_gc_count @@ -0,0 +1,72 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +############################################## +# Munin plugin for Elasticsearch monitoring # +# by Andrii Gakhov # +############################################## + +import json +import urllib3 + +from munin import MuninPlugin + + +class ESGarbageCollectionCountPlugin(MuninPlugin): + category = 'Elasticsearch' + args = '--base 1000 --lower-limit 0' + vlabel = 'Number of garbage collections' + info = 'Show garbage collection counts' + + @property + def title(self): + return 'Elasticsearch Garbage Collection count' + + @property + def fields(self): + fields = [ + ('total', dict( + label='total', + type='GAUGE', + )), + ('young', dict( + label='young generation', + type='GAUGE', + )), + ('old', dict( + label='old generation', + type='GAUGE', + )) + ] + return fields + + def __init__(self): + super(ESGarbageCollectionCountPlugin, self).__init__() + self.es_host = 'http://localhost:9200' + self.http = urllib3.PoolManager() + + def execute(self): + return self._get_gc_stats() + + def _get_gc_stats(self): + url = '{}/_nodes/_local/jvm/stats'.format(self.es_host) + response = self.http.request('GET', url) + + if response.status != 200: + return None + + data = json.loads(response.data) + if not data.get('nodes', {}).values(): + return None + + stats = data['nodes'].values()[0].get('jvm', {}).get('gc', {}) + return { + 'total': stats.get('collection_count'), + 'new': stats.get('collectors', {}).get( + 'ParNew', {}).get('collection_count'), + 'old': stats.get('collectors', {}).get( + 'ConcurrentMarkSweep', {}).get('collection_count'), + } + +if __name__ == '__main__': + ESGarbageCollectionCountPlugin().run() diff --git a/plugins/elasticsearch/es_gc_time b/plugins/elasticsearch/es_gc_time new file mode 100755 index 0000000..b4c3078 --- /dev/null +++ b/plugins/elasticsearch/es_gc_time @@ -0,0 +1,72 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +############################################## +# Munin plugin for Elasticsearch monitoring # +# by Andrii Gakhov # +############################################## + +import json +import urllib3 + +from munin import MuninPlugin + + +class ESGarbageCollectionTimePlugin(MuninPlugin): + category = 'Elasticsearch' + args = '--base 1000 --lower-limit 0' + vlabel = 'GC time / milliseconds' + info = 'Show garbage collection time' + + @property + def title(self): + return 'Elasticsearch Garbage Collection time' + + @property + def fields(self): + fields = [ + ('total', dict( + label='total', + type='GAUGE', + )), + ('young', dict( + label='young generation', + type='GAUGE', + )), + ('old', dict( + label='old generation', + type='GAUGE', + )) + ] + return fields + + def __init__(self): + super(ESGarbageCollectionTimePlugin, self).__init__() + self.es_host = 'http://localhost:9200' + self.http = urllib3.PoolManager() + + def execute(self): + return self._get_gc_stats() + + def _get_gc_stats(self): + url = '{}/_nodes/_local/jvm/stats'.format(self.es_host) + response = self.http.request('GET', url) + + if response.status != 200: + return None + + data = json.loads(response.data) + if not data.get('nodes', {}).values(): + return None + + stats = data['nodes'].values()[0].get('jvm', {}).get('gc', {}) + return { + 'total': stats.get('collection_time_in_millis'), + 'young': stats.get('collectors', {}).get( + 'ParNew', {}).get('collection_time_in_millis'), + 'old': stats.get('collectors', {}).get( + 'ConcurrentMarkSweep', {}).get('collection_time_in_millis'), + } + +if __name__ == '__main__': + ESGarbageCollectionTimePlugin().run() diff --git a/plugins/elasticsearch/es_http_connections b/plugins/elasticsearch/es_http_connections new file mode 100755 index 0000000..151e4dc --- /dev/null +++ b/plugins/elasticsearch/es_http_connections @@ -0,0 +1,60 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +############################################## +# Munin plugin for Elasticsearch monitoring # +# by Andrii Gakhov # +############################################## + +import json +import urllib3 + +from munin import MuninPlugin + + +class ESHttpOpenConnectionsCountPlugin(MuninPlugin): + category = 'Elasticsearch' + args = '--base 1000 --lower-limit 0' + vlabel = 'Number of HTTP open connections' + info = 'Show HTTP open connections count' + + @property + def title(self): + return 'Elasticsearch HTTP open connections count' + + @property + def fields(self): + fields = [ + ('total', dict( + label='total', + type='GAUGE', + )) + ] + return fields + + def __init__(self): + super(ESHttpOpenConnectionsCountPlugin, self).__init__() + self.es_host = 'http://localhost:9200' + self.http = urllib3.PoolManager() + + def execute(self): + return self._get_http_stats() + + def _get_http_stats(self): + url = '{}/_nodes/_local/http/stats'.format(self.es_host) + response = self.http.request('GET', url) + + if response.status != 200: + return None + + data = json.loads(response.data) + if not data.get('nodes', {}).values(): + return None + + stats = data['nodes'].values()[0].get('http', {}) + return { + 'total': stats.get('current_open'), + } + +if __name__ == '__main__': + ESHttpOpenConnectionsCountPlugin().run() diff --git a/plugins/elasticsearch/es_jvm_pools b/plugins/elasticsearch/es_jvm_pools new file mode 100755 index 0000000..f02d5d0 --- /dev/null +++ b/plugins/elasticsearch/es_jvm_pools @@ -0,0 +1,65 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +############################################## +# Munin plugin for Elasticsearch monitoring # +# by Andrii Gakhov # +############################################## + +import json +import urllib3 + +from munin import MuninPlugin + + +class ESJvmMemPoolSizePlugin(MuninPlugin): + category = 'Elasticsearch' + args = '--base 1000 --lower-limit 0' + vlabel = 'JVM memory pools size / bytes' + info = 'Show JVM memory pools size' + + @property + def title(self): + return 'Elasticsearch JVM Memory Pools size' + + @property + def fields(self): + fields = [ + ('young', dict( + label='young generation', + type='GAUGE', + )), + ('old', dict( + label='old generation', + type='GAUGE', + )) + ] + return fields + + def __init__(self): + super(ESJvmMemPoolSizePlugin, self).__init__() + self.es_host = 'http://localhost:9200' + self.http = urllib3.PoolManager() + + def execute(self): + return self._get_cache_stats() + + def _get_cache_stats(self): + url = '{}/_nodes/_local/jvm/stats'.format(self.es_host) + response = self.http.request('GET', url) + + if response.status != 200: + return None + + data = json.loads(response.data) + if not data.get('nodes', {}).values(): + return None + + stats = data['nodes'].values()[0].get('jvm', {}).get('mem', {}).get('pools', {}) + return { + 'young': stats.get('Par Eden Space', {}).get('used_in_bytes'), + 'old': stats.get('CMS Old Gen', {}).get('used_in_bytes') + } + +if __name__ == '__main__': + ESJvmMemPoolSizePlugin().run() diff --git a/plugins/elasticsearch/es_open_files b/plugins/elasticsearch/es_open_files new file mode 100755 index 0000000..3892ca3 --- /dev/null +++ b/plugins/elasticsearch/es_open_files @@ -0,0 +1,60 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +############################################## +# Munin plugin for Elasticsearch monitoring # +# by Andrii Gakhov # +############################################## + +import json +import urllib3 + +from munin import MuninPlugin + + +class ESOpenFilesCountPlugin(MuninPlugin): + category = 'Elasticsearch' + args = '--base 1000 --lower-limit 0' + vlabel = 'Number of open file descriptors' + info = 'Show open file descriptors' + + @property + def title(self): + return 'Elasticsearch open files count' + + @property + def fields(self): + fields = [ + ('total', dict( + label='total', + type='GAUGE', + )) + ] + return fields + + def __init__(self): + super(ESOpenFilesCountPlugin, self).__init__() + self.es_host = 'http://localhost:9200' + self.http = urllib3.PoolManager() + + def execute(self): + return self._get_files_stats() + + def _get_files_stats(self): + url = '{}/_nodes/_local/process/stats'.format(self.es_host) + response = self.http.request('GET', url) + + if response.status != 200: + return None + + data = json.loads(response.data) + if not data.get('nodes', {}).values(): + return None + + stats = data['nodes'].values()[0].get('process', {}) + return { + 'total': stats.get('open_file_descriptors'), + } + +if __name__ == '__main__': + ESOpenFilesCountPlugin().run() diff --git a/plugins/elasticsearch/es_os_memory b/plugins/elasticsearch/es_os_memory new file mode 100755 index 0000000..76af96f --- /dev/null +++ b/plugins/elasticsearch/es_os_memory @@ -0,0 +1,85 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +############################################## +# Munin plugin for Elasticsearch monitoring # +# by Andrii Gakhov # +############################################## + +import json +import urllib3 + +from munin import MuninPlugin + + +class ESOSMemoryStatsPlugin(MuninPlugin): + category = 'Elasticsearch' + args = '--base 1000 --lower-limit 0' + vlabel = 'Memory / bytes' + info = 'Show OS memory stats' + + @property + def title(self): + return 'Elasticsearch OS memory stats' + + @property + def fields(self): + fields = [ + ('mem_free', dict( + label='mem free', + type='GAUGE', + )), + ('mem_actual_free', dict( + label='mem actual free', + type='GAUGE', + )), + ('mem_used', dict( + label='mem used', + type='GAUGE', + )), + ('mem_actual_used', dict( + label='mem actual used', + type='GAUGE', + )), + ('swap_free', dict( + label='swap free', + type='GAUGE', + )), + ('swap_used', dict( + label='swap used', + type='GAUGE', + )), + ] + return fields + + def __init__(self): + super(ESOSMemoryStatsPlugin, self).__init__() + self.es_host = 'http://localhost:9200' + self.http = urllib3.PoolManager() + + def execute(self): + return self._get_os_stats() + + def _get_os_stats(self): + url = '{}/_nodes/_local/os/stats'.format(self.es_host) + response = self.http.request('GET', url) + + if response.status != 200: + return None + + data = json.loads(response.data) + if not data.get('nodes', {}).values(): + return None + + stats = data['nodes'].values()[0].get('os', {}) + return { + 'mem_free': stats.get('mem', {}).get('free_in_bytes'), + 'mem_actual_free': stats.get('mem', {}).get('actual_free_in_bytes'), + 'mem_used': stats.get('mem', {}).get('used_in_bytes'), + 'mem_actual_used': stats.get('mem', {}).get('actual_used_in_bytes'), + 'swap_free': stats.get('swap', {}).get('free_in_bytes'), + 'swap_used': stats.get('swap', {}).get('used_in_bytes'), + } + +if __name__ == '__main__': + ESOSMemoryStatsPlugin().run() diff --git a/plugins/elasticsearch/es_query_time b/plugins/elasticsearch/es_query_time new file mode 100755 index 0000000..24eab23 --- /dev/null +++ b/plugins/elasticsearch/es_query_time @@ -0,0 +1,61 @@ +#!/usr/bin/env python + +############################################## +# Munin plugin for Elasticsearch monitoring # +# by Andrii Gakhov # +############################################## + +import json +import os +import urllib3 + +from munin import MuninPlugin + + +class ESQueryTimePlugin(MuninPlugin): + args = "--base 1000 --lower-limit 0" + category = "Elasticsearch" + title = "Elasticsearch Query Time" + vlabel = "Query time / milliseconds" + info = "Show execution time for queries" + + @property + def fields(self): + fields = [] + for index in self.es_indices: + fields.append( + (index, dict( + label="%s index" % index, + type="GAUGE", + )), + ) + return fields + + def __init__(self): + super(ESQueryTimePlugin, self).__init__() + self.es_host = 'http://localhost:9200' + self.es_indices = os.environ['ES_INDICES_LIST'] + self.query = '{"query":{"match_all":{}}}' + self.http = urllib3.PoolManager() + + def execute(self): + results = {} + for index in self.es_indices: + results[index] = self._get_query_time(index) + return results + + def _get_query_time(self, index): + url = "%s/%s/_search" % (self.es_host, index) + headers = {'content-type': 'application/json'} + response = self.http.urlopen('POST', url, + body=self.query, headers=headers) + + if response.status != 200: + return None + data = json.loads(response.data) + if data.get('timed_out', True): + return None + return data['took'] + +if __name__ == '__main__': + ESQueryTimePlugin().run() diff --git a/plugins/storm_topology_items b/plugins/storm_topology_items new file mode 100755 index 0000000..a837ccb --- /dev/null +++ b/plugins/storm_topology_items @@ -0,0 +1,118 @@ +#!/usr/bin/env python + +############################################## +# Munin plugin for Storm monitoring # +# by Andrii Gakhov # +############################################## + +import datetime +import json +import re +import urllib3 + +from munin import MuninPlugin + +TABLE_ROWS_PATTERN = re.compile(r']*>(?P.+?)', + re.DOTALL | re.IGNORECASE | re.MULTILINE) +TABLE_COLLS_PATTERN = re.compile(r']*>(?P.+?)', + re.DOTALL | re.IGNORECASE | re.MULTILINE) + +TOPOLOGY_ID_POSITION = 1 +ACKED_POSITION = 4 +FAILED_POSITION = 5 + + +class StormTopologyPlugin(MuninPlugin): + args = "--base 1000" + category = 'Storm' + vlabel = "Number of failed (-) / acked (+) items" + info = "Show social topology processing statistics" + + @property + def title(self): + return "Items processed by %s" % self.topology + + @property + def fields(self): + fields = [ + ("acked", dict( + label="ACK", + type="GAUGE", + )), + ("failed", dict( + label="FAIL", + type="GAUGE", + )), + ] + return fields + + def __init__(self): + self.host = 'http://localhost:8080' + self.topology = os.environ['STORM_TOPOLOGY_NAME'] + self.http = urllib3.PoolManager() + + def execute(self): + stats = self._get_topology_processing_stats() + return { + 'acked': stats['acked'] if stats else None, + 'failed': -stats['failed'] if stats else None + } + + def _get_topology_processing_stats(self): + # 1st step: get id for the requested topology + response = self.http.request('GET', self.host) + if response.status != 200: + return None + + html = response.data + parsed = self._find_and_parse_table_row( + html, + contains=('a', None, self.topology)) + if not parsed: + return None + + topology_id = parsed[TOPOLOGY_ID_POSITION] + + # 2nd step: get stats for selected topology + url = "%s/topology/%s" % (self.host, topology_id) + response = self.http.request('GET', url) + if response.status != 200: + return None + + html = response.data + parsed = self._find_and_parse_table_row( + html, + contains=('a', 'href', 'window=600')) + if not parsed: + return None + + return { + 'acked': int(parsed[ACKED_POSITION]), + 'failed': int(parsed[FAILED_POSITION]) + } + + def _find_and_parse_table_row(self, html, contains): + if contains == (None, None, None): + return [] + + (element, attribute, text) = contains + if element and attribute and text: + pattern = r'<%s[^>]+%s=[^\s>]*%s[^>]*?>' % ( + element, attribute, text) + elif element and text: + pattern = '<%s[^>]*?>.*?(%s).*?' % (element, text, element) + elif text: + pattern = r'>.*?%s.*?<' % text + SEARCH_PATTERN = re.compile(pattern) + + rows = TABLE_ROWS_PATTERN.findall(html) + for row in rows: + if not SEARCH_PATTERN.search(row): + continue + return TABLE_COLLS_PATTERN.findall(row) + + return None + + +if __name__ == '__main__': + StormTopologyPlugin().run()