From aae9530b10f45adce32f0fea46e273d102c40692 Mon Sep 17 00:00:00 2001 From: Charles Markello Date: Wed, 27 Jul 2016 00:17:01 +0000 Subject: [PATCH] Accommodating pipes in programs.py Adding in docker pipes unit test Added docs to docker_call and fixed docker_call unit test Changed datasize for docker_call pipes unit test to 1GB Polished docker_call functionality Added stderr to file handle support in docker_call --- src/toil_scripts/lib/programs.py | 113 ++++++++++++++++++--- src/toil_scripts/lib/test/test_programs.py | 22 +++- 2 files changed, 118 insertions(+), 17 deletions(-) diff --git a/src/toil_scripts/lib/programs.py b/src/toil_scripts/lib/programs.py index bb03a416..2e66a0bc 100644 --- a/src/toil_scripts/lib/programs.py +++ b/src/toil_scripts/lib/programs.py @@ -2,6 +2,7 @@ import subprocess import logging from bd2k.util.exceptions import panic +from toil_scripts.lib import require _log = logging.getLogger(__name__) @@ -15,34 +16,66 @@ def mock_mode(): return True if int(os.environ.get('TOIL_SCRIPTS_MOCK_MODE', '0')) else False -def docker_call(tool, +def docker_call(tool=None, + tools=None, parameters=None, work_dir='.', rm=True, env=None, outfile=None, + errfile=None, inputs=None, outputs=None, docker_parameters=None, check_output=False, + return_stderr=False, mock=None): """ Calls Docker, passing along parameters and tool. - :param str tool: Name of the Docker image to be used (e.g. quay.io/ucsc_cgl/samtools) + :param (str tool | list[str] tools): Name of the Docker image to be used (e.g. quay.io/ucsc_cgl/samtools) + OR str list of names of the Docker images and order to be used when piping commands to + Docker. (e.g. ['quay.io/ucsc_cgl/samtools', 'ubuntu']). Both tool and tools are mutually + exclusive parameters to docker_call. :param list[str] parameters: Command line arguments to be passed to the tool :param str work_dir: Directory to mount into the container via `-v`. Destination convention is /data :param bool rm: Set to True to pass `--rm` flag. :param dict[str,str] env: Environment variables to be added (e.g. dict(JAVA_OPTS='-Xmx15G')) :param bool sudo: If True, prepends `sudo` to the docker call - :param file outfile: Pipe output of Docker call to file handle + :param file outfile: Pipe stdout of Docker call to file handle + :param file errfile: Pipe stderr of Docker call to file handle :param list[str] inputs: A list of the input files. :param dict[str,str] outputs: A dictionary containing the outputs files as keys with either None or a url. The value is only used if mock=True :param dict[str,str] docker_parameters: Parameters to pass to docker :param bool check_output: When True, this function returns docker's output + :param bool return_stderr: When True, this function includes stderr in docker's output :param bool mock: Whether to run in mock mode. If this variable is unset, its value will be determined by the environment variable. + + Piping docker commands can be done in one of two ways depending on use case: + Running a pipe in docker in 'pipe-in-single-container' mode produces command structure + docker '... | ... | ...' where each '...' command corresponds to each element in the 'parameters' + argument that uses a docker container. This is the most efficient method if you want to run a pipe of + commands where each command uses the same docker container. + + Running a pipe in docker in 'pipe-of-containers' mode produces command structure + docker '...' | docker '...' | docker '...' where each '...' command corresponds to each element in + the 'parameters' argument that uses a docker container and each 'docker' tool in the pipe + corresponds to each element in the 'tool' argument + + Examples for running command 'head -c 1M /dev/urandom | gzip | gunzip | md5sum 1>&2': + Running 'pipe-in-single-container' mode: + command= ['head -c 1M /dev/urandom', 'gzip', 'gunzip', 'md5sum 1>&2'] + docker_work_dir=curr_work_dir + docker_tools=['ubuntu'] + stdout = docker_call(work_dir=docker_work_dir, parameters=command, tools=docker_tools, check_output=True) + + Running 'pipe-of-containers' mode: + command= ['head -c 1M /dev/urandom', 'gzip', 'gunzip', 'md5sum 1>&2'] + docker_work_dir=curr_work_dir + docker_tools=['ubuntu', 'ubuntu', 'ubuntu', 'ubuntu'] + stdout = docker_call(work_dir=docker_work_dir, parameters=command, tools=docker_tools, check_output=True) """ from toil_scripts.lib.urls import download_url @@ -83,28 +116,63 @@ def docker_call(tool, if env: for e, v in env.iteritems(): base_docker_call.extend(['-e', '{}={}'.format(e, v)]) + if docker_parameters: base_docker_call += docker_parameters + + docker_call = [] + + require(bool(tools) != bool(tool), 'Either "tool" or "tools" must contain a value, but not both') + + # Pipe functionality + # each element in the parameters list must represent a sub-pipe command + if bool(tools): + if len(tools) > 1: + require(len(tools) == len(parameters), "Both 'tools'({}) and 'parameters'({}) arguments must\ + contain the same number of elements".format(len(tools), len(parameters))) + # If tool is a list containing multiple docker container name strings + # then format the docker call in the 'pipe-of-containers' mode + docker_call.extend(base_docker_call + ['--entrypoint /bin/bash', tools[0], '-c \'{}\''.format(parameters[0])]) + for i in xrange(1, len(tools)): + docker_call.extend(['|'] + base_docker_call + ['-i --entrypoint /bin/bash', tools[i], '-c \'{}\''.format(parameters[i])]) + docker_call = " ".join(docker_call) + _log.debug("Calling docker with %s." % docker_call) + + elif len(tools) == 1: + # If tool is a list containing a single docker container name string + # then format the docker call in the 'pipe-in-single-container' mode + docker_call.extend(base_docker_call + ['--entrypoint /bin/bash', tools[0], '-c \'{}\''.format(" | ".join(parameters))]) + docker_call = " ".join(docker_call) + _log.debug("Calling docker with %s." % docker_call) + + else: + assert False + else: + docker_call = " ".join(base_docker_call + [tool] + parameters) + _log.debug("Calling docker with %s." % docker_call) - _log.debug("Calling docker with %s." % " ".join(base_docker_call + [tool] + parameters)) - - docker_call = base_docker_call + [tool] + parameters - + try: if outfile: - subprocess.check_call(docker_call, stdout=outfile) + if errfile: + subprocess.check_call(docker_call, stdout=outfile, stderr=errfile, shell=True) + else: + subprocess.check_call(docker_call, stdout=outfile, shell=True) else: if check_output: - return subprocess.check_output(docker_call) + if return_stderr: + return subprocess.check_output(docker_call, shell=True, stderr=subprocess.STDOUT) + else: + return subprocess.check_output(docker_call, shell=True) else: - subprocess.check_call(docker_call) + subprocess.check_call(docker_call, shell=True) # Fix root ownership of output files except: # Panic avoids hiding the exception raised in the try block with panic(): - _fix_permissions(base_docker_call, tool, work_dir) + _fix_permissions(base_docker_call, tool, tools, work_dir) else: - _fix_permissions(base_docker_call, tool, work_dir) + _fix_permissions(base_docker_call, tool, tools, work_dir) for filename in outputs.keys(): if not os.path.isabs(filename): @@ -112,8 +180,8 @@ def docker_call(tool, assert(os.path.isfile(filename)) -def _fix_permissions(base_docker_call, tool, work_dir): - """ +def _fix_permissions(base_docker_call, tool, tools, work_dir): + """ Fix permission of a mounted Docker directory by reusing the tool :param list base_docker_call: Docker run parameters @@ -122,5 +190,18 @@ def _fix_permissions(base_docker_call, tool, work_dir): """ base_docker_call.append('--entrypoint=chown') stat = os.stat(work_dir) - command = base_docker_call + [tool] + ['-R', '{}:{}'.format(stat.st_uid, stat.st_gid), '/data'] - subprocess.check_call(command) + if tools: + command_list = [] + for tool in tools: + command = base_docker_call + [tool] + ['-R', '{}:{}'.format(stat.st_uid, stat.st_gid), '/data'] + command_list.append(command) + + for command in command_list: + subprocess.check_call(command) + else: + command = base_docker_call + [tool] + ['-R', '{}:{}'.format(stat.st_uid, stat.st_gid), '/data'] + subprocess.check_call(command) + + + + diff --git a/src/toil_scripts/lib/test/test_programs.py b/src/toil_scripts/lib/test/test_programs.py index 3c75ac4f..7ab86519 100644 --- a/src/toil_scripts/lib/test/test_programs.py +++ b/src/toil_scripts/lib/test/test_programs.py @@ -1,5 +1,5 @@ import os - +import re def test_docker_call(tmpdir): from toil_scripts.lib.programs import docker_call @@ -12,3 +12,23 @@ def test_docker_call(tmpdir): with open(fpath, 'w') as f: docker_call(tool='ubuntu', env=dict(foo='bar'), parameters=['printenv', 'foo'], outfile=f) assert open(fpath).read() == 'bar\n' + + # Test pipe functionality + # download ubuntu docker image + docker_call(work_dir=work_dir, tool="ubuntu") + command1 = ['head -c 1G /dev/urandom | tee /data/first', 'gzip', 'gunzip', 'md5sum 1>&2'] + command2 = ['md5sum /data/first 1>&2'] + # Test 'pipe-in-single-container' mode + docker_tools1=['ubuntu'] + stdout1 = docker_call(work_dir=work_dir, parameters=command1, tools=docker_tools1, check_output=True, return_stderr=True) + stdout2 = docker_call(work_dir=work_dir, parameters=command2, tool='ubuntu', check_output=True, return_stderr=True) + test1 = re.findall(r"([a-fA-F\d]{32})", stdout1) + test2 = re.findall(r"([a-fA-F\d]{32})", stdout2) + assert test1[0] == test2[0] + # Test 'pipe-of-containers' mode + docker_tools2=['ubuntu', 'ubuntu', 'ubuntu', 'ubuntu'] + stdout1 = docker_call(work_dir=work_dir, parameters=command1, tools=docker_tools2, check_output=True, return_stderr=True) + stdout2 = docker_call(work_dir=work_dir, parameters=command2, tool='ubuntu', check_output=True, return_stderr=True) + test1 = re.findall(r"([a-fA-F\d]{32})", stdout1) + test2 = re.findall(r"([a-fA-F\d]{32})", stdout2) + assert test1[0] == test2[0]