Skip to content

Commit

Permalink
Fix Azure storage upload (#598)
Browse files Browse the repository at this point in the history
  • Loading branch information
allegroai committed Mar 15, 2022
1 parent 835d951 commit 237c724
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 21 deletions.
41 changes: 21 additions & 20 deletions clearml/storage/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ def upload(self, src_path, dest_path=None, extra=None, async_enable=False, cb=No
dest_path = os.path.basename(src_path)

dest_path = self._canonize_url(dest_path)
dest_path = dest_path.replace('\\', '/')

if cb and self.scheme in _HttpDriver.schemes:
# store original callback
Expand Down Expand Up @@ -687,7 +688,7 @@ def next_chunk(astream):
# try to get file size
try:
if isinstance(self._driver, _HttpDriver) and obj:
obj = self._driver._get_download_object(obj)
obj = self._driver._get_download_object(obj) # noqa
total_size_mb = float(obj.headers.get('Content-Length', 0)) / (1024 * 1024)
elif hasattr(obj, 'size'):
size = obj.size
Expand Down Expand Up @@ -722,9 +723,10 @@ def next_chunk(astream):
if not skip_zero_size_check and Path(temp_local_path).stat().st_size <= 0:
raise Exception('downloaded a 0-sized file')

# if we are on windows, we need to remove the target file before renaming
# if we are on Windows, we need to remove the target file before renaming
# otherwise posix rename will overwrite the target
if os.name != 'posix':
# noinspection PyBroadException
try:
os.remove(local_path)
except Exception:
Expand Down Expand Up @@ -757,8 +759,7 @@ def next_chunk(astream):
if delete_on_failure:
# noinspection PyBroadException
try:
if temp_local_path:
os.remove(temp_local_path)
os.remove(temp_local_path)
except Exception:
pass
return None
Expand Down Expand Up @@ -1710,7 +1711,7 @@ class _AzureBlobServiceStorageDriver(_Driver):

class _Container(object):
def __init__(self, name, config, account_url):
self.MAX_SINGLE_PUT_SIZE = 16 * 1024 * 1024
self.MAX_SINGLE_PUT_SIZE = 4 * 1024 * 1024
self.SOCKET_TIMEOUT = (300, 2000)
self.name = name
self.config = config
Expand Down Expand Up @@ -1775,17 +1776,9 @@ def create_blob_from_path(
progress_callback=progress_callback,
)
else:
client = self.__blob_service.get_blob_client(container_name, blob_name)
with open(path, "rb") as file:
first_chunk = True
for chunk in iter((lambda: file.read(self.MAX_SINGLE_PUT_SIZE)), b""):
if first_chunk:
client.upload_blob(chunk, overwrite=True, max_concurrency=max_connections)
first_chunk = False
else:
from azure.storage.blob import BlockType # noqa

client.upload_blob(chunk, BlockType.AppendBlob)
self.create_blob_from_data(
container_name, None, blob_name, open(path, "rb"), max_connections=max_connections
)

def delete_blob(self, container_name, blob_name):
if self.__legacy:
Expand Down Expand Up @@ -1839,13 +1832,17 @@ def get_blob_to_path(self, container_name, blob_name, path, max_connections=10,
progress_callback=progress_callback,
)
else:
client = self.__blob_service.get_blob_client(container_name, blob_name, max_concurrency=max_connections)
client = self.__blob_service.get_blob_client(container_name, blob_name)
with open(path, "wb") as file:
return client.download_blob().download_to_stream(file)
return client.download_blob(max_concurrency=max_connections).download_to_stream(file)

def is_legacy(self):
return self.__legacy

@property
def blob_service(self):
return self.__blob_service

@attrs
class _Object(object):
container = attrib()
Expand Down Expand Up @@ -1953,7 +1950,10 @@ def download_object_as_stream(self, obj, verbose, *_, **__):
obj.blob_name,
progress_callback=cb,
)
return blob.content
if container.is_legacy():
return blob.content
else:
return blob

def download_object(self, obj, local_path, overwrite_existing=True, delete_on_failure=True, callback=None, **_):
p = Path(local_path)
Expand Down Expand Up @@ -1981,7 +1981,8 @@ def callback_func(current, total):
max_connections=10,
progress_callback=callback_func,
)
download_done.wait()
if container.is_legacy():
download_done.wait()

def test_upload(self, test_path, config, **_):
container = self.get_container(config=config)
Expand Down
3 changes: 2 additions & 1 deletion clearml/storage/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ def upload_folder(cls, local_folder, remote_url, match_wildcard=None):
if not Path(local_folder).is_dir():
base_logger.error("Local folder '{}' does not exist".format(local_folder))
return
local_folder = str(Path(local_folder))
results = []
helper = StorageHelper.get(remote_url)
with ThreadPool() as pool:
Expand Down Expand Up @@ -299,7 +300,7 @@ def download_folder(
continue
local_url = os.path.join(
str(Path(local_folder)),
str(Path(remote_path[len(remote_url):].lstrip(os.path.sep)))
str(Path(remote_path[len(remote_url):])).lstrip(os.path.sep)
)
if not os.path.exists(local_url) or os.path.getsize(local_url) == 0:
results.append(
Expand Down

0 comments on commit 237c724

Please sign in to comment.