Paging (aka chunking, batch) tools #29
Replies: 3 comments
-
Here's a function that can be useful to make a "chunked reads" version of def get_chunked_data(self, k, chunk_size=8192):
blob_client = self._container_client.get_blob_client(blob=self._id_of_key(k))
# Download the blob and return a StorageStreamDownloader
stream_downloader = blob_client.download_blob()
# Return an iterator that yields chunks of the data
return stream_downloader.chunks(chunk_size) The question is now: What is the general pattern here? We would like to have some general tools to handle these chunked read situations. For example: If you have a new system, you need to specify how to get a chunk reader, and that reader is used in the |
Beta Was this translation helpful? Give feedback.
-
AppendableThis is a very common and general use-case: We want to append some streaming bytes to an open file or upload a big file to some remote blob storage service (e.g. s3 or azure), etc. We'd like to be able to do things like for chk in stream:
s[k] += chk
# or if some open/close management is needed:
with s[k]:
for chk in stream:
s[k].extend(chk)
# or
consume(map(s[k].append, chks) # where `consume = lambda it: [_ for _ in it]` It should be obvious from the code above that this ability to append to I'm thinking more along the lines of an Most probably this code will go in the already existing dol.appendable module. Now to the question of One thing is sure: We want to make it as close to the builtin types as possible. That is, we shouldn't align with file and DB operations, but with simple builtin types. Here we're mutating the The strange world of
|
Beta Was this translation helpful? Give feedback.
-
Here's a proposal, which I'll push (or PR) tomorrow: from collections.abc import MutableMapping
from functools import partial
from operator import add
def read_add_write(store, key, iterable, add_iterables=add):
"""Retrieves """
if key in store:
store[key] = add_iterables(store[key], iterable)
else:
store[key] = iterable
class Extender:
def __init__(
self,
store: MutableMapping,
key,
*,
extend_store_value=read_add_write,
append_method=None,
):
self.store = store
self.key = key
self.extend_store_value = extend_store_value
if append_method is not None:
self.append = partial(append_method, self)
def extend(self, iterable):
"""Extend the iterable stored in """
return self.extend_store_value(self.store, self.key, iterable)
__iadd__ = extend
store = {'a': 'pple'}
# test normal extend
a_extender = Extender(store, 'a')
a_extender.extend('sauce')
assert store == {'a': 'pplesauce'}
# test creation (when key is not in store)
b_extender = Extender(store, 'b')
b_extender.extend('anana')
assert store == {'a': 'pplesauce', 'b': 'anana'}
# you can use the += operator too
b_extender += ' split'
assert store == {'a': 'pplesauce', 'b': 'anana split'}
# test append
# Need to define an append method that makes sense.
# Here, with strings, we can just call extend.
b_bis_extender = Extender(store, 'b', append_method=lambda self, obj: self.extend(obj))
b_bis_extender.append('s')
assert store == {'a': 'pplesauce', 'b': 'anana splits'}
# But if our "extend" values were lists, we'd need to have a different append method,
# one that puts the single object into a list, so that its sum with the existing list
# is a list.
store = {'c': [1,2,3]}
c_extender = Extender(store, 'c', append_method=lambda self, obj: self.extend([obj]))
c_extender.append(4)
assert store == {'c': [1,2,3,4]}
# And if the values were tuples, we'd have to put the single object into a tuple.
store = {'d': (1,2,3)}
d_extender = Extender(store, 'd', append_method=lambda self, obj: self.extend((obj,)))
d_extender.append(4)
assert store == {'d': (1,2,3,4)}
# Now, the default extend method is `read_add_write`, which retrieves the existing
# value, sums it to the new value, and writes it back to the store.
# If the values of your store have a sum defined (i.e. an `__add__` method),
# **and** that sum method does what you want, then you can use the default
# `extend_store_value` function.
# O ye numpy users, beware! The sum of numpy arrays is an elementwise sum,
# not a concatenation (you'd have to use `np.concatenate` for that).
import numpy as np
store = {'e': np.array([1,2,3])}
e_extender = Extender(store, 'e')
e_extender.extend(np.array([4,5,6]))
assert all(store['e'] == np.array([5,7,9]))
# This is what the `extend_store_value` function is for: you can pass it a function
# that does what you want.
store = {'f': np.array([1,2,3])}
def extend_store_value_for_numpy(store, key, iterable):
store[key] = np.concatenate([store[key], iterable])
f_extender = Extender(store, 'f', extend_store_value=extend_store_value_for_numpy)
f_extender.extend(np.array([4,5,6]))
assert all(store['f'] == np.array([1,2,3,4,5,6]))
# WARNING: See that the `extend_store_value`` defined here doesn't accomodate for
# the case where the key is not in the store. It is the user's responsibility to
# handle that aspect in the `extend_store_value` they provide.
# For your convenience, the `read_add_write` that is used as a default has
# (and which **does** handle the non-existing key case by simply writing the value in
# the store) has an `add_iterables` argument that can be set to whatever
# makes sense for your use case.
from functools import partial
store = {'g': np.array([1,2,3])}
extend_store_value_for_numpy = partial(
read_add_write, add_iterables=lambda x, y: np.concatenate([x, y])
)
g_extender = Extender(store, 'g', extend_store_value=extend_store_value_for_numpy)
g_extender.extend(np.array([4,5,6]))
assert all(store['g'] == np.array([1,2,3,4,5,6]))
# TODO: Continue this: Make a store that returns file objects, and make an Extender
# that appends to those files.
# TODO: Make this an actual class of dol.filesys
def append_to_file(store, key, iterable):
with store[key] as f:
for item in iterable:
f.write(item)
from dol import temp_dir
# TODO: Show how to use `Extender` with `wrap_kvs` to make stores that return Extenders,
# thereby enabling efficient `store[key] += iterable`` syntax,
# and explore how the store[key] might be made to conserve its original type and behavior
# (discussing the pros and cons of each approach). |
Beta Was this translation helpful? Give feedback.
-
The need for tools to carry out chunked iteration (or paging, or batch operations), read and write, comes up regularly. It shows up not only when we have big files, but in general, when ever there's a lot of data and/or IO is expensive (for example, remote DBs). So our tooling needs to get something reusable for that soon.
Here's the discussion for that.
Some related use cases
Links
dol
(well,py2store
) to provide more fine-tuned control over iteration. It's original purpose was precisely to read large files. Namely, a store would return aStream
instance that could then be consumed, automatically taking care of any paging/chunking concerns, but also preparing, filtering... ThisStream
then grew up to be calledCreek
in a namesakecreek
package that accumulated many other streaming tools.dol
use cases here.Beta Was this translation helpful? Give feedback.
All reactions