Skip to content

Commit

Permalink
Log each step in the pipeline cleaning up RSE
Browse files Browse the repository at this point in the history
Log error whenever the list of files from RucioConMon is empty
  • Loading branch information
amaltaro committed Jul 29, 2024
1 parent bd337e7 commit e2d5bb8
Showing 1 changed file with 18 additions and 1 deletion.
19 changes: 18 additions & 1 deletion src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def cleanRSE(self, rse):
:param rse: MSUnmergedRSE object to be cleaned
:return: The MSUnmergedRSE object
"""
self.logger.info("Start cleaning files for RSE: %s.", rse['name'])

# Create the gfal2 context object:
try:
Expand Down Expand Up @@ -538,6 +539,7 @@ def consRecordAge(self, rse):
:return: rse or raises MSUnmergedPlineExit
"""
rseName = rse['name']
self.logger.info("Evaluating consistency record agent for RSE: %s.", rse['name'])

if rseName not in self.rseConsStats:
msg = "RSE: %s Missing in stats records at Rucio Consistency Monitor. " % rseName
Expand Down Expand Up @@ -615,7 +617,10 @@ def getUnmergedFiles(self, rse):
:param rse: The RSE to work on
:return: rse
"""
self.logger.info("Fetching data from Rucio ConMon for RSE: %s.", rse['name'])
rse['files']['allUnmerged'] = self.rucioConMon.getRSEUnmerged(rse['name'])
if not rse['files']['allUnmerged']:
self.logger.error("RSE: %s has an empty list of unmerged files in Rucio ConMon.", rse['name'])
for filePath in rse['files']['allUnmerged']:
# Check if what we start with is under /store/unmerged/*
if self.regStoreUnmergedLfn.match(filePath):
Expand Down Expand Up @@ -669,8 +674,11 @@ def filterUnmergedFiles(self, rse):
:param rse: The RSE to work on
:return: rse
"""
self.logger.info("Filtering unmerged files for RSE: %s.", rse['name'])
rse['dirs']['toDelete'] = rse['dirs']['allUnmerged'] - self.protectedLFNs
rse['dirs']['protected'] = rse['dirs']['allUnmerged'] & self.protectedLFNs
self.logger.info("Pre-filter counts for allUnmerged: %s, toDelete: %s, protected: %s.",
len(rse['dirs']['allUnmerged']), len(rse['dirs']['toDelete']), len(rse['dirs']['protected']))

# The following check may seem redundant, but better stay safe than sorry
if not (rse['dirs']['toDelete'] | rse['dirs']['protected']) == rse['dirs']['allUnmerged']:
Expand Down Expand Up @@ -733,6 +741,9 @@ def genFunc(pattern, iterable):
# Now apply the filters back to the set in rse['dirs']['toDelete']
rse['dirs']['toDelete'] = set(rse['files']['toDelete'].keys())

self.logger.info("Post-filter counts for allUnmerged: %s, toDelete: %s, protected: %s.",
len(rse['dirs']['allUnmerged']), len(rse['dirs']['toDelete']), len(rse['dirs']['protected']))

# Update the counters:
rse['counters']['dirsToDelete'] = len(rse['files']['toDelete'])
self.logger.info("RSE: %s: %s", rse['name'], twFormat(rse, maxLength=8))
Expand All @@ -747,6 +758,7 @@ def getPfn(self, rse):
:param rse: The RSE to be checked
:return: rse
"""
self.logger.info("Fetching PFN map for RSE: %s.", rse['name'])
# NOTE: pfnPrefix here is considered the full part of the pfn up to the
# beginning of the lfn part rather than just the protocol prefix
if rse['files']['allUnmerged']:
Expand All @@ -771,6 +783,7 @@ def purgeRseObj(self, rse, dumpRSEtoLog=False):
:param dumpRSEToLog: Dump the whole RSEobject into the service log.
:return: rse
"""
self.logger.info("Purging RSE in-memory information for RSE: %s.", rse['name'])
msg = "\n----------------------------------------------------------"
msg += "\nMSUnmergedRSE: \n%s"
msg += "\n----------------------------------------------------------"
Expand All @@ -788,6 +801,8 @@ def updateRSETimestamps(self, rse, start=True, end=True):
:param rse: The RSE to work on
:return: rse
"""
self.logger.info("Updating timestamps for RSE: %s. With start: %s, end: %s.", rse['name'], start, end)

rseName = rse['name']
currTime = time()

Expand Down Expand Up @@ -820,6 +835,8 @@ def updateServiceCounters(self, rse):
:param pName: The pipeline name whose counters to be updated
:return: rse
"""
self.logger.info("Updating service counters for RSE: %s.", rse['name'])

pName = self.plineUnmerged.name
self.plineCounters[pName]['totalNumFiles'] += rse['counters']['totalNumFiles']
self.plineCounters[pName]['totalNumDirs'] += rse['counters']['totalNumDirs']
Expand Down Expand Up @@ -869,7 +886,7 @@ def uploadRSEToMongoDB(self, rse, fullRSEToDB=False, overwrite=True):
:return: rse
"""
try:
self.logger.info("RSE: %s Writing rse data to MongoDB.", rse['name'])
self.logger.info("Uploading RSE information to MongoDB for RSE: %s.", rse['name'])
rse.writeRSEToMongoDB(self.msUnmergedColl, fullRSEToDB=fullRSEToDB, overwrite=overwrite, retryCount=self.msConfig['mongoDBRetryCount'])
except NotPrimaryError:
msg = "Could not write RSE to MongoDB for the maximum of %s mongoDBRetryCounts configured." % self.msConfig['mongoDBRetryCount']
Expand Down

0 comments on commit e2d5bb8

Please sign in to comment.