Skip to content

Commit

Permalink
Reuse local/hopsworks engine for copy, move and download ops
Browse files Browse the repository at this point in the history
  • Loading branch information
javierdlrm committed Nov 6, 2023
1 parent 827d0f8 commit e8f2e2e
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 37 deletions.
14 changes: 14 additions & 0 deletions python/hsml/core/native_hdfs_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,18 @@ def rm(self, path, recursive=True):
hdfs.rm(path, recursive=recursive)

def upload(self, local_path: str, remote_path: str):
# copy from local fs to hdfs
hdfs.put(local_path, remote_path)

def download(self, remote_path: str, local_path: str):
# copy from hdfs to local fs
print("Downloading file ...", end=" ")
hdfs.get(remote_path, local_path)

def copy(self, source_path: str, destination_path: str):
# both paths are hdfs paths
hdfs.cp(source_path, destination_path)

def move(self, source_path: str, destination_path: str):
# both paths are hdfs paths
hdfs.rename(source_path, destination_path)
24 changes: 20 additions & 4 deletions python/hsml/engine/hopsworks_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,40 @@ def __init__(self):
self._native_hdfs_api = native_hdfs_api.NativeHdfsApi()

def mkdir(self, remote_path: str):
remote_path = self._preppend_project_path(remote_path)
remote_path = self._prepend_project_path(remote_path)
self._native_hdfs_api.mkdir(remote_path)
self._native_hdfs_api.chmod(remote_path, "ug+rwx")

def delete(self, remote_path: str):
remote_path = self._preppend_project_path(remote_path)
remote_path = self._prepend_project_path(remote_path)
self._native_hdfs_api.rm(remote_path)

def upload(self, local_path: str, remote_path: str):
local_path = self._get_abs_path(local_path)
remote_path = self._preppend_project_path(remote_path)
remote_path = self._prepend_project_path(remote_path)
self._native_hdfs_api.upload(local_path, remote_path)
self._native_hdfs_api.chmod(remote_path, "ug+rwx")

def download(self, remote_path: str, local_path: str):
local_path = self._get_abs_path(local_path)
remote_path = self._prepend_project_path(remote_path)
self._native_hdfs_api.download(remote_path, local_path)

def copy(self, source_path: str, destination_path: str):
# both paths are hdfs paths
source_path = self._prepend_project_path(source_path)
destination_path = self._prepend_project_path(destination_path)
self._native_hdfs_api.copy(source_path, destination_path)

def move(self, source_path: str, destination_path: str):
source_path = self._prepend_project_path(source_path)
destination_path = self._prepend_project_path(destination_path)
self._native_hdfs_api.move(source_path, destination_path)

def _get_abs_path(self, local_path: str):
return local_path if os.path.isabs(local_path) else os.path.abspath(local_path)

def _preppend_project_path(self, remote_path: str):
def _prepend_project_path(self, remote_path: str):
if not remote_path.startswith("/Projects/"):
_client = client.get_instance()
remote_path = "/Projects/{}/{}".format(_client._project_name, remote_path)
Expand Down
23 changes: 19 additions & 4 deletions python/hsml/engine/local_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,37 @@ def __init__(self):
self._dataset_api = dataset_api.DatasetApi()

def mkdir(self, remote_path: str):
remote_path = self._preppend_project_path(remote_path)
remote_path = self._prepend_project_path(remote_path)
self._dataset_api.mkdir(remote_path)

def delete(self, remote_path: str):
remote_path = self._preppend_project_path(remote_path)
remote_path = self._prepend_project_path(remote_path)
self._dataset_api.rm(remote_path)

def upload(self, local_path: str, remote_path: str):
local_path = self._get_abs_path(local_path)
remote_path = self._preppend_project_path(remote_path)
remote_path = self._prepend_project_path(remote_path)
self._dataset_api.upload(local_path, remote_path)

