Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow writing into Data Volume in job submission #9

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
157 changes: 121 additions & 36 deletions py2/SciServer/Jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,29 @@
import pandas as pd


class QueryOutputType:
FILE_CSV = "FILE_CSV"
FILE_TSV = "FILE_TSV"
FILE_BSV = "FILE_BSV"
FILE_JSON = "FILE_JSON"
FILE_FITS = "FILE_FITS"
FILE_VOTABLE = "FILE_VOTABLE"
TABLE = "TABLE"

class QueryOutput:

def __init__(self):
self.outputs = []

def add_output(self, output_name="query_result.json", output_type = QueryOutputType.FILE_JSON, result_number=1):
if output_type not in [type for type in dir(Jobs.QueryOutputType) if not type.startswith("__")]:
raise Exception("Unsupported output_type " + str(output_type))
self.outputs.append({'location':output_name, 'type':output_type , 'resultNumber':result_number})

def get_outputs(self):
return self.outputs



def getDockerComputeDomains():
"""
Expand Down Expand Up @@ -210,7 +233,7 @@ def getJobsList(top=10, open=None, start=None, end=None, type='all'):
if(type=='docker'):
url = Config.RacmApiURL + "/jobm/rest/dockerjobs?"

url = url + topString + startString + endString + "TaskName=" + taskName;
url = url + topString + startString + endString + openString + "TaskName=" + taskName;

headers = {'X-Auth-Token': token, "Content-Type": "application/json"}
res = requests.get(url, headers=headers, stream=True)
Expand Down Expand Up @@ -273,6 +296,54 @@ def getDockerJobsListQuick(top=10, open=None, start=None, end=None, labelReg=Non



def getJobQueues(returnType="pandas"):
"""
Gets information about queues of jobs submitted to all compute domains, including the ranking jobs that users have
already submitted to the queue for execution, as well as the ranking that a new job would get if submitted to a queue.
:param returnType: if set To "pandas" (default setting), then it returns a pandas dataframe. Else, it will return a dictionary object.
:return: a pandas dataframe by default, with each row containing information about a submitted job. See the 'returnType' parameter for other return options.
The resultset columns are: the name of the compute domain, the type or class of compute domain, the queue Id
(that uniquely identifies the compute domain), the ranking of a job in the queue (smaller ranking has higher priority),
the name of the user owner of the job (names other than the requestor's are hashed as a number), the job status
(rows with status=0 are placeholders to see where a newly submitted job to the queue would be placed in the ranking),
job submission time, and job start time.
:raises: Throws an exception if the HTTP request to the Authentication URL returns an error, and if the HTTP request to the JOBM API returns an error.
:example: queues = Jobs.getJobQueues();
.. seealso:: Jobs.getJobsList, Jobs.submitNotebookJob, Jobs.submitShellCommandJob, Jobs.getJobStatus, Jobs.getDockerComputeDomains, Jobs.cancelJob,
"""
token = Authentication.getToken()
if token is not None and token != "":

if Config.isSciServerComputeEnvironment():
taskName = "Compute.SciScript-Python.Jobs.getJobQueues"
else:
taskName = "SciScript-Python.Jobs.getJobQueues"


url = Config.RacmApiURL + "/jobm/rest/jobs/queues?TaskName=" + taskName

headers = {'X-Auth-Token': token, "Content-Type": "application/json"}
res = requests.get(url, headers=headers, stream=True)

if res.status_code != 200:
raise Exception("Error when getting job queues from JOBM API.\nHttp Response from JOBM API returned status code " +
str(res.status_code) + ":\n" + res.content.decode());
j = json.loads(res.content.decode())
# change dict 'j' so it can be used to create a pandas dataframe
if returnType=="pandas":
j['data']=j.pop('rows')
j.pop('hrNames')
df=pd.read_json(json.dumps(j), orient='split')
df.fillna('',inplace=True)
return df
else:
return j
else:
raise Exception("User token is not defined. First log into SciServer.")




def getJobDescription(jobId):
"""
Gets the definition of the job,
Expand Down Expand Up @@ -344,8 +415,8 @@ def submitNotebookJob(notebookPath, dockerComputeDomain=None, dockerImageName=No
:param notebookPath: path of the notebook within the filesystem mounted in SciServer-Compute (string). Example: notebookPath = '/home/idies/worskpace/persistent/JupyterNotebook.ipynb'
:param dockerComputeDomain: object (dictionary) that defines a Docker compute domain. A list of these kind of objects available to the user is returned by the function Jobs.getDockerComputeDomains().
:param dockerImageName: name (string) of the Docker image for executing the notebook. E.g., dockerImageName="Python (astro)". An array of available Docker images is defined as the 'images' property in the dockerComputeDomain object.
:param userVolumes: a list with the names of user volumes (with optional write permissions) that will be mounted to the docker Image. E.g.: userVolumes = [{'name':'persistent', 'needsWriteAccess':False},{'name':'scratch', , 'needsWriteAccess':True}] . A list of available user volumes can be found as the 'userVolumes' property in the dockerComputeDomain object. If userVolumes=None, then all available user volumes are mounted, with 'needsWriteAccess' = True if the user has Write permissions on the volume.
:param dataVolumes: a list with the names of data volumes that will be mounted to the docker Image. E.g.: dataVolumes=[{"name":"SDSS_DAS"}, {"name":"Recount"}]. A list of available data volumes can be found as the 'volumes' property in the dockerComputeDomain object. If dataVolumes=None, then all available data volumes are mounted.
:param userVolumes: a list with the names of user volumes (with optional write permissions) that will be mounted to the docker Image. E.g.: userVolumes = [{'name':'persistent', 'needsWriteAccess':False},{'name':'scratch', , 'needsWriteAccess':True}] . A list of available user volumes can be found as the 'userVolumes' property in the dockerComputeDomain object. If userVolumes='all', then all available user volumes are mounted, with 'needsWriteAccess' = True if the user has Write permissions on the volume.
:param dataVolumes: a list with the names of data volumes (with optional write permissions) that will be mounted to the docker Image. E.g.: dataVolumes=[{"name":"SDSS_DAS", 'needsWriteAccess':False}, {"name":"Recount"}]. A list of available data volumes can be found as the 'volumes' property in the dockerComputeDomain object. If dataVolumes='all', then all available data volumes are mounted (without write permissions).
:param resultsFolderPath: full path to results folder (string) where the original notebook is copied to and executed. E.g.: /home/idies/workspace/rootVolume/username/userVolume/jobsFolder. If not set, then a default folder will be set automatically.
:param parameters: string containing parameters that the notebook might need during its execution. This string is written in the 'parameters.txt' file in the same directory level where the notebook is being executed.
:param jobAlias: alias (string) of job, defined by the user.
Expand Down Expand Up @@ -379,7 +450,7 @@ def submitNotebookJob(notebookPath, dockerComputeDomain=None, dockerImageName=No
raise Exception("dockerComputeDomain has no docker images available for the user.");

uVols = [];
if userVolumes is None:
if userVolumes == 'all':
for vol in dockerComputeDomain.get('userVolumes'):
if 'write' in vol.get('allowedActions'):
uVols.append({'userVolumeId': vol.get('id'), 'needsWriteAccess': True});
Expand All @@ -393,7 +464,7 @@ def submitNotebookJob(notebookPath, dockerComputeDomain=None, dockerImageName=No
for vol in dockerComputeDomain.get('userVolumes'):
if vol.get('name') == uVol.get('name') and vol.get('rootVolumeName') == uVol.get('rootVolumeName') and vol.get('owner') == uVol.get('owner'):
found = True;
if (uVol.has_key('needsWriteAccess')):
if uVol.get('needsWriteAccess') is not None:
if uVol.get('needsWriteAccess') == True and 'write' in vol.get('allowedActions'):
uVols.append({'userVolumeId': vol.get('id'), 'needsWriteAccess': True});
else:
Expand All @@ -408,17 +479,26 @@ def submitNotebookJob(notebookPath, dockerComputeDomain=None, dockerImageName=No
raise Exception("User volume '" + uVol.get('name') + "' not found within Compute domain")

datVols = [];
if dataVolumes is None:
if dataVolumes == 'all':
for vol in dockerComputeDomain.get('volumes'):
datVols.append({'name': vol.get('name')});
datVols.append({'id': vol.get('id'), 'name': vol.get('name'), 'writable': False});
Copy link
Contributor

@glemson glemson Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting here that we treat data volumes differently from user volumes. in the latter needsWriteAccess is decided by user privilege when 'all', for former we want users to explicitly ask for write access.


else:
for dVol in dataVolumes:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is still possible for the input argument dataVolumes to be None. In that case this code would throw an error I think.
maybe protect against that by setting dataVolumes to [] if the input was None?
Same for userVolumes.

found = False;
for vol in dockerComputeDomain.get('volumes'):
if vol.get('name') == dVol.get('name'):
found = True;
datVols.append({'name': vol.get('name')});
if dVol.get('needsWriteAccess') is not None:
if dVol.get('needsWriteAccess') is True:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the constraint on "'write' in allowedActions" removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bug fix (vol does not have 'allowedActions' field)

datVols.append({'id': vol.get('id'), 'name': vol.get('name'), 'writable': True});
else:
datVols.append({'id': vol.get('id'), 'name': vol.get('name'), 'writable': False});
else:
if vol.get('writable') is True:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again here if a user does not ask for write access we should not give it by default, even if they are allowed to write there..
I'd rather err on the side of caution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed that.

datVols.append({'id': vol.get('id'), 'name': vol.get('name'), 'writable': True});
else:
datVols.append({'id': vol.get('id'), 'name': vol.get('name'), 'writable': False});

if not found:
raise Exception("Data volume '" + dVol.get('name') + "' not found within Compute domain")
Expand Down Expand Up @@ -457,11 +537,11 @@ def submitShellCommandJob(shellCommand, dockerComputeDomain = None, dockerImageN
:param dockerComputeDomain: object (dictionary) that defines a Docker compute domain. A list of these kind of objects available to the user is returned by the function Jobs.getDockerComputeDomains().
:param dockerImageName: name (string) of the Docker image for executing the notebook. E.g., dockerImageName="Python (astro)". An array of available Docker images is defined as the 'images' property in the dockerComputeDomain object.
:param userVolumes: a list with the names of user volumes (with optional write permissions) that will be mounted to the docker Image.
E.g., userVolumes = [{'name':'persistent', 'needsWriteAccess':False},{'name':'scratch', , 'needsWriteAccess':True}]
A list of available user volumes can be found as the 'userVolumes' property in the dockerComputeDomain object. If userVolumes=None, then all available user volumes are mounted, with 'needsWriteAccess' = True if the user has Write permissions on the volume.
:param dataVolumes: a list with the names of data volumes that will be mounted to the docker Image.
E.g., dataVolumes=[{"name":"SDSS_DAS"}, {"name":"Recount"}].
A list of available data volumes can be found as the 'volumes' property in the dockerComputeDomain object. If dataVolumes=None, then all available data volumes are mounted.
E.g., userVolumes = [{'name':'persistent', 'needsWriteAccess':False},{'name':'scratch', 'needsWriteAccess':True}]
A list of available user volumes can be found as the 'userVolumes' property in the dockerComputeDomain object. If userVolumes='all', then all available user volumes are mounted, with 'needsWriteAccess' = True if the user has Write permissions on the volume.
:param dataVolumes: a list with the names of data volumes (with optional write permissions) that will be mounted to the docker Image.
E.g., dataVolumes=[{"name":"SDSS_DAS", 'needsWriteAccess':False}, {"name":"Recount"}].
A list of available data volumes can be found as the 'volumes' property in the dockerComputeDomain object. If dataVolumes='all', then all available data volumes are mounted (without write permissions).
:param resultsFolderPath: full path to results folder (string) where the shell command is executed. E.g.: /home/idies/workspace/rootVolume/username/userVolume/jobsFolder. If not set, then a default folder will be set automatically.
:param jobAlias: alias (string) of job, defined by the user.
:return: the job ID (int)
Expand Down Expand Up @@ -495,7 +575,7 @@ def submitShellCommandJob(shellCommand, dockerComputeDomain = None, dockerImageN
raise Exception("dockerComputeDomain has no docker images available for the user.");

uVols = [];
if userVolumes is None:
if userVolumes == 'all':
for vol in dockerComputeDomain.get('userVolumes'):
if 'write' in vol.get('allowedActions'):
uVols.append({'userVolumeId': vol.get('id'), 'needsWriteAccess': True});
Expand All @@ -507,9 +587,9 @@ def submitShellCommandJob(shellCommand, dockerComputeDomain = None, dockerImageN
for uVol in userVolumes:
found = False;
for vol in dockerComputeDomain.get('userVolumes'):
if vol.get('name') == uVol.get('name'):
if vol.get('name') == uVol.get('name') and vol.get('rootVolumeName') == uVol.get('rootVolumeName') and vol.get('owner') == uVol.get('owner'):
found = True;
if (uVol.has_key('needsWriteAccess')):
if uVol.get('needsWriteAccess') is not None:
if uVol.get('needsWriteAccess') == True and 'write' in vol.get('allowedActions'):
uVols.append({'userVolumeId': vol.get('id'), 'needsWriteAccess': True});
else:
Expand All @@ -524,17 +604,26 @@ def submitShellCommandJob(shellCommand, dockerComputeDomain = None, dockerImageN
raise Exception("User volume '" + uVol.get('name') + "' not found within Compute domain")

datVols = [];
if dataVolumes is None:
if dataVolumes == 'all':
for vol in dockerComputeDomain.get('volumes'):
datVols.append({'name': vol.get('name')});
datVols.append({'id': vol.get('id'), 'name': vol.get('name'), 'writable': False});

else:
for dVol in dataVolumes:
found = False;
for vol in dockerComputeDomain.get('volumes'):
if vol.get('name') == dVol.get('name'):
found = True;
datVols.append({'name': vol.get('name')});
if dVol.get('needsWriteAccess') is not None:
if dVol.get('needsWriteAccess') is True:
datVols.append({'id': vol.get('id'), 'name': vol.get('name'), 'writable': True});
else:
datVols.append({'id': vol.get('id'), 'name': vol.get('name'), 'writable': False});
else:
if vol.get('writable') is True:
datVols.append({'id': vol.get('id'), 'name': vol.get('name'), 'writable': True});
else:
datVols.append({'id': vol.get('id'), 'name': vol.get('name'), 'writable': False});

if not found:
raise Exception("Data volume '" + dVol.get('name') + "' not found within Compute domain")
Expand Down Expand Up @@ -563,14 +652,14 @@ def submitShellCommandJob(shellCommand, dockerComputeDomain = None, dockerImageN
else:
raise Exception("User token is not defined. First log into SciServer.")

def submitRDBQueryJob(sqlQuery, rdbComputeDomain=None, databaseContextName = None, resultsName='queryResults', resultsFolderPath="", jobAlias = ""):
def submitRDBQueryJob(sqlQuery, rdbComputeDomain=None, databaseContextName = None, query_output = None, resultsFolderPath="", jobAlias = ""):
"""
Submits a sql query for execution (as an asynchronous job) inside a relational database (RDB) compute domain.

:param sqlQuery: sql query (string)
:param sqlQuery: sql query statement (string)
:param rdbComputeDomain: object (dictionary) that defines a relational database (RDB) compute domain. A list of these kind of objects available to the user is returned by the function Jobs.getRDBComputeDomains().
:param databaseContextName: database context name (string) on which the sql query is executed.
:param resultsName: name (string) of the table or file (without file type ending) that contains the query result. In case the sql query has multiple statements, should be set to a list of names (e.g., ['result1','result2']).
:param query_output: if a string, then it is the name of the json file that contains the query output result. Otherwise, it is a parameter of class QueryOutput, which contains the definitions of the output result types for each each subquery in the sql query statement.
:param resultsFolderPath: full path to results folder (string) where query output tables are written into. E.g.: /home/idies/workspace/rootVOlume/username/userVolume/jobsFolder . If not set, then a default folder will be set automatically.
:param jobAlias: alias (string) of job, defined by the user.
:return: a dictionary containing the definition of the submitted job.
Expand Down Expand Up @@ -602,21 +691,17 @@ def submitRDBQueryJob(sqlQuery, rdbComputeDomain=None, databaseContextName = Non
else:
raise Exception("rbdComputeDomain has no database contexts available for the user.");

targets = [];
if type(resultsName) == str:
targets.append({'location': resultsName, 'type': 'FILE_CSV', 'resultNumber': 1});
elif type(resultsName) == list:
if len(set(resultsName)) != len(resultsName):
raise Exception("Elements of parameter 'resultsName' must be unique");

for i in range(len(resultsName)):
if type(resultsName[i]) == str:
targets.append({'location': resultsName[i], 'type': 'FILE_CSV', 'resultNumber': i+1});
else:
raise Exception("Elements of array 'resultsName' are not strings");

if type(query_output) == str or query_output is None:
targets = QueryOutput()
if type(query_output) == str:
name = query_output
else:
name = "queryResult.json"
targets.add_output(output_name=name, output_type=QueryOutputType.FILE_JSON, result_number=1)
elif type(query_output) == QueryOutput:
targets = query_output
else:
raise Exception("Type of parameter 'resultsName' is not supported");
raise Exception("Unsupported query_output type " + str(type(query_output)))


rdbDomainId = rdbComputeDomain.get('id');
Expand Down
Loading