Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added DagsHub Label Studio Inference for Customisable Remote backend #513

Merged
merged 32 commits into from
Oct 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2dc52cc
added local model-based annotation
jinensetpal Jun 19, 2024
3a677dc
removed torch dependency for auto predictor, generalized prediction f…
jinensetpal Jul 1, 2024
86e9d93
linter fixes
jinensetpal Jul 1, 2024
85b017d
broke extra long lines down
jinensetpal Jul 1, 2024
27bd1da
reverted fixes from incorrect linter param
jinensetpal Jul 1, 2024
2b6f4de
re-linted
jinensetpal Jul 1, 2024
67eeab3
bugfix
jinensetpal Jul 1, 2024
1b29531
more xl docstrings
jinensetpal Jul 1, 2024
f818556
better error enumeration
jinensetpal Jul 2, 2024
83ef416
bifurcated functions, not using dagshub.init, added rich progress, ad…
jinensetpal Jul 2, 2024
cb54e73
running 'dagshub.init' twice
jinensetpal Jul 3, 2024
da44129
clarified hook I/O
jinensetpal Jul 3, 2024
14c6209
optional field clarified
jinensetpal Jul 3, 2024
9ec4469
moved away from init, removed return_predictions argument, updated do…
jinensetpal Jul 3, 2024
7608f55
added option for custom host in predict/annotate_with_mlflow, used mu…
jinensetpal Jul 8, 2024
dc94883
fixed bug with annotation dictionary encoding
jinensetpal Jul 9, 2024
9693edf
encapsulatede prediction with raw url & annotations
jinensetpal Jul 11, 2024
3b38715
added function that adds a remote LS backend
jinensetpal Jul 16, 2024
ea0ed18
auto-setup authtoken and annotation column, made post_hook optional, …
jinensetpal Jul 19, 2024
5c8d871
updated requirements, fixed uncaught failure from requests
jinensetpal Jul 22, 2024
23bb4da
better name for autolabelling extra
jinensetpal Jul 22, 2024
21b58c2
added annotation from config, trimmed dependencies, retries for clust…
jinensetpal Jul 25, 2024
323be85
retry ls initialization only, single ngrok-session
jinensetpal Jul 29, 2024
9020795
merged upstream
jinensetpal Jul 29, 2024
94e3530
added more progress detail, docstrings
jinensetpal Aug 1, 2024
67a3467
Formatting, unpinned cloudpickle and ngrok
kbolashev Sep 4, 2024
07bee0d
Merge branch 'master' into ls-remote+mlflow
kbolashev Sep 4, 2024
26165e1
Lint fix
kbolashev Sep 4, 2024
549d945
Use asyncio to run ngrok and fix bug
idonov8 Oct 5, 2024
a904ace
Making the function async instead of using asyncio
idonov8 Oct 8, 2024
6e9d3d2
Merge pull request #535 from idonov8/fix-async-bug
kbolashev Oct 8, 2024
5a92e76
Merge branch 'master' into ls-remote+mlflow
deanp70 Oct 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dagshub/common/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def upload(

FILENAME can be a directory.

REPO should be of the form <owner>/<repo-name>, i.e nirbarazida/yolov6.
REPO should be of the form <owner>/<repo-name>, e.g: nirbarazida/yolov6.

TARGET should include the full path inside the repo, including the filename itself.
If TARGET is omitted, it defaults to using the relative path to FILENAME from current working directory,
Expand Down
149 changes: 149 additions & 0 deletions dagshub/data_engine/model/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
import threading
import time
import uuid
import requests
import webbrowser
from contextlib import contextmanager
from dataclasses import dataclass, field
from os import PathLike
from pathlib import Path
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union, Set, ContextManager, Tuple, Literal, Callable
from tenacity import retry, wait_fixed, stop_after_attempt, retry_if_exception_type


import rich.progress
from dataclasses_json import config, LetterCase, DataClassJsonMixin
Expand Down Expand Up @@ -42,6 +45,7 @@
DatasetFieldComparisonError,
FieldNotFoundError,
DatasetNotFoundError,
LSInitializingError,
)
from dagshub.data_engine.model.metadata import (
validate_uploading_metadata,
Expand All @@ -60,14 +64,19 @@
import pandas
import mlflow
import mlflow.entities
import cloudpickle
import ngrok
else:
plugin_server_module = lazy_load("dagshub.data_engine.voxel_plugin_server.server")
fo = lazy_load("fiftyone")
mlflow = lazy_load("mlflow")
pandas = lazy_load("pandas")
ngrok = lazy_load("ngrok")
cloudpickle = lazy_load("cloudpickle")

logger = logging.getLogger(__name__)

LS_ORCHESTRATOR_URL = "http://127.0.0.1"
DEFAULT_MLFLOW_ARTIFACT_NAME = "datasource.dagshub.json"
MLFLOW_DATASOURCE_TAG_NAME = "dagshub.datasets.datasource_id"
MLFLOW_DATASET_TAG_NAME = "dagshub.datasets.dataset_id"
Expand Down Expand Up @@ -167,6 +176,8 @@ def __init__(
self._explicit_update_ctx: Optional[MetadataContextManager] = None
self.assigned_dataset = from_dataset

self.ngrok_listener = None

@property
def has_explicit_context(self):
return self._explicit_update_ctx is not None
Expand Down Expand Up @@ -1024,6 +1035,144 @@ def _encode_query_for_frontend(self) -> str:
def fields(self) -> List[MetadataFieldSchema]:
return self.source.metadata_fields

@retry(retry=retry_if_exception_type(LSInitializingError), wait=wait_fixed(3), stop=stop_after_attempt(5))
async def add_annotation_model_from_config(self, config, project_name, ngrok_authtoken, port=9090):
"""
Initialize a LS backend for ML annotation using a preset configuration.

Args:
config: dictionary containing information about the mlflow model, hooks and LS label config
recommended to use with `get_config()` from preconfigured_models in the orchestrator repo
project_name: automatically adds backend to project
ngrok_authtoken: uses ngrok to forward local connection
port: (optional, default: 9090) port on which orchestrator is hosted
"""
ls_api_endpoint = multi_urljoin(self.source.repoApi.host, self.source.repo, "annotations/de/api/projects")

res = requests.get(
ls_api_endpoint,
headers={"Authorization": f"Bearer {dagshub.auth.get_token()}"},
)
if res.text.startswith("<!DOCTYPE html>"):
raise LSInitializingError()
elif res.status_code // 100 != 2:
raise ValueError(f"Adding backend failed! Response: {res.text}")
projects = {project["title"]: str(project["id"]) for project in res.json()["results"]}

if project_name not in projects:
res = requests.post(
ls_api_endpoint,
headers={"Authorization": f"Bearer {dagshub.auth.get_token()}", "Content-Type": "application/json"},
data=json.dumps({"title": project_name, "label_config": config.pop("label_config")}),
)
if res.status_code // 100 != 2:
raise ValueError(f"Adding backend failed! Response: {res.text}")
else:
res = requests.patch(
multi_urljoin(ls_api_endpoint, projects[project_name]),
headers={"Authorization": f"Bearer {dagshub.auth.get_token()}", "Content-Type": "application/json"},
data=json.dumps({"label_config": config.pop("label_config")}),
)
if res.status_code // 100 != 2:
raise ValueError(f"Adding backend failed! Response: {res.text}")

await self.add_annotation_model(**config, port=port, project_name=project_name, ngrok_authtoken=ngrok_authtoken)

@retry(retry=retry_if_exception_type(LSInitializingError), wait=wait_fixed(3), stop=stop_after_attempt(5))
async def add_annotation_model(
self,
repo: str,
name: str,
version: str = "latest",
post_hook: Callable[[Any], Any] = lambda x: x,
pre_hook: Callable[[Any], Any] = lambda x: x,
port: int = 9090,
project_name: Optional[str] = None,
ngrok_authtoken: Optional[str] = None,
) -> None:
"""
Initialize a LS backend for ML annotation.

Args:
repo: repository to extract the model from
name: name of the model in the mlflow registry
version: (optional, default: 'latest') version of the model in the mlflow registry
pre_hook: (optional, default: identity function) function that runs before datapoint is sent to the model
post_hook: (optional, default: identity function) function that converts mlflow model output
to the desired format
port: (optional, default: 9090) port on which orchestrator is hosted
project_name: (optional, default: None) automatically adds backend to project
ngrok_authtoken: (optional, default: None) uses ngrok to forward local connection
"""

def fn_encoder(fn):
return base64.b64encode(cloudpickle.dumps(fn)).decode("utf-8")

if not ngrok_authtoken and project_name:
raise ValueError("As `ngrok_authtoken` is not specified, project will have to be added manually.")
with get_rich_progress() as progress:
task = progress.add_task("Initializing LS Model...", total=1)
res = requests.post(
f"{LS_ORCHESTRATOR_URL}:{port}/configure",
headers={"Content-Type": "application/json"},
json=json.dumps(
{
"host": self.source.repoApi.host,
"username": self.source.repoApi.owner,
"repo": repo,
"model": name,
"version": version,
"authtoken": dagshub.auth.get_token(),
"datasource_repo": self.source.repo,
"datasource_name": self.source.name,
"pre_hook": fn_encoder(pre_hook),
"post_hook": fn_encoder(post_hook),
}
),
)
progress.update(task, advance=1, description="Configured LS model backend container")
if res.status_code // 100 != 2:
raise ValueError(f"Adding backend failed! Response: {res.text}")

if ngrok_authtoken:
if not self.ngrok_listener:
self.ngrok_listener = await ngrok.forward(port, authtoken=ngrok_authtoken)
endpoint = self.ngrok_listener.url()
else:
endpoint = f"{LS_ORCHESTRATOR_URL}:{port}/"
progress.update(task, advance=1, description="Configured any necessary forwarding")

if project_name:
ls_api_endpoint = multi_urljoin(self.source.repoApi.host, self.source.repo, "annotations/de/api")
res = requests.get(
multi_urljoin(ls_api_endpoint, "projects"),
headers={"Authorization": f"Bearer {dagshub.auth.get_token()}"},
)
if res.text.startswith("<!DOCTYPE html>"):
raise LSInitializingError()
elif res.status_code // 100 != 2:
raise ValueError(f"Adding backend failed! Response: {res.text}")
projects = {project["title"]: str(project["id"]) for project in res.json()["results"]}

if project_name not in projects:
raise ValueError(
f"{project_name} not in projects. Available project names: {list(projects.keys())}"
)

res = requests.post(
multi_urljoin(ls_api_endpoint, "ml"),
headers={"Authorization": f"Bearer {dagshub.auth.get_token()}", "Content-Type": "application/json"},
data=json.dumps({"url": endpoint, "project": projects[project_name]}),
)
if res.status_code // 100 == 2:
progress.update(task, advance=1, description="Added model to LS backend")
print("Backend added successfully!")
else:
raise ValueError(f"Adding backend failed! Response: {res.text}")
else:
progress.update(task, advance=1, description="Added model to LS backend")
print(f"Connection Established! Add LS endpoint: {endpoint} to your project.")

def annotate(self, fields_to_embed=None, fields_to_exclude=None) -> Optional[str]:
"""
Sends all datapoints in the datasource for annotation in Label Studio.
Expand Down
8 changes: 8 additions & 0 deletions dagshub/data_engine/model/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,11 @@ def __init__(self, repo, id, name):

def __str__(self):
return f"Dataset with name {self.name} or id {self.id} not found " f"in repository {self.repo}"


class LSInitializingError(Exception):
def __init__(self):
super().__init__()

def __str__(self):
return "Label Studio on DagsHub is starting up! Please try again in a couple of seconds"
132 changes: 129 additions & 3 deletions dagshub/data_engine/model/query_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
from dataclasses import field, dataclass
from os import PathLike
from pathlib import Path
from typing import TYPE_CHECKING, List, Dict, Any, Optional, Union, Tuple, Literal
from typing import TYPE_CHECKING, List, Dict, Any, Optional, Union, Tuple, Literal, Callable
import json
import os
import os.path

import dacite
Expand All @@ -18,12 +20,13 @@
from dagshub_annotation_converter.ir.image import IRImageAnnotationBase
from pydantic import ValidationError

from dagshub.auth import get_token
from dagshub.common import config
from dagshub.common.analytics import send_analytics_event
from dagshub.common.download import download_files
from dagshub.common.helpers import sizeof_fmt, prompt_user, log_message
from dagshub.common.rich_util import get_rich_progress
from dagshub.common.util import lazy_load
from dagshub.common.util import lazy_load, multi_urljoin
from dagshub.data_engine.annotation import MetadataAnnotations
from dagshub.data_engine.annotation.voxel_conversion import (
add_voxel_annotations,
Expand All @@ -42,11 +45,13 @@
import dagshub.data_engine.voxel_plugin_server.server as plugin_server_module
import datasets as hf_ds
import tensorflow as tf
import mlflow
else:
plugin_server_module = lazy_load("dagshub.data_engine.voxel_plugin_server.server")
fo = lazy_load("fiftyone")
tf = lazy_load("tensorflow")
hf_ds = lazy_load("datasets")
mlflow = lazy_load("mlflow")

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -466,6 +471,86 @@ def download_binary_columns(
num_proc=num_proc,
)

def predict_with_mlflow_model(
self,
repo: str,
name: str,
host: Optional[str] = None,
version: str = "latest",
pre_hook: Callable[[Any], Any] = lambda x: x,
post_hook: Callable[[Any], Any] = lambda x: x,
batch_size: int = 1,
log_to_field: Optional[str] = None,
) -> Optional[list]:
"""
Sends all the datapoints returned in this QueryResult as prediction targets for
an MLFlow model registered on DagsHub.

Args:
repo: repository to extract the model from
name: name of the model in the mlflow registry
version: (optional, default: 'latest') version of the model in the mlflow registry
pre_hook: (optional, default: identity function) function that runs before datapoint is sent to the model
post_hook: (optional, default: identity function) function that converts mlflow model output
to the desired format
batch_size: (optional, default: 1) function that sets batch_size
log_to_field: (optional, default: 'prediction') write prediction results to metadata logged in data engine.
If None, just returns predictions.
(in addition to logging to a field, iff that parameter is set)
"""

# to support depedency-free dataloading, `Batcher` is a barebones dataloader that sets up batched inference
class Batcher:
def __init__(self, dset, batch_size):
self.dset = dset
self.batch_size = batch_size

def __iter__(self):
self.curr_idx = 0
return self

def __next__(self):
self.curr_idx += self.batch_size
return [self.dset[idx] for idx in range(self.curr_idx - self.batch_size, self.curr_idx)]

if not host:
host = self.datasource.source.repoApi.host
prev_uri = mlflow.get_tracking_uri()
os.environ["MLFLOW_TRACKING_URI"] = multi_urljoin(host, f"{repo}.mlflow")
os.environ["MLFLOW_TRACKING_USERNAME"] = get_token()
os.environ["MLFLOW_TRACKING_PASSWORD"] = get_token()
try:
model = mlflow.pyfunc.load_model(f"models:/{name}/{version}")
finally:
os.environ["MLFLOW_TRACKING_URI"] = prev_uri

dset = DagsHubDataset(self, tensorizers=[lambda x: x])

predictions = {}
progress = get_rich_progress(rich.progress.MofNCompleteColumn())
task = progress.add_task("Running inference...", total=len(dset))
with progress:
for idx, local_paths in enumerate(
Batcher(dset, batch_size) if batch_size != 1 else dset
): # encapsulates dataset with batcher if necessary and iterates over it
for prediction, remote_path in zip(
post_hook(model.predict(pre_hook(local_paths))),
[result.path for result in self[idx * batch_size : (idx + 1) * batch_size]],
):
predictions[remote_path] = {
"data": {"image": multi_urljoin(self.datasource.source.root_raw_path, remote_path)},
"annotations": [prediction],
}
progress.update(task, advance=batch_size, refresh=True)

if log_to_field:
with self.datasource.metadata_context() as ctx:
for remote_path in predictions:
ctx.update_metadata(
remote_path, {log_to_field: json.dumps(predictions[remote_path]).encode("utf-8")}
)
return predictions

def get_annotations(self, **kwargs) -> "QueryResult":
"""
Loads all annotation fields using :func:`get_blob_fields`.
Expand Down Expand Up @@ -768,11 +853,52 @@ def visualize(self, visualizer: Literal["dagshub", "fiftyone"] = "dagshub", **kw

return sess

def annotate_with_mlflow_model(
self,
repo: str,
name: str,
post_hook: Callable = lambda x: x,
pre_hook: Callable = lambda x: x,
host: Optional[str] = None,
version: str = "latest",
batch_size: int = 1,
log_to_field: str = "annotation",
) -> Optional[str]:
"""
Sends all the datapoints returned in this QueryResult to an MLFlow model which automatically labels datapoints.

Args:
repo: repository to extract the model from
name: name of the model in the mlflow registry
version: (optional, default: 'latest') version of the model in the mlflow registry
pre_hook: (optional, default: identity function) function that runs
before the datapoint is sent to the model
post_hook: (optional, default: identity function) function that converts
mlflow model output converts to labelstudio format
batch_size: (optional, default: 1) batched annotation size
"""
self.predict_with_mlflow_model(
repo,
name,
host=host,
version=version,
pre_hook=pre_hook,
post_hook=post_hook,
batch_size=batch_size,
log_to_field=log_to_field,
)
self.datasource.metadata_field(log_to_field).set_annotation().apply()

def annotate(
self, open_project=True, ignore_warning=True, fields_to_embed=None, fields_to_exclude=None
self,
open_project: bool = True,
ignore_warning: bool = True,
fields_to_embed: Optional[List[str]] = None,
fields_to_exclude: Optional[List[str]] = None,
) -> Optional[str]:
"""
Sends all the datapoints returned in this QueryResult to be annotated in Label Studio on DagsHub.
Alternatively, uses MLFlow to automatically label datapoints.

Args:
open_project: Automatically open the Label Studio project in the browser
Expand Down
Loading
Loading