This repository has been archived by the owner on Feb 24, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
Flow for one Job through CRAB3 and ASO
Stefano Belforte edited this page May 6, 2016
·
9 revisions
HERE IS A PSEUDOCODE DESCRIPTION OF THE LOGIC
#=======================================================
# THE DAG NODE
def TheDAG_Node() :
Execute(PreJob) # possibly effectively incorporates a delay for HC
Submit (HTCondor Job) # on the vanilla pool aka grid
Execute (PostJob)
if PostJobExitCode == 1:
#Go Back To The Beginning. Max retry is 3
pass #for syntax highlighting
elif PostJobExitCode == 4:
#DEFER the postjob, i.e.: re-execute it in 30 minutes
pass
else:
Terminate()
def PostJob():
if HTCondorJob is OK:
ProcessStageOut()
#putting the filemetadata in oracle is the very last thing since
#we want the publication to start only when the job is finished successfully
if ProcessStageOut() is OK:
try:
UploadMetaDataInOracle ()
except:
Retry() #run a new job somewhere else up to 2 times
else:
Retry()
else:
Retry()
def ProcessStageOut():
if JobWrapperOnWorkerNodeFailedToPutDocInCouchDB:
try:
PutDocumentInCounchDB (status=NEW) #nretry configured in ASO
except:
exit(4) #4 means we retry the PJ in 30 minutes, i.e. retry to put the document in couch
while DocumentStatusInCouchDB == NEW or AQUIRED:
pass # Sleep and Poll Couch
if DocumentStatusInCouchDB == DONE:
exit(0) # will end DAG node
if DocumentStatusInCouchDB == FAILED:
Retry()
def Retry():
RetryCount += 1
if RetryCount < 3:
if error is recoverable:
exit(1) # will make DAG retry the node
else:
exit(2) # will end DAG node as Failed
else:
exit(2) # will end DAG node as Failed
==============================================================
# THE ASO SIDE (simplified view for a single user)
# see https://github.com/dmwm/AsyncStageout/wiki
# a few components run in parallel forever
start(AsynchTransfer)
start(FTSMonitor)
start(Reporter)
start(RetryManager)
start(DBSPublisher)
def AsynchTransfer:
while True:
Poll CouchDB every 5 min
if DocumentStatus == NEW or RETRY:
DocumentStatus = AQUIRED
FileList += File in this document
if len(FileList) < file_limit_per_job:
submit (FTS transfer request)
else:
split in multiple fts job
Add Json to FTSMonitor dropbox/Input
reset (FileList)
def FTSMonitor:
while True:
for json in dropbox/Input:
if FTS_status not QUEUED:
completed = True # includes xfers which timedour in FTS
if TimeOut (6 hour): # This is internal ASO timeout.
completed = True
status = FAILED
reason = file_timeout
if completed :
write status and reason in dropbox/Output
remove json from dropbox/Input # done by cleanup script via crontab
def Reporter:
while True:
for json in dropbox/Ouput:
if status == DONE:
DocumentStatus in CouchDB = DONE
if status == FAIL:
if error is recoverable: # N.B. TimeOut is not recoverable
DocumentStatus in CouchDB = RETRY
else
DocumentStatus in CouchDB = FAILED
remove json from dropbox/Output
def RetryManager:
while True:
poll CouchDB
if DocumentStatus == RETRY:
sleep (2 hour) # cooloff
DocumentStatus = NEW
def DBSPublisher:
while True:
poll CounchDB
if DocumentStatus == DONE and publish=1 and not Published:
FileList += File in this document
if len(FileList) > some limit (or 6 hours since last publication) :
publish in DBS
for file in FileList: Publish flag in CouchDB = True
reset (FileList)
===========================================================================
# FTS
# Currently we do no enable retries. So pretty simple. Nothing to say