Skip to content

Commit

Permalink
Introduce compute_id for each computation (#382)
Browse files Browse the repository at this point in the history
* Make sure operation name is passed through to function running on executor.

* Introduce `compute_id` for each computation

* Store historical metadata about the computation, and timeline plots, in history/<compute-id> directory

* Update examples to point to new location of history files
  • Loading branch information
tomwhite authored Feb 16, 2024
1 parent 09c0ae8 commit 53b889d
Show file tree
Hide file tree
Showing 18 changed files with 67 additions and 34 deletions.
7 changes: 5 additions & 2 deletions cubed/core/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,21 @@ def execute(
):
dag = self._finalize_dag(optimize_graph, optimize_function)

compute_id = f"compute-{datetime.now().strftime('%Y%m%dT%H%M%S.%f')}"

if callbacks is not None:
event = ComputeStartEvent(dag, resume)
event = ComputeStartEvent(compute_id, dag, resume)
[callback.on_compute_start(event) for callback in callbacks]
executor.execute_dag(
dag,
compute_id=compute_id,
callbacks=callbacks,
resume=resume,
spec=spec,
**kwargs,
)
if callbacks is not None:
event = ComputeEndEvent(dag)
event = ComputeEndEvent(compute_id, dag)
[callback.on_compute_end(event) for callback in callbacks]

def num_tasks(self, optimize_graph=True, optimize_function=None, resume=None):
Expand Down
11 changes: 5 additions & 6 deletions cubed/extensions/history.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import time
from dataclasses import asdict
from pathlib import Path

Expand Down Expand Up @@ -32,11 +31,11 @@ def on_task_end(self, event):
def on_compute_end(self, event):
self.plan_df = pd.DataFrame(self.plan)
self.events_df = pd.DataFrame(self.events)
Path("history").mkdir(exist_ok=True)
id = int(time.time())
self.plan_df_path = Path(f"history/plan-{id}.csv")
self.events_df_path = Path(f"history/events-{id}.csv")
self.stats_df_path = Path(f"history/stats-{id}.csv")
history_path = Path(f"history/{event.compute_id}")
history_path.mkdir(parents=True, exist_ok=True)
self.plan_df_path = history_path / "plan.csv"
self.events_df_path = history_path / "events.csv"
self.stats_df_path = history_path / "stats.csv"
self.plan_df.to_csv(self.plan_df_path, index=False)
self.events_df.to_csv(self.events_df_path, index=False)

Expand Down
5 changes: 3 additions & 2 deletions cubed/extensions/timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def on_task_end(self, event):

def on_compute_end(self, event):
end_tstamp = time.time()
create_timeline(self.stats, self.start_tstamp, end_tstamp)
dst = f"history/{event.compute_id}"
create_timeline(self.stats, self.start_tstamp, end_tstamp, dst)


# copy of lithops function of the same name, and modified for different field names
Expand Down Expand Up @@ -87,6 +88,6 @@ def create_timeline(stats, start_tstamp, end_tstamp, dst=None):
)
else:
dst = os.path.expanduser(dst) if "~" in dst else dst
dst = "{}_{}".format(os.path.realpath(dst), "timeline.png")
dst = "{}/{}".format(os.path.realpath(dst), "timeline.png")

fig.savefig(dst)
2 changes: 1 addition & 1 deletion cubed/runtime/executors/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def async_map_unordered(
inputs = next(input_batches)

task_create_tstamp = time.time()
tasks = {task: i for i, task in create_futures_func(inputs, **kwargs)}
tasks = {task: i for i, task in create_futures_func(inputs, name=name, **kwargs)}
pending = set(tasks.keys())
t = time.monotonic()
start_times = {f: t for f in pending}
Expand Down
4 changes: 3 additions & 1 deletion cubed/runtime/executors/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def expand(self, pcoll):
class BeamDagExecutor(DagExecutor):
"""An execution engine that uses Apache Beam."""

def execute_dag(self, dag, callbacks=None, resume=None, spec=None, **kwargs):
def execute_dag(
self, dag, callbacks=None, resume=None, spec=None, compute_id=None, **kwargs
):
dag = dag.copy()
pipeline = beam.Pipeline(**kwargs)

Expand Down
5 changes: 4 additions & 1 deletion cubed/runtime/executors/coiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_id: Optional[str] = None,
**coiled_kwargs: Mapping[str, Any],
) -> None:
# Note this currently only builds the task graph for each stage once it gets to that stage in computation
Expand All @@ -31,7 +32,9 @@ def execute_dag(
input = list(
pipeline.mappable
) # coiled expects a sequence (it calls `len` on it)
for _, stats in coiled_function.map(input, config=pipeline.config):
for _, stats in coiled_function.map(
input, config=pipeline.config, name=name
):
if callbacks is not None:
if name is not None:
stats["name"] = name
Expand Down
4 changes: 3 additions & 1 deletion cubed/runtime/executors/dask_distributed_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

# note we can't call `pipeline_func` just `func` here as it clashes with `dask.distributed.Client.map``
@execution_stats
def run_func(input, pipeline_func=None, config=None, name=None):
def run_func(input, pipeline_func=None, config=None, name=None, compute_id=None):
result = pipeline_func(input, config=config)
return result

Expand Down Expand Up @@ -152,6 +152,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_id: Optional[str] = None,
compute_kwargs: Optional[Dict[str, Any]] = None,
**kwargs,
) -> None:
Expand All @@ -163,6 +164,7 @@ def execute_dag(
resume=resume,
spec=spec,
compute_kwargs=compute_kwargs,
compute_id=compute_id,
**merged_kwargs,
)
)
10 changes: 7 additions & 3 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
logger = logging.getLogger(__name__)


