diff --git a/src/python/TaskWorker/Actions/PreDAG.py b/src/python/TaskWorker/Actions/PreDAG.py index a1fa998b07..73e1ebbade 100644 --- a/src/python/TaskWorker/Actions/PreDAG.py +++ b/src/python/TaskWorker/Actions/PreDAG.py @@ -32,10 +32,10 @@ import tempfile import functools import subprocess +from ast import literal_eval import classad -from ast import literal_eval from WMCore.DataStructs.LumiList import LumiList from ServerUtilities import getLock, newX509env, MAX_IDLE_JOBS, MAX_POST_JOBS @@ -69,7 +69,7 @@ def readJobStatus(self): #XXX Maybe the status_cache filname should be in a variable in ServerUtilities? if not os.path.exists("task_process/status_cache.pkl"): return - with open("task_process/status_cache.pkl",'rb') as fd: + with open("task_process/status_cache.pkl", 'rb') as fd: statusCache = pickle.load(fd) if not 'nodes' in statusCache: return @@ -174,9 +174,9 @@ def executeInternal(self, *args): # need to use user proxy as credential for talking with cmsweb config.TaskWorker.cmscert = os.environ.get('X509_USER_PROXY') - config.TaskWorker.cmskey = os.environ.get('X509_USER_PROXY') + config.TaskWorker.cmskey = os.environ.get('X509_USER_PROXY') config.TaskWorker.envForCMSWEB = newX509env(X509_USER_CERT=config.TaskWorker.cmscert, - X509_USER_KEY=config.TaskWorker.cmskey) + X509_USER_KEY=config.TaskWorker.cmskey) # need to get username from classAd to setup for Rucio access task_ad = classad.parseOne(open(os.environ['_CONDOR_JOB_AD'])) @@ -208,12 +208,18 @@ def executeInternal(self, *args): sumEventsThr += throughput sumEventsSize += eventsize count += 1 - eventsThr = sumEventsThr / count - eventsSize = sumEventsSize / count + if count > 0: + eventsThr = sumEventsThr / count + eventsSize = sumEventsSize / count self.logger.info("average throughput for %s jobs: %s evt/s", count, eventsThr) self.logger.info("average eventsize for %s jobs: %s bytes", count, eventsSize) + if eventsThr == 0: + retmsg = "Splitting failed because all probe jobs failed or anyhow failed to provide estimates" + self.logger.error(retmsg) + return 1 + maxSize = getattr(config.TaskWorker, 'automaticOutputSizeMaximum', 5 * 1000**3) maxEvents = (maxSize / eventsSize) if eventsSize > 0 else 0 @@ -268,7 +274,10 @@ def executeInternal(self, *args): creator = DagmanCreator(config, crabserver=None, rucioClient=rucioClient) with config.TaskWorker.envForCMSWEB: creator.createSubdag(split_result.result, task=task, parent=parent, stage=self.stage) - self.submitSubdag('RunJobs{0}.subdag'.format(self.prefix), getattr(config.TaskWorker, 'maxIdle', MAX_IDLE_JOBS), getattr(config.TaskWorker, 'maxPost', MAX_POST_JOBS), self.stage) + self.submitSubdag('RunJobs{0}.subdag'.format(self.prefix), + getattr(config.TaskWorker, 'maxIdle', MAX_IDLE_JOBS), + getattr(config.TaskWorker, 'maxPost', MAX_POST_JOBS), + self.stage) except TaskWorkerException as e: retmsg = "DAG creation failed with:\n{0}".format(e) self.logger.error(retmsg) @@ -333,7 +342,7 @@ def adjustLumisForCompletion(self, task, unprocessed): # Now we turn lumis it into something like: # lumis=['1, 33, 35, 35, 37, 47, 49, 75, 77, 130, 133, 136','1,45,50,80'] # which is the format expected by buildLumiMask in the splitting algorithm - lumis = [",".join(str(l) for l in functools.reduce(lambda x, y:x + y, missing_compact[run])) for run in runs] + lumis = [",".join(str(l) for l in functools.reduce(lambda x, y: x + y, missing_compact[run])) for run in runs] task['tm_split_args']['runs'] = runs task['tm_split_args']['lumis'] = lumis