Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 37 additions & 51 deletions common/check_deletable_replicas
Original file line number Diff line number Diff line change
Expand Up @@ -8,70 +8,56 @@
# Authors:
# - Mario Lassnig, <mario.lassnig@cern.ch>, 2013-2014
# - Cedric Serfon, <cedric.serfon@cern.ch>, 2018
# - Fernando Garzon. oscar.fernando.garzon.miguez@cern.ch, 2020

'''
Probe to check the queues of messages to submit by Hermes to the broker
Probe to check the number of deletable files per rse, as well as
the total deletable space per rse.
'''

from __future__ import print_function
import sys

from rucio.db.sqla.constants import ReplicaState
from sqlalchemy import or_
from datetime import datetime
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from rucio.common.config import config_get
from rucio.core import monitor
from rucio.db.sqla import models
from rucio.db.sqla.session import get_session
from sqlalchemy import func

# Exit statuses
OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3

PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='')
if PROM_SERVERS != '':
PROM_SERVERS = PROM_SERVERS.split(',')

if __name__ == "__main__":
try:
SESSION = get_session()
QUERY = '''BEGIN
FOR u in (SELECT
a.rse_id AS rse_id,
NVL(b.files, 0) AS files,
NVL(b.bytes, 0) AS bytes,
sys_extract_utc(localtimestamp) AS updated_at
FROM
(
SELECT
id AS rse_id
FROM
atlas_rucio.rses
WHERE
deleted=0) a
LEFT OUTER JOIN
(
SELECT
rse_id,
COUNT(1) AS files,
SUM(bytes) AS bytes
FROM
ATLAS_RUCIO.ADG_ONLY_SCAN_REPLICAS
WHERE
tombstone IS NOT NULL
AND tombstone < sys_extract_utc(localtimestamp) GROUP BY rse_id) b
ON
a.rse_id=b.rse_id)
LOOP
MERGE INTO atlas_rucio.RSE_USAGE
USING DUAL
ON (atlas_rucio.RSE_USAGE.rse_id = u.rse_id and source = 'expired')
WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at)
VALUES (u.rse_id, 'expired', u.bytes, u.files, u.updated_at, u.updated_at)
WHEN MATCHED THEN UPDATE SET used=u.bytes, files=u.files, updated_at=u.updated_at;
registry = CollectorRegistry()
session = get_session()
a = session.query(models.RSE.rse.label('Node'),func.count(models.RSEFileAssociation.name).label('Files'), func.sum(models.RSEFileAssociation.bytes).label('bytes')).\
filter( or_(models.RSEFileAssociation.state != ReplicaState.BAD,
models.RSEFileAssociation.state!=ReplicaState.UNAVAILABLE,
models.RSEFileAssociation.state!=ReplicaState.TEMPORARY_UNAVAILABLE,
models.RSEFileAssociation.tombstone < datetime.utcnow())).join(models.RSEFileAssociation.rse).group_by(models.RSE.rse)
print(a)
for result in a.all():
monitor.record_gauge(stat="rucio.judge.deletable_files.%s" % result[0], value=result[1])
#Gauge('rucio_judge_deletable_files', '', registry=registry).set(result[1])
for result in a.all():
monitor.record_gauge(stat="rucio.judge.deletable_space.%s" % result[0], value=result[2]/1000000000000.0)
#Gauge('rucio_judge_deletable_space', '', registry=registry).set(result[2]/1000000000000)

if len(PROM_SERVERS):
for server in PROM_SERVERS:
try:
push_to_gateway(server.strip(), job='check_deletable_files', registry=registry)
push_to_gateway(server.strip(), job='check_deletable_space', registry=registry)
except:
continue

MERGE INTO ATLAS_RUCIO.RSE_USAGE_HISTORY H
USING DUAL
ON (h.rse_id = u.rse_id and h.source = 'expired' and h.updated_at = u.updated_at)
WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at)
VALUES (u.rse_id, 'expired', u.bytes, u.files, u.updated_at, u.updated_at);

COMMIT;
END LOOP;
END;
'''
SESSION.execute(QUERY)
except Exception as error:
print(error)
print('THE ERROR IS ',error)
sys.exit(UNKNOWN)
sys.exit(OK)