def run_func(input, func=None, config=None, name=None):
def run_func(input, func=None, config=None, name=None, compute_id=None):
result = func(input, config=config)
return result

Expand Down Expand Up @@ -186,11 +186,12 @@ def execute_dag(
[run_func],
[pipeline.mappable],
[name],
use_backups=use_backups,
return_stats=True,
# kwargs below
func=pipeline.function,
config=pipeline.config,
name=name,
use_backups=use_backups,
return_stats=True,
):
handle_callbacks(callbacks, stats)
else:
Expand All @@ -213,6 +214,7 @@ def execute_dag(
group_names,
use_backups=use_backups,
return_stats=True,
# TODO: kwargs
):
handle_callbacks(callbacks, stats)

Expand Down Expand Up @@ -242,6 +244,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_id: Optional[str] = None,
**kwargs,
) -> None:
merged_kwargs = {**self.kwargs, **kwargs}
Expand All @@ -250,5 +253,6 @@ def execute_dag(
callbacks=callbacks,
resume=resume,
spec=spec,
compute_id=compute_id,
**merged_kwargs,
)
6 changes: 4 additions & 2 deletions cubed/runtime/executors/modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def check_runtime_memory(spec):
retries=2,
cloud="aws",
)
def run_remotely(input, func=None, config=None):
def run_remotely(input, func=None, config=None, name=None, compute_id=None):
print(f"running remotely on {input} in {os.getenv('MODAL_REGION')}")
# note we can't use the execution_stat decorator since it doesn't work with modal decorators
result, stats = execute_with_stats(func, input, config=config)
Expand All @@ -96,7 +96,7 @@ def __enter__(self):
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path

@modal.method()
def run_remotely(self, input, func=None, config=None):
def run_remotely(self, input, func=None, config=None, name=None, compute_id=None):
print(f"running remotely on {input} in {os.getenv('MODAL_REGION')}")
# note we can't use the execution_stat decorator since it doesn't work with modal decorators
result, stats = execute_with_stats(func, input, config=config)
Expand Down Expand Up @@ -152,6 +152,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_id: Optional[str] = None,
**kwargs,
) -> None:
merged_kwargs = {**self.kwargs, **kwargs}
Expand All @@ -160,5 +161,6 @@ def execute_dag(
callbacks=callbacks,
resume=resume,
spec=spec,
compute_id=compute_id,
**merged_kwargs,
)
2 changes: 2 additions & 0 deletions cubed/runtime/executors/modal_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_id: Optional[str] = None,
**kwargs,
) -> None:
merged_kwargs = {**self.kwargs, **kwargs}
Expand All @@ -165,6 +166,7 @@ def execute_dag(
callbacks=callbacks,
resume=resume,
spec=spec,
compute_id=compute_id,
**merged_kwargs,
)
)
15 changes: 11 additions & 4 deletions cubed/runtime/executors/python.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Callable, Optional, Sequence
from typing import Optional, Sequence

from networkx import MultiDiGraph

Expand All @@ -7,8 +7,8 @@
from cubed.spec import Spec


def exec_stage_func(func: Callable[..., Any], *args, **kwargs):
return func(*args, **kwargs)
def exec_stage_func(input, func=None, config=None, name=None, compute_id=None):
return func(input, config=config)


