Skip to content

Commit

Permalink
first version which works using JDL to submit dag (#8486)
Browse files Browse the repository at this point in the history
  • Loading branch information
belforte authored Jun 11, 2024
1 parent 2023176 commit c92c4bd
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 16 deletions.
5 changes: 4 additions & 1 deletion src/python/TaskWorker/Actions/DagmanCreator.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,10 @@ def makeJobSubmit(self, task):
info['accounting_group_user'] = info['userhn']
info = transform_strings(info)
info['faillimit'] = task['tm_fail_limit']
# tm_extrajdl and tm_user_config['acceleratorparams'] contain list of k=v
# assignements to be turned into classAds, so here we turn them from a python list of strings to
# a single string with k=v separated by \n which can be pasted into the Job.submit JDL
info['extra_jdl'] = '\n'.join(literal_eval(task['tm_extrajdl']))
if task['tm_user_config']['requireaccelerator']:
# hardcoding accelerator to GPU (SI currently only have nvidia GPU)
info['accelerator_jdl'] = '+RequiresGPU=1\nrequest_GPUs=1'
Expand All @@ -520,7 +524,6 @@ def makeJobSubmit(self, task):
info['accelerator_jdl'] += f"\n+CUDARuntime={classad.quote(cudaRuntime)}"
else:
info['accelerator_jdl'] = ''
info['extra_jdl'] = '\n'.join(literal_eval(task['tm_extrajdl']))
arch = info['jobarch_flatten'].split("_")[0] # extracts "slc7" from "slc7_amd64_gcc10"
required_os_list = ARCH_TO_OS.get(arch)
if not required_os_list:
Expand Down
37 changes: 27 additions & 10 deletions src/python/TaskWorker/Actions/DagmanSubmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,12 @@
('+CRAB_UserVO', 'tm_user_vo'),
('+CRAB_SiteBlacklist', 'siteblacklist'),
('+CRAB_SiteWhitelist', 'sitewhitelist'),
('+RequestMemory', 'tm_maxmemory'),
('+RequestCpus', 'tm_numcores'),
('+CRAB_RequestedMemory', 'tm_maxmemory'),
('+CRAB_RequestedCores', 'tm_numcores'),
('+MaxWallTimeMins', 'tm_maxjobruntime'),
('+MaxWallTimeMinsRun', 'tm_maxjobruntime'),
('+MaxWallTimeMinsProbe', 'maxproberuntime'),
('+MaxWallTimeMinsTail', 'maxtailruntime'),
('JobPrio', 'tm_priority'),
('+CRAB_FailedNodeLimit', 'faillimit'),
('+CRAB_DashboardTaskType', 'taskType'),
('+CRAB_MaxIdle', 'maxidle'),
Expand All @@ -94,8 +93,22 @@ def addCRABInfoToJobJDL(jdl, info):
from the info directory
"""
for adName, dictName in SUBMIT_INFO:
if dictName in info and (info[dictName] is not None):
jdl[adName] = classad.quote(info[dictName])
if dictName in info and info[dictName] is not None:
jdl[adName] = info[dictName]
# extra_jdl and accelerator_jdl are not listed in SUBMIT_INFO
# and need ad-hoc handling, those are a string of `\n` separated k=v elements
if 'extra_jdl' in info and info['extra_jdl']:
for keyValue in info['extra_jdl'].split('\n'):
adName, adVal = keyValue.split(sep='=', maxsplit=1)
# remove any user-inserted spaces which would break schedd.submit #8420
adName = adName.strip()
adVal = adVal.strip()
jdl[adName] = adVal
if 'accelerator_jdl' in info and info['accelerator_jdl']:
for keyValue in info['accelerator_jdl'].split('\n'):
adName, adVal = keyValue.split(sep='=', maxsplit=1)
# these are built in our code w/o extra spaces
jdl[adName] = classad.ExprTree(str(adVal))


class ScheddStats(dict):
Expand Down Expand Up @@ -390,6 +403,7 @@ def executeInternal(self, info, inputFiles, **kwargs):
info['outputFilesString'] = ", ".join(outputFiles)
arg = "RunJobs.dag"

# for uniformity with values prepared in DagmanCreator (in JSON format), add double quotes
info['resthost'] = f"\"{self.crabserver.server['host']}\""
info['dbinstance'] = f'"{self.crabserver.getDbInstance()}"'

Expand Down Expand Up @@ -481,16 +495,19 @@ def submitDirect(self, schedd, cmd, arg, info): #pylint: disable=R0201
jobJDL["JobUniverse"] = 7
jobJDL["HoldKillSig"] = "SIGUSR1"
jobJDL["X509UserProxy"] = info['user_proxy']
jobJDL["Requirements"] = "TARGET.Cpus == 1" # see https://github.com/dmwm/CRABServer/issues/8456#issuecomment-2145887432
# submission command "priority" maps to jobAd "JobPrio" !
jobJDL["priority"] = int(info['tm_priority']) # info stores a string, but HTC wants a number
jobJDL["Requirements"] = "TARGET.Cpus >= 1" # see https://github.com/dmwm/CRABServer/issues/8456#issuecomment-2145887432
jobJDL["Requirements"] = True
environmentString = "PATH=/usr/bin:/bin CRAB3_VERSION=3.3.0-pre1"
environmentString += " CONDOR_ID=$(ClusterId).$(ProcId)"
environmentString += " " + " ".join(info['additional_environment_options'].split(';'))
# Environment command in JDL requires proper quotes https://htcondor.readthedocs.io/en/latest/man-pages/condor_submit.html#environment
jobJDL["Environment"] = classad.quote(environmentString)
jobJDL["+RemoteCondorSetup"] = classad.quote(info['remote_condor_setup'])
jobJDL["+CRAB_TaskSubmitTime"] = classad.quote(info['start_time'])
jobJDL['+CRAB_TaskLifetimeDays'] = classad.quote(TASKLIFETIME // 24 // 60 // 60)
jobJDL['+CRAB_TaskEndTime'] = classad.quote(int(info['start_time']) + TASKLIFETIME)
jobJDL["+CRAB_TaskSubmitTime"] = info['start_time'] # this is an int (seconds from epoch)
jobJDL['+CRAB_TaskLifetimeDays'] = TASKLIFETIME // 24 // 60 // 60
jobJDL['+CRAB_TaskEndTime'] = int(info['start_time']) + TASKLIFETIME
#For task management info see https://github.com/dmwm/CRABServer/issues/4681#issuecomment-302336451
jobJDL["LeaveJobInQueue"] = "True"
jobJDL["PeriodicHold"] = "time() > CRAB_TaskEndTime"
Expand All @@ -509,7 +526,7 @@ def submitDirect(self, schedd, cmd, arg, info): #pylint: disable=R0201
with open('subdag.jdl', 'w', encoding='utf-8') as fd:
print(subdagJDL, file=fd)

jobJDL["+TaskType"] = classad.quote("ROOT")
jobJDL["+TaskType"] = classad.quote("ROOT") # we want the ad value to be "ROOT", not ROOT
jobJDL["output"] = os.path.join(info['scratch'], "request.out")
jobJDL["error"] = os.path.join(info['scratch'], "request.err")
jobJDL["Cmd"] = cmd
Expand Down
10 changes: 5 additions & 5 deletions src/python/TaskWorker/Actions/PreJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def alter_submit(self, crab_retry):
Copy the content of the generic file Job.submit into a job-specific file
Job.<job_id>.submit and add attributes that are job-specific (e.g. CRAB_Retry).
Add also parameters that can be overwritten at each manual job resubmission
(e.g. MaxWallTimeMins, RequestMemory, RequestCpus, JobPrio, DESIRED_SITES).
(e.g. MaxWallTimeMins, RequestMemory, RequestCores, JobPrio, DESIRED_SITES).
"""
## Start the Job.<job_id>.submit content with the CRAB_Retry.
new_submit_text = '+CRAB_Retry = %d\n' % (crab_retry)
Expand Down Expand Up @@ -287,10 +287,10 @@ def alter_submit(self, crab_retry):
maxjobruntime = int(str(self.task_ad.lookup('MaxWallTimeMinsTail')))
elif 'MaxWallTimeMinsRun' in self.task_ad:
maxjobruntime = int(str(self.task_ad.lookup('MaxWallTimeMinsRun')))
if 'RequestMemory' in self.task_ad:
maxmemory = int(str(self.task_ad.lookup('RequestMemory')))
if 'RequestCpus' in self.task_ad:
numcores = int(str(self.task_ad.lookup('RequestCpus')))
if 'CRAB_RequestedMemory' in self.task_ad:
maxmemory = int(str(self.task_ad.lookup('CRAB_RequestedMemory')))
if 'CRAB_RequestedCores' in self.task_ad:
numcores = int(str(self.task_ad.lookup('CRAB_RequestedCores')))
if 'JobPrio' in self.task_ad:
priority = int(str(self.task_ad['JobPrio']))
if str(self.job_id) == '0': #jobids can be like 1-1 for subjobs
Expand Down

0 comments on commit c92c4bd

Please sign in to comment.