diff --git a/src/python/Publisher/PublisherMasterRucio.py b/src/python/Publisher/PublisherMasterRucio.py index 581b2a9b42..1e4ed670a3 100644 --- a/src/python/Publisher/PublisherMasterRucio.py +++ b/src/python/Publisher/PublisherMasterRucio.py @@ -78,8 +78,10 @@ def __init__(self, confFile=None, quiet=False, debug=True, testMode=False): def active_tasks(self, crabServer): """ :param crabServer: CRABRest object to access proper REST as createdin __init__ method - :return: a list of dictionaries [{'task':taskname, 'username':username , 'fileDicts':list_of_filedicts}, ...]. - First two keys are obvious, 'fileDicts' is a list of dictionaries, one per file with keys: + :return: a list of dictionaries, one for each task which has files waiting for publication + in filetransfersdb: [{'task':taskname, 'username':username , 'scope':rucioScope, + 'fileDicts':list_of_filedicts}, ...]. + First three keys are obvious, 'fileDicts' is a list of dictionaries, one per file with keys: 'username', 'cache_url', 'source_lfn', 'publication_state', 'destination', 'last_update', 'input_dataset', 'dbs_url', 'aso_worker', 'transfer_state', 'destination_lfn', 'dbs_blockname', 'block_complete' @@ -131,6 +133,10 @@ def active_tasks(self, crabServer): for file in filesToPublish: if file['taskname'] == task: taskDict['username'] = file['username'] + if file['dbs_blockname']: # Ruio_ASO puts there a scope:name DID + (rucioScope, blockName) = file['dbs_blockname'].split(':') + taskDict['scope'] = rucioScope + file['dbs_blockname'] = blockName taskDict['destination'] = file['destination'] fileList.append(file) taskDict['fileDicts'] = fileList @@ -284,19 +290,12 @@ def startSlave(self, task): # pylint: disable=too-many-branches, too-many-local if blockName not in numAcquiredFilesPerBlock: numAcquiredFilesPerBlock[blockName] = 0 numAcquiredFilesPerBlock[blockName] += 1 - blocksToPublish.add(blockName) + blocksToPublish.add(blockName) # make sure that we have acquired all files in that block. ref #8491 for blockName in blocksToPublish.copy(): countOfAcquiredFilesInBlock = numAcquiredFilesPerBlock[blockName] - # start by "finding" rucio scope, later we will pick this from the filetransfers DB - rucioScope = f"user.{task['username']}" - # beware group scope - destLfnParts = task['fileDicts'][0]['destination_lfn'].split('/') - if destLfnParts[2] == 'group': - groupName = destLfnParts[4] - rucioScope = f"group.{groupName}" - countOfFilesInRucioDataset = len(list(self.rucio.list_content(scope=rucioScope, name=blockName))) + countOfFilesInRucioDataset = len(list(self.rucio.list_content(scope=task['scope'], name=blockName))) if countOfAcquiredFilesInBlock != countOfFilesInRucioDataset: # wait until we have acquired all files which Rucio says are in this block blocksToPublish.remove(blockName)