class PythonDagExecutor(DagExecutor):
Expand All @@ -20,12 +20,19 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_id: Optional[str] = None,
**kwargs,
) -> None:
for name, node in visit_nodes(dag, resume=resume):
pipeline: CubedPipeline = node["pipeline"]
for m in pipeline.mappable:
exec_stage_func(pipeline.function, m, config=pipeline.config)
exec_stage_func(
m,
pipeline.function,
config=pipeline.config,
name=name,
compute_id=compute_id,
)
if callbacks is not None:
event = TaskEndEvent(name=name)
[callback.on_task_end(event) for callback in callbacks]
6 changes: 4 additions & 2 deletions cubed/runtime/executors/python_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@


@execution_stats
def run_func(input, func=None, config=None, name=None):
print(f"{name}: running on {input}")
def run_func(input, func=None, config=None, name=None, compute_id=None):
print(f"{compute_id} {name}: running on {input}")
result = func(input, config=config)
return result

Expand Down Expand Up @@ -126,6 +126,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_id: Optional[str] = None,
**kwargs,
) -> None:
asyncio.run(
Expand All @@ -134,6 +135,7 @@ def execute_dag(
callbacks=callbacks,
resume=resume,
spec=spec,
compute_id=compute_id,
**kwargs,
)
)
6 changes: 6 additions & 0 deletions cubed/runtime/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class CubedPipeline:
class ComputeStartEvent:
"""Callback information about a computation that is about to start."""

compute_id: str
"""ID of the computation."""

dag: MultiDiGraph
"""The computation DAG."""

Expand All @@ -39,6 +42,9 @@ class ComputeStartEvent:
class ComputeEndEvent:
"""Callback information about a computation that has finished."""

compute_id: str
"""ID of the computation."""

dag: MultiDiGraph
"""The computation DAG."""

Expand Down
2 changes: 1 addition & 1 deletion cubed/tests/runtime/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def write_int_to_file(path, i):
f.write(str(i))


def deterministic_failure(path, timing_map, i):
def deterministic_failure(path, timing_map, i, *, name=None):
"""A function that can either run normally, run slowly, or raise
an exception, depending on input and invocation count.
Expand Down
4 changes: 2 additions & 2 deletions examples/lithops/aws-lambda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ These will take longer to run as they operate on more data.


The last two examples use `TimelineVisualizationCallback` which produce a plot showing the timeline of events in the task lifecycle.
The plots are `png` files and are written in the `plots` directory with a timestamp. Open the latest one with
The plots are `png` files and are written in the `history` directory in a directory with a timestamp. Open the latest one with

```shell
open plots/$(ls plots | tail -1)
open $(ls -d history/compute-* | tail -1)/timeline.png
```

## Cleaning up
Expand Down
4 changes: 2 additions & 2 deletions examples/lithops/gcf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ These will take longer to run as they operate on more data.


The last two examples use `TimelineVisualizationCallback` which produce a plot showing the timeline of events in the task lifecycle.
The plots are `png` files and are written in the `plots` directory with a timestamp. Open the latest one with
The plots are `png` files and are written in the `history` directory in a directory with a timestamp. Open the latest one with

```shell
open plots/$(ls plots | tail -1)
open $(ls -d history/compute-* | tail -1)/timeline.png
```

## Cleaning up
Expand Down
4 changes: 2 additions & 2 deletions examples/modal/aws/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ python modal-matmul-random.py "s3://cubed-modal-$USER-temp"
These will take longer to run as they operate on more data.

The last two examples use `TimelineVisualizationCallback` which produce a plot showing the timeline of events in the task lifecycle.
The plots are `png` files and are written in the `plots` directory with a timestamp. Open the latest one with
The plots are `png` files and are written in the `history` directory in a directory with a timestamp. Open the latest one with

```shell
open plots/$(ls plots | tail -1)
open $(ls -d history/compute-* | tail -1)/timeline.png
```
4 changes: 2 additions & 2 deletions examples/modal/gcp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ python modal-matmul-random.py "gs://cubed-modal-$USER-temp"
These will take longer to run as they operate on more data.

The last two examples use `TimelineVisualizationCallback` which produce a plot showing the timeline of events in the task lifecycle.
The plots are `png` files and are written in the `plots` directory with a timestamp. Open the latest one with
The plots are `png` files and are written in the `history` directory in a directory with a timestamp. Open the latest one with

```shell
open plots/$(ls plots | tail -1)
open $(ls -d history/compute-* | tail -1)/timeline.png
```

0 comments on commit 53b889d

Please sign in to comment.