Skip to content

Commit

Permalink
Adatp Pub_rucio to scope:name in dbs_blockname (#8533)
Browse files Browse the repository at this point in the history
  • Loading branch information
belforte authored Jun 27, 2024
1 parent b65f5e8 commit 61ebff7
Showing 1 changed file with 10 additions and 11 deletions.
21 changes: 10 additions & 11 deletions src/python/Publisher/PublisherMasterRucio.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ def __init__(self, confFile=None, quiet=False, debug=True, testMode=False):
def active_tasks(self, crabServer):
"""
:param crabServer: CRABRest object to access proper REST as createdin __init__ method
:return: a list of dictionaries [{'task':taskname, 'username':username , 'fileDicts':list_of_filedicts}, ...].
First two keys are obvious, 'fileDicts' is a list of dictionaries, one per file with keys:
:return: a list of dictionaries, one for each task which has files waiting for publication
in filetransfersdb: [{'task':taskname, 'username':username , 'scope':rucioScope,
'fileDicts':list_of_filedicts}, ...].
First three keys are obvious, 'fileDicts' is a list of dictionaries, one per file with keys:
'username', 'cache_url', 'source_lfn', 'publication_state', 'destination',
'last_update', 'input_dataset', 'dbs_url', 'aso_worker',
'transfer_state', 'destination_lfn', 'dbs_blockname', 'block_complete'
Expand Down Expand Up @@ -131,6 +133,10 @@ def active_tasks(self, crabServer):
for file in filesToPublish:
if file['taskname'] == task:
taskDict['username'] = file['username']
if file['dbs_blockname']: # Ruio_ASO puts there a scope:name DID
(rucioScope, blockName) = file['dbs_blockname'].split(':')
taskDict['scope'] = rucioScope
file['dbs_blockname'] = blockName
taskDict['destination'] = file['destination']
fileList.append(file)
taskDict['fileDicts'] = fileList
Expand Down Expand Up @@ -284,19 +290,12 @@ def startSlave(self, task): # pylint: disable=too-many-branches, too-many-local
if blockName not in numAcquiredFilesPerBlock:
numAcquiredFilesPerBlock[blockName] = 0
numAcquiredFilesPerBlock[blockName] += 1
blocksToPublish.add(blockName)
blocksToPublish.add(blockName)

# make sure that we have acquired all files in that block. ref #8491
for blockName in blocksToPublish.copy():
countOfAcquiredFilesInBlock = numAcquiredFilesPerBlock[blockName]
# start by "finding" rucio scope, later we will pick this from the filetransfers DB
rucioScope = f"user.{task['username']}"
# beware group scope
destLfnParts = task['fileDicts'][0]['destination_lfn'].split('/')
if destLfnParts[2] == 'group':
groupName = destLfnParts[4]
rucioScope = f"group.{groupName}"
countOfFilesInRucioDataset = len(list(self.rucio.list_content(scope=rucioScope, name=blockName)))
countOfFilesInRucioDataset = len(list(self.rucio.list_content(scope=task['scope'], name=blockName)))
if countOfAcquiredFilesInBlock != countOfFilesInRucioDataset:
# wait until we have acquired all files which Rucio says are in this block
blocksToPublish.remove(blockName)
Expand Down

0 comments on commit 61ebff7

Please sign in to comment.