Skip to content

Commit

Permalink
Improve spec docs and add convenience constructors
Browse files Browse the repository at this point in the history
  • Loading branch information
epwalsh committed Apr 13, 2022
1 parent 424f8c7 commit cd24cb4
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Implemented `__str__` method on `Beaker` client for debugging.
- Improved documentation for `ExperimentSpec`, `TaskSpec`, and other related data models,
and added new convenience constructors such as `TaskSpec.new()`.

### Changed

Expand Down
196 changes: 192 additions & 4 deletions beaker/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ def _check_exactly_one_field_set(cls, values: Dict[str, Any]) -> Dict[str, Any]:
class DataMount(BaseModel):
"""
Describes how to mount a dataset into a task. All datasets are mounted read-only.
.. seealso::
This is used in the :data:`TaskSpec.datasets` property in :class:`TaskSpec`.
"""

source: DataSource
Expand Down Expand Up @@ -217,6 +220,40 @@ class DataMount(BaseModel):
setting the sub-path to ``path/to`` will result in the task seeing ``{mount_path}/file.csv``.
"""

@classmethod
def new(
cls,
mount_path: str,
sub_path: Optional[str] = None,
beaker: Optional[str] = None,
host_path: Optional[str] = None,
result: Optional[str] = None,
url: Optional[str] = None,
secret: Optional[str] = None,
) -> "DataMount":
"""
A convenience method for quickly creating a new :class:`DataMount`.
:param mount_path: The :data:`mount_path`.
:param sub_path: The :data:`sub_path`.
:param beaker: The :data:`beaker <DataSource.beaker>` argument to :class:`DataSource`.
:param host_path: The :data:`host_path <DataSource.host_path>` argument to :class:`DataSource`.
:param result: The :data:`result <DataSource.result>` argument to :class:`DataSource`.
:param url: The :data:`url <DataSource.url>` argument to :class:`DataSource`.
:param secret: The :data:`secret <DataSource.secret>` argument to :class:`DataSource`.
"""
return cls(
mount_path=mount_path,
sub_path=sub_path,
source=DataSource(
beaker=beaker,
host_path=host_path,
result=result,
url=url,
secret=secret,
),
)


class ResultSpec(BaseModel):
"""
Expand Down Expand Up @@ -295,7 +332,7 @@ class TaskContext(BaseModel):

@validator("priority")
def _validate_priority(cls, v: str) -> str:
if v not in {"low", "normal", "high"}:
if v is not None and v not in {"low", "normal", "high"}:
raise ValueError(
"Invalided 'priority'. Value must be one of 'low', 'normal', or 'high'."
)
Expand All @@ -304,13 +341,13 @@ def _validate_priority(cls, v: str) -> str:

class TaskSpec(BaseModel):
"""
A :class:`TaskSpec` defines a :class:`Task` within an :class:`ExperimentSpec`.
Tasks are Beaker's fundamental unit of work.
A Beaker experiment may contain multiple tasks.
A task may also depend on the results of another task in its experiment,
creating an execution graph.
A :class:`TaskSpec` defines a :class:`Task`.
"""

