diff --git a/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py b/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py index 5e797c08eb..1590bf2366 100644 --- a/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py +++ b/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py @@ -20,7 +20,6 @@ import re import os import errno -import stat try: import gfal2 except ImportError: @@ -152,7 +151,6 @@ def __init__(self, msConfig, logger=None): Functor(self.updateRSETimestamps, start=True, end=False), Functor(self.consRecordAge), Functor(self.getUnmergedFiles), - Functor(self.filterUnmergedFiles), Functor(self.getPfn), Functor(self.cleanRSE), Functor(self.updateServiceCounters), @@ -306,9 +304,24 @@ def _execute(self, rseList): def cleanRSE(self, rse): """ The method to implement the actual deletion of files for an RSE. + Order of deletion attempts is: + 1. top directory + 2. sub-directories + 3. list sub-directories and files + 4. remove each file (unlink) + 5. remove the (now) empty sub-directories + 6. try to remove the top directory again :param rse: MSUnmergedRSE object to be cleaned :return: The MSUnmergedRSE object """ + # reset dirs counters + rse['dirs']['deletedSuccess'] = set() + rse['dirs']['deletedFail'] = set() + self.logger.info("Start cleaning files for RSE: %s.", rse['name']) + if not rse['dirs']['toDelete']: + self.logger.info("There is nothing to delete for RSE: %s.", rse['name']) + rse['isClean'] = self._checkClean(rse) + return rse # Create the gfal2 context object: try: @@ -319,120 +332,92 @@ def cleanRSE(self, rse): self.logger.exception(msg) raise MSUnmergedPlineExit(msg) from ex - filesToDeleteCurrRSE = 0 - # Start cleaning one directory at a time: - for dirLfn, fileLfnGen in rse['files']['toDelete'].items(): - if dirLfn in rse['dirs']['deletedSuccess']: - self.logger.info("RSE: %s, dir: %s already successfully deleted.", rse['name'], dirLfn) - continue + for idx, dirLfn in enumerate(rse['dirs']['toDelete']): + self.logger.info("Processing directory index %s out of %s", idx, len(rse['dirs']['toDelete'])) + # figure out the PFN prefix + dirPfn = rse['pfnPrefix'] + dirLfn + if not self.msConfig['enableRealMode']: + self.logger.info("DRY-RUN: would delete directory PFN: %s for RSE: %s", dirPfn, rse['name']) + else: + # Initially try to delete the whole directory even before emptying its content: + self.logger.info("Trying to remove the whole directory: %s", dirPfn) + rmdirSuccess = self._rmDir(ctx, dirPfn) + if rmdirSuccess: + self.logger.info("Directory successfully removed: %s", dirPfn) + rse['counters']['dirsDeletedSuccess'] += 1 + rse['dirs']['deletedSuccess'].add(dirLfn) + continue - if self.msConfig['limitFilesPerRSE'] < 0 or \ - filesToDeleteCurrRSE < self.msConfig['limitFilesPerRSE']: - - # Now we consume the rse['files']['toDelete'][dirLfn] generator - # upon that no values will be left in it. In case we need it again - # we will have to recreate the filter as we did in self.filterUnmergedFiles() - pfnList = [] - if not rse['pfnPrefix']: - # Fall back to calling Rucio on a per directory basis for - # resolving the lfn to pfn mapping - dirPfn = self.rucio.getPFN(rse['name'], dirLfn, operation='delete')[dirLfn] - for fileLfn in fileLfnGen: - fileLfnSuffix = fileLfn.split(dirLfn)[1] - filePfn = dirPfn + fileLfnSuffix - pfnList.append(filePfn) - else: - # Proceed with assembling the full filePfn out of the rse['pfnPrefix'] and the fileLfn - dirPfn = rse['pfnPrefix'] + dirLfn - for fileLfn in fileLfnGen: - filePfn = rse['pfnPrefix'] + fileLfn - pfnList.append(filePfn) - - filesToDeleteCurrRSE += len(pfnList) - msg = "\nRSE: %s \nDELETING: %s." - msg += "\nPFN list with: %s entries: \n%s" - self.logger.debug(msg, rse['name'], dirLfn, len(pfnList), twFormat(pfnList, maxLength=4)) - - if self.msConfig['enableRealMode']: - # The following two bool flags are to track the success for directory removal - # during all consecutive attempts/steps of cleaning the current branch. - rmdirSuccess = False - purgeSuccess = False - filesDeletedSuccess = 0 - filesDeletedFail = 0 - - # Initially try to delete the whole directory even before emptying its content: - self.logger.info("Trying to remove nonempty directory: %s", dirLfn) - rmdirSuccess = self._rmDir(ctx, dirPfn) - - # If the directory was considered successfully removed, update the file counters with the length of the directory contents - # If the above operation fails try to execute the directory contents deletion in bulk - full list of files per directory - if rmdirSuccess: - filesDeletedSuccess = len(pfnList) - else: - msg = "Trying to clean the contents of nonempty directory: %s " - msg += "in slices of: %s files" - self.logger.info(msg, dirLfn, self.msConfig["filesToDeleteSliceSize"]) - for pfnSlice in list(grouper(pfnList, self.msConfig["filesToDeleteSliceSize"])): - try: - delResult = ctx.unlink(pfnSlice) - # Count all the successfully deleted files (if a deletion was - # successful a value of None is put in the delResult list): - self.logger.debug("RSE: %s, Dir: %s, delResult: %s", - rse['name'], dirLfn, pformat(delResult)) - for gfalErr in delResult: - if gfalErr is None: - filesDeletedSuccess += 1 - else: - filesDeletedFail += 1 - errMessage = os.strerror(gfalErr.code) - rse['counters']['gfalErrors'].setdefault(errMessage, 0) - rse['counters']['gfalErrors'][errMessage] += 1 - except Exception as ex: - msg = "Error while cleaning RSE: %s. " - msg += "Will retry in the next cycle. Err: %s" - self.logger.exception(msg, rse['name'], str(ex)) - - self.logger.info("RSE: %s, Dir: %s, filesDeletedSuccess: %s", - rse['name'], dirLfn, filesDeletedSuccess) - - # Now delete the whole branch, which was previously cleaned file by file - # First try to delete the base directory: - rmdirSuccess = self._rmDir(ctx, dirPfn) - - # Then if unable to delete the base directory due to nonEmpty err or similar, try with _purgeTree() recursively - if not rmdirSuccess: - self.logger.info("Trying to recursively purge directory: %s:\n", dirLfn) - purgeSuccess = self._purgeTree(ctx, dirPfn) - - # Updating the RSE counters with the newly successfully deleted files - rse['counters']['filesDeletedSuccess'] += filesDeletedSuccess - rse['counters']['filesDeletedFail'] += filesDeletedFail - - if purgeSuccess or rmdirSuccess: + # Next, try to delete sub-directories then + self.logger.info("Then trying to list and remove all sub-directories for: %s", dirPfn) + rmdirSuccess = self._rmSubDir(ctx, dirPfn) + if rmdirSuccess: + self.logger.info("ALL sub-directories successfully removed. Now deleting the parent: %s", dirPfn) + if self._rmDir(ctx, dirPfn) is True: + rse['counters']['dirsDeletedSuccess'] += 1 rse['dirs']['deletedSuccess'].add(dirLfn) - rse['counters']['dirsDeletedSuccess'] = len(rse['dirs']['deletedSuccess']) - # if dirLfn in rse['dirs']['toDelete']: - # rse['dirs']['toDelete'].remove(dirLfn) - if dirLfn in rse['dirs']['deletedFail']: - rse['dirs']['deletedFail'].remove(dirLfn) - msg = "RSE: %s Success deleting directory: %s" - self.logger.info(msg, rse['name'], dirLfn) + continue + + # IF we are here, that means we did not manage to delete some/all of the directories + msg = "Failed to delete the parent and/or its sub-directories. " + msg += "Going to retrieve a list of files and remove them in bulk operations." + self.logger.warning(msg) + listFiles = self._listDir(ctx, dirPfn) + # The following two bool flags are to track the success for directory removal + # during all consecutive attempts/steps of cleaning the current branch. + filesDeletedSuccess = 0 + filesDeletedFail = 0 + self.logger.info("Starting deletion of %s files:", len(listFiles)) + for pfnSlice in list(grouper(listFiles, self.msConfig["filesToDeleteSliceSize"])): + self.logger.info("Executing file slice removal for %s files...", len(pfnSlice)) + try: + # returns None if deletion was successful + for resp in ctx.unlink(pfnSlice): + if resp is None: + filesDeletedSuccess += len(pfnSlice) + else: + filesDeletedFail += len(pfnSlice) + errMessage = os.strerror(resp.code) + rse['counters']['gfalErrors'].setdefault(errMessage, 0) + rse['counters']['gfalErrors'][errMessage] += 1 + except Exception as ex: + msg = "Error while cleaning RSE: %s. " + msg += "Will retry in the next cycle. Err: %s" + self.logger.exception(msg, rse['name'], str(ex)) + + self.logger.info("RSE: %s, Dir: %s, filesDeletedSuccess: %s, filesDeletedFail: %s", + rse['name'], dirLfn, filesDeletedSuccess, filesDeletedFail) + + # Now reverse engineer the deepest directory names and delete each one of them, likely all empty dirs + setDirPfn = set() + for item in listFiles: + setDirPfn.add(os.path.dirname(item)) + for subDir in setDirPfn: + rmdirSuccess = self._rmDir(ctx, subDir) + if rmdirSuccess: + self.logger.info("Empty sub-directory successfully removed: %s", subDir) else: - rse['dirs']['deletedFail'].add(dirLfn) - rse['counters']['dirsDeletedFail'] = len(rse['dirs']['deletedFail']) - msg = "RSE: %s Failed to purge directory: %s" - self.logger.error(msg, rse['name'], dirLfn) - else: - msg = "RSE: %s reached limit of files per RSE to be deleted. Skipping directory: %s. It will be retried on the next cycle." - self.logger.warning(msg, rse['name'], dirLfn) + self.logger.info("Empty sub-directory failed to be removed: %s", subDir) + + # lastly, try to delete the original directory + if self._rmDir(ctx, dirPfn): + self.logger.info("Finally, directory successfully removed: %s", dirPfn) + rse['counters']['dirsDeletedSuccess'] += 1 + rse['dirs']['deletedSuccess'].add(dirLfn) + else: + self.logger.info("Directory still fails to be removed: %s", dirPfn) + rse['counters']['dirsDeletedFail'] += 1 + rse['dirs']['deletedFail'].add(dirLfn) + + # Updating the RSE counters with the newly successfully deleted files + rse['counters']['filesDeletedSuccess'] += filesDeletedSuccess + rse['counters']['filesDeletedFail'] += filesDeletedFail rse['isClean'] = self._checkClean(rse) # Explicitly release all internal resources used by the gfal2 context instance if ctx: ctx.free() - return rse def _rmDir(self, ctx, dirPfn): @@ -446,6 +431,7 @@ def _rmDir(self, ctx, dirPfn): """ try: # NOTE: For gfal2 rmdir() exit status of 0 is success + self.logger.info("Deleting directory: %s", dirPfn) rmdirSuccess = ctx.rmdir(dirPfn) == 0 except gfal2.GError as gfalExc: if gfalExc.code == errno.ENOENT: @@ -456,71 +442,55 @@ def _rmDir(self, ctx, dirPfn): rmdirSuccess = False return rmdirSuccess - - def _purgeTree(self, ctx, baseDirPfn, isDirEntry=False): + def _rmSubDir(self, ctx, dirPfn): """ - A method to be used for purging the tree bellow a specific branch. - It deletes every empty directory bellow that branch + the origin at the end. - :param ctx: The gfal2 context object - :param baseDirPfn: The base entry for starting the recursion - :param isDirEntry: Bool flag to avoid extra `stat` operations - NOTE: When called from inside a recursion, we have already checked if the entry point is a directory - :return: Bool: True if it managed to purge everything, False otherwise + Similar to the _rmDir method, but it first lists the content of a given + directory, identify directories with a cheap logic and tries to delete + each of the sub-directories. + :param ctx: Gfal Context Manager object. + :param dirPfn: The Pfn of the directory to be removed + :return: Bool: True if the removal was successful, False otherwise """ - # NOTE: It deletes only directories and does not try to unlink any file. - - # First, test if baseDirPfn is actually a directory entry: - if not isDirEntry: - try: - entryStat = ctx.stat(baseDirPfn) - if not stat.S_ISDIR(entryStat.st_mode): - self.logger.error("The base pfn: %s is not a directory entry.", baseDirPfn) - return False - except gfal2.GError as gfalExc: - if gfalExc.code == errno.ENOENT: - self.logger.warning("MISSING baseDir: %s", baseDirPfn) - return True - else: - self.logger.error("FAILED to open baseDir: %s: gfalException: %s, gfalErrorCode: %s", baseDirPfn, str(gfalExc), gfalExc.code) - return False - - # Second, recursively iterate down the tree: - successList = [] + # list with status of the directory removal (True is success, False otherwise) + rm_status = [] try: - dirEntryList = ctx.listdir(baseDirPfn) + self.logger.info("Listing content for parent directory: %s", dirPfn) + for entry in ctx.listdir(dirPfn): + if not entry.endswith(".root"): + # then assume it is a directory + subDir = os.path.join(dirPfn, entry) + rm_status.append(self._rmDir(ctx, subDir)) except gfal2.GError as gfalExc: - if gfalExc.code == errno.ENOENT: - self.logger.warning("MISSING baseDir: %s", baseDirPfn) - return True - else: - self.logger.error("FAILED to list dirEntry: %s: gfalException: %s, gfalErrorCode: %s", baseDirPfn, str(gfalExc), gfalExc.code) - return False + self.logger.error("FAILED to list directory: %s: gfalException: %s, gfalErrorCode: %s", + dirPfn, str(gfalExc), gfalExc.code) + return False - for dirEntry in dirEntryList: - if dirEntry in ['.', '..']: - continue - dirEntryPfn = baseDirPfn + dirEntry - entryStat = None - try: - entryStat = ctx.stat(dirEntryPfn) - except gfal2.GError as gfalExc: - if gfalExc.code == errno.ENOENT: - self.logger.warning("MISSING dirEntry: %s", dirEntryPfn) - successList.append(True) - else: - self.logger.error("FAILED to open dirEntry: %s: gfalException: %s, gfalErrorCode: %s", dirEntryPfn, str(gfalExc), gfalExc.code) - successList.append(False) - continue + self.logger.info("Sub-directories successful deletion for %s and failed for %d directories.", + rm_status.count(True), rm_status.count(False)) + # return True (success) only if all sub-directories were deleted + return (rm_status.count(True) == len(rm_status)) - if entryStat and stat.S_ISDIR(entryStat.st_mode): - successList.append(self._purgeTree(ctx, dirEntryPfn, isDirEntry=True)) + def _listDir(self, ctx, dirPfn): + """ + Recursively lists all files in the given directory and its subdirectories. - # Finally, remove the baseDir: - self.logger.debug("RM baseDir: %s", baseDirPfn) - success = self._rmDir(ctx, baseDirPfn) - successList.append(success) + :param ctx: Gfal context manager object + :param dirPfn: string with a directory pfn + """ + files = [] + try: + for entry in ctx.listdir(dirPfn): + # are there files inside this folder + if entry.endswith(".root"): + files.append(os.path.join(dirPfn, entry)) + else: + # it is a directory. Go deeper another directory level + files.extend(self._listDir(ctx, os.path.join(dirPfn, entry))) + except gfal2.GError as gfalExc: + self.logger.warning("Failed to list directory: %s, gfal code: %s", dirPfn, gfalExc.code) - return all(successList) + self.logger.info("Entries under directory: %s is: %s", dirPfn, len(files)) + return files def _checkClean(self, rse): """ @@ -538,6 +508,7 @@ def consRecordAge(self, rse): :return: rse or raises MSUnmergedPlineExit """ rseName = rse['name'] + self.logger.info("Evaluating consistency record agent for RSE: %s.", rse['name']) if rseName not in self.rseConsStats: msg = "RSE: %s Missing in stats records at Rucio Consistency Monitor. " % rseName @@ -615,19 +586,40 @@ def getUnmergedFiles(self, rse): :param rse: The RSE to work on :return: rse """ - rse['files']['allUnmerged'] = self.rucioConMon.getRSEUnmerged(rse['name']) - for filePath in rse['files']['allUnmerged']: - # Check if what we start with is under /store/unmerged/* - if self.regStoreUnmergedLfn.match(filePath): - # Cut the path to the deepest level known to WMStats protected LFNs - dirPath = self._cutPath(filePath) - # Check if what is left is still under /store/unmerged/* - if self.regStoreUnmergedLfn.match(dirPath): - # Add it to the set of allUnmerged - rse['dirs']['allUnmerged'].add(dirPath) - - rse['counters']['totalNumFiles'] = len(rse['files']['allUnmerged']) - rse['counters']['totalNumDirs'] = len(rse['dirs']['allUnmerged']) + rse['counters']['totalNumFiles'] = 0 + rse['counters']['totalNumDirs'] = 0 + rse['counters']['dirsToDelete'] = 0 + rse['counters']['filesToDelete'] = 0 + rse['dirs']['allUnmerged'] = [] # TODO FIXME: this is supposed to be the union of toDelete and protected + rse['dirs']['toDelete'] = set() + rse['dirs']['protected'] = set() + + self.logger.info("Fetching data from Rucio ConMon for RSE: %s.", rse['name']) + for lfn in self.rucioConMon.getRSEUnmerged(rse['name'], zipped=True): + dirPath = self._cutPath(lfn) + # Check if what is left is still under /store/unmerged/* + if not self.regStoreUnmergedLfn.match(dirPath): + msg = f"Retrieved file from RucioConMon that does not belong to the unmerged area: {lfn}" + self.logger.critical(msg) + + # general counter for possible files and unique directories + rse['counters']['totalNumFiles'] += 1 + + # now evaluate whether it is deletable or not, and persist it under the right field + if self._isDeletable(dirPath): + rse['dirs']['toDelete'].add(dirPath) + rse['counters']['filesToDelete'] += 1 + else: + rse['dirs']['protected'].add(dirPath) + + if not rse['counters']['totalNumFiles']: + self.logger.error("RSE: %s has an empty list of unmerged files in Rucio ConMon.", rse['name']) + + rse['counters']['totalNumDirs'] = len(rse['dirs']['toDelete']) + len(rse['dirs']['protected']) + rse['counters']['dirsToDelete'] = len(rse['dirs']['toDelete']) + + self.logger.info("RSE post-filter stats for: %s: %s", rse['name'], twFormat(rse, maxLength=8)) + return rse def _cutPath(self, filePath): @@ -661,82 +653,33 @@ def _cutPath(self, filePath): finalPath = os.path.join(*newPath) return finalPath - # @profile - def filterUnmergedFiles(self, rse): - """ - This method is applying set compliment operation to the set of unmerged - files per RSE in order to exclude the protected LFNs. - :param rse: The RSE to work on - :return: rse + def _isDeletable(self, dirPath): """ - rse['dirs']['toDelete'] = rse['dirs']['allUnmerged'] - self.protectedLFNs - rse['dirs']['protected'] = rse['dirs']['allUnmerged'] & self.protectedLFNs + Given a short directory path, verify if this directory can be + deleted or not. Checks are performed against: + * directory inclusion filter + * directory exclusion filter + * protected lfns - # The following check may seem redundant, but better stay safe than sorry - if not (rse['dirs']['toDelete'] | rse['dirs']['protected']) == rse['dirs']['allUnmerged']: - rse['counters']['dirsToDelete'] = -1 - msg = "Incorrect set check while trying to estimate the final set for deletion." - raise MSUnmergedPlineExit(msg) - - # Get rid of 'allUnmerged' directories - rse['dirs']['allUnmerged'].clear() - - # NOTE: Here we may want to filter out all protected files from allUnmerged and leave just those - # eligible for deletion. This will minimize the iteration time of the filters - # from toDelete later on. - # while rse['files']['allUnmerged' - - # Now create the filters for rse['files']['toDelete'] - those should be pure generators - # A simple generator: - def genFunc(pattern, iterable): - for i in iterable: - if i.startswith(pattern): - yield i - - # NOTE: If the 'dirFilterIncl' is non empty then the cleaning process will - # be enclosed only in this part of the tree and will ignore anything - # from /store/unmerged/ which does not belong to the included filter - # NOTE: 'dirFilterExcl' is always applied. - - # Merge the additional filters into a final set to be applied: - dirFilterIncl = set(self.msConfig['dirFilterIncl']) - dirFilterExcl = set(self.msConfig['dirFilterExcl']) - - # Update directory/files with no service filters - if not dirFilterIncl and not dirFilterExcl: - for dirName in rse['dirs']['toDelete']: - rse['files']['toDelete'][dirName] = genFunc(dirName, rse['files']['allUnmerged']) - rse['counters']['dirsToDelete'] = len(rse['files']['toDelete']) - self.logger.info("RSE: %s: %s", rse['name'], twFormat(rse, maxLength=8)) - return rse - - # If we are here, then there are service filters... - for dirName in rse['dirs']['toDelete']: - # apply exclusion filter - dirFilterExclMatch = [] - for pathExcl in dirFilterExcl: - dirFilterExclMatch.append(dirName.startswith(pathExcl)) - if any(dirFilterExclMatch): - # then it matched one of the exclusion paths - continue - if not dirFilterIncl: - # there is no inclusion filter, simply add this directory/files - rse['files']['toDelete'][dirName] = genFunc(dirName, rse['files']['allUnmerged']) - continue - - # apply inclusion filter - for pathIncl in dirFilterIncl: - if dirName.startswith(pathIncl): - rse['files']['toDelete'][dirName] = genFunc(dirName, rse['files']['allUnmerged']) - break + :param dirPath: string with a shorter version of the LFN + :return _type_: True if the directory can be deleted, False otherwise + """ + # Check against the inclusion filter + if self.msConfig['dirFilterIncl']: + respFilter = [dirPath.startswith(filterItem) for filterItem in self.msConfig['dirFilterIncl']] + if any(respFilter) is False: + # does not match against any of the inclusion filters + return False - # Now apply the filters back to the set in rse['dirs']['toDelete'] - rse['dirs']['toDelete'] = set(rse['files']['toDelete'].keys()) + # Check against the exclusion filter + if self.msConfig['dirFilterExcl']: + respFilter = [dirPath.startswith(filterItem) for filterItem in self.msConfig['dirFilterExcl']] + if any(respFilter) is True: + # matches against at least one exclusion filter + return False - # Update the counters: - rse['counters']['dirsToDelete'] = len(rse['files']['toDelete']) - self.logger.info("RSE: %s: %s", rse['name'], twFormat(rse, maxLength=8)) - return rse + # Finally, check against the protected LFNs + return dirPath not in self.protectedLFNs def getPfn(self, rse): """ @@ -747,10 +690,11 @@ def getPfn(self, rse): :param rse: The RSE to be checked :return: rse """ + self.logger.info("Fetching PFN map for RSE: %s.", rse['name']) # NOTE: pfnPrefix here is considered the full part of the pfn up to the # beginning of the lfn part rather than just the protocol prefix - if rse['files']['allUnmerged']: - lfn = next(iter(rse['files']['allUnmerged'])) + if rse['dirs']['toDelete']: + lfn = next(iter(rse['dirs']['toDelete'])) pfnDict = self.rucio.getPFN(rse['name'], lfn, operation='delete') pfnFull = pfnDict[lfn] if self.regStoreUnmergedPfn.match(pfnFull): @@ -760,6 +704,10 @@ def getPfn(self, rse): msg = "Could not establish the correct pfn Prefix for RSE: %s. " % rse['name'] msg += "Will fall back to calling Rucio on a directory basis for lfn to pfn resolution." self.logger.warning(msg) + if not rse['pfnPrefix']: + msg = f"Failed to resolve PFN from LFN for RSE: {rse['name']}. Will retry later." + raise MSUnmergedPlineExit(msg) + return rse # @profile @@ -771,6 +719,7 @@ def purgeRseObj(self, rse, dumpRSEtoLog=False): :param dumpRSEToLog: Dump the whole RSEobject into the service log. :return: rse """ + self.logger.info("Purging RSE in-memory information for RSE: %s.", rse['name']) msg = "\n----------------------------------------------------------" msg += "\nMSUnmergedRSE: \n%s" msg += "\n----------------------------------------------------------" @@ -788,6 +737,8 @@ def updateRSETimestamps(self, rse, start=True, end=True): :param rse: The RSE to work on :return: rse """ + self.logger.info("Updating timestamps for RSE: %s. With start: %s, end: %s.", rse['name'], start, end) + rseName = rse['name'] currTime = time() @@ -820,6 +771,8 @@ def updateServiceCounters(self, rse): :param pName: The pipeline name whose counters to be updated :return: rse """ + self.logger.info("Updating service counters for RSE: %s.", rse['name']) + pName = self.plineUnmerged.name self.plineCounters[pName]['totalNumFiles'] += rse['counters']['totalNumFiles'] self.plineCounters[pName]['totalNumDirs'] += rse['counters']['totalNumDirs'] @@ -869,7 +822,7 @@ def uploadRSEToMongoDB(self, rse, fullRSEToDB=False, overwrite=True): :return: rse """ try: - self.logger.info("RSE: %s Writing rse data to MongoDB.", rse['name']) + self.logger.info("Uploading RSE information to MongoDB for RSE: %s.", rse['name']) rse.writeRSEToMongoDB(self.msUnmergedColl, fullRSEToDB=fullRSEToDB, overwrite=overwrite, retryCount=self.msConfig['mongoDBRetryCount']) except NotPrimaryError: msg = "Could not write RSE to MongoDB for the maximum of %s mongoDBRetryCounts configured." % self.msConfig['mongoDBRetryCount'] diff --git a/src/python/WMCore/Services/RucioConMon/RucioConMon.py b/src/python/WMCore/Services/RucioConMon/RucioConMon.py index 5a695e6727..a26e0a08ec 100644 --- a/src/python/WMCore/Services/RucioConMon/RucioConMon.py +++ b/src/python/WMCore/Services/RucioConMon/RucioConMon.py @@ -40,14 +40,13 @@ def __init__(self, url, logger=None, configDict=None): super(RucioConMon, self).__init__(configDict) self['logger'].debug("Initializing RucioConMon with url: %s", self['endpoint']) - def _getResult(self, uri, callname="", clearCache=False, args=None, binary=False): + def _getResult(self, uri, callname="", clearCache=False, args=None): """ Either fetch data from the cache file or query the data-service :param uri: The endpoint uri :param callname: alias for caller function :param clearCache: parameter to control the cache behavior :param args: additional parameters to HTTP request call - :param binary: specifies request for binary object from HTTP requests (e.g. zipped content) :return: A dictionary """ @@ -68,31 +67,26 @@ def _getResult(self, uri, callname="", clearCache=False, args=None, binary=False if clearCache: self.clearCache(cachedApi, args) results = '{}' # explicitly define results which will be loaded by json.loads below - if binary: - with self.refreshCache(cachedApi, apiUrl, decoder=False, binary=True) as istream: - results = gzip.decompress(istream.read()) - return results - else: - with self.refreshCache(cachedApi, apiUrl) as istream: - results = istream.read() - - results = json.loads(results) - return results + with self.refreshCache(cachedApi, apiUrl, decoder=True, binary=False) as istream: + results = istream.read() + return json.loads(results) - def _getResultZipped(self, uri, callname="", clearCache=True, args=None): + def _getResultZipped(self, uri, callname="", clearCache=True): """ - This method is retrieving a zipped file from the uri privided, instead - of the normal json + This method retrieves gzipped content, instead of the standard json format. :param uri: The endpoint uri :param callname: alias for caller function :param clearCache: parameter to control the cache behavior - :param args: additional parameters to HTTP request call - :return: a list of LFNs + :return: yields a single record from the data retrieved """ - data = self._getResult(uri, callname, clearCache, args, binary=True) - # convert bytes which we received upstream to string - data = decodeBytesToUnicode(data) - return [f for f in data.split('\n') if f] + cachedApi = callname + if clearCache: + self.clearCache(cachedApi) + + with self.refreshCache(cachedApi, uri, decoder=False, binary=True) as istream: + for line in istream: + line = decodeBytesToUnicode(line).replace("\n", "") + yield line def getRSEStats(self): """ @@ -109,7 +103,7 @@ def getRSEUnmerged(self, rseName, zipped=False): Gets the list of all unmerged files in an RSE :param rseName: The RSE whose list of unmerged files to be retrieved :param zipped: If True the interface providing the zipped lists will be called - :return: A list of unmerged files for the RSE in question + :return: a generator of unmerged files for the RSE in question """ # NOTE: The default API provided by Rucio Consistency Monitor is in a form of a # zipped file/stream. Currently we are using the newly provided json API @@ -117,12 +111,14 @@ def getRSEUnmerged(self, rseName, zipped=False): # implement the method with the zipped API and use disc cache for # reading/streaming from file. This will prevent any set arithmetic # in the future. - if not zipped: - uri = "files?rse=%s&format=json" % rseName - rseUnmerged = self._getResult(uri, callname=rseName) - return rseUnmerged - else: + if zipped: uri = "files?rse=%s&format=raw" % rseName callname = '{}.zipped'.format(rseName) - rseUnmerged = self._getResultZipped(uri, callname=callname, clearCache=True) - return rseUnmerged + rseUnmerged = self._getResultZipped(uri, callname=callname, clearCache=True) + else: + uri = "files?rse=%s&format=json" % rseName + callname = '{}.json'.format(rseName) + rseUnmerged = self._getResult(uri, callname=callname) + # now lazily return items + for item in rseUnmerged: + yield item diff --git a/src/python/WMCore/Services/WMStatsServer/WMStatsServer.py b/src/python/WMCore/Services/WMStatsServer/WMStatsServer.py index 72993002ad..7d55ea7eeb 100644 --- a/src/python/WMCore/Services/WMStatsServer/WMStatsServer.py +++ b/src/python/WMCore/Services/WMStatsServer/WMStatsServer.py @@ -128,7 +128,7 @@ def getChildParentDatasetMap(self, requestType="StepChain", parentageResolved=Fa def getProtectedLFNs(self): """ A method to be used for fetching a list of all protected lfns from WMStatServer - :returns: A list of lfns + :returns: a unique list of protected LFNs """ callname = 'protectedlfns' - return self._getResult(callname, verb="GET") + return list(set(self._getResult(callname, verb="GET"))) diff --git a/test/python/WMCore_t/MicroService_t/MSUnmerged_t/MSUnmerged_t.py b/test/python/WMCore_t/MicroService_t/MSUnmerged_t/MSUnmerged_t.py index 33a11b093e..4be2e86d57 100644 --- a/test/python/WMCore_t/MicroService_t/MSUnmerged_t/MSUnmerged_t.py +++ b/test/python/WMCore_t/MicroService_t/MSUnmerged_t/MSUnmerged_t.py @@ -8,7 +8,6 @@ import os import unittest -from future.utils import viewkeys from mock import mock from Utils.PythonVersion import PY3 @@ -70,8 +69,8 @@ def getBasicRSEData(): rse = {"name": "T2_TestRSE", "counters": {"dirsToDeleteAll": 0}, "dirs": {"allUnmerged": set(), - "toDelete": {}, - "protected": []}, + "toDelete": set(), + "protected": set()}, "files": {"allUnmerged": set(lfns), "toDelete": {}, "protected": []} @@ -168,13 +167,13 @@ def testPlineUnmerged(self): rse = self.msUnmerged.updateRSETimestamps(rse, start=True, end=False) rse = self.msUnmerged.consRecordAge(rse) rse = self.msUnmerged.getUnmergedFiles(rse) - rse = self.msUnmerged.filterUnmergedFiles(rse) + rse = self.msUnmerged.getPfn(rse) rse = self.msUnmerged.cleanRSE(rse) rse = self.msUnmerged.updateServiceCounters(rse) rse = self.msUnmerged.updateRSETimestamps(rse, start=False, end=True) # self.msUnmerged.plineUnmerged.run(rse) expectedRSE = {'name': 'T2_US_Wisconsin', - 'pfnPrefix': None, + 'pfnPrefix': mock.ANY, 'isClean': False, 'rucioConMonStatus': None, 'timestamps': {'endTime': mock.ANY, @@ -185,23 +184,18 @@ def testPlineUnmerged(self): "counters": {"totalNumFiles": 11938, "totalNumDirs": 11, "dirsToDelete": 6, - "filesToDelete": 0, + "filesToDelete": 10934, "filesDeletedSuccess": 0, "filesDeletedFail": 0, "dirsDeletedSuccess": 0, "dirsDeletedFail": 0, "gfalErrors": {}}, - 'files': {'allUnmerged': mock.ANY, + 'files': {'allUnmerged': [], 'deletedFail': set(), 'deletedSuccess': set(), 'protected': {}, - 'toDelete': {'/store/unmerged/Phase2HLTTDRSummer20ReRECOMiniAOD/DYToLL_M-50_TuneCP5_14TeV-pythia8/FEVT/FlatPU0To200_pilot_111X_mcRun4_realistic_T15_v1-v2': mock.ANY, - '/store/unmerged/Run2016G/DoubleEG/MINIAOD/UL2016_MiniAODv2-v1': mock.ANY, - '/store/unmerged/SAM/testSRM/SAM-cms-lvs-gridftp.hep.wisc.edu': mock.ANY, - '/store/unmerged/SAM/testSRM/SAM-cms-lvs-gridftp.hep.wisc.edu/lcg-util': mock.ANY, - '/store/unmerged/SAM/testSRM/SAM-cmssrm.hep.wisc.edu': mock.ANY, - '/store/unmerged/SAM/testSRM/SAM-cmssrm.hep.wisc.edu/lcg-util': mock.ANY}}, - 'dirs': {'allUnmerged': set(), + 'toDelete': {}}, + 'dirs': {'allUnmerged': [], "deletedSuccess": set(), "deletedFail": set(), 'protected': {'/store/unmerged/RunIIAutumn18FSPremix/PMSSM_set_1_prompt_1_TuneCP2_13TeV-pythia8/AODSIM/GridpackScan_102X_upgrade2018_realistic_v15-v1', @@ -233,45 +227,40 @@ def testCutPath(self): def testFilterInclDirectories(self): "Test MSUnmerged with including directories filter" - toDeleteDict = {"/store/unmerged/data/prod/2018/1/12": ["/store/unmerged/data/prod/2018/1/12/log6.tar"], - "/store/unmerged/express/prod/2020/1/12": ["/store/unmerged/express/prod/2020/1/12/log8.tar", - "/store/unmerged/express/prod/2020/1/12/log9.tar"]} + toDeleteDict = {"/store/unmerged/data/prod/2018/1/12", "/store/unmerged/express/prod/2020/1/12"} rseData = getBasicRSEData() self.msUnmerged.msConfig['dirFilterIncl'] = ["/store/unmerged/data/prod/2018/", "/store/unmerged/express"] self.msUnmerged.protectedLFNs = set() - filterData = self.msUnmerged.filterUnmergedFiles(rseData) - self.assertEqual(filterData['counters']['dirsToDelete'], 2) - self.assertItemsEqual(viewkeys(filterData['files']['toDelete']), viewkeys(toDeleteDict)) - self.assertItemsEqual(list(filterData['files']['toDelete']['/store/unmerged/data/prod/2018/1/12']), - toDeleteDict['/store/unmerged/data/prod/2018/1/12']) - self.assertItemsEqual(list(filterData['files']['toDelete']['/store/unmerged/express/prod/2020/1/12']), - toDeleteDict['/store/unmerged/express/prod/2020/1/12']) + filterData = set() + for dirPath in rseData['dirs']['allUnmerged']: + if self.msUnmerged._isDeletable(dirPath): + filterData.add(dirPath) + + self.assertEqual(len(filterData), 2) + self.assertItemsEqual(filterData, toDeleteDict) def testFilterExclDirectories(self): "Test MSUnmerged with excluding directories filter" - toDeleteDict = {"/store/unmerged/data/prod/2018/1/12": ["/store/unmerged/data/prod/2018/1/12/log6.tar"], - "/store/unmerged/express/prod/2020/1/12": ["/store/unmerged/express/prod/2020/1/12/log8.tar", - "/store/unmerged/express/prod/2020/1/12/log9.tar"]} + toDeleteDict = {"/store/unmerged/data/prod/2018/1/12", "/store/unmerged/express/prod/2020/1/12"} rseData = getBasicRSEData() self.msUnmerged.msConfig['dirFilterExcl'] = ["/store/unmerged/logs", "/store/unmerged/data/prod/2019", "/store/unmerged/alan/prod"] self.msUnmerged.protectedLFNs = set() - filterData = self.msUnmerged.filterUnmergedFiles(rseData) - self.assertEqual(filterData['counters']['dirsToDelete'], 2) - self.assertItemsEqual(viewkeys(filterData['files']['toDelete']), viewkeys(toDeleteDict)) - self.assertItemsEqual(list(filterData['files']['toDelete']['/store/unmerged/data/prod/2018/1/12']), - toDeleteDict['/store/unmerged/data/prod/2018/1/12']) - self.assertItemsEqual(list(filterData['files']['toDelete']['/store/unmerged/express/prod/2020/1/12']), - toDeleteDict['/store/unmerged/express/prod/2020/1/12']) + filterData = set() + for dirPath in rseData['dirs']['allUnmerged']: + if self.msUnmerged._isDeletable(dirPath): + filterData.add(dirPath) + + self.assertEqual(len(filterData), 2) + self.assertItemsEqual(filterData, toDeleteDict) def testFilterInclExclDirectories(self): "Test MSUnmerged with including and excluding directories filter" - toDeleteDict = {"/store/unmerged/express/prod/2020/1/12": ["/store/unmerged/express/prod/2020/1/12/log8.tar", - "/store/unmerged/express/prod/2020/1/12/log9.tar"]} + toDeleteDict = {"/store/unmerged/express/prod/2020/1/12"} rseData = getBasicRSEData() self.msUnmerged.msConfig['dirFilterIncl'] = ["/store/unmerged/data/prod/2018/", "/store/unmerged/express"] @@ -279,8 +268,10 @@ def testFilterInclExclDirectories(self): "/store/unmerged/data/prod", "/store/unmerged/alan/prod"] self.msUnmerged.protectedLFNs = set() - filterData = self.msUnmerged.filterUnmergedFiles(rseData) - self.assertEqual(filterData['counters']['dirsToDelete'], 1) - self.assertItemsEqual(viewkeys(filterData['files']['toDelete']), viewkeys(toDeleteDict)) - self.assertItemsEqual(list(filterData['files']['toDelete']['/store/unmerged/express/prod/2020/1/12']), - toDeleteDict['/store/unmerged/express/prod/2020/1/12']) + filterData = set() + for dirPath in rseData['dirs']['allUnmerged']: + if self.msUnmerged._isDeletable(dirPath): + filterData.add(dirPath) + + self.assertEqual(len(filterData), 1) + self.assertItemsEqual(filterData, toDeleteDict) diff --git a/test/python/WMCore_t/MicroService_t/MSUnmerged_t/test_gfal.py b/test/python/WMCore_t/MicroService_t/MSUnmerged_t/test_gfal.py new file mode 100644 index 0000000000..545c2f7763 --- /dev/null +++ b/test/python/WMCore_t/MicroService_t/MSUnmerged_t/test_gfal.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python +import logging + +try: + import gfal2 +except ImportError: + # in case we do not have gfal2 installed + print("FAILED to import gfal2. Use it only in emulateGfal2=True mode!!!") + gfal2 = None + +def createGfal2Context(logLevel="normal", emulate=False): + """ + Create a gfal2 context object + :param logLevel: string with the gfal2 log level + :param emulate: boolean to be used by unit tests + :return: the gfal2 context object + """ + if emulate: + return None + ctx = gfal2.creat_context() + gfal2.set_verbose(gfal2.verbose_level.names[logLevel]) + return ctx + +def testGFAL(ctx): + logger = logging.getLogger() + rseDirs = ["/store/unmerged/Run3Summer22EENanoAODv11/Wto2Q-3Jets_HT-200to400_TuneCP5_13p6TeV_madgraphMLM-pythia8/NANOAODSIM/126X_mcRun3_2022_realistic_postEE_v1-v3", + "/store/unmerged/RunIISummer20UL18NanoAODv9/GluGluHoffshell_HToWWToENuTauNu_TuneCP5_13TeV_MCFM701-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2"] + + for dirPfn in rseDirs: + try: + # NOTE: For gfal2 rmdir() exit status of 0 is success + rmdirSuccess = ctx.rmdir(dirPfn) == 0 + except gfal2.GError as gfalExc: + logger.warning("MISSING directory: %s, gfal code=%s", dirPfn, gfalExc.code) + +def main(): + ctx = createGfal2Context() + testGFAL(ctx) + print("succeeded") + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/test/python/WMCore_t/Services_t/Rucio_t/RucioConMon_t.py b/test/python/WMCore_t/Services_t/Rucio_t/RucioConMon_t.py index 77343c42f6..98607cf599 100644 --- a/test/python/WMCore_t/Services_t/Rucio_t/RucioConMon_t.py +++ b/test/python/WMCore_t/Services_t/Rucio_t/RucioConMon_t.py @@ -8,7 +8,6 @@ import unittest from nose.plugins.attrib import attr - from WMCore.Services.RucioConMon.RucioConMon import RucioConMon @@ -17,17 +16,17 @@ class RucioConMonTest(unittest.TestCase): Unit tests for RucioConMon Service module """ - @attr("integration") +# @attr("integration") def testGetRSEUnmerged(self): """ Test getRSEUnmerged method using both zipped and unzipped requests This test uses specific rse name which can be changed to any other RSE. """ # url = "https://cmsweb.cern.ch/rucioconmon/WM/files?rse=T2_TR_METU&format=raw" - mgr = RucioConMon("https://cmsweb.cern.ch/rucioconmon") - rseName = "T2_TR_METU" - dataUnzipped = mgr.getRSEUnmerged(rseName, zipped=False) - dataZipped = mgr.getRSEUnmerged(rseName, zipped=True) + mgr = RucioConMon("https://cmsweb.cern.ch/rucioconmon/unmerged") + rseName = "T2_RU_ITEP" + dataUnzipped = [item for item in mgr.getRSEUnmerged(rseName, zipped=False)] + dataZipped = [item for item in mgr.getRSEUnmerged(rseName, zipped=True)] self.assertTrue(dataUnzipped == dataZipped)