Skip to content

Commit

Permalink
Add submitrefused 7989 (#8564)
Browse files Browse the repository at this point in the history
* add SubmissionRefusedException in StageoutCheck

* move upload msg to common location in Handler

* use SubmissionRefusedException in DBSDataDiscovery

* use SubmissionRefusedException in Splitter

* use SubmissionRefusedException also for Rucio StageoutCheck

* use SubmissionRefusedException in Rucio and User DataDiscovery

* use SubmissionRefusedException in Dagman* actions

* use SubmissionRefusedException in Rucio actions

* pylint

* raise SubmissionRefused also when error is catched in Handler
  • Loading branch information
belforte authored Jul 30, 2024
1 parent 3e49d49 commit e6227ee
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 87 deletions.
9 changes: 5 additions & 4 deletions src/python/ServerUtilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,9 @@ def isFailurePermanent(reason, gridJob=False):
permanent failure and submit task or not.
"""

checkQuota = " Please check that you have write access to destination site and that your quota is not exceeded, use crab checkwrite for more information."
refuseToSubmit = " Can't submit task because write check at destination site fails."
checkQuota = "Please check that you have write access to destination site\n and that your quota is not exceeded"
checkQuota += "\n use crab checkwrite for more information."
refuseToSubmit = " Can't submit task because write check at destination site fails.\n"
if gridJob:
refuseToSubmit = ""
for exitCode in STAGEOUT_ERRORS:
Expand Down Expand Up @@ -497,14 +498,14 @@ def getUsernameFromTaskname(taskname):

def getTimeFromTaskname(taskname):
""" Get the submission time from the taskname and return the seconds since epoch
corresponding to it. The function is not currently used.
corresponding to it.
"""

# validate taskname. In principle not necessary, but..
if not isinstance(taskname, str):
raise TypeError('In ServerUtilities.getTimeFromTaskname: "taskname" parameter must be a string')
stime = taskname.split(':')[0] # s stands for string
stimePattern = '^\d{6}_\d{6}$'
stimePattern = r'^\d{6}_\d{6}$'
if not re.match(stimePattern, stime):
raise ValueError('In ServerUtilities.getTimeFromTaskname: "taskname" parameter must match %s' % stimePattern)
# convert the time
Expand Down
36 changes: 18 additions & 18 deletions src/python/TaskWorker/Actions/DBSDataDiscovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from RucioUtils import getNativeRucioClient, getTapeRecallUsage

from ServerUtilities import MAX_LUMIS_IN_BLOCK, parseDBSInstance, isDatasetUserDataset
from TaskWorker.WorkerExceptions import TaskWorkerException, TapeDatasetException
from TaskWorker.WorkerExceptions import TaskWorkerException, TapeDatasetException, SubmissionRefusedException
from TaskWorker.Actions.DataDiscovery import DataDiscovery
from TaskWorker.Actions.RucioActions import RucioAction

Expand All @@ -35,9 +35,9 @@ def checkDatasetStatus(self, dataset, kwargs):
""" as the name says """
res = self.dbs.dbs.listDatasets(dataset=dataset, detail=1, dataset_access_type='*')
if not res:
raise TaskWorkerException(f"Cannot find dataset {dataset} in {self.dbsInstance} DBS instance")
raise SubmissionRefusedException(f"Cannot find dataset {dataset} in {self.dbsInstance} DBS instance")
if len(res) > 1:
raise TaskWorkerException(f"Found more than one dataset while checking in DBS the status of {dataset}")
raise SubmissionRefusedException(f"Found more than one dataset while checking in DBS the status of {dataset}")
res = res[0]
#import pprint
#self.logger.info("Input dataset details: %s", pprint.pformat(res))
Expand All @@ -53,7 +53,7 @@ def checkDatasetStatus(self, dataset, kwargs):
msg += " To allow CRAB to consider a dataset that is not 'VALID', set Data.allowNonValidInputDataset = True in the CRAB configuration."
msg += " Notice that this will not force CRAB to run over all files in the dataset;"
msg += " CRAB will still check if there are any valid files in the dataset and run only over those files."
raise TaskWorkerException(msg)
raise SubmissionRefusedException(msg)
msg = f"The input dataset {dataset} is not 'VALID' but '{accessType}'."
msg += " CRAB will check if there are any valid files in the dataset and run only over those files."
if accessType == 'DEPRECATED':
Expand Down Expand Up @@ -92,7 +92,7 @@ def checkBlocksSize(self, blocks):
msg += "\nusing FileBased split algorithm and avoiding any additional request"
msg += "\nwich may cause lumi information to be looked up. See CRAB FAQ for more info:"
msg += "\nhttps://twiki.cern.ch/twiki/bin/view/CMSPublic/CRAB3FAQ"
raise TaskWorkerException(msg)
raise SubmissionRefusedException(msg)

@staticmethod
def lumiOverlap(d1, d2):
Expand All @@ -119,22 +119,22 @@ def validateInputUserFiles(self, dataset=None, files=None):
for file in files:
Lexicon.lfn(file) # will raise if file is not a valid lfn
except AssertionError as ex:
raise TaskWorkerException(f"input file is not a valid LFN: {file}") from ex
raise SubmissionRefusedException(f"input file is not a valid LFN: {file}") from ex
filesInDataset = self.dbs.listDatasetFiles(datasetPath=dataset)
for file in files:
if not file in filesInDataset:
raise TaskWorkerException(f"input file use does not belong to input dataset: {file}")
raise SubmissionRefusedException(f"input file use does not belong to input dataset: {file}")
@staticmethod
def validateInputBlocks(dataset=None, blocks=None):
""" make sure that blocks in input list match input dataset """
try:
for block in blocks:
Lexicon.block(block) # will raise if file is not a valid lfn
except AssertionError as ex:
raise TaskWorkerException(f"input block is not a valid block name: {block}") from ex
raise SubmissionRefusedException(f"input block is not a valid block name: {block}") from ex
for block in blocks:
if not block.split('#')[0] == dataset:
raise TaskWorkerException(f"input block does not belong to input dataset: {block}")
raise SubmissionRefusedException(f"input block does not belong to input dataset: {block}")

def execute(self, *args, **kwargs):
"""
Expand Down Expand Up @@ -213,7 +213,7 @@ def executeInternal(self, *args, **kwargs): # pylint: disable=unused-argument
except DBSReaderError as dbsexc:
# dataset not found in DBS is a known use case
if str(dbsexc).find('No matching data'):
raise TaskWorkerException(
raise SubmissionRefusedException(
f"CRAB could not find dataset {inputDataset} in this DBS instance: {dbsurl}"
) from dbsexc
raise
Expand Down Expand Up @@ -297,7 +297,7 @@ def executeInternal(self, *args, **kwargs): # pylint: disable=unused-argument
if 'T3_CH_CERNBOX' in v:
useCernbox = True
if useCernbox:
raise TaskWorkerException(
raise SubmissionRefusedException(
"USER dataset is located at T3_CH_CERNBOX, but this location \n"+\
"is not available to CRAB jobs."
)
Expand Down Expand Up @@ -337,7 +337,7 @@ def executeInternal(self, *args, **kwargs): # pylint: disable=unused-argument
self.logger.warning(msg)
if not secondaryLocationsMap:
msg = f"No locations found for secondaryDataset {secondaryDataset}."
raise TaskWorkerException(msg)
raise SubmissionRefusedException(msg)


# From now on code is not dependent from having used Rucio or PhEDEx
Expand Down Expand Up @@ -425,7 +425,7 @@ def executeInternal(self, *args, **kwargs): # pylint: disable=unused-argument
f" and contact the experts if the error persists.\nError reason: {ex}"
) from ex
if not filedetails:
raise TaskWorkerException(
raise SubmissionRefusedException(
("Cannot find any file inside the dataset. Please, check your dataset in DAS\n%s\n" +
"Aborting submission. Resubmitting your task will not help.") %
(f"https://cmsweb.cern.ch/das/request?instance={self.dbsInstance}&input=dataset={inputDataset}")
Expand All @@ -438,7 +438,7 @@ def executeInternal(self, *args, **kwargs): # pylint: disable=unused-argument
tempDir=kwargs['tempDir'])

if not result.result:
raise TaskWorkerException(("Cannot find any valid file inside the dataset. Please, check your dataset in DAS, %s.\n" +
raise SubmissionRefusedException(("Cannot find any valid file inside the dataset. Please, check your dataset in DAS, %s.\n" +
"Aborting submission. Resubmitting your task will not help.") %
(f"https://cmsweb.cern.ch/das/request?instance={self.dbsInstance}&input=dataset={inputDataset}"))

Expand Down Expand Up @@ -545,15 +545,15 @@ def executeTapeRecallPolicy(self, inputDataset, inputBlocks, totalSizeBytes):
msg = f"Task could not be submitted because not all blocks of dataset {inputDataset} are on DISK"
if not userHasQuota:
msg += overQmsg
raise TaskWorkerException(msg)
raise SubmissionRefusedException(msg)
msg += "\nWill request a full disk copy for you. See"
msg += "\n https://twiki.cern.ch/twiki/bin/view/CMSPublic/CRAB3FAQ#crab_submit_fails_with_Task_coul"
elif inputBlocks:
if totalSizeTB < maxTierToBlockRecallSizeTB:
msg = "Task could not be submitted because blocks specified in 'Data.inputBlocks' are not on disk."
if not userHasQuota:
msg += overQmsg
raise TaskWorkerException(msg)
raise SubmissionRefusedException(msg)
msg += "\nWill request a disk copy for you. See"
msg += "\n https://twiki.cern.ch/twiki/bin/view/CMSPublic/CRAB3FAQ#crab_submit_fails_with_Task_coul"
else:
Expand All @@ -562,13 +562,13 @@ def executeTapeRecallPolicy(self, inputDataset, inputBlocks, totalSizeBytes):
" ({totalSizeTB}TB/{maxTierToBlockRecallSizeTB}TB) "\
"to issue automatically recall from TAPE."
msg += "\nIf you need these blocks, contact Data Transfer team via https://its.cern.ch/jira/browse/CMSTRANSF"
raise TaskWorkerException(msg)
raise SubmissionRefusedException(msg)
else:
msg = "Some blocks are on TAPE only and will not be processed."
msg += "\nThis dataset is too large for automatic recall from TAPE."
msg += "\nIf you can do with only a part of the dataset, use Data.inputBlocks configuration."
msg += "\nIf you need the full dataset, contact Data Transfer team via https://its.cern.ch/jira/browse/CMSTRANSF"
raise TaskWorkerException(msg)
raise SubmissionRefusedException(msg)
return msg


Expand Down
17 changes: 8 additions & 9 deletions src/python/TaskWorker/Actions/DagmanCreator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Generates the condor submit files and the master DAG.
"""
# pylint: disable=invalid-name # have a lot of snake_case varaibles here from "old times"

import os
import re
Expand All @@ -18,10 +19,9 @@
from ServerUtilities import MAX_DISK_SPACE, MAX_IDLE_JOBS, MAX_POST_JOBS, TASKLIFETIME
from ServerUtilities import getLock, downloadFromS3

import TaskWorker.WorkerExceptions
import TaskWorker.DataObjects.Result
from TaskWorker.Actions.TaskAction import TaskAction
from TaskWorker.WorkerExceptions import TaskWorkerException
from TaskWorker.WorkerExceptions import TaskWorkerException, SubmissionRefusedException
from RucioUtils import getWritePFN
from CMSGroupMapper import get_egroup_users

Expand Down Expand Up @@ -246,8 +246,7 @@ def validateLFNs(path, outputFiles):
msg = "\nYour task specifies an output LFN %d-char long " % len(testLfn)
msg += "\n which exceeds maximum length of 500"
msg += "\n and therefore can not be handled in our DataBase"
raise TaskWorker.WorkerExceptions.TaskWorkerException(msg)
return
raise SubmissionRefusedException(msg)

def validateUserLFNs(path, outputFiles):
"""
Expand All @@ -274,7 +273,7 @@ def validateUserLFNs(path, outputFiles):
msg = "\nYour task specifies an output LFN %d-char long " % len(testLfn)
msg += "\n which exceeds maximum length of 500"
msg += "\n and therefore can not be handled in our DataBase"
raise TaskWorker.WorkerExceptions.TaskWorkerException(msg)
raise SubmissionRefusedException(msg)
return

def transform_strings(data):
Expand Down Expand Up @@ -376,7 +375,7 @@ def populateGlideinMatching(self, info):
_, _, arch, _ = m.groups()
if arch not in SCRAM_TO_ARCH:
msg = f"Job configured for non-supported ScramArch '{arch}'"
raise TaskWorker.WorkerExceptions.TaskWorkerException(msg)
raise SubmissionRefusedException(msg)
info['required_arch'] = SCRAM_TO_ARCH.get(arch)
# if arch == "amd64":
# info['required_arch'] = "X86_64"
Expand Down Expand Up @@ -527,7 +526,7 @@ def makeJobSubmit(self, task):
arch = info['jobarch_flatten'].split("_")[0] # extracts "slc7" from "slc7_amd64_gcc10"
required_os_list = ARCH_TO_OS.get(arch)
if not required_os_list:
raise TaskWorkerException(f"Unsupported architecture {arch}")
raise SubmissionRefusedException(f"Unsupported architecture {arch}")
# ARCH_TO_OS.get("slc7") gives a list with one item only: ['rhel7']
info['opsys_req'] = f'+REQUIRED_OS="{required_os_list[0]}"'

Expand Down Expand Up @@ -593,7 +592,7 @@ def makeDagSpecs(self, task, sitead, siteinfo, jobgroup, block, availablesites,
msg = "\nYour task specifies an output LFN which fails validation in"
msg += "\n WMCore/Lexicon and therefore can not be handled in our DataBase"
msg += "\nError detail: %s" % (str(ex))
raise TaskWorker.WorkerExceptions.TaskWorkerException(msg)
raise SubmissionRefusedException(msg) from ex
groupid = len(siteinfo['group_sites'])
siteinfo['group_sites'][groupid] = list(availablesites)
siteinfo['group_datasites'][groupid] = list(datasites)
Expand Down Expand Up @@ -884,7 +883,7 @@ def createSubdag(self, splitterResult, **kwargs):
msg += " Please try to submit a new task (resubmit will not work)"
msg += " and contact the experts if the error persists."
msg += "\nError reason: %s" % (str(ex))
raise TaskWorker.WorkerExceptions.TaskWorkerException(msg)
raise TaskWorkerException(msg) from ex
else:
possiblesites = locations
## At this point 'possiblesites' should never be empty.
Expand Down
4 changes: 2 additions & 2 deletions src/python/TaskWorker/Actions/DagmanSubmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from TaskWorker.DataObjects import Result
from TaskWorker.Actions import TaskAction
from TaskWorker.WorkerExceptions import TaskWorkerException
from TaskWorker.WorkerExceptions import TaskWorkerException, SubmissionRefusedException

if sys.version_info >= (3, 0):
from urllib.parse import urlencode # pylint: disable=no-name-in-module
Expand Down Expand Up @@ -184,7 +184,7 @@ def checkMemoryWalltime(info, task, cmd, logger, warningUploader):
msg = f"Task requests {memory} MB of memory, above the allowed maximum of {absmaxmemory}"
msg += f" for a {ncores} core(s) job.\n"
logger.error(msg)
raise TaskWorkerException(msg)
raise SubmissionRefusedException(msg)
if memory is not None and memory > MAX_MEMORY_PER_CORE:
if ncores is not None and ncores < 2:
msg = f"Task requests {memory} MB of memory, but only {MAX_MEMORY_PER_CORE} are guaranteed to be available."
Expand Down
15 changes: 11 additions & 4 deletions src/python/TaskWorker/Actions/Handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
from TaskWorker.Actions.UserDataDiscovery import UserDataDiscovery
from TaskWorker.Actions.RucioDataDiscovery import RucioDataDiscovery
from TaskWorker.Actions.DagmanResubmitter import DagmanResubmitter
from TaskWorker.WorkerExceptions import WorkerHandlerException, TapeDatasetException, TaskWorkerException
from TaskWorker.WorkerExceptions import WorkerHandlerException, TapeDatasetException,\
TaskWorkerException, SubmissionRefusedException

from CRABUtils.TaskUtils import updateTaskStatus, uploadWarning
from ServerUtilities import uploadToS3


Expand Down Expand Up @@ -92,8 +94,13 @@ def executeAction(self, nextinput, work):
"""
try:
output = work.execute(nextinput, task=self.task, tempDir=self.tempDir)
except TapeDatasetException as tde:
raise TapeDatasetException(str(tde)) from tde
except TapeDatasetException as e:
raise TapeDatasetException(str(e)) from e
except SubmissionRefusedException as e:
uploadWarning(self.crabserver, self.task['tm_taskname'], str(e), self.logger)
updateTaskStatus(self.crabserver, taskName=self.task['tm_taskname'],
status='SUBMITREFUSED', logger=self.logger)
raise SubmissionRefusedException(str(e)) from e
except TaskWorkerException as twe:
self.logger.debug(str(traceback.format_exc())) # print the stacktrace only in debug mode
raise WorkerHandlerException(str(twe), retry=twe.retry) from twe # TaskWorker error, do not add traceback
Expand Down Expand Up @@ -186,7 +193,7 @@ def handleNewTask(resthost, dbInstance, config, task, procnum, *args, **kwargs):
elif task.get('tm_user_files'):
handler.addWork(UserDataDiscovery(config=config, crabserver=crabserver, procnum=procnum))
else:
raise WorkerHandlerException("Neither inputDataset nor userInputFiles specified", retry=0)
raise SubmissionRefusedException("Neither inputDataset nor userInputFiles specified", retry=0)
elif task['tm_job_type'] == 'PrivateMC':
handler.addWork(MakeFakeFileSet(config=config, crabserver=crabserver, procnum=procnum))
handler.addWork(Splitter(config=config, crabserver=crabserver, procnum=procnum))
Expand Down
4 changes: 2 additions & 2 deletions src/python/TaskWorker/Actions/RucioActions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ServerUtilities import MAX_DAYS_FOR_TAPERECALL, MAX_TB_TO_RECALL_AT_A_SINGLE_SITE
from ServerUtilities import TASKLIFETIME
from TaskWorker.WorkerUtilities import uploadWarning
from TaskWorker.WorkerExceptions import TaskWorkerException, TapeDatasetException
from TaskWorker.WorkerExceptions import TaskWorkerException, TapeDatasetException, SubmissionRefusedException


class RucioAction():
Expand Down Expand Up @@ -187,7 +187,7 @@ def recallData(self, dataToRecall=None, sizeToRecall=0, tapeLocations=None, msgH
# Sanity check
if teraBytesToRecall > 1e3:
msg = msgHead + f"\nDataset size {teraBytesToRecall} TB. Will not trigger automatic recall for >1PB. Contact DataOps"
raise TaskWorkerException(msg)
raise SubmissionRefusedException(msg)

# a friendly-formatted string to print the size
recallSize = f"{teraBytesToRecall:.0f} TBytes" if teraBytesToRecall > 0 else f"{sizeToRecall / 1e9:.0f} GBytes"
Expand Down
Loading

0 comments on commit e6227ee

Please sign in to comment.