Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre dag divide by zero fix 6926 #6959

Merged
merged 2 commits into from
Jan 12, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions src/python/TaskWorker/Actions/PreDAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import tempfile
import functools
import subprocess
from ast import literal_eval

import classad

from ast import literal_eval
from WMCore.DataStructs.LumiList import LumiList

from ServerUtilities import getLock, newX509env, MAX_IDLE_JOBS, MAX_POST_JOBS
Expand Down Expand Up @@ -69,7 +69,7 @@ def readJobStatus(self):
#XXX Maybe the status_cache filname should be in a variable in ServerUtilities?
if not os.path.exists("task_process/status_cache.pkl"):
return
with open("task_process/status_cache.pkl",'rb') as fd:
with open("task_process/status_cache.pkl", 'rb') as fd:
statusCache = pickle.load(fd)
if not 'nodes' in statusCache:
return
Expand Down Expand Up @@ -174,9 +174,9 @@ def executeInternal(self, *args):

# need to use user proxy as credential for talking with cmsweb
config.TaskWorker.cmscert = os.environ.get('X509_USER_PROXY')
config.TaskWorker.cmskey = os.environ.get('X509_USER_PROXY')
config.TaskWorker.cmskey = os.environ.get('X509_USER_PROXY')
config.TaskWorker.envForCMSWEB = newX509env(X509_USER_CERT=config.TaskWorker.cmscert,
X509_USER_KEY=config.TaskWorker.cmskey)
X509_USER_KEY=config.TaskWorker.cmskey)

# need to get username from classAd to setup for Rucio access
task_ad = classad.parseOne(open(os.environ['_CONDOR_JOB_AD']))
Expand Down Expand Up @@ -208,12 +208,18 @@ def executeInternal(self, *args):
sumEventsThr += throughput
sumEventsSize += eventsize
count += 1
eventsThr = sumEventsThr / count
eventsSize = sumEventsSize / count
if count > 0:
eventsThr = sumEventsThr / count
eventsSize = sumEventsSize / count

self.logger.info("average throughput for %s jobs: %s evt/s", count, eventsThr)
self.logger.info("average eventsize for %s jobs: %s bytes", count, eventsSize)

if eventsThr == 0:
retmsg = "Splitting failed because all probe jobs failed or anyhow failed to provide estimates"
self.logger.error(retmsg)
return 1

maxSize = getattr(config.TaskWorker, 'automaticOutputSizeMaximum', 5 * 1000**3)
maxEvents = (maxSize / eventsSize) if eventsSize > 0 else 0

Expand Down Expand Up @@ -268,7 +274,10 @@ def executeInternal(self, *args):
creator = DagmanCreator(config, crabserver=None, rucioClient=rucioClient)
with config.TaskWorker.envForCMSWEB:
creator.createSubdag(split_result.result, task=task, parent=parent, stage=self.stage)
self.submitSubdag('RunJobs{0}.subdag'.format(self.prefix), getattr(config.TaskWorker, 'maxIdle', MAX_IDLE_JOBS), getattr(config.TaskWorker, 'maxPost', MAX_POST_JOBS), self.stage)
self.submitSubdag('RunJobs{0}.subdag'.format(self.prefix),
getattr(config.TaskWorker, 'maxIdle', MAX_IDLE_JOBS),
getattr(config.TaskWorker, 'maxPost', MAX_POST_JOBS),
self.stage)
except TaskWorkerException as e:
retmsg = "DAG creation failed with:\n{0}".format(e)
self.logger.error(retmsg)
Expand Down Expand Up @@ -333,7 +342,7 @@ def adjustLumisForCompletion(self, task, unprocessed):
# Now we turn lumis it into something like:
# lumis=['1, 33, 35, 35, 37, 47, 49, 75, 77, 130, 133, 136','1,45,50,80']
# which is the format expected by buildLumiMask in the splitting algorithm
lumis = [",".join(str(l) for l in functools.reduce(lambda x, y:x + y, missing_compact[run])) for run in runs]
lumis = [",".join(str(l) for l in functools.reduce(lambda x, y: x + y, missing_compact[run])) for run in runs]

task['tm_split_args']['runs'] = runs
task['tm_split_args']['lumis'] = lumis
Expand Down