From dea618f8a2eb2f5fe996ab9bfb0f7ff1908155a5 Mon Sep 17 00:00:00 2001 From: Fernando Garzon Date: Wed, 20 Jan 2021 10:14:25 -0600 Subject: [PATCH 1/2] check_deletable_replicas probe ported to SQLAlchemy --- common/check_deletable_replicas | 88 ++++++++++++++------------------- 1 file changed, 37 insertions(+), 51 deletions(-) diff --git a/common/check_deletable_replicas b/common/check_deletable_replicas index 9b69be1b..09315e21 100755 --- a/common/check_deletable_replicas +++ b/common/check_deletable_replicas @@ -8,70 +8,56 @@ # Authors: # - Mario Lassnig, , 2013-2014 # - Cedric Serfon, , 2018 +# - Fernando Garzon. oscar.fernando.garzon.miguez@cern.ch, 2020 ''' -Probe to check the queues of messages to submit by Hermes to the broker +Probe to check the number of deletable files per rse, as well as +the total deletable space per rse. ''' - from __future__ import print_function import sys - +from rucio.db.sqla.constants import ReplicaState +from sqlalchemy import or_ +from datetime import datetime +from prometheus_client import CollectorRegistry, Gauge, push_to_gateway +from rucio.common.config import config_get +from rucio.core import monitor +from rucio.db.sqla import models from rucio.db.sqla.session import get_session +from sqlalchemy import func # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 +PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') +if PROM_SERVERS != '': + PROM_SERVERS = PROM_SERVERS.split(',') if __name__ == "__main__": try: - SESSION = get_session() - QUERY = '''BEGIN - FOR u in (SELECT - a.rse_id AS rse_id, - NVL(b.files, 0) AS files, - NVL(b.bytes, 0) AS bytes, - sys_extract_utc(localtimestamp) AS updated_at - FROM - ( - SELECT - id AS rse_id - FROM - atlas_rucio.rses - WHERE - deleted=0) a - LEFT OUTER JOIN - ( - SELECT - rse_id, - COUNT(1) AS files, - SUM(bytes) AS bytes - FROM - ATLAS_RUCIO.ADG_ONLY_SCAN_REPLICAS - WHERE - tombstone IS NOT NULL - AND tombstone < sys_extract_utc(localtimestamp) GROUP BY rse_id) b - ON - a.rse_id=b.rse_id) - LOOP - MERGE INTO atlas_rucio.RSE_USAGE - USING DUAL - ON (atlas_rucio.RSE_USAGE.rse_id = u.rse_id and source = 'expired') - WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at) - VALUES (u.rse_id, 'expired', u.bytes, u.files, u.updated_at, u.updated_at) - WHEN MATCHED THEN UPDATE SET used=u.bytes, files=u.files, updated_at=u.updated_at; + registry = CollectorRegistry() + session = get_session() + a = session.query(models.RSE.rse.label('Node'),func.count(models.RSEFileAssociation.name).label('Files'), func.sum(models.RSEFileAssociation.bytes).label('bytes')).\ + filter( or_(models.RSEFileAssociation.state != ReplicaState.BAD, + models.RSEFileAssociation.state!=ReplicaState.UNAVAILABLE, + models.RSEFileAssociation.state!=ReplicaState.TEMPORARY_UNAVAILABLE, + models.RSEFileAssociation.tombstone < datetime.utcnow())).join(models.RSEFileAssociation.rse).group_by(models.RSE.rse) + print(a) + for result in a.all(): + monitor.record_gauge(stat="rucio.judge.deletable_files.%s" %result[0], value=result[1]) + #Gauge('rucio_judge_deletable_files', '', registry=registry).set(result[1]) + for result in a.all(): + monitor.record_gauge(stat="rucio.judge.deletable_space.%s" %result[0], value=result[2]/1000000000000.0) + #Gauge('rucio_judge_deletable_space', '', registry=registry).set(result[2]/1000000000000) + + if len(PROM_SERVERS): + for server in PROM_SERVERS: + try: + push_to_gateway(server.strip(), job='check_deletable_files', registry=registry) + push_to_gateway(server.strip(), job='check_deletable_space', registry=registry) + except: + continue - MERGE INTO ATLAS_RUCIO.RSE_USAGE_HISTORY H - USING DUAL - ON (h.rse_id = u.rse_id and h.source = 'expired' and h.updated_at = u.updated_at) - WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at) - VALUES (u.rse_id, 'expired', u.bytes, u.files, u.updated_at, u.updated_at); - - COMMIT; - END LOOP; -END; -''' - SESSION.execute(QUERY) except Exception as error: - print(error) + print('THE ERROR IS ',error) sys.exit(UNKNOWN) - sys.exit(OK) From 6054eb6e503865ed2447e8dc7a26292f13362db3 Mon Sep 17 00:00:00 2001 From: FernandoGarzon <61252446+FernandoGarzon@users.noreply.github.com> Date: Mon, 25 Jan 2021 08:55:48 -0600 Subject: [PATCH 2/2] Update check_deletable_replicas --- common/check_deletable_replicas | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/check_deletable_replicas b/common/check_deletable_replicas index 09315e21..98d087b5 100755 --- a/common/check_deletable_replicas +++ b/common/check_deletable_replicas @@ -44,10 +44,10 @@ if __name__ == "__main__": models.RSEFileAssociation.tombstone < datetime.utcnow())).join(models.RSEFileAssociation.rse).group_by(models.RSE.rse) print(a) for result in a.all(): - monitor.record_gauge(stat="rucio.judge.deletable_files.%s" %result[0], value=result[1]) + monitor.record_gauge(stat="rucio.judge.deletable_files.%s" % result[0], value=result[1]) #Gauge('rucio_judge_deletable_files', '', registry=registry).set(result[1]) for result in a.all(): - monitor.record_gauge(stat="rucio.judge.deletable_space.%s" %result[0], value=result[2]/1000000000000.0) + monitor.record_gauge(stat="rucio.judge.deletable_space.%s" % result[0], value=result[2]/1000000000000.0) #Gauge('rucio_judge_deletable_space', '', registry=registry).set(result[2]/1000000000000) if len(PROM_SERVERS):