image: ImageSource
Expand Down Expand Up @@ -369,15 +406,146 @@ class TaskSpec(BaseModel):
External hardware requirements, such as memory or GPU devices.
"""

@classmethod
def new(
cls,
name: str,
cluster: str,
beaker_image: Optional[str] = None,
docker_image: Optional[str] = None,
result_path: str = "/unused",
priority: Optional[str] = None,
**kwargs,
) -> "TaskSpec":
"""
A convenience method for quickly creating a new :class:`TaskSpec`.
:param name: The :data:`name` of the task.
:param cluster: The :data:`cluster <TaskContext.cluster>` name in the :data:`context`.
:param beaker_image: The :data:`beaker <ImageSource.beaker>` image name in the
:data:`image` source.
.. important::
Mutually exclusive with ``docker_image``.
:param docker_image: The :data:`docker <ImageSource.docker>` image name in the
:data:`image` source.
.. important::
Mutually exclusive with ``beaker_image``.
:param priority: The :data:`priority <TaskContext.priority>` of the :data:`context`.
:param kwargs: Additional kwargs are passed as-is to :class:`TaskSpec`.
:examples:
>>> task_spec = TaskSpec.new(
... "hello-world",
... "ai2/cpu-cluster",
... docker_image="hello-world",
... )
"""
return TaskSpec(
name=name,
image=ImageSource(beaker=beaker_image, docker=docker_image),
result=ResultSpec(path=result_path),
context=TaskContext(cluster=cluster, priority=priority),
**kwargs,
)

def with_resources(self, **kwargs) -> "TaskSpec":
"""
Return a new :class:`TaskSpec` with the given :data:`resources`.
:param kwargs: Key-word arguments are passed directly to :class:`TaskResources`.
:examples:
>>> task_spec = TaskSpec.new(
... "hello-world",
... "ai2/gpu-cluster",
... docker_image="hello-world",
... ).with_resources(gpu_count=2)
>>> assert task_spec.resources.gpu_count == 2
"""
return self.copy(deep=True, update={"resources": TaskResources(**kwargs)})

def with_data(self, mount_path: str, **kwargs) -> "TaskSpec":
"""
Return a new :class:`TaskSpec` with an additional input :data:`data <datasets>`.
:param mount_path: The :data:`mount_path <DataMount>` of the :class:`DataMount`.
:param kwargs: Additional kwargs are passed as-is to :meth:`DataMount.new()`.
:examples:
>>> task_spec = TaskSpec.new(
... "hello-world",
... "ai2/cpu-cluster",
... docker_image="hello-world",
... ).with_data("/data/foo", beaker="foo")
>>> assert task_spec.datasets
"""
return self.copy(
deep=True,
update={
"datasets": [d.copy(deep=True) for d in self.datasets or []]
+ [DataMount.new(mount_path, **kwargs)]
},
)

def with_env_var(
self, name: str, value: Optional[str] = None, secret: Optional[str] = None
) -> "TaskSpec":
"""
Return a new :class:`TaskSpec` with an additional input :data:`env_var <env_vars>`.
:param name: The :data:`name <EnvVar.name>` of the :class:`EnvVar`.
:param value: The :data:`value <EnvVar.value>` of the :class:`EnvVar`.
:param secret: The :data:`secret <EnvVar.secret>` of the :class:`EnvVar`.
:examples:
>>> task_spec = TaskSpec.new(
... "hello-world",
... "ai2/cpu-cluster",
... docker_image="hello-world",
... env_vars=[EnvVar(name="bar", value="secret!")],
... ).with_env_var("baz", value="top, top secret")
>>> assert len(task_spec.env_vars) == 2
"""
return self.copy(
deep=True,
update={
"env_vars": [d.copy(deep=True) for d in self.env_vars or []]
+ [EnvVar(name=name, value=value, secret=secret)]
},
)


class ExperimentSpec(BaseModel):
"""
Experiments are the main unit of execution in Beaker.
An :class:`ExperimentSpec` defines an :class:`Experiment`.
:examples:
>>> spec = ExperimentSpec(
... tasks=[
... TaskSpec(
... name="hello",
... image=ImageSource(docker="hello-world"),
... context=TaskContext(cluster="ai2/cpu-only"),
... result=ResultSpec(
... path="/unused" # required even if the task produces no output.
... ),
... ),
... ],
... )
"""

tasks: List[TaskSpec]
tasks: List[TaskSpec] = Field(default_factory=list)
"""
Specifications for each process to run.
"""
Expand Down Expand Up @@ -409,6 +577,26 @@ def from_file(cls, path: PathOrStr) -> "ExperimentSpec":
raw_spec = yaml.load(spec_file, Loader=yaml.SafeLoader)
return cls.from_json(raw_spec)

def with_task(self, task: TaskSpec) -> "ExperimentSpec":
"""
Return a new :class:`ExperimentSpec` with an additional task.
:param task: The task to add.
:examples:
>>> spec = ExperimentSpec().with_task(
... TaskSpec.new(
... "hello-world",
... "ai2/cpu-cluster",
... docker_image="hello-world",
... )
... )
"""
return self.copy(
deep=True, update={"tasks": [d.copy(deep=True) for d in self.tasks or []] + [task]}
)


class WorkspaceSize(BaseModel):
datasets: int
Expand Down

0 comments on commit cd24cb4

Please sign in to comment.