Skip to content

Commit

Permalink
mark transfer fail when add_replicas raise exception (#7866)
Browse files Browse the repository at this point in the history
  • Loading branch information
novicecpp authored Sep 11, 2023
1 parent e7c14e3 commit d4896fe
Showing 1 changed file with 62 additions and 20 deletions.
82 changes: 62 additions & 20 deletions src/python/ASO/Rucio/Actions/RegisterReplicas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import itertools
import copy
from rucio.rse.rsemanager import find_matching_scheme
from rucio.common.exception import RSENotFound, RSEProtocolNotSupported

import ASO.Rucio.config as config # pylint: disable=consider-using-from-import
from ASO.Rucio.Actions.BuildDBSDataset import BuildDBSDataset
Expand Down Expand Up @@ -42,8 +43,10 @@ def execute(self):
transferItemsWithoutLogfile = self.skipLogTransfers(transferGenerator)
preparedReplicasByRSE = self.prepare(transferItemsWithoutLogfile)
# Add file to rucio by RSE
successFileDocs = self.addFilesToRucio(preparedReplicasByRSE)
successFileDocs, failFileDocs = self.addFilesToRucio(preparedReplicasByRSE)
self.logger.debug(f'successFileDocs: {successFileDocs}')
self.logger.debug(f'failFileDocs: {failFileDocs}')
self.updateRESTFileDocStateToFailed(failFileDocs)
# Add replicas to transfer container
transferFileDocs = self.addReplicasToContainer(successFileDocs, self.transfer.transferContainer)
# Update state of files in REST in FILETRANSFERDB table
Expand Down Expand Up @@ -141,11 +144,12 @@ def addFilesToRucio(self, prepareReplicas):
:param prepareReplicas: dict return from `prepare()` method.
:type prepareReplicas: dict
:returns: a list contain dict with REST xfer id and replicas LFN.
:rtype: list
:returns: tuple of a list contain dict with REST xfer id and replicas LFN.
:rtype: tuple
"""

ret = []
retSuccess = []
retFail = []
self.logger.debug(f'Prepare replicas: {prepareReplicas}')
for rse, replicas in prepareReplicas.items():
self.logger.debug(f'Registering replicas from {rse}')
Expand All @@ -160,22 +164,35 @@ def addFilesToRucio(self, prepareReplicas):
# the new value.
# See https://github.com/dmwm/CMSRucio/issues/343#issuecomment-1543663323
self.rucioClient.add_replicas(rse, rs)
except Exception as ex:
# Note that 2 exceptions we encounter so far here is due to
# LFN to PFN converstion and RSE protocols.
# https://github.com/dmwm/CRABServer/issues/7632
self.logger.error(f'add_replicas(rse, r): rse={rse} rs={rs}')
raise RucioTransferException('Something wrong with adding new replicas') from ex
for idx, r in chunk:
fileDoc = {
'id': idx,
'name': r['name'],
'dataset': None,
'blockcomplete': None,
'ruleid': None,
}
ret.append(fileDoc)
return ret
for idx, r in chunk:
fileDoc = {
'id': idx,
'name': r['name'],
'dataset': None,
'blockcomplete': None,
'ruleid': None,
}
retSuccess.append(fileDoc)
except (RSENotFound, RSEProtocolNotSupported) as ex:
# Usually when exception occur it affect the whole chunk,
# no need to retry individually.
#
# Note that at this point, it still possible for job to
# retry because file is not register in replica yet.
self.logger.exception(ex)
self.logger.warning(f'Exception is raised at add_replicas(rse, rs)')
self.logger.debug(f'rse={rse}, rs={rs}')
for idx, r in chunk:
fileDoc = {
'id': idx,
'name': r['name'],
'dataset': None,
'blockcomplete': None,
'ruleid': None,
'failReason': f'RUCIO_Transfers.py raised an exception: {ex.__class__.__name__}'
}
retFail.append(fileDoc)
return retSuccess, retFail

def addReplicasToContainer(self, fileDocs, container):
"""
Expand Down Expand Up @@ -330,3 +347,28 @@ def updateRESTFileDocStateToSubmitted(self, fileDocs):
'list_of_fts_id': [x['ruleid'] for x in fileDocs]
}
updateToREST(self.crabRESTClient, 'filetransfers', 'updateTransfers', restFileDoc)

def updateRESTFileDocStateToFailed(self, fileDocs):
"""
Update files transfer state in filetransfersdb to FAILED to let PostJob put it on retry loop.
:param fileDocs: list of dict contains transferItems's ID and its
information.
:type fileDocs: list
"""
# NOTE: Do not put key with NoneType value to restFileDoc. On the server
# side, it will recognize as string 'None' instead of NoneType and
# break "if" in https://github.com/dmwm/CRABServer/blob/e7c14e3115c53c5542bb0884d618a45397f5886c/src/python/CRABInterface/RESTFileTransfers.py#L151
num = len(fileDocs)
restFileDoc = {
'asoworker': 'rucio',
'list_of_ids': [x['id'] for x in fileDocs],
'list_of_transfer_state': ['FAILED']*num,
#'list_of_dbs_blockname': None,
#'list_of_block_complete': None,
#'list_of_fts_instance': None,
'list_of_failure_reason': [x['failReason'] for x in fileDocs],
'list_of_retry_value': [0]*num,
#'list_of_fts_id': None,
}
updateToREST(self.crabRESTClient, 'filetransfers', 'updateTransfers', restFileDoc)

0 comments on commit d4896fe

Please sign in to comment.