-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multireaderfilestream Redesign #4595
Conversation
@@ -255,7 +255,7 @@ def write_fileobj( | |||
conn_str = os.environ.get('AZURE_STORAGE_CONNECTION_STRING', '') | |||
os.environ['AZURE_STORAGE_CONNECTION_STRING'] = bundle_conn_str | |||
try: | |||
CHUNK_SIZE = 16 * 1024 | |||
CHUNK_SIZE = 1024 * 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why increase?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The upload speed with the smaller chunk size was too slow due to the sleep behavior that occurs on the faster reader, which is always the index reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like there's no super meaningful reason to keep chunk size smallish since the speed tradeoff is too large
return | ||
self._buffer += s | ||
|
||
def read(self, index: int, num_bytes=0): # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 0? If optional, use None, and add type hint Optional[int]
return s | ||
|
||
def peek(self, index: int, num_bytes): # type: ignore | ||
self._fill_buf_bytes(index, num_bytes) | ||
s = self._bufs[index].peek(num_bytes) | ||
while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid duplicate code with read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made the change to its current iteration, logic is that read is just a peek and position change. My concern is that this requires the lock to be released, which could potentially mean another thread could interpose during this release and do another read or peek. From what I can tell this shouldn't affect any logic since the actual buffer reading occurs before the lock releases, but I could be wrong. Definitely might cause some context switches though, but with only 2 threads the effect should be minimal
num_bytes = len(self._bufs[index]) | ||
s = self._bufs[index].read(num_bytes) | ||
self._pos[index] += len(s) | ||
if num_bytes == None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we ever call this with None
? If not, don't support it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think so, but this emulates default read() behavior for other io streams in python, where None means read to the end of the fileobj, or in our case the buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
end of fileobj is different semantically from end of buffer (which is arbitrary), so I would avoid having None
to set the wrong expectations of a behavior that we don't support
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I'll make the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great overall. Left a few comments so we can clean things up.
Reasons for making this change
The previous Multireaderfilestream design did not allow for backwards seeks, which causes uploads of directories above a certain size to break (~15MB). This was due to the process of reading directory bundles requiring seeks backwards up to a limit of 20MiB. The new design doesn't use BytesBuffers, but rather stores raw bytes, from 32MiB before the slowest reader, up to a maximum of 64MiB after the slowest reader. If a faster reader tries to read past this threshold, it will sleep until it is allowed to read more.
Added Tests
UPLOAD TESTS
test_fileobj_tar_gz: basic directory bundle test, did not exist previously. Checks that a basic directory has the correct files and file sizes.
test_large_fileobj_tar_gz: Tests a large directory bundle past the size threshold limit of the previous Multireaderfilestream design. Checks that all files are uploaded.
test_large_fileobj_tar_gz2: Tests a large directory bundle past the size threshold limit of the previous Multireaderfilestream design, with multiple large files. Checks that all files are uploaded and that sizes of files are correct.
test_upload_memory: Tests that the memory usage of the Multireaderfilestream uploading is expected at < 100MB
Manual speed test: Achieves faster than previous speeds with larger 1MB chunk sizes on upload, modified from 16KB
MRFS TESTS
test_reader_distance: Tests that 2 readers are always within the expected thresholds.
test_seek: Tests that backwards seeks within lookback range are working as expected.
test_toofar_seek: Tests that backwards seeks that go too far raise errors.
Related issues
Screenshots
Checklist