Skip to content

Commit

Permalink
Pre dag divide by zero fix 6926 (#6959)
Browse files Browse the repository at this point in the history
* protect against probe jobs returning no events. Fix #6926

* some pylint cleanups
  • Loading branch information
belforte authored Jan 12, 2022
1 parent 23d2bf4 commit da11c26
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions src/python/TaskWorker/Actions/PreDAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import tempfile
import functools
import subprocess
from ast import literal_eval

import classad

from ast import literal_eval
from WMCore.DataStructs.LumiList import LumiList

from ServerUtilities import getLock, newX509env, MAX_IDLE_JOBS, MAX_POST_JOBS
Expand Down Expand Up @@ -69,7 +69,7 @@ def readJobStatus(self):
#XXX Maybe the status_cache filname should be in a variable in ServerUtilities?
if not os.path.exists("task_process/status_cache.pkl"):
return
with open("task_process/status_cache.pkl",'rb') as fd:
with open("task_process/status_cache.pkl", 'rb') as fd:
statusCache = pickle.load(fd)
if not 'nodes' in statusCache:
return
Expand Down Expand Up @@ -174,9 +174,9 @@ def executeInternal(self, *args):

# need to use user proxy as credential for talking with cmsweb
config.TaskWorker.cmscert = os.environ.get('X509_USER_PROXY')
config.TaskWorker.cmskey = os.environ.get('X509_USER_PROXY')
config.TaskWorker.cmskey = os.environ.get('X509_USER_PROXY')
config.TaskWorker.envForCMSWEB = newX509env(X509_USER_CERT=config.TaskWorker.cmscert,
X509_USER_KEY=config.TaskWorker.cmskey)
X509_USER_KEY=config.TaskWorker.cmskey)

# need to get username from classAd to setup for Rucio access
task_ad = classad.parseOne(open(os.environ['_CONDOR_JOB_AD']))
Expand Down Expand Up @@ -208,12 +208,18 @@ def executeInternal(self, *args):
sumEventsThr += throughput
sumEventsSize += eventsize
count += 1
eventsThr = sumEventsThr / count
eventsSize = sumEventsSize / count
if count > 0:
eventsThr = sumEventsThr / count
eventsSize = sumEventsSize / count

self.logger.info("average throughput for %s jobs: %s evt/s", count, eventsThr)
self.logger.info("average eventsize for %s jobs: %s bytes", count, eventsSize)

if eventsThr == 0:
retmsg = "Splitting failed because all probe jobs failed or anyhow failed to provide estimates"
self.logger.error(retmsg)
return 1

maxSize = getattr(config.TaskWorker, 'automaticOutputSizeMaximum', 5 * 1000**3)
maxEvents = (maxSize / eventsSize) if eventsSize > 0 else 0

Expand Down Expand Up @@ -268,7 +274,10 @@ def executeInternal(self, *args):
creator = DagmanCreator(config, crabserver=None, rucioClient=rucioClient)
with config.TaskWorker.envForCMSWEB:
creator.createSubdag(split_result.result, task=task, parent=parent, stage=self.stage)
self.submitSubdag('RunJobs{0}.subdag'.format(self.prefix), getattr(config.TaskWorker, 'maxIdle', MAX_IDLE_JOBS), getattr(config.TaskWorker, 'maxPost', MAX_POST_JOBS), self.stage)
self.submitSubdag('RunJobs{0}.subdag'.format(self.prefix),
getattr(config.TaskWorker, 'maxIdle', MAX_IDLE_JOBS),
getattr(config.TaskWorker, 'maxPost', MAX_POST_JOBS),
self.stage)
except TaskWorkerException as e:
retmsg = "DAG creation failed with:\n{0}".format(e)
self.logger.error(retmsg)
Expand Down Expand Up @@ -333,7 +342,7 @@ def adjustLumisForCompletion(self, task, unprocessed):
# Now we turn lumis it into something like:
# lumis=['1, 33, 35, 35, 37, 47, 49, 75, 77, 130, 133, 136','1,45,50,80']
# which is the format expected by buildLumiMask in the splitting algorithm
lumis = [",".join(str(l) for l in functools.reduce(lambda x, y:x + y, missing_compact[run])) for run in runs]
lumis = [",".join(str(l) for l in functools.reduce(lambda x, y: x + y, missing_compact[run])) for run in runs]

task['tm_split_args']['runs'] = runs
task['tm_split_args']['lumis'] = lumis
Expand Down

0 comments on commit da11c26

Please sign in to comment.