def download(self, remote_path: str, local_path: str):
local_path = self._get_abs_path(local_path)
remote_path = self._prepend_project_path(remote_path)
self._dataset_api.download(remote_path, local_path)

def copy(self, source_path, destination_path):
source_path = self._prepend_project_path(source_path)
destination_path = self._prepend_project_path(destination_path)
self._dataset_api.copy(source_path, destination_path)

def move(self, source_path, destination_path):
source_path = self._prepend_project_path(source_path)
destination_path = self._prepend_project_path(destination_path)
self._dataset_api.move(source_path, destination_path)

def _get_abs_path(self, local_path: str):
return local_path if os.path.isabs(local_path) else os.path.abspath(local_path)

def _preppend_project_path(self, remote_path: str):
def _prepend_project_path(self, remote_path: str):
if not remote_path.startswith("/Projects/"):
_client = client.get_instance()
remote_path = "/Projects/{}/{}".format(_client._project_name, remote_path)
Expand Down
61 changes: 32 additions & 29 deletions python/hsml/engine/model_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _upload_additional_resources(self, model_instance):
with open(input_example_path, "w+") as out:
json.dump(input_example, out, cls=util.NumpyEncoder)

self._dataset_api.upload(input_example_path, model_instance.version_path)
self._engine.upload(input_example_path, model_instance.version_path)
os.remove(input_example_path)
model_instance.input_example = None
if model_instance._model_schema is not None:
Expand All @@ -83,33 +83,34 @@ def _upload_additional_resources(self, model_instance):
with open(model_schema_path, "w+") as out:
out.write(model_schema.json())

self._dataset_api.upload(model_schema_path, model_instance.version_path)
self._engine.upload(model_schema_path, model_instance.version_path)
os.remove(model_schema_path)
model_instance.model_schema = None
return model_instance

def _copy_or_move_hopsfs_model(
self,
existing_model_path,
model_version_path,
from_hdfs_model_path,
to_model_version_path,
keep_original_files,
update_upload_progress,
):
"""Copy or move model files from a hdfs path to the model version folder in the Models dataset."""
# Strip hdfs prefix
if existing_model_path.startswith("hdfs:/"):
projects_index = existing_model_path.find("/Projects", 0)
existing_model_path = existing_model_path[projects_index:]
if from_hdfs_model_path.startswith("hdfs:/"):
projects_index = from_hdfs_model_path.find("/Projects", 0)
from_hdfs_model_path = from_hdfs_model_path[projects_index:]

