Skip to content

Commit

Permalink
[IO-432][external] Pull video frames concurrently (#475)
Browse files Browse the repository at this point in the history
* [IO-432][external] Pull video frames concurrently
  • Loading branch information
rslota authored Oct 31, 2022
1 parent 8983c4d commit c575cb6
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 25 deletions.
110 changes: 88 additions & 22 deletions darwin/dataset/download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
import urllib
from pathlib import Path
from typing import Any, Callable, Iterator, Tuple
from typing import Any, Callable, Iterable, List, Tuple

import deprecation
import requests
Expand Down Expand Up @@ -40,7 +40,7 @@ def download_all_images_from_annotations(
use_folders: bool = False,
video_frames: bool = False,
force_slots: bool = False,
) -> Tuple[Callable[[], Iterator[Any]], int]:
) -> Tuple[Callable[[], Iterable[Any]], int]:
"""
Downloads the all images corresponding to a project.
Expand Down Expand Up @@ -115,22 +115,20 @@ def download_all_images_from_annotations(
print(f"Removing {existing_image} as there is no corresponding annotation")
existing_image.unlink()
# Create the generator with the partial functions
count = len(annotations_to_download_path)
generator = lambda: (
functools.partial(
download_image_from_annotation,
download_functions: List = []
for annotation_path in annotations_to_download_path:
file_download_functions = lazy_download_image_from_annotation(
api_key,
api_url,
annotation_path,
images_path,
annotation_format,
use_folders,
video_frames,
force_slots,
)
for annotation_path in annotations_to_download_path
)
return generator, count
download_functions.extend(file_download_functions)

return lambda: download_functions, len(download_functions)


@deprecation.deprecated(
Expand Down Expand Up @@ -180,7 +178,56 @@ def download_image_from_annotation(
console = Console()

if annotation_format == "json":
_download_image_from_json_annotation(
downloadables = _download_image_from_json_annotation(
api_key, annotation_path, images_path, use_folders, video_frames, force_slots
)
for downloadable in downloadables:
downloadable()
else:
console.print("[bold red]Unsupported file format. Please use 'json'.")
raise NotImplementedError


def lazy_download_image_from_annotation(
api_key: str,
annotation_path: Path,
images_path: Path,
annotation_format: str,
use_folders: bool,
video_frames: bool,
force_slots: bool,
) -> Iterable[Callable[[], None]]:
"""
Returns functions to download an image given an annotation. Same as `download_image_from_annotation`
but returns Callables that trigger the download instead fetching files interally.
Parameters
----------
api_key : str
API Key of the current team
annotation_path : Path
Path where the annotation is located
images_path : Path
Path where to download the image
annotation_format : str
Format of the annotations. Currently only JSON is supported
use_folders : bool
Recreate folder structure
video_frames : bool
Pulls video frames images instead of video files
force_slots: bool
Pulls all slots of items into deeper file structure ({prefix}/{item_name}/{slot_name}/{file_name})
Raises
------
NotImplementedError
If the format of the annotation is not supported.
"""

console = Console()

if annotation_format == "json":
return _download_image_from_json_annotation(
api_key, annotation_path, images_path, use_folders, video_frames, force_slots
)
else:
Expand All @@ -189,11 +236,16 @@ def download_image_from_annotation(


def _download_image_from_json_annotation(
api_key: str, annotation_path: Path, image_path: Path, use_folders: bool, video_frames: bool, force_slots: bool
) -> None:
api_key: str,
annotation_path: Path,
image_path: Path,
use_folders: bool,
video_frames: bool,
force_slots: bool,
) -> Iterable[Callable[[], None]]:
annotation = parse_darwin_json(annotation_path, count=0)
if annotation is None:
return None
return []

# If we are using folders, extract the path for the image and create the folder if needed
sub_path = annotation.remote_path if use_folders else Path("/")
Expand All @@ -203,12 +255,17 @@ def _download_image_from_json_annotation(
annotation.slots.sort(key=lambda slot: slot.name or "0")
if len(annotation.slots) > 0:
if force_slots:
_download_all_slots_from_json_annotation(annotation, api_key, parent_path, video_frames)
return _download_all_slots_from_json_annotation(annotation, api_key, parent_path, video_frames)
else:
_download_single_slot_from_json_annotation(annotation, api_key, parent_path, annotation_path, video_frames)
return _download_single_slot_from_json_annotation(
annotation, api_key, parent_path, annotation_path, video_frames
)

return []


def _download_all_slots_from_json_annotation(annotation, api_key, parent_path, video_frames):
generator = []
for slot in annotation.slots:
slot_path = parent_path / sanitize_filename(annotation.filename) / sanitize_filename(slot.name)
slot_path.mkdir(exist_ok=True, parents=True)
Expand All @@ -218,30 +275,34 @@ def _download_all_slots_from_json_annotation(annotation, api_key, parent_path, v
video_path.mkdir(exist_ok=True, parents=True)
for i, frame_url in enumerate(slot.frame_urls or []):
path = video_path / f"{i:07d}.png"
_download_image(frame_url, path, api_key)
generator.append(functools.partial(_download_image, frame_url, path, api_key))
else:
for upload in slot.source_files:
file_path = slot_path / sanitize_filename(upload["file_name"])
_download_image(upload["url"], file_path, api_key)
_update_local_path(annotation, upload["url"], file_path)
generator.append(
functools.partial(_download_image_with_trace, annotation, upload["url"], file_path, api_key)
)
return generator


def _download_single_slot_from_json_annotation(annotation, api_key, parent_path, annotation_path, video_frames):
slot = annotation.slots[0]
generator = []

if video_frames and slot.type != "image":
video_path: Path = parent_path / annotation_path.stem
video_path.mkdir(exist_ok=True, parents=True)
for i, frame_url in enumerate(slot.frame_urls or []):
path = video_path / f"{i:07d}.png"
_download_image(frame_url, path, api_key)
generator.append(functools.partial(_download_image, frame_url, path, api_key))
else:
if len(slot.source_files) > 0:
image_url = slot.source_files[0]["url"]
filename = slot.source_files[0]["file_name"]
image_path = parent_path / sanitize_filename(filename or annotation.filename)
_download_image(image_url, image_path, api_key)
_update_local_path(annotation, image_url, image_path)

generator.append(functools.partial(_download_image_with_trace, annotation, image_url, image_path, api_key))
return generator


def _update_local_path(annotation: AnnotationFile, url, local_path):
Expand Down Expand Up @@ -383,6 +444,11 @@ def _download_image(url: str, path: Path, api_key: str) -> None:
time.sleep(1)


def _download_image_with_trace(annotation, image_url, image_path, api_key):
_download_image(image_url, image_path, api_key)
_update_local_path(annotation, image_url, image_path)


def _fetch_multiple_files(path: Path, response: requests.Response) -> None:
obj = response.json()
if "urls" not in obj:
Expand Down
8 changes: 7 additions & 1 deletion darwin/dataset/remote_dataset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import os
import shutil
import tempfile
import zipfile
Expand Down Expand Up @@ -309,7 +310,12 @@ def pull(

# If blocking is selected, download the dataset on the file system
if blocking:
exhaust_generator(progress=progress(), count=count, multi_threaded=multi_threaded)
max_workers = None
env_max_workers = os.getenv("DARWIN_DOWNLOAD_FILES_CONCURRENCY")
if env_max_workers and int(env_max_workers) > 0:
max_workers = int(env_max_workers)

exhaust_generator(progress=progress(), count=count, multi_threaded=multi_threaded, worker_count=max_workers)
return None, count
else:
return progress, count
Expand Down
11 changes: 9 additions & 2 deletions darwin/dataset/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ def _f(x: Any) -> Any:
return x()


def exhaust_generator(progress: Generator, count: int, multi_threaded: bool) -> List[Dict[str, Any]]:
def exhaust_generator(
progress: Generator, count: int, multi_threaded: bool, worker_count: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Exhausts the generator passed as parameter. Can be done multi threaded if desired.
Expand All @@ -182,6 +184,8 @@ def exhaust_generator(progress: Generator, count: int, multi_threaded: bool) ->
Size of the generator.
multi_threaded : bool
Flag for multi-threaded enabled operations.
worker_count : Optional[int]
Number of workers to use if multi_threaded=True. By default CPU count is used.
Returns
-------
Expand All @@ -195,8 +199,11 @@ def exhaust_generator(progress: Generator, count: int, multi_threaded: bool) ->
def update(*a):
progress_bar.completed += 1

if worker_count is None:
worker_count = mp.cpu_count()

with Live(progress_bar):
with mp.Pool(mp.cpu_count()) as pool:
with mp.Pool(worker_count) as pool:
for f in progress:
responses.append(pool.apply_async(_f, args=(f,), callback=update))
pool.close()
Expand Down

0 comments on commit c575cb6

Please sign in to comment.