Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize between uploading file and generating index thread #4443

Open
wants to merge 64 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
ac5ed66
POC: parallel file uploading and index creation
epicfaace Aug 16, 2022
b9fbc35
local test
wwwjn Jan 2, 2023
20aea6d
still buggy
wwwjn Jan 3, 2023
a1842a4
find error with GZipStrema
wwwjn Jan 4, 2023
ce7de68
clean
wwwjn Jan 4, 2023
78e2b0d
more tests
wwwjn Jan 13, 2023
14cbc59
more tests
wwwjn Jan 21, 2023
fda189a
might be good
wwwjn Jan 25, 2023
54507b9
clean
wwwjn Jan 25, 2023
40e0fe8
Merge remote-tracking branch 'origin/master' into parallel-upload
wwwjn Feb 1, 2023
de13ee5
indexed_gzip success, but does not work for folder
wwwjn Feb 1, 2023
2db90e7
works for both file and folder
wwwjn Feb 1, 2023
bde9f10
format
wwwjn Feb 1, 2023
cdb2240
test
wwwjn Feb 10, 2023
0a6e9da
Merge remote-tracking branch 'origin/master' into parallel-upload
wwwjn Feb 14, 2023
d4c6b58
fix unit test
wwwjn Feb 14, 2023
3a9f8d0
fix
wwwjn Feb 15, 2023
8ba4f68
fix
wwwjn Feb 21, 2023
0f409b0
fix half
wwwjn Feb 21, 2023
5c9cc8a
fix file size
wwwjn Feb 21, 2023
d4509f6
temporary fix to pass unittest
wwwjn Feb 21, 2023
99402f7
fix
wwwjn Feb 22, 2023
5f55e54
update file size
wwwjn Feb 23, 2023
f29e7fa
add API
wwwjn Feb 28, 2023
5cc9025
finish test1
wwwjn Mar 1, 2023
cbbb4f9
checkout all tests
wwwjn Mar 1, 2023
5ce3bf8
fix client
wwwjn Mar 1, 2023
0490cf5
fix unittest
wwwjn Mar 1, 2023
90b13f0
fix client
wwwjn Mar 8, 2023
e48d91b
fix format
wwwjn Mar 8, 2023
32d39db
Merge remote-tracking branch 'origin/master' into parallel-upload
wwwjn Mar 8, 2023
7e8f511
add requirments
wwwjn Mar 8, 2023
28e5e14
fix upload string
wwwjn Mar 8, 2023
4111e81
add more print
wwwjn Mar 15, 2023
ef3c907
fix --force-compression
wwwjn Mar 15, 2023
c17aa9a
Merge branch 'master' into parallel-upload
wwwjn Mar 15, 2023
ce715e8
Merge remote-tracking branch 'origin/master' into parallel-upload
wwwjn Mar 18, 2023
8b28b1f
fix stream file error
wwwjn Mar 18, 2023
76d2888
comment
epicfaace Mar 21, 2023
61617b3
test
epicfaace Mar 21, 2023
b0af9ed
revert changes, simpler GHA
epicfaace Mar 21, 2023
7741c94
fix
epicfaace Mar 21, 2023
f193e80
revert gha changes
epicfaace Mar 21, 2023
77480f1
cleanup v1
wwwjn Mar 22, 2023
1bd3bbd
delete pycache
wwwjn Mar 22, 2023
8bb7c5b
rm __pycache__
wwwjn Mar 22, 2023
d6bdefd
more fix
wwwjn Mar 22, 2023
f0e3215
fmt
wwwjn Mar 22, 2023
6d86462
fix fmt
wwwjn Mar 23, 2023
2ab01be
fix docs
wwwjn Mar 23, 2023
f6ab881
change signed url expire time
wwwjn Apr 5, 2023
f4589ba
synchronize busy waiting version
wwwjn Apr 10, 2023
f89109b
Merge branch 'master' into parallel-upload
wwwjn Apr 12, 2023
c06f37d
fmt
wwwjn Apr 12, 2023
75f81af
fix upload1
wwwjn Apr 12, 2023
6b96ebb
merge master
wwwjn Apr 18, 2023
9f524ef
Merge remote-tracking branch 'origin/master' into parallel-upload-sync
wwwjn Apr 25, 2023
7f02173
rm pycache files
wwwjn Apr 26, 2023
5d6c496
fix ignore files
wwwjn Apr 26, 2023
5c3c7b5
Merge branch 'master' into parallel-upload-sync
wwwjn May 17, 2023
12c3fe0
Merge branch 'master' into parallel-upload-sync
wwwjn Jun 6, 2023
b2cf26b
fix format
wwwjn Jun 6, 2023
3037b17
Merge branch 'parallel-upload-sync' of github.com:codalab/codalab-wor…
wwwjn Jun 6, 2023
a7b473d
add
wwwjn Jun 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Thumbs.db
*MANIFEST
*.egg-info
venv*/
*/__pycache__/*

/nav
/tags
Expand Down
4 changes: 2 additions & 2 deletions codalab/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def _get_azure_sas_url(self, path, **kwargs):
account_name=AZURE_BLOB_ACCOUNT_NAME,
container_name=AZURE_BLOB_CONTAINER_NAME,
account_key=AZURE_BLOB_ACCOUNT_KEY,
expiry=datetime.datetime.now() + datetime.timedelta(hours=1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we refactor this into a constant? Also, is 10 hours enough or might we need more for even larger files?

expiry=datetime.datetime.now() + datetime.timedelta(hours=10),
blob_name=blob_name,
)
return f"{AZURE_BLOB_HTTP_ENDPOINT}/{AZURE_BLOB_CONTAINER_NAME}/{blob_name}?{sas_token}"
Expand All @@ -306,7 +306,7 @@ def _get_gcs_signed_url(self, path, **kwargs):
blob = bucket.blob(blob_name)
signed_url = blob.generate_signed_url(
version="v4",
expiration=datetime.timedelta(hours=1),
expiration=datetime.timedelta(hours=10),
method=kwargs.get("method", "GET"), # HTTP method. eg, GET, PUT
content_type=kwargs.get("request_content_type", None),
response_disposition=kwargs.get("content_disposition", None),
Expand Down
29 changes: 29 additions & 0 deletions codalab/lib/beam/MultiReaderFileStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ class MultiReaderFileStream(BytesIO):
"""
NUM_READERS = 2

# MAX memory usage <= MAX_BUF_SIZE + max(num_bytes called in read)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this comment mean? Can you add a description of what MAX_BUF_SIZE is used for?

MAX_BUF_SIZE = 1024 * 1024 * 1024 # 10 MiB for test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What value should it be for non-test?



def __init__(self, fileobj):
self._bufs = [BytesBuffer() for _ in range(0, self.NUM_READERS)]
self._pos = [0 for _ in range(0, self.NUM_READERS)]
self._fileobj = fileobj
self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffers.
self._current_max_buf_length = 0

class FileStreamReader(BytesIO):
def __init__(s, index):
Expand All @@ -36,15 +41,39 @@ def _fill_buf_bytes(self, index: int, num_bytes=None):
break
for i in range(0, self.NUM_READERS):
self._bufs[i].write(s)
self.find_largest_buffer()

def find_largest_buffer(self):
self._current_max_buf_length = len(self._bufs[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a docstring comment

for i in range(1, self.NUM_READERS):
self._current_max_buf_length = max(self._current_max_buf_length, len(self._bufs[i]))
# print(f"find largest buffer: {self._current_max_buf_length} in thread: {threading.current_thread().name}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove comments



def read(self, index: int, num_bytes=None): # type: ignore
"""Read the specified number of bytes from the associated file.
index: index that specifies which reader is reading.
"""

# print(f"calling read() in thread {threading.current_thread().name}, num_bytes={num_bytes}")
# busy waiting until
while(self._current_max_buf_length > self.MAX_BUF_SIZE and len(self._bufs[index]) < self._current_max_buf_length):
# only the slowest reader could read
# print(f"Busy waiting in thread: {threading.current_thread().name}, current max_len = {self._current_max_buf_length}, current_buf_size = {len(self._bufs[index])}")
pass

# If current thread is the slowest reader, continue read.
# If current thread is the slowest reader, and num_bytes > len(self._buf[index]) / num_bytes = None, will continue grow the buffer.
# max memory usage <= MAX_BUF_SIZE + max(num_bytes called in read)
self._fill_buf_bytes(index, num_bytes)
assert self._current_max_buf_length <= 2 * self.MAX_BUF_SIZE
if num_bytes is None:
num_bytes = len(self._bufs[index])
s = self._bufs[index].read(num_bytes)
self.find_largest_buffer()
# print("Current thread name: ", threading.current_thread().name)


self._pos[index] += len(s)
return s

Expand Down
2 changes: 1 addition & 1 deletion codalab/lib/beam/SQLiteIndexedTar.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ def _createIndex(
# In that case add that itself to the file index. This won't work when called recursively,
# so check stream offset.
fileCount = self.sqlConnection.execute('SELECT COUNT(*) FROM "files";').fetchone()[0]
if fileCount == 0: # Jiani: For Codalab, the bundle contains only
if fileCount == 0: # Jiani: For Codalab, the bundle contains only single files
# This branch is not used.
if self.printDebug >= 3:
print(f"Did not find any file in the given TAR: {self.tarFileName}. Assuming a compressed file.")
Expand Down
1 change: 1 addition & 0 deletions tests/unit/server/upload_download_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def test_not_found(self):

def check_file_target_contents(self, target):
"""Checks to make sure that the specified file has the contents 'hello world'."""
# This can not be checked, Since
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update comment?

with self.download_manager.stream_file(target, gzipped=False) as f:
self.assertEqual(f.read(), b"hello world")

Expand Down