diff --git a/codalab/lib/beam/blobstorageuploader.py b/codalab/lib/beam/blobstorageuploader.py new file mode 100644 index 000000000..da90ae76b --- /dev/null +++ b/codalab/lib/beam/blobstorageuploader.py @@ -0,0 +1,55 @@ +from apache_beam.io.filesystemio import Uploader +from azure.storage.blob import ( + ContentSettings, + BlobBlock, +) +from apache_beam.io.azure.blobstorageio import parse_azfs_path +import base64 +from codalab.worker.un_gzip_stream import BytesBuffer + +class BlobStorageUploader(Uploader): + """An improved version of apache_beam.io.azure.blobstorageio.BlobStorageUploader + that handles multipart streaming (block-by-block) uploads. + TODO (Ashwin): contribute this back upstream to Apache Beam (https://github.com/codalab/codalab-worksheets/issues/3475). + """ + def __init__(self, client, path, mime_type='application/octet-stream'): + self._client = client + self._path = path + self._container, self._blob = parse_azfs_path(path) + self._content_settings = ContentSettings(mime_type) + + self._blob_to_upload = self._client.get_blob_client( + self._container, self._blob) + + self.block_number = 1 + self.buffer = BytesBuffer() + self.block_list = [] + + def put(self, data): + # Note that Blob Storage currently can hold a maximum of 100,000 uncommitted blocks. + # This means that with this current implementation, we can upload a file with a maximum + # size of 10 TiB to Blob Storage. To exceed that limit, we must either increase MIN_WRITE_SIZE + # or modify the implementation of this class to call commit_block_list more often (and not + # just at the end of the upload). + MIN_WRITE_SIZE = 100 * 1024 * 1024 + # Maximum block size is 4000 MiB (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks). + MAX_WRITE_SIZE = 4000 * 1024 * 1024 + + self.buffer.write(data.tobytes()) + + while len(self.buffer) >= MIN_WRITE_SIZE: + # Take the first chunk off the buffer and write it to Blob Storage + chunk = self.buffer.read(MAX_WRITE_SIZE) + self._write_to_blob(chunk) + + def _write_to_blob(self, data): + # block_id's have to be base-64 strings normalized to have the same length. + block_id = base64.b64encode('{0:-32d}'.format(self.block_number).encode()).decode() + + self._blob_to_upload.stage_block(block_id, data) + self.block_list.append(BlobBlock(block_id)) + self.block_number = self.block_number + 1 + + def finish(self): + self._write_to_blob(self.buffer) + self._blob_to_upload.commit_block_list(self.block_list, content_settings=self._content_settings) diff --git a/codalab/lib/beam/filesystems.py b/codalab/lib/beam/filesystems.py index 11a548994..90c4d62cd 100644 --- a/codalab/lib/beam/filesystems.py +++ b/codalab/lib/beam/filesystems.py @@ -1,6 +1,11 @@ import os from azure.storage.blob import BlobServiceClient +# Monkey-patch BlobStorageUploader +from .blobstorageuploader import BlobStorageUploader +import apache_beam.io.azure.blobstorageio +apache_beam.io.azure.blobstorageio.BlobStorageUploader = BlobStorageUploader + # Test connection string for Azurite (local development) TEST_CONN_STR = ( "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;" diff --git a/codalab/worker_manager/azure_batch_worker_manager.py b/codalab/worker_manager/azure_batch_worker_manager.py index 71d8603aa..fad705dad 100644 --- a/codalab/worker_manager/azure_batch_worker_manager.py +++ b/codalab/worker_manager/azure_batch_worker_manager.py @@ -98,12 +98,9 @@ def start_worker_job(self) -> None: worker_image: str = 'codalab/worker:' + os.environ.get('CODALAB_VERSION', 'latest') worker_id: str = uuid.uuid4().hex logger.debug('Starting worker {} with image {}'.format(worker_id, worker_image)) - work_dir_prefix: str = ( + work_dir: str = ( self.args.worker_work_dir_prefix if self.args.worker_work_dir_prefix else "/tmp/" ) - - # This needs to be a unique directory since Batch jobs may share a host - work_dir: str = os.path.join(work_dir_prefix, 'cl_worker_{}_work_dir'.format(worker_id)) command: List[str] = self.build_command(worker_id, work_dir) task_container_run_options: List[str] = [ diff --git a/docker_config/dockerfiles/Dockerfile.default-cpu b/docker_config/dockerfiles/Dockerfile.default-cpu index f87256f50..031e16541 100644 --- a/docker_config/dockerfiles/Dockerfile.default-cpu +++ b/docker_config/dockerfiles/Dockerfile.default-cpu @@ -66,7 +66,7 @@ RUN python3 -m pip install -U --no-cache-dir \ scikit-learn \ scikit-image \ nltk \ - tensorflow-gpu==1.12.0 \ + tensorflow==1.12.0 \ tensorboard \ keras \ torch==1.1.0 \ diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 1323606ad..b81744a2a 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -5645,17 +5645,6 @@ "integrity": "sha1-/xnt6Kml5XkyQUewwR8PvLq+1jk=", "dev": true }, - "clipboard": { - "version": "2.0.8", - "resolved": "https://registry.npmjs.org/clipboard/-/clipboard-2.0.8.tgz", - "integrity": "sha512-Y6WO0unAIQp5bLmk1zdThRhgJt/x3ks6f30s3oE3H1mgIEU33XyQjEf8gsf6DxC7NPX8Y1SsNWjUjL/ywLnnbQ==", - "optional": true, - "requires": { - "good-listener": "^1.2.2", - "select": "^1.1.2", - "tiny-emitter": "^2.0.0" - } - }, "cliui": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/cliui/-/cliui-5.0.0.tgz", @@ -6946,12 +6935,6 @@ "integrity": "sha1-3zrhmayt+31ECqrgsp4icrJOxhk=", "dev": true }, - "delegate": { - "version": "3.2.0", - "resolved": "https://registry.npmjs.org/delegate/-/delegate-3.2.0.tgz", - "integrity": "sha512-IofjkYBZaZivn0V8nnsMJGBr4jVLxHDheKSW88PyxS5QC4Vo9ZbZVvhzlSxY87fVq3STR6r+4cGepyHkcWOQSw==", - "optional": true - }, "depd": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/depd/-/depd-1.1.2.tgz", @@ -8942,15 +8925,6 @@ } } }, - "good-listener": { - "version": "1.2.2", - "resolved": "https://registry.npmjs.org/good-listener/-/good-listener-1.2.2.tgz", - "integrity": "sha1-1TswzfkxPf+33JoNR3CWqm0UXFA=", - "optional": true, - "requires": { - "delegate": "^3.1.2" - } - }, "graceful-fs": { "version": "4.2.4", "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.4.tgz", @@ -14571,12 +14545,9 @@ } }, "prismjs": { - "version": "1.23.0", - "resolved": "https://registry.npmjs.org/prismjs/-/prismjs-1.23.0.tgz", - "integrity": "sha512-c29LVsqOaLbBHuIbsTxaKENh1N2EQBOHaWv7gkHN4dgRbxSREqDnDbtFJYdpPauS4YCplMSNCABQ6Eeor69bAA==", - "requires": { - "clipboard": "^2.0.0" - } + "version": "1.24.0", + "resolved": "https://registry.npmjs.org/prismjs/-/prismjs-1.24.0.tgz", + "integrity": "sha512-SqV5GRsNqnzCL8k5dfAjCNhUrF3pR0A9lTDSCUZeh/LIshheXJEaP0hwLz2t4XHivd2J/v2HR+gRnigzeKe3cQ==" }, "process": { "version": "0.11.10", @@ -16630,12 +16601,6 @@ "resolved": "https://registry.npmjs.org/seamless-immutable/-/seamless-immutable-7.1.4.tgz", "integrity": "sha512-XiUO1QP4ki4E2PHegiGAlu6r82o5A+6tRh7IkGGTVg/h+UoeX4nFBeCGPOhb4CYjvkqsfm/TUtvOMYC1xmV30A==" }, - "select": { - "version": "1.1.2", - "resolved": "https://registry.npmjs.org/select/-/select-1.1.2.tgz", - "integrity": "sha1-DnNQrN7ICxEIUoeG7B1EGNEbOW0=", - "optional": true - }, "select-hose": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/select-hose/-/select-hose-2.0.0.tgz", @@ -17981,12 +17946,6 @@ "integrity": "sha1-QFQRqOfmM5/mTbmiNN4R3DHgK9Q=", "dev": true }, - "tiny-emitter": { - "version": "2.1.0", - "resolved": "https://registry.npmjs.org/tiny-emitter/-/tiny-emitter-2.1.0.tgz", - "integrity": "sha512-NB6Dk1A9xgQPMoGqC5CVXn123gWyte215ONT5Pp5a0yt4nlEoO1ZWeCwpncaekPHXO60i47ihFnZPiRPjRMq4Q==", - "optional": true - }, "tiny-invariant": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/tiny-invariant/-/tiny-invariant-1.1.0.tgz", diff --git a/setup.cfg b/setup.cfg index 822d2ad9f..b00b5dc14 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,7 +33,7 @@ ignore_missing_imports = True [flake8] max-line-length = 200 -exclude = venv/*,var/*,alembic/*,frontend/*,codalab/lib/beam/ratarmount.py,codalab/lib/beam/streamingzipfile.py +exclude = venv/*,var/*,alembic/*,frontend/*,codalab/lib/beam/ratarmount.py,codalab/lib/beam/blobstorageuploader.py,codalab/lib/beam/streamingzipfile.py # Ignore completely: # E203 - White space before ':', (conflicts with black) @@ -53,7 +53,7 @@ per-file-ignores = # Modifying those texts in accordance with flake8 will change how the generated markdown look like ./scripts/gen-rest-docs.py: E501 -[mypy-apache_beam,apache_beam.io.filesystem,apache_beam.io.filesystems,apache_beam.io.localfilesystem,apache_beam.io] +[mypy-apache_beam,apache_beam.io.filesystem,apache_beam.io.filesystems,apache_beam.io.localfilesystem,apache_beam.io,apache_beam.io.filesystemio,apache_beam.io.azure,apache_beam.io.azure.blobstorageio] ignore_missing_imports = True [mypy-azure.storage.blob]