diff --git a/common/check_expired_dids b/common/check_expired_dids index dd8229b..885efb2 100755 --- a/common/check_expired_dids +++ b/common/check_expired_dids @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,7 +8,7 @@ # Authors: # - Vincent Garonne, , 2013 # - Thomas Beermann, , 2019 -# - Eric Vaandering , 2020-2021 +# - Eric Vaandering , 2020-2022 """ Probe to check the backlog of expired dids. @@ -17,42 +17,44 @@ from __future__ import print_function import sys import traceback + + from datetime import datetime +from rucio.core import monitor from prometheus_client import CollectorRegistry, Gauge, push_to_gateway from rucio.common.config import config_get + from rucio.db.sqla import models from rucio.db.sqla.session import get_session from rucio.db.sqla.util import get_count + +PrometheusPusher = common.PrometheusPusher + from utils.common import probe_metrics + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: registry = CollectorRegistry() session = get_session() - query = session.query(models.DataIdentifier.scope).filter(models.DataIdentifier.expired_at.isnot(None), - models.DataIdentifier.expired_at < datetime.utcnow()) - result = get_count(query) - # Possible check against a threshold. If result > max_value then sys.exit(CRITICAL) - probe_metrics.gauge(name='undertaker.expired_dids').set(result) - Gauge('undertaker_expired_dids', '', registry=registry).set(result) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_expired_dids', registry=registry) - except: - continue - - print(result) + with PrometheusPusher(registry, job_name='check_expired_dids') as prometheus_config: + prefix: str = prometheus_config['prefix'] + query = (session.query(models.DataIdentifier.scope) + .filter(models.DataIdentifier.expired_at.isnot(None), + models.DataIdentifier.expired_at < datetime.utcnow())) + result = get_count(query) + # Possible check against a threshold. If result > max_value then sys.exit(CRITICAL) + + monitor.record_gauge('undertaker.expired_dids', value=result) + Gauge(prefix + 'undertaker_expired_dids', '', registry=registry).set(result) + + print(result) + except: print(traceback.format_exc()) sys.exit(UNKNOWN) diff --git a/common/check_fts_backlog b/common/check_fts_backlog index 5c85cd9..675a72f 100755 --- a/common/check_fts_backlog +++ b/common/check_fts_backlog @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """ Copyright European Organization for Nuclear Research (CERN) 2013 @@ -9,27 +9,26 @@ Authors: - Cedric Serfon, , 2014-2018 - Mario Lassnig, , 2015 - - Eric Vaandering, , 2019-2021 + - Eric Vaandering, , 2019-2022 - Thomas Beermann, , 2019 """ -from __future__ import print_function import os import sys +from urllib.parse import urlparse import requests import urllib3 -try: - from urlparse import urlparse -except ImportError: - from urllib.parse import urlparse - -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway +from prometheus_client import CollectorRegistry, Gauge from rucio.common.config import config_get, config_get_bool +from rucio.core import monitor from rucio.core.distance import update_distances from rucio.db.sqla.session import BASE, get_session -from utils.common import probe_metrics +from utils import common + +PrometheusPusher = common.PrometheusPusher + OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -40,10 +39,6 @@ if BASE.metadata.schema: else: schema = '' -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": se_matrix = {} @@ -83,86 +78,87 @@ if __name__ == "__main__": UPDATE_DIST = True registry = CollectorRegistry() - g = Gauge('fts_submitted', '', labelnames=('hostname',), registry=registry) - errmsg = '' - for ftshost in FTSHOSTS.split(','): - print("=== %s ===" % ftshost) - parsed_url = urlparse(ftshost) - scheme, hostname, port = parsed_url.scheme, parsed_url.hostname, parsed_url.port - retvalue = CRITICAL - url = '%s/fts3/ftsmon/overview?dest_se=&source_se=&time_window=1&vo=%s' % (ftshost, VO) - busy_channels = [] - busylimit = 5000 - for attempt in range(0, 5): - result = None - try: - result = requests.get(url, verify=False, cert=(PROXY, PROXY)) - res = result.json() - for channel in res['overview']['items']: - src = channel['source_se'] - dst = channel['dest_se'] - if (src, dst) not in se_matrix: - se_matrix[(src, dst)] = {'active': 0, 'submitted': 0, 'finished': 0, 'failed': 0, - 'transfer_speed': 0, 'mbps_link': 0} - for state in ['submitted', 'active', 'finished', 'failed']: + with PrometheusPusher(registry, job_name='check_fts_backlog') as prometheus_config: + prefix: str = prometheus_config['prefix'] + extra_prom_labels = prometheus_config['labels'] + labelnames = ['hostname'] + labelnames.extend(extra_prom_labels.keys()) + g = Gauge(prefix + 'fts_submitted', '', labelnames=labelnames, registry=registry) + + errmsg = '' + for ftshost in FTSHOSTS.split(','): + print("=== %s ===" % ftshost) + parsed_url = urlparse(ftshost) + scheme, hostname, port = parsed_url.scheme, parsed_url.hostname, parsed_url.port + retvalue = CRITICAL + url = '%s/fts3/ftsmon/overview?dest_se=&source_se=&time_window=1&vo=%s' % (ftshost, VO) + busy_channels = [] + busylimit = 5000 + for attempt in range(0, 5): + result = None + try: + result = requests.get(url, verify=False, cert=(PROXY, PROXY)) + res = result.json() + for channel in res['overview']['items']: + src = channel['source_se'] + dst = channel['dest_se'] + if (src, dst) not in se_matrix: + se_matrix[(src, dst)] = {'active': 0, 'submitted': 0, 'finished': 0, 'failed': 0, + 'transfer_speed': 0, 'mbps_link': 0} + for state in ['submitted', 'active', 'finished', 'failed']: + try: + se_matrix[(src, dst)][state] += channel[state] + except Exception: + pass try: - se_matrix[(src, dst)][state] += channel[state] + se_matrix[(src, dst)]['transfer_speed'] += channel['current'] + se_matrix[(src, dst)]['mbps_link'] += channel['current'] except Exception: pass - try: - se_matrix[(src, dst)]['transfer_speed'] += channel['current'] - se_matrix[(src, dst)]['mbps_link'] += channel['current'] - except Exception: - pass - if CHECK_BUSY and 'submitted' in channel and channel['submitted'] >= busylimit: - url_activities = '%s/fts3/ftsmon/config/activities/%s?source_se=%s&dest_se=%s' % (ftshost, VO, - src, dst) - activities = {} - try: - s = requests.get(url_activities, verify=False, cert=(PROXY, PROXY)) - for key, val in s.json().items(): - activities[key] = val['SUBMITTED'] - except Exception as error: - pass - busy_channels.append({'src': src, 'dst': dst, 'submitted': channel['submitted'], - 'activities': activities}) - summary = res['summary'] - hostname = hostname.replace('.', '_') - print('%s : Submitted : %s' % (hostname, summary['submitted'])) - print('%s : Active : %s' % (hostname, summary['active'])) - print('%s : Staging : %s' % (hostname, summary['staging'])) - print('%s : Started : %s' % (hostname, summary['started'])) - if busy_channels != []: - print('Busy channels (>%s submitted):' % busylimit) - for bc in busy_channels: - activities_str = ", ".join([("%s: %s" % (key, val)) for key, val in bc['activities'].items()]) - print(' %s to %s : %s submitted jobs (%s)' % (bc['src'], bc['dst'], bc['submitted'], - str(activities_str))) - probe_metrics.gauge('fts3.{hostname}.submitted').labels(hostname=hostname).set(summary['submitted'] - + summary['active'] - + summary['staging'] - + summary['started']) - - g.labels(**{'hostname': hostname}).set((summary['submitted'] + summary['active'] + summary['staging'] + summary['started'])) - retvalue = OK - break - except Exception as error: - retvalue = CRITICAL - if result and result.status_code: - errmsg = 'Error when trying to get info from %s : HTTP status code %s. [%s]' % ( - ftshost, str(result.status_code), str(error)) - else: - errmsg = 'Error when trying to get info from %s. %s' % (ftshost, str(error)) - if retvalue == CRITICAL: - print("All attempts failed. %s" % errmsg) - WORST_RETVALUE = max(retvalue, WORST_RETVALUE) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_fts_backlog', registry=registry) - except: - continue + + if CHECK_BUSY and 'submitted' in channel and channel['submitted'] >= busylimit: + url_activities = '%s/fts3/ftsmon/config/activities/%s?source_se=%s&dest_se=%s' % ( + ftshost, VO, + src, dst) + activities = {} + try: + s = requests.get(url_activities, verify=False, cert=(PROXY, PROXY)) + for key, val in s.json().items(): + activities[key] = val['SUBMITTED'] + except Exception as error: + pass + busy_channels.append({'src': src, 'dst': dst, 'submitted': channel['submitted'], + 'activities': activities}) + summary = res['summary'] + hostname = hostname.replace('.', '_') + print('%s : Submitted : %s' % (hostname, summary['submitted'])) + print('%s : Active : %s' % (hostname, summary['active'])) + print('%s : Staging : %s' % (hostname, summary['staging'])) + print('%s : Started : %s' % (hostname, summary['started'])) + if busy_channels != []: + print('Busy channels (>%s submitted):' % busylimit) + for bc in busy_channels: + activities_str = ", ".join([("%s: %s" % (key, val)) for key, val in bc['activities'].items()]) + print(' %s to %s : %s submitted jobs (%s)' % (bc['src'], bc['dst'], bc['submitted'], + str(activities_str))) + monitor.record_gauge('fts3.%s.submitted' % hostname, + value=(summary['submitted'] + summary['active'] + + summary['staging'] + summary['started'])) + g.labels(**{'hostname': hostname}).set( + (summary['submitted'] + summary['active'] + summary['staging'] + summary['started'])) + retvalue = OK + break + except Exception as error: + retvalue = CRITICAL + if result and result.status_code: + errmsg = 'Error when trying to get info from %s : HTTP status code %s. [%s]' % ( + ftshost, str(result.status_code), str(error)) + else: + errmsg = 'Error when trying to get info from %s. %s' % (ftshost, str(error)) + if retvalue == CRITICAL: + print("All attempts failed. %s" % errmsg) + WORST_RETVALUE = max(retvalue, WORST_RETVALUE) + if not UPDATE_DIST: sys.exit(WORST_RETVALUE) diff --git a/common/check_messages_to_submit b/common/check_messages_to_submit index 4400531..ccba18e 100755 --- a/common/check_messages_to_submit +++ b/common/check_messages_to_submit @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,19 +8,23 @@ # Authors: # - Mario Lassnig, , 2013-2014 # - Thomas Beermann, , 2019 +# - Eric Vaandering , 2022 """ Probe to check the queues of messages to submit by Hermes to the broker """ -from __future__ import print_function import sys -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get + +from prometheus_client import CollectorRegistry, Gauge +from rucio.core import monitor from rucio.db.sqla.session import BASE, get_session -from utils.common import probe_metrics +from utils import common + +PrometheusPusher = common.PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -32,25 +36,17 @@ else: queue_sql = """SELECT COUNT(*) FROM {schema}messages""".format(schema=schema) -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: registry = CollectorRegistry() session = get_session() - result = session.execute(queue_sql).fetchall() - print('queues.messages %s' % result[0][0]) - probe_metrics.gauge(name='queues.messages').set(result[0][0]) - Gauge('hermes_queues_messages', '', registry=registry).set(result[0][0]) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_messages_to_submit', registry=registry) - except: - continue + with PrometheusPusher(registry, job_name='check_messages_to_submit') as prometheus_config: + prefix: str = prometheus_config['prefix'] + result = session.execute(queue_sql).fetchall() + print('queues.messages %s' % result[0][0]) + monitor.record_gauge(stat='queues.messages', value=result[0][0]) + Gauge(prefix + 'hermes_queues_messages', '', registry=registry).set(result[0][0]) + if result[0][0] > 100000: sys.exit(WARNING) diff --git a/common/check_new_dids b/common/check_new_dids index 6f290cb..de90289 100755 --- a/common/check_new_dids +++ b/common/check_new_dids @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,48 +8,45 @@ # Authors: # - Vincent Garonne, , 2013 # - Thomas Beermann, , 2019 -# - Eric Vaandering , 2020 +# - Eric Vaandering , 2020-2022 """ Probe to check the backlog of new dids. """ -from __future__ import print_function import sys import traceback -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get + +from prometheus_client import CollectorRegistry, Gauge +from rucio.core import monitor + from rucio.db.sqla import models from rucio.db.sqla.session import get_session from rucio.db.sqla.util import get_count -from utils.common import probe_metrics + +from utils import common + +PrometheusPusher = common.PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: registry = CollectorRegistry() session = get_session() - query = (session.query(models.DataIdentifier.scope) - .with_hint(models.DataIdentifier, "INDEX_FFS(DIDS DIDS_IS_NEW_IDX)", 'oracle') - .filter(models.DataIdentifier.is_new.isnot(None))) - result = get_count(query) - probe_metrics.gauge(name='transmogrifier.new_dids').set(result) - Gauge('transmogrifier_new_dids', '', registry=registry).set(result) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_new_dids', registry=registry) - except: - continue + with PrometheusPusher(registry, job_name='check_new_dids') as prometheus_config: + prefix: str = prometheus_config['prefix'] + query = (session.query(models.DataIdentifier.scope) + .with_hint(models.DataIdentifier, "INDEX_FFS(DIDS DIDS_IS_NEW_IDX)", 'oracle') + .filter(models.DataIdentifier.is_new.isnot(None))) + result = get_count(query) + monitor.record_gauge('transmogrifier.new_dids', value=result) + Gauge(prefix + 'transmogrifier_new_dids', '', registry=registry).set(result) + print(result) except: diff --git a/common/check_obsolete_replicas b/common/check_obsolete_replicas index efd8394..5883bae 100755 --- a/common/check_obsolete_replicas +++ b/common/check_obsolete_replicas @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,14 +8,22 @@ # Authors: # - Vincent Garonne, , 2015 # - Cedric Serfon, , 2018 +# - Maggie Voetberg, , 2024 ''' Probe to check the backlog of obsolete replicas. ''' import sys +import traceback +from sqlalchemy.sql import text +from rucio.db.sqla.session import BASE, get_session +from utils.common import PrometheusPusher -from rucio.db.sqla.session import get_session +if BASE.metadata.schema: + schema = BASE.metadata.schema + '.' +else: + schema = '' # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -23,62 +31,78 @@ OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 if __name__ == "__main__": try: - SESSION = get_session() - QUERY = '''BEGIN - FOR u in (SELECT - a.rse_id AS rse_id, - NVL(b.files, 0) AS files, - NVL(b.bytes, 0) AS bytes, - SYS_EXTRACT_UTC(localtimestamp) AS updated_at - FROM - ( - SELECT - id AS rse_id - FROM - atlas_rucio.rses - WHERE - deleted=0) a - LEFT OUTER JOIN - ( - SELECT - /*+ INDEX_FFS(replicas REPLICAS_TOMBSTONE_IDX) */ - rse_id, - COUNT(1) AS files, - SUM(bytes) AS bytes - FROM - atlas_rucio.replicas - WHERE - ( - CASE - WHEN tombstone IS NOT NULL - THEN rse_id - END) IS NOT NULL - AND tombstone=to_date('1-1-1970 00:00:00','MM-DD-YYYY HH24:Mi:SS') - GROUP BY - rse_id) b - ON - a.rse_id=b.rse_id) + session = get_session() + with PrometheusPusher() as manager: + query = '''BEGIN + FOR u in (SELECT + a.rse_id AS rse_id, + NVL(b.files, 0) AS files, + NVL(b.bytes, 0) AS bytes, + SYS_EXTRACT_UTC(localtimestamp) AS updated_at + FROM + ( + SELECT + id AS rse_id + FROM + {schema}rses + WHERE + deleted=0) a + LEFT OUTER JOIN + ( + SELECT + /*+ INDEX_FFS(replicas REPLICAS_TOMBSTONE_IDX) */ + rse_id, + COUNT(1) AS files, + SUM(bytes) AS bytes + FROM + {schema}replicas + WHERE + ( + CASE + WHEN tombstone IS NOT NULL + THEN rse_id + END) IS NOT NULL + AND tombstone=to_date('1-1-1970 00:00:00','MM-DD-YYYY HH24:Mi:SS') + GROUP BY + rse_id) b + ON + a.rse_id=b.rse_id) - LOOP - MERGE INTO atlas_rucio.RSE_USAGE - USING DUAL - ON (atlas_rucio.RSE_USAGE.rse_id = u.rse_id and source = 'obsolete') - WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at) - VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at) - WHEN MATCHED THEN UPDATE SET used=u.bytes, files=u.files, updated_at=u.updated_at; + LOOP + MERGE INTO {schema}RSE_USAGE + USING DUAL + ON ({schema}RSE_USAGE.rse_id = u.rse_id and source = 'obsolete') + WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at) + VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at) + WHEN MATCHED THEN UPDATE SET used=u.bytes, files=u.files, updated_at=u.updated_at; - MERGE INTO ATLAS_RUCIO.RSE_USAGE_HISTORY H - USING DUAL - ON (h.rse_id = u.rse_id and h.source = 'obsolete' and h.updated_at = u.updated_at) - WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at) - VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at); + MERGE INTO {schema}RSE_USAGE_HISTORY H + USING DUAL + ON (h.rse_id = u.rse_id and h.source = 'obsolete' and h.updated_at = u.updated_at) + WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at) + VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at); - COMMIT; - END LOOP; -END; -''' - SESSION.execute(QUERY) - except Exception as error: - print error + COMMIT; + END LOOP; + END;'''.format(schema=schema) + + for result in session.execute(text(query)): + print(result) + + rse_id = result[0] + bytes_sum = result[2] + files_count = result[3] + + manager.gauge(name="obsolete_replicas_files.{rse}", + documentation="Probe to check the backlog of obsolete replicas.").labels(rse=rse_id).set(files_count) + + manager.gauge(name="obsolete_replicas_bytes.{rse}", + documentation="Probe to check the backlog of obsolete replicas.").labels().set(bytes_sum) + + + except: + print(traceback.format_exc()) sys.exit(UNKNOWN) + finally: + session.remove() sys.exit(OK) diff --git a/common/check_stuck_rules b/common/check_stuck_rules index 84633ba..2761c50 100755 --- a/common/check_stuck_rules +++ b/common/check_stuck_rules @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -7,22 +7,23 @@ # # Authors: # - Martin Barisits, , 2014 -# - Eric Vaandering, , 2019-2021 +# - Eric Vaandering, , 2019-2022 # - Thomas Beermann, , 2019 """ Probe to check the backlog of stuck rules. """ -from __future__ import print_function import sys import traceback -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get +from prometheus_client import CollectorRegistry, Gauge +from rucio.core import monitor from rucio.db.sqla.session import BASE, get_session -from utils.common import probe_metrics +from utils import common + +PrometheusPusher = common.PrometheusPusher # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -32,32 +33,24 @@ if BASE.metadata.schema: else: schema = '' -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: registry = CollectorRegistry() session = get_session() - sql = 'SELECT COUNT(1) FROM {schema}RULES where state=\'S\' and (error !=\'MissingSourceReplica\' or error IS NULL)'.format( - schema=schema) - result = session.execute(sql).fetchone()[0] - probe_metrics.gauge(name='judge.stuck_rules_without_missing_source_replica').set(result) - Gauge('judge_stuck_rules_without_missing_source_replica', '', registry=registry).set(result) + with PrometheusPusher(registry, job_name='check_stuck_rules') as prometheus_config: + prefix: str = prometheus_config['prefix'] + + sql = 'SELECT COUNT(1) FROM {schema}RULES where state=\'S\' and (error !=\'MissingSourceReplica\' or error IS NULL)'.format(schema=schema) + result = session.execute(sql).fetchone()[0] + monitor.record_gauge('judge.stuck_rules_without_missing_source_replica', value=result) + Gauge(prefix + 'judge_stuck_rules_without_missing_source_replica', '', registry=registry).set(result) + + sql = 'SELECT COUNT(1) FROM {schema}RULES where state=\'S\' and error =\'MissingSourceReplica\''.format(schema=schema) + result = session.execute(sql).fetchone()[0] + monitor.record_gauge('judge.stuck_rules_with_missing_source_replica', value=result) + Gauge(prefix + 'judge_stuck_rules_with_missing_source_replica', '', registry=registry).set(result) - sql = 'SELECT COUNT(1) FROM {schema}RULES where state=\'S\' and error =\'MissingSourceReplica\''.format( - schema=schema) - result = session.execute(sql).fetchone()[0] - probe_metrics.gauge(name='judge.stuck_rules_with_missing_source_replica').set(result) - Gauge('judge_stuck_rules_with_missing_source_replica', '', registry=registry).set(result) - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_stuck_rules', registry=registry) - except: - continue except: print(traceback.format_exc()) sys.exit(UNKNOWN) diff --git a/common/check_transfer_queues_status b/common/check_transfer_queues_status index 15f94b9..b33434b 100755 --- a/common/check_transfer_queues_status +++ b/common/check_transfer_queues_status @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -10,19 +10,23 @@ # - Cedric Serfon, , 2014 # - Wen Guan, , 2015 # - Thomas Beermann, , 2019 +# - Eric Vaandering, , 2022 """ Probe to check the queues of the transfer service """ -from __future__ import print_function import sys -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get + +from prometheus_client import CollectorRegistry, Gauge +from rucio.core import monitor from rucio.db.sqla.session import BASE, get_session -from utils.common import probe_metrics +from utils import common + +PrometheusPusher = common.PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -57,29 +61,27 @@ FROM {schema}requests GROUP BY state, activity, external_host )""".format(schema=schema) -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: registry = CollectorRegistry() - g = Gauge('conveyor_queues_requests', '', labelnames=('state', 'activity', 'external_host'), registry=registry) - session = get_session() - for k in session.execute(active_queue).fetchall(): - print(k[0], k[1], end=" ") - probe_metrics.gauge(name=k[0].replace('-', '_')).set(k[1]) - items = k[0].split('.') - state = items[2] - activity = items[3] - external_host = items[4].replace('-', '_') - g.labels(**{'activity': activity, 'state': state, 'external_host': external_host}).set(k[1]) - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_transfer_queues_status', registry=registry) - except: - continue + + with PrometheusPusher(registry, job_name='check_transfer_queues_status') as prometheus_config: + prefix: str = prometheus_config['prefix'] + extra_prom_labels = prometheus_config['labels'] + labelnames = ['state', 'activity', 'external_host'] + labelnames.extend(extra_prom_labels.keys()) + + g = Gauge(prefix+'conveyor_queues_requests', '', labelnames=labelnames, registry=registry) + session = get_session() + for k in session.execute(active_queue).fetchall(): + print(k[0], k[1], end=" ") + monitor.record_gauge(stat=k[0], value=k[1]) + items = k[0].split('.') + state = items[2] + activity = items[3] + external_host = items[4] + g.labels(**{'activity': activity, 'state': state, 'external_host': external_host}).set(k[1]) + except: sys.exit(UNKNOWN) sys.exit(OK) diff --git a/common/check_unevaluated_dids b/common/check_unevaluated_dids index d342885..f09da1d 100755 --- a/common/check_unevaluated_dids +++ b/common/check_unevaluated_dids @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,19 +8,22 @@ # Authors: # - Vincent Garonne, , 2013 # - Thomas Beermann, , 2019 +# - Eric Vaandering, , 2022 """ Probe to check the backlog of dids waiting for rule evaluation. """ -from __future__ import print_function import sys -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get +from prometheus_client import CollectorRegistry, Gauge +from rucio.core import monitor from rucio.db.sqla.session import BASE, get_session -from utils.common import probe_metrics +from utils import common + +PrometheusPusher = common.PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -32,10 +35,6 @@ else: count_sql = 'SELECT COUNT(*) FROM {schema}updated_dids'.format(schema=schema) -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: session = get_session() @@ -43,14 +42,9 @@ if __name__ == "__main__": probe_metrics.gauge(name='judge.waiting_dids').set(result) registry = CollectorRegistry() - Gauge('judge_waiting_dids', '', registry=registry).set(result) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_unevaluated_dids', registry=registry) - except: - continue + with PrometheusPusher(registry, job_name='check_unevaluated_dids') as prometheus_config: + prefix: str = prometheus_config['prefix'] + Gauge(prefix + 'judge_waiting_dids', '', registry=registry).set(result) print(result) except: diff --git a/common/check_unlocked_replicas b/common/check_unlocked_replicas index 80276a9..36b72a3 100755 --- a/common/check_unlocked_replicas +++ b/common/check_unlocked_replicas @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,19 +8,22 @@ # Authors: # - Vincent Garonne, , 2013 # - Thomas Beermann, , 2019 +# - Eric Vaandering, , 2022 """ Probe to check the backlog of unlocked replicas. """ -from __future__ import print_function import sys -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get +from prometheus_client import CollectorRegistry, Gauge +from rucio.core import monitor from rucio.db.sqla.session import BASE, get_session -from utils.common import probe_metrics +from utils import common + +PrometheusPusher = common.PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -40,30 +43,24 @@ if BASE.metadata.schema: else: schema = '' -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: registry = CollectorRegistry() session = get_session() - unlocked_sql = 'select /*+ index_ffs(replicas REPLICAS_TOMBSTONE_IDX) */ count(1) from {schema}replicas where tombstone is not null'.format(schema=schema) - result = session.execute(unlocked_sql).fetchone()[0] - probe_metrics.gauge(name='reaper.unlocked_replicas').set(result) - Gauge('reaper_unlocked_replicas', '', registry=registry).set(result) - print(result) - expired_sql = 'select /*+ index_ffs(replicas REPLICAS_TOMBSTONE_IDX) */ count(1) from {schema}replicas where tombstone is not null and tombstone < sysdate - 2/24'.format(schema=schema) - result = session.execute(expired_sql).fetchone()[0] - probe_metrics.gauge(name='reaper.unlocked_replicas').set(result) - Gauge('reaper_expired_replicas', '', registry=registry).set(result) + with PrometheusPusher(registry, job_name='check_unevaluated_dids') as prometheus_config: + prefix: str = prometheus_config['prefix'] + + unlocked_sql = 'select /*+ index_ffs(replicas REPLICAS_TOMBSTONE_IDX) */ count(1) from {schema}replicas where tombstone is not null'.format(schema=schema) + result = session.execute(unlocked_sql).fetchone()[0] + monitor.record_gauge(stat='reaper.unlocked_replicas', value=result) + Gauge(prefix + 'reaper_unlocked_replicas', '', registry=registry).set(result) + print(result) + expired_sql = 'select /*+ index_ffs(replicas REPLICAS_TOMBSTONE_IDX) */ count(1) from {schema}replicas where tombstone is not null and tombstone < sysdate - 2/24'.format(schema=schema) + result = session.execute(expired_sql).fetchone()[0] + monitor.record_gauge(stat='reaper.expired_replicas', value=result) + Gauge(prefix + 'reaper_expired_replicas', '', registry=registry).set(result) + - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_unlocked_replicas', registry=registry) - except: - continue except: sys.exit(UNKNOWN) sys.exit(OK) diff --git a/common/check_updated_dids b/common/check_updated_dids index 756aad8..877f926 100755 --- a/common/check_updated_dids +++ b/common/check_updated_dids @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,46 +8,42 @@ # Authors: # - Vincent Garonne, , 2013 # - Thomas Beermann, , 2019 -# - Eric Vaandering , 2020-2021 +# - Eric Vaandering , 2020-2022 """ Probe to check the backlog of updated dids. """ -from __future__ import print_function import sys import traceback -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get + +from prometheus_client import CollectorRegistry, Gauge +from rucio.core import monitor + from rucio.db.sqla import models from rucio.db.sqla.session import get_session from rucio.db.sqla.util import get_count -from utils.common import probe_metrics +from utils import common + +PrometheusPusher = common.PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: registry = CollectorRegistry() session = get_session() - query = session.query(models.UpdatedDID) - result = get_count(query) - probe_metrics.gauge(name='judge.updated_dids').set(result) - Gauge('judge_updated_dids', '', registry=registry).set(result) + with PrometheusPusher(registry, job_name='check_updated_dids') as prometheus_config: + prefix: str = prometheus_config['prefix'] + query = session.query(models.UpdatedDID) + result = get_count(query) + monitor.record_gauge('judge.updated_dids', value=result) + Gauge(prefix + 'judge_updated_dids', '', registry=registry).set(result) - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_updated_dids', registry=registry) - except: - continue # created_at, count, max, min, avg, stdev = 0.0, 0.0, 0.0, 0.0, 0.0, 0.0 # result = session.execute('select * from atlas_rucio.concurency_stats where created_at > sysdate - 1/1440')