Skip to content

Commit

Permalink
Merge branch 'master' into rc0.5.56
Browse files Browse the repository at this point in the history
  • Loading branch information
adiprerepa authored Jun 30, 2021
2 parents de14162 + 6408dbe commit d607bce
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 51 deletions.
55 changes: 55 additions & 0 deletions codalab/lib/beam/blobstorageuploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from apache_beam.io.filesystemio import Uploader
from azure.storage.blob import (
ContentSettings,
BlobBlock,
)
from apache_beam.io.azure.blobstorageio import parse_azfs_path
import base64
from codalab.worker.un_gzip_stream import BytesBuffer

class BlobStorageUploader(Uploader):
"""An improved version of apache_beam.io.azure.blobstorageio.BlobStorageUploader
that handles multipart streaming (block-by-block) uploads.
TODO (Ashwin): contribute this back upstream to Apache Beam (https://github.com/codalab/codalab-worksheets/issues/3475).
"""
def __init__(self, client, path, mime_type='application/octet-stream'):
self._client = client
self._path = path
self._container, self._blob = parse_azfs_path(path)
self._content_settings = ContentSettings(mime_type)

self._blob_to_upload = self._client.get_blob_client(
self._container, self._blob)

self.block_number = 1
self.buffer = BytesBuffer()
self.block_list = []

def put(self, data):
# Note that Blob Storage currently can hold a maximum of 100,000 uncommitted blocks.
# This means that with this current implementation, we can upload a file with a maximum
# size of 10 TiB to Blob Storage. To exceed that limit, we must either increase MIN_WRITE_SIZE
# or modify the implementation of this class to call commit_block_list more often (and not
# just at the end of the upload).
MIN_WRITE_SIZE = 100 * 1024 * 1024
# Maximum block size is 4000 MiB (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks).
MAX_WRITE_SIZE = 4000 * 1024 * 1024

self.buffer.write(data.tobytes())

while len(self.buffer) >= MIN_WRITE_SIZE:
# Take the first chunk off the buffer and write it to Blob Storage
chunk = self.buffer.read(MAX_WRITE_SIZE)
self._write_to_blob(chunk)

def _write_to_blob(self, data):
# block_id's have to be base-64 strings normalized to have the same length.
block_id = base64.b64encode('{0:-32d}'.format(self.block_number).encode()).decode()

self._blob_to_upload.stage_block(block_id, data)
self.block_list.append(BlobBlock(block_id))
self.block_number = self.block_number + 1

def finish(self):
self._write_to_blob(self.buffer)
self._blob_to_upload.commit_block_list(self.block_list, content_settings=self._content_settings)
5 changes: 5 additions & 0 deletions codalab/lib/beam/filesystems.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import os
from azure.storage.blob import BlobServiceClient

# Monkey-patch BlobStorageUploader
from .blobstorageuploader import BlobStorageUploader
import apache_beam.io.azure.blobstorageio
apache_beam.io.azure.blobstorageio.BlobStorageUploader = BlobStorageUploader

# Test connection string for Azurite (local development)
TEST_CONN_STR = (
"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
Expand Down
5 changes: 1 addition & 4 deletions codalab/worker_manager/azure_batch_worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,9 @@ def start_worker_job(self) -> None:
worker_image: str = 'codalab/worker:' + os.environ.get('CODALAB_VERSION', 'latest')
worker_id: str = uuid.uuid4().hex
logger.debug('Starting worker {} with image {}'.format(worker_id, worker_image))
work_dir_prefix: str = (
work_dir: str = (
self.args.worker_work_dir_prefix if self.args.worker_work_dir_prefix else "/tmp/"
)

# This needs to be a unique directory since Batch jobs may share a host
work_dir: str = os.path.join(work_dir_prefix, 'cl_worker_{}_work_dir'.format(worker_id))
command: List[str] = self.build_command(worker_id, work_dir)

task_container_run_options: List[str] = [
Expand Down
2 changes: 1 addition & 1 deletion docker_config/dockerfiles/Dockerfile.default-cpu
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ RUN python3 -m pip install -U --no-cache-dir \
scikit-learn \
scikit-image \
nltk \
tensorflow-gpu==1.12.0 \
tensorflow==1.12.0 \
tensorboard \
keras \
torch==1.1.0 \
Expand Down
47 changes: 3 additions & 44 deletions frontend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ ignore_missing_imports = True

[flake8]
max-line-length = 200
exclude = venv/*,var/*,alembic/*,frontend/*,codalab/lib/beam/ratarmount.py,codalab/lib/beam/streamingzipfile.py
exclude = venv/*,var/*,alembic/*,frontend/*,codalab/lib/beam/ratarmount.py,codalab/lib/beam/blobstorageuploader.py,codalab/lib/beam/streamingzipfile.py

# Ignore completely:
# E203 - White space before ':', (conflicts with black)
Expand All @@ -53,7 +53,7 @@ per-file-ignores =
# Modifying those texts in accordance with flake8 will change how the generated markdown look like
./scripts/gen-rest-docs.py: E501

[mypy-apache_beam,apache_beam.io.filesystem,apache_beam.io.filesystems,apache_beam.io.localfilesystem,apache_beam.io]
[mypy-apache_beam,apache_beam.io.filesystem,apache_beam.io.filesystems,apache_beam.io.localfilesystem,apache_beam.io,apache_beam.io.filesystemio,apache_beam.io.azure,apache_beam.io.azure.blobstorageio]
ignore_missing_imports = True

[mypy-azure.storage.blob]
Expand Down

0 comments on commit d607bce

Please sign in to comment.