n_dirs, n_files = 0, 0
for entry in self._dataset_api.list(existing_model_path, sort_by="NAME:desc")[
for entry in self._dataset_api.list(from_hdfs_model_path, sort_by="NAME:desc")[
"items"
]:
path = entry["attributes"]["path"]
_, file_name = os.path.split(path)
if keep_original_files:
self._dataset_api.copy(path, model_version_path + "/" + file_name)
self._engine.copy(path, to_model_version_path + "/" + file_name)
else:
self._dataset_api.move(path, model_version_path + "/" + file_name)
self._engine.move(path, to_model_version_path + "/" + file_name)
if "." in path:
n_files += 1
else:
Expand All @@ -118,46 +119,48 @@ def _copy_or_move_hopsfs_model(

def _upload_local_model(
self,
local_model_path,
dataset_model_name_path,
from_local_model_path,
to_model_version_path,
update_upload_progress,
):
"""Copy or upload model files from a local path to the model version folder in the Models dataset."""
n_dirs, n_files = 0, 0
for root, dirs, files in os.walk(local_model_path):
for root, dirs, files in os.walk(from_local_model_path):
# os.walk(local_model_path), where local_model_path is expected to be an absolute path
# - root is the absolute path of the directory being walked
# - dirs is the list of directory names present in the root dir
# - files is the list of file names present in the root dir
# we need to replace the local path prefix with the hdfs path prefix (i.e., /srv/hops/....../root with /Projects/.../)
remote_base_path = root.replace(local_model_path, dataset_model_name_path)
remote_base_path = root.replace(
from_local_model_path, to_model_version_path
)
for d_name in dirs:
self._engine.mkdir(remote_base_path + "/" + d_name)
n_dirs += 1
update_upload_progress(n_dirs, n_files)
for f_name in files:
self._engine.upload(
root + "/" + f_name, remote_base_path + "/" + f_name
)
self._engine.upload(root + "/" + f_name, remote_base_path)
n_files += 1
update_upload_progress(n_dirs, n_files)

def _save_model_from_local_or_hopsfs(
def _save_model_from_local_or_hopsfs_mount(
self, model_instance, model_path, keep_original_files, update_upload_progress
):
"""Save model files from a local path. The local path can be on hopsfs mount"""
# check hopsfs mount
if model_path.startswith(constants.MODEL_REGISTRY.HOPSFS_MOUNT_PREFIX):
self._copy_or_move_hopsfs_model(
existing_model_path=model_path.replace(
from_hdfs_model_path=model_path.replace(
constants.MODEL_REGISTRY.HOPSFS_MOUNT_PREFIX, ""
),
model_version_path=model_instance.version_path,
to_model_version_path=model_instance.version_path,
keep_original_files=keep_original_files,
update_upload_progress=update_upload_progress,
)
else:
self._upload_local_model(
local_model_path=model_path,
dataset_model_name_path=model_instance.version_path,
from_local_model_path=model_path,
to_model_version_path=model_instance.version_path,
update_upload_progress=update_upload_progress,
)

Expand Down Expand Up @@ -281,7 +284,7 @@ def update_upload_progress(n_dirs=0, n_files=0):
# Upload Model files from local path to /Models/{model_instance._name}/{model_instance._version}
# check local absolute
if os.path.isabs(model_path) and os.path.exists(model_path):
self._save_model_from_local_or_hopsfs(
self._save_model_from_local_or_hopsfs_mount(
model_instance=model_instance,
model_path=model_path,
keep_original_files=keep_original_files,
Expand All @@ -291,7 +294,7 @@ def update_upload_progress(n_dirs=0, n_files=0):
elif os.path.exists(
os.path.join(os.getcwd(), model_path)
): # check local relative
self._save_model_from_local_or_hopsfs(
self._save_model_from_local_or_hopsfs_mount(
model_instance=model_instance,
model_path=os.path.join(os.getcwd(), model_path),
keep_original_files=keep_original_files,
Expand All @@ -302,8 +305,8 @@ def update_upload_progress(n_dirs=0, n_files=0):
model_path
): # check hdfs relative and absolute
self._copy_or_move_hopsfs_model(
existing_model_path=model_path,
model_version_path=model_instance.version_path,
from_hdfs_model_path=model_path,
to_model_version_path=model_instance.version_path,
keep_original_files=keep_original_files,
update_upload_progress=update_upload_progress,
)
Expand Down Expand Up @@ -350,7 +353,7 @@ def download(self, model_instance):
block=True,
timeout=600,
)
self._dataset_api.download(
self._engine.download(
temp_download_dir + "/" + str(model_instance._version) + ".zip",
zip_path,
)
Expand All @@ -375,7 +378,7 @@ def read_file(self, model_instance, resource):
resource = os.path.basename(resource)
tmp_dir = tempfile.TemporaryDirectory(dir=os.getcwd())
local_resource_path = os.path.join(tmp_dir.name, resource)
self._dataset_api.download(
self._engine.download(
hdfs_resource_path,
local_resource_path,
)
Expand All @@ -391,7 +394,7 @@ def read_json(self, model_instance, resource):
try:
tmp_dir = tempfile.TemporaryDirectory(dir=os.getcwd())
local_resource_path = os.path.join(tmp_dir.name, resource)
self._dataset_api.download(
self._engine.download(
hdfs_resource_path,
local_resource_path,
)
Expand Down

0 comments on commit e8f2e2e

Please sign in to comment.