From e16ca162b33c07d1bd5893235ea75e85e53797e1 Mon Sep 17 00:00:00 2001 From: David Stansby Date: Mon, 26 Aug 2024 08:48:39 +0100 Subject: [PATCH 1/3] Start work on blosc v2 --- numcodecs/blosc_v2.py | 180 +++++++++++++++++++ numcodecs/tests/test_blosc_v2.py | 286 +++++++++++++++++++++++++++++++ 2 files changed, 466 insertions(+) create mode 100644 numcodecs/blosc_v2.py create mode 100644 numcodecs/tests/test_blosc_v2.py diff --git a/numcodecs/blosc_v2.py b/numcodecs/blosc_v2.py new file mode 100644 index 00000000..af3d063b --- /dev/null +++ b/numcodecs/blosc_v2.py @@ -0,0 +1,180 @@ +""" +An attempt at replacing bundled versin of blosc with +the blosc-python package, which provides pre-build wheels. + +List of functions to deprecate: +[ +'destroy', +'init', +'compname_to_compcode', +'cbuffer_sizes', +'cbuffer_metainfo', +'err_bad_cname', +'decompress_partial' +] + +List of behaviour to deprecate: +- Passing cname as bytes + +""" + +from numcodecs.abc import Codec +import numpy as np + +import blosc +from blosc import ( + BITSHUFFLE, + SHUFFLE, + NOSHUFFLE, + MAX_BUFFERSIZE, + MAX_THREADS, + MAX_TYPESIZE, + VERSION_STRING, + VERSION_DATE, +) + +__all__ = [ + "BITSHUFFLE", + "SHUFFLE", + "NOSHUFFLE", + "MAX_BUFFERSIZE", + "MAX_THREADS", + "MAX_TYPESIZE", + "VERSION_STRING", + "VERSION_DATE", + "list_compressors", + 'get_nthreads', +] + +AUTOBLOCKS = 0 +AUTOSHUFFLE = -1 +_shuffle_repr = ['AUTOSHUFFLE', 'NOSHUFFLE', 'SHUFFLE', 'BITSHUFFLE'] + + +def list_compressors() -> list[str]: + return blosc.compressor_list() + + +def get_nthreads() -> int: + nthreads = blosc.set_nthreads(1) + blosc.set_nthreads(nthreads) + return nthreads + + +def set_nthreads(nthreads: int) -> None: + blosc.set_nthreads(nthreads) + + +def cbuffer_complib(source): + return blosc.get_clib(source) + + +def _check_not_object_array(arr): + if arr.dtype == object: + raise TypeError("object arrays are not supported") + + +def _check_buffer_size(buf, max_buffer_size): + if isinstance(buf, np.ndarray): + size = buf.nbytes + else: + size = len(buf) + + if size > max_buffer_size: + msg = f"Codec does not support buffers of > {max_buffer_size} bytes" + raise ValueError(msg) + + +def compress(source, cname: str, clevel: int, shuffle: int = SHUFFLE, blocksize=AUTOBLOCKS): + if shuffle == AUTOSHUFFLE: + if source.itemsize == 1: + shuffle = BITSHUFFLE + else: + shuffle = SHUFFLE + blosc.set_blocksize(blocksize) + if isinstance(source, np.ndarray): + _check_not_object_array(source) + result = blosc.compress_ptr( + source.ctypes.data, + source.size, + source.dtype.itemsize, + cname=cname, + clevel=clevel, + shuffle=shuffle, + ) + else: + result = blosc.compress(source, cname=cname, clevel=clevel, shuffle=shuffle) + blosc.set_blocksize(AUTOBLOCKS) + return result + + +def decompress(source, dest: np.ndarray | bytearray | None = None): + if dest is None: + return blosc.decompress(source) + elif isinstance(dest, np.ndarray): + _check_not_object_array(dest) + blosc.decompress_ptr(source, dest.ctypes.data) + else: + dest[:] = blosc.decompress(source) + + +class Blosc(Codec): + """Codec providing compression using the Blosc meta-compressor. + + Parameters + ---------- + cname : string, optional + A string naming one of the compression algorithms available within blosc, e.g., + 'zstd', 'blosclz', 'lz4', 'lz4hc', 'zlib' or 'snappy'. + clevel : integer, optional + An integer between 0 and 9 specifying the compression level. + shuffle : integer, optional + Either NOSHUFFLE (0), SHUFFLE (1), BITSHUFFLE (2) or AUTOSHUFFLE (-1). If AUTOSHUFFLE, + bit-shuffle will be used for buffers with itemsize 1, and byte-shuffle will + be used otherwise. The default is `SHUFFLE`. + blocksize : int + The requested size of the compressed blocks. If 0 (default), an automatic + blocksize will be used. + + See Also + -------- + numcodecs.zstd.Zstd, numcodecs.lz4.LZ4 + + """ + + codec_id = 'blosc' + NOSHUFFLE = NOSHUFFLE + SHUFFLE = SHUFFLE + BITSHUFFLE = BITSHUFFLE + AUTOSHUFFLE = AUTOSHUFFLE + max_buffer_size = 2**31 - 1 + + def __init__(self, cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=AUTOBLOCKS): + self.cname = cname + if isinstance(cname, str): + self._cname_bytes = cname.encode('ascii') + else: + self._cname_bytes = cname + self.clevel = clevel + self.shuffle = shuffle + self.blocksize = blocksize + + def encode(self, buf): + _check_buffer_size(buf, self.max_buffer_size) + return compress( + buf, self.cname, clevel=self.clevel, shuffle=self.shuffle, blocksize=self.blocksize + ) + + def decode(self, buf, out=None): + _check_buffer_size(buf, self.max_buffer_size) + return decompress(buf, out) + + def __repr__(self): + r = '%s(cname=%r, clevel=%r, shuffle=%s, blocksize=%s)' % ( + type(self).__name__, + self.cname, + self.clevel, + _shuffle_repr[self.shuffle + 1], + self.blocksize, + ) + return r diff --git a/numcodecs/tests/test_blosc_v2.py b/numcodecs/tests/test_blosc_v2.py new file mode 100644 index 00000000..94230293 --- /dev/null +++ b/numcodecs/tests/test_blosc_v2.py @@ -0,0 +1,286 @@ +from multiprocessing import Pool +from multiprocessing.pool import ThreadPool + + +import numpy as np +import pytest + + +try: + from numcodecs import blosc_v2 as blosc + from numcodecs.blosc_v2 import Blosc +except ImportError: # pragma: no cover + pytest.skip("numcodecs.blosc not available", allow_module_level=True) + + +from numcodecs.tests.common import ( + check_encode_decode, + check_config, + check_backwards_compatibility, + check_err_decode_object_buffer, + check_err_encode_object_buffer, + check_max_buffer_size, +) + + +codecs = [ + Blosc(shuffle=Blosc.SHUFFLE), + Blosc(clevel=0, shuffle=Blosc.SHUFFLE), + Blosc(cname='lz4', shuffle=Blosc.SHUFFLE), + Blosc(cname='lz4', clevel=1, shuffle=Blosc.NOSHUFFLE), + Blosc(cname='lz4', clevel=5, shuffle=Blosc.SHUFFLE), + Blosc(cname='lz4', clevel=9, shuffle=Blosc.BITSHUFFLE), + Blosc(cname='zlib', clevel=1, shuffle=0), + Blosc(cname='zstd', clevel=1, shuffle=1), + Blosc(cname='blosclz', clevel=1, shuffle=2), + None, # was snappy + Blosc(shuffle=Blosc.SHUFFLE, blocksize=0), + Blosc(shuffle=Blosc.SHUFFLE, blocksize=2**8), + Blosc(cname='lz4', clevel=1, shuffle=Blosc.NOSHUFFLE, blocksize=2**8), +] + + +# mix of dtypes: integer, float, bool, string +# mix of shapes: 1D, 2D, 3D +# mix of orders: C, F +arrays = [ + np.arange(1000, dtype='i4'), + np.linspace(1000, 1001, 1000, dtype='f8'), + np.random.normal(loc=1000, scale=1, size=(100, 10)), + np.random.randint(0, 2, size=1000, dtype=bool).reshape(100, 10, order='F'), + np.random.choice([b'a', b'bb', b'ccc'], size=1000).reshape(10, 10, 10), + np.random.randint(0, 2**60, size=1000, dtype='u8').view('M8[ns]'), + np.random.randint(0, 2**60, size=1000, dtype='u8').view('m8[ns]'), + np.random.randint(0, 2**25, size=1000, dtype='u8').view('M8[m]'), + np.random.randint(0, 2**25, size=1000, dtype='u8').view('m8[m]'), + np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('M8[ns]'), + np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('m8[ns]'), + np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('M8[m]'), + np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('m8[m]'), +] + + +def _skip_null(codec): + if codec is None: + pytest.skip("codec has been removed") + + +@pytest.fixture(scope='module', params=[True, False, None]) +def use_threads(request): + return request.param + + +@pytest.mark.parametrize('array', arrays) +@pytest.mark.parametrize('codec', codecs) +def test_encode_decode(array, codec): + _skip_null(codec) + check_encode_decode(array, codec) + + +""" +@pytest.mark.parametrize('codec', codecs) +@pytest.mark.parametrize( + 'array', + [ + pytest.param(x) if len(x.shape) == 1 else pytest.param(x, marks=[pytest.mark.xfail]) + for x in arrays + ], +) +def test_partial_decode(codec, array): + _skip_null(codec) + check_encode_decode_partial(array, codec) +""" + + +def test_config(): + codec = Blosc(cname='zstd', clevel=3, shuffle=1) + check_config(codec) + codec = Blosc(cname='lz4', clevel=1, shuffle=2, blocksize=2**8) + check_config(codec) + + +def test_repr(): + expect = "Blosc(cname='zstd', clevel=3, shuffle=SHUFFLE, blocksize=0)" + actual = repr(Blosc(cname='zstd', clevel=3, shuffle=Blosc.SHUFFLE, blocksize=0)) + assert expect == actual + expect = "Blosc(cname='lz4', clevel=1, shuffle=NOSHUFFLE, blocksize=256)" + actual = repr(Blosc(cname='lz4', clevel=1, shuffle=Blosc.NOSHUFFLE, blocksize=256)) + assert expect == actual + expect = "Blosc(cname='zlib', clevel=9, shuffle=BITSHUFFLE, blocksize=512)" + actual = repr(Blosc(cname='zlib', clevel=9, shuffle=Blosc.BITSHUFFLE, blocksize=512)) + assert expect == actual + expect = "Blosc(cname='blosclz', clevel=5, shuffle=AUTOSHUFFLE, blocksize=1024)" + actual = repr(Blosc(cname='blosclz', clevel=5, shuffle=Blosc.AUTOSHUFFLE, blocksize=1024)) + assert expect == actual + + +def test_eq(): + assert Blosc() == Blosc() + assert Blosc(cname='lz4') != Blosc(cname='zstd') + assert Blosc(clevel=1) != Blosc(clevel=9) + assert Blosc(cname='lz4') != 'foo' + + +@pytest.mark.skip("blosc-python has no way to get blocksize") +def test_compress_blocksize_default(use_threads): + arr = np.arange(1000, dtype='i4') + + blosc.use_threads = use_threads + + # default blocksize + enc = blosc.compress(arr, b'lz4', clevel=1, shuffle=Blosc.NOSHUFFLE) + _, _, blocksize = blosc.cbuffer_sizes(enc) + assert blocksize > 0 + + # explicit default blocksize + enc = blosc.compress(arr, b'lz4', clevel=1, shuffle=Blosc.NOSHUFFLE, blocksize=0) + _, _, blocksize = blosc.cbuffer_sizes(enc) + assert blocksize > 0 + + +@pytest.mark.skip("blosc-python has no way to get cbuffer sizes") +@pytest.mark.parametrize('bs', (2**7, 2**8)) +def test_compress_blocksize(use_threads, bs): + arr = np.arange(1000, dtype='i4') + + blosc.use_threads = use_threads + + enc = blosc.compress(arr, b'lz4', clevel=1, shuffle=Blosc.NOSHUFFLE, blocksize=bs) + _, _, blocksize = blosc.cbuffer_sizes(enc) + assert blocksize == bs + + +def test_compress_complib(use_threads): + arr = np.arange(1000, dtype='i4') + expected_complibs = { + 'lz4': 'LZ4', + 'lz4hc': 'LZ4', + 'blosclz': 'BloscLZ', + 'zlib': 'Zlib', + 'zstd': 'Zstd', + } + blosc.use_threads = use_threads + for cname in blosc.list_compressors(): + enc = blosc.compress(arr, cname, 1, Blosc.NOSHUFFLE) + complib = blosc.cbuffer_complib(enc) + expected_complib = expected_complibs[cname] + assert complib == expected_complib + with pytest.raises(ValueError): + # capitalized cname + blosc.compress(arr, b'LZ4', 1) + with pytest.raises(ValueError): + # bad cname + blosc.compress(arr, b'foo', 1) + + +@pytest.mark.skip("blosc-python has no way to get cbuffer metainfo") +@pytest.mark.parametrize('dtype', ['i1', 'i2', 'i4', 'i8']) +def test_compress_metainfo(dtype, use_threads): + arr = np.arange(1000, dtype=dtype) + for shuffle in Blosc.NOSHUFFLE, Blosc.SHUFFLE, Blosc.BITSHUFFLE: + blosc.use_threads = use_threads + for cname in blosc.list_compressors(): + enc = blosc.compress(arr, cname, 1, shuffle) + typesize, did_shuffle, _ = blosc.cbuffer_metainfo(enc) + assert typesize == arr.dtype.itemsize + assert did_shuffle == shuffle + + +@pytest.mark.skip("blosc-python has no way to get cbuffer metainfo") +def test_compress_autoshuffle(use_threads): + arr = np.arange(8000) + for dtype in 'i1', 'i2', 'i4', 'i8', 'f2', 'f4', 'f8', 'bool', 'S10': + varr = arr.view(dtype) + blosc.use_threads = use_threads + for cname in blosc.list_compressors(): + enc = blosc.compress(varr, cname.encode(), 1, Blosc.AUTOSHUFFLE) + typesize, did_shuffle, _ = blosc.cbuffer_metainfo(enc) + assert typesize == varr.dtype.itemsize + if typesize == 1: + assert did_shuffle == Blosc.BITSHUFFLE + else: + assert did_shuffle == Blosc.SHUFFLE + + +def test_config_blocksize(): + # N.B., we want to be backwards compatible with any config where blocksize is not + # explicitly stated + + # blocksize not stated + config = dict(cname='lz4', clevel=1, shuffle=Blosc.SHUFFLE) + codec = Blosc.from_config(config) + assert codec.blocksize == 0 + + # blocksize stated + config = dict(cname='lz4', clevel=1, shuffle=Blosc.SHUFFLE, blocksize=2**8) + codec = Blosc.from_config(config) + assert codec.blocksize == 2**8 + + +def test_backwards_compatibility(): + check_backwards_compatibility(Blosc.codec_id, arrays, codecs) + + +def _encode_worker(data): + compressor = Blosc(cname='zlib', clevel=9, shuffle=Blosc.SHUFFLE) + enc = compressor.encode(data) + return enc + + +def _decode_worker(enc): + compressor = Blosc() + data = compressor.decode(enc) + return data + + +@pytest.mark.parametrize('pool', (Pool, ThreadPool)) +def test_multiprocessing(use_threads, pool): + data = np.arange(1000000) + enc = _encode_worker(data) + + pool = pool(5) + + try: + blosc.use_threads = use_threads + + # test with process pool and thread pool + + # test encoding + enc_results = pool.map(_encode_worker, [data] * 5) + assert all(len(enc) == len(e) for e in enc_results) + + # test decoding + dec_results = pool.map(_decode_worker, [enc] * 5) + assert all(data.nbytes == len(d) for d in dec_results) + + # tidy up + pool.close() + pool.join() + + finally: + blosc.use_threads = None # restore default + + +def test_err_decode_object_buffer(): + check_err_decode_object_buffer(Blosc()) + + +def test_err_encode_object_buffer(): + check_err_encode_object_buffer(Blosc()) + + +@pytest.mark.skip("blosc can decode empty data fine") +def test_decompression_error_handling(): + for codec in codecs: + _skip_null(codec) + with pytest.raises(RuntimeError): + codec.decode(bytearray()) + with pytest.raises(RuntimeError): + codec.decode(bytearray(0)) + + +@pytest.mark.parametrize("codec", codecs) +def test_max_buffer_size(codec): + _skip_null(codec) + assert codec.max_buffer_size == 2**31 - 1 + check_max_buffer_size(codec) From ed2cff2fe933bc6ddff551a8d90ad1244c280e79 Mon Sep 17 00:00:00 2001 From: David Stansby Date: Sat, 31 Aug 2024 20:44:46 +0100 Subject: [PATCH 2/3] Add blosc dep --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 3fb3ad0d..59d50711 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ for use in data storage and communication applications.""" readme = "README.rst" dependencies = [ "numpy>=1.7", + "blosc==1.11.2", ] requires-python = ">=3.10" dynamic = [ @@ -65,7 +66,7 @@ msgpack = [ ] zfpy = [ "zfpy>=1.0.0", - "numpy<2.0.0", + "numpy<2.0.0", ] pcodec = [ "pcodec>=0.2.0", From 44f1c36c43d777d5157fa2408e9f16c3c51a515d Mon Sep 17 00:00:00 2001 From: David Stansby Date: Sun, 1 Sep 2024 11:07:40 +0100 Subject: [PATCH 3/3] Replace c-blosc extension --- numcodecs/__init__.py | 2 - numcodecs/{blosc_v2.py => blosc.py} | 66 +++- numcodecs/blosc.pyx | 578 ---------------------------- numcodecs/tests/test_blosc.py | 91 +---- numcodecs/tests/test_blosc_v2.py | 286 -------------- setup.py | 80 +--- 6 files changed, 66 insertions(+), 1037 deletions(-) rename numcodecs/{blosc_v2.py => blosc.py} (71%) delete mode 100644 numcodecs/blosc.pyx delete mode 100644 numcodecs/tests/test_blosc_v2.py diff --git a/numcodecs/__init__.py b/numcodecs/__init__.py index 47c2c616..628c4fd0 100644 --- a/numcodecs/__init__.py +++ b/numcodecs/__init__.py @@ -52,9 +52,7 @@ ncores = multiprocessing.cpu_count() except OSError: # pragma: no cover ncores = 1 - blosc.init() blosc.set_nthreads(min(8, ncores)) - atexit.register(blosc.destroy) with suppress(ImportError): from numcodecs import zstd diff --git a/numcodecs/blosc_v2.py b/numcodecs/blosc.py similarity index 71% rename from numcodecs/blosc_v2.py rename to numcodecs/blosc.py index af3d063b..3b55c29f 100644 --- a/numcodecs/blosc_v2.py +++ b/numcodecs/blosc.py @@ -52,20 +52,30 @@ def list_compressors() -> list[str]: + """Get a list of compressors supported in blosc.""" return blosc.compressor_list() def get_nthreads() -> int: + """ + Get the number of threads that Blosc uses internally for compression and + decompression. + """ nthreads = blosc.set_nthreads(1) blosc.set_nthreads(nthreads) return nthreads def set_nthreads(nthreads: int) -> None: + """ + Set the number of threads that Blosc uses internally for compression and + decompression. + """ blosc.set_nthreads(nthreads) -def cbuffer_complib(source): +def cbuffer_complib(source) -> str: + """Return the name of the compression library used to compress `source`.""" return blosc.get_clib(source) @@ -86,6 +96,32 @@ def _check_buffer_size(buf, max_buffer_size): def compress(source, cname: str, clevel: int, shuffle: int = SHUFFLE, blocksize=AUTOBLOCKS): + """ + Compress data. + + Parameters + ---------- + source : bytes-like + Data to be compressed. Can be any object supporting the buffer + protocol. + cname : bytes + Name of compression library to use. + clevel : int + Compression level. + shuffle : int + Either NOSHUFFLE (0), SHUFFLE (1), BITSHUFFLE (2) or AUTOSHUFFLE (-1). If AUTOSHUFFLE, + bit-shuffle will be used for buffers with itemsize 1, and byte-shuffle will + be used otherwise. The default is `SHUFFLE`. + blocksize : int + The requested size of the compressed blocks. If 0, an automatic blocksize will + be used. + + Returns + ------- + dest : bytes + Compressed data. + + """ if shuffle == AUTOSHUFFLE: if source.itemsize == 1: shuffle = BITSHUFFLE @@ -109,6 +145,23 @@ def compress(source, cname: str, clevel: int, shuffle: int = SHUFFLE, blocksize= def decompress(source, dest: np.ndarray | bytearray | None = None): + """ + Decompress data. + + Parameters + ---------- + source : bytes-like + Compressed data, including blosc header. Can be any object supporting the buffer + protocol. + dest : array-like, optional + Object to decompress into. + + Returns + ------- + dest : bytes + Object containing decompressed data. + + """ if dest is None: return blosc.decompress(source) elif isinstance(dest, np.ndarray): @@ -119,7 +172,8 @@ def decompress(source, dest: np.ndarray | bytearray | None = None): class Blosc(Codec): - """Codec providing compression using the Blosc meta-compressor. + """ + Codec providing compression using the Blosc meta-compressor. Parameters ---------- @@ -170,11 +224,5 @@ def decode(self, buf, out=None): return decompress(buf, out) def __repr__(self): - r = '%s(cname=%r, clevel=%r, shuffle=%s, blocksize=%s)' % ( - type(self).__name__, - self.cname, - self.clevel, - _shuffle_repr[self.shuffle + 1], - self.blocksize, - ) + r = f'{type(self).__name__}(cname={self.cname!r}, clevel={self.clevel!r}, shuffle={_shuffle_repr[self.shuffle + 1]}, blocksize={self.blocksize})' return r diff --git a/numcodecs/blosc.pyx b/numcodecs/blosc.pyx deleted file mode 100644 index 45e5ea10..00000000 --- a/numcodecs/blosc.pyx +++ /dev/null @@ -1,578 +0,0 @@ -# cython: embedsignature=True -# cython: profile=False -# cython: linetrace=False -# cython: binding=False -# cython: language_level=3 -import threading -import multiprocessing -import os - - -from cpython.buffer cimport PyBUF_ANY_CONTIGUOUS, PyBUF_WRITEABLE -from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AS_STRING - - -from .compat_ext cimport Buffer -from .compat_ext import Buffer -from .compat import ensure_contiguous_ndarray -from .abc import Codec - - -cdef extern from "blosc.h": - cdef enum: - BLOSC_MAX_OVERHEAD, - BLOSC_VERSION_STRING, - BLOSC_VERSION_DATE, - BLOSC_NOSHUFFLE, - BLOSC_SHUFFLE, - BLOSC_BITSHUFFLE, - BLOSC_MAX_BUFFERSIZE, - BLOSC_MAX_THREADS, - BLOSC_MAX_TYPESIZE, - BLOSC_DOSHUFFLE, - BLOSC_DOBITSHUFFLE, - BLOSC_MEMCPYED - - void blosc_init() - void blosc_destroy() - int blosc_get_nthreads() - int blosc_set_nthreads(int nthreads) - int blosc_set_compressor(const char *compname) - void blosc_set_blocksize(size_t blocksize) - char* blosc_list_compressors() - int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes, - void* src, void* dest, size_t destsize) nogil - int blosc_decompress(void *src, void *dest, size_t destsize) nogil - int blosc_getitem(void* src, int start, int nitems, void* dest) - int blosc_compname_to_compcode(const char* compname) - int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize, size_t nbytes, - const void* src, void* dest, size_t destsize, - const char* compressor, size_t blocksize, - int numinternalthreads) nogil - int blosc_decompress_ctx(const void* src, void* dest, size_t destsize, - int numinternalthreads) nogil - void blosc_cbuffer_sizes(const void* cbuffer, size_t* nbytes, size_t* cbytes, - size_t* blocksize) - char* blosc_cbuffer_complib(const void* cbuffer) - void blosc_cbuffer_metainfo(const void* cbuffer, size_t* typesize, int* flags) - - -MAX_OVERHEAD = BLOSC_MAX_OVERHEAD -MAX_BUFFERSIZE = BLOSC_MAX_BUFFERSIZE -MAX_THREADS = BLOSC_MAX_THREADS -MAX_TYPESIZE = BLOSC_MAX_TYPESIZE -VERSION_STRING = BLOSC_VERSION_STRING -VERSION_DATE = BLOSC_VERSION_DATE -VERSION_STRING = VERSION_STRING.decode() -VERSION_DATE = VERSION_DATE.decode() -__version__ = VERSION_STRING -NOSHUFFLE = BLOSC_NOSHUFFLE -SHUFFLE = BLOSC_SHUFFLE -BITSHUFFLE = BLOSC_BITSHUFFLE -# automatic shuffle -AUTOSHUFFLE = -1 -# automatic block size - let blosc decide -AUTOBLOCKS = 0 - -# synchronization -try: - mutex = multiprocessing.Lock() -except OSError: - mutex = None -except ImportError: - mutex = None - -# store ID of process that first loads the module, so we can detect a fork later -_importer_pid = os.getpid() - - -def init(): - """Initialize the Blosc library environment.""" - blosc_init() - - -def destroy(): - """Destroy the Blosc library environment.""" - blosc_destroy() - - -def compname_to_compcode(cname): - """Return the compressor code associated with the compressor name. If the compressor - name is not recognized, or there is not support for it in this build, -1 is returned - instead.""" - if isinstance(cname, str): - cname = cname.encode('ascii') - return blosc_compname_to_compcode(cname) - - -def list_compressors(): - """Get a list of compressors supported in the current build.""" - s = blosc_list_compressors() - s = s.decode('ascii') - return s.split(',') - - -def get_nthreads(): - """Get the number of threads that Blosc uses internally for compression and - decompression.""" - return blosc_get_nthreads() - - -def set_nthreads(int nthreads): - """Set the number of threads that Blosc uses internally for compression and - decompression.""" - return blosc_set_nthreads(nthreads) - - -def cbuffer_sizes(source): - """Return information about a compressed buffer, namely the number of uncompressed - bytes (`nbytes`) and compressed (`cbytes`). It also returns the `blocksize` (which - is used internally for doing the compression by blocks). - - Returns - ------- - nbytes : int - cbytes : int - blocksize : int - - """ - cdef: - Buffer buffer - size_t nbytes, cbytes, blocksize - - # obtain buffer - buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) - - # determine buffer size - blosc_cbuffer_sizes(buffer.ptr, &nbytes, &cbytes, &blocksize) - - # release buffers - buffer.release() - - return nbytes, cbytes, blocksize - - -def cbuffer_complib(source): - """Return the name of the compression library used to compress `source`.""" - cdef: - Buffer buffer - - # obtain buffer - buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) - - # determine buffer size - complib = blosc_cbuffer_complib(buffer.ptr) - - # release buffers - buffer.release() - - complib = complib.decode('ascii') - - return complib - - -def cbuffer_metainfo(source): - """Return some meta-information about the compressed buffer in `source`, including - the typesize, whether the shuffle or bit-shuffle filters were used, and the - whether the buffer was memcpyed. - - Returns - ------- - typesize - shuffle - memcpyed - - """ - cdef: - Buffer buffer - size_t typesize - int flags - - # obtain buffer - buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) - - # determine buffer size - blosc_cbuffer_metainfo(buffer.ptr, &typesize, &flags) - - # release buffers - buffer.release() - - # decompose flags - if flags & BLOSC_DOSHUFFLE: - shuffle = SHUFFLE - elif flags & BLOSC_DOBITSHUFFLE: - shuffle = BITSHUFFLE - else: - shuffle = NOSHUFFLE - memcpyed = flags & BLOSC_MEMCPYED - - return typesize, shuffle, memcpyed - - -def err_bad_cname(cname): - raise ValueError('bad compressor or compressor not supported: %r; expected one of ' - '%s' % (cname, list_compressors())) - - -def compress(source, char* cname, int clevel, int shuffle=SHUFFLE, - int blocksize=AUTOBLOCKS): - """Compress data. - - Parameters - ---------- - source : bytes-like - Data to be compressed. Can be any object supporting the buffer - protocol. - cname : bytes - Name of compression library to use. - clevel : int - Compression level. - shuffle : int - Either NOSHUFFLE (0), SHUFFLE (1), BITSHUFFLE (2) or AUTOSHUFFLE (-1). If AUTOSHUFFLE, - bit-shuffle will be used for buffers with itemsize 1, and byte-shuffle will - be used otherwise. The default is `SHUFFLE`. - blocksize : int - The requested size of the compressed blocks. If 0, an automatic blocksize will - be used. - - Returns - ------- - dest : bytes - Compressed data. - - """ - - cdef: - char *source_ptr - char *dest_ptr - Buffer source_buffer - size_t nbytes, itemsize - int cbytes - bytes dest - - # check valid cname early - cname_str = cname.decode('ascii') - if cname_str not in list_compressors(): - err_bad_cname(cname_str) - - # setup source buffer - source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) - source_ptr = source_buffer.ptr - nbytes = source_buffer.nbytes - itemsize = source_buffer.itemsize - - # determine shuffle - if shuffle == AUTOSHUFFLE: - if itemsize == 1: - shuffle = BITSHUFFLE - else: - shuffle = SHUFFLE - elif shuffle not in [NOSHUFFLE, SHUFFLE, BITSHUFFLE]: - raise ValueError('invalid shuffle argument; expected -1, 0, 1 or 2, found %r' % - shuffle) - - try: - - # setup destination - dest = PyBytes_FromStringAndSize(NULL, nbytes + BLOSC_MAX_OVERHEAD) - dest_ptr = PyBytes_AS_STRING(dest) - - # perform compression - if _get_use_threads(): - # allow blosc to use threads internally - - # N.B., we are using blosc's global context, and so we need to use a lock - # to ensure no-one else can modify the global context while we're setting it - # up and using it. - with mutex: - - # set compressor - compressor_set = blosc_set_compressor(cname) - if compressor_set < 0: - # shouldn't happen if we checked against list of compressors - # already, but just in case - err_bad_cname(cname_str) - - # set blocksize - blosc_set_blocksize(blocksize) - - # perform compression - with nogil: - cbytes = blosc_compress(clevel, shuffle, itemsize, nbytes, source_ptr, - dest_ptr, nbytes + BLOSC_MAX_OVERHEAD) - - else: - with nogil: - cbytes = blosc_compress_ctx(clevel, shuffle, itemsize, nbytes, source_ptr, - dest_ptr, nbytes + BLOSC_MAX_OVERHEAD, - cname, blocksize, 1) - - finally: - - # release buffers - source_buffer.release() - - # check compression was successful - if cbytes <= 0: - raise RuntimeError('error during blosc compression: %d' % cbytes) - - # resize after compression - dest = dest[:cbytes] - - return dest - - -def decompress(source, dest=None): - """Decompress data. - - Parameters - ---------- - source : bytes-like - Compressed data, including blosc header. Can be any object supporting the buffer - protocol. - dest : array-like, optional - Object to decompress into. - - Returns - ------- - dest : bytes - Object containing decompressed data. - - """ - cdef: - int ret - char *source_ptr - char *dest_ptr - Buffer source_buffer - Buffer dest_buffer = None - size_t nbytes, cbytes, blocksize - - # setup source buffer - source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) - source_ptr = source_buffer.ptr - - # determine buffer size - blosc_cbuffer_sizes(source_ptr, &nbytes, &cbytes, &blocksize) - - # setup destination buffer - if dest is None: - # allocate memory - dest = PyBytes_FromStringAndSize(NULL, nbytes) - dest_ptr = PyBytes_AS_STRING(dest) - dest_nbytes = nbytes - else: - arr = ensure_contiguous_ndarray(dest) - dest_buffer = Buffer(arr, PyBUF_ANY_CONTIGUOUS | PyBUF_WRITEABLE) - dest_ptr = dest_buffer.ptr - dest_nbytes = dest_buffer.nbytes - - try: - - # guard condition - if dest_nbytes < nbytes: - raise ValueError('destination buffer too small; expected at least %s, ' - 'got %s' % (nbytes, dest_nbytes)) - - # perform decompression - if _get_use_threads(): - # allow blosc to use threads internally - with nogil: - ret = blosc_decompress(source_ptr, dest_ptr, nbytes) - else: - with nogil: - ret = blosc_decompress_ctx(source_ptr, dest_ptr, nbytes, 1) - - finally: - - # release buffers - source_buffer.release() - if dest_buffer is not None: - dest_buffer.release() - - # handle errors - if ret <= 0: - raise RuntimeError('error during blosc decompression: %d' % ret) - - return dest - - -def decompress_partial(source, start, nitems, dest=None): - """**Experimental** - Decompress data of only a part of a buffer. - - Parameters - ---------- - source : bytes-like - Compressed data, including blosc header. Can be any object supporting the buffer - protocol. - start: int, - Offset in item where we want to start decoding - nitems: int - Number of items we want to decode - dest : array-like, optional - Object to decompress into. - - - Returns - ------- - dest : bytes - Object containing decompressed data. - - """ - cdef: - int ret - int encoding_size - int nitems_bytes - int start_bytes - char *source_ptr - char *dest_ptr - Buffer source_buffer - Buffer dest_buffer = None - - # setup source buffer - source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) - source_ptr = source_buffer.ptr - - # get encoding size from source buffer header - encoding_size = source[3] - - # convert variables to handle type and encoding sizes - nitems_bytes = nitems * encoding_size - start_bytes = (start * encoding_size) - - # setup destination buffer - if dest is None: - dest = PyBytes_FromStringAndSize(NULL, nitems_bytes) - dest_ptr = PyBytes_AS_STRING(dest) - dest_nbytes = nitems_bytes - else: - arr = ensure_contiguous_ndarray(dest) - dest_buffer = Buffer(arr, PyBUF_ANY_CONTIGUOUS | PyBUF_WRITEABLE) - dest_ptr = dest_buffer.ptr - dest_nbytes = dest_buffer.nbytes - - # try decompression - try: - if dest_nbytes < nitems_bytes: - raise ValueError('destination buffer too small; expected at least %s, ' - 'got %s' % (nitems_bytes, dest_nbytes)) - ret = blosc_getitem(source_ptr, start, nitems, dest_ptr) - - finally: - source_buffer.release() - if dest_buffer is not None: - dest_buffer.release() - - # ret refers to the number of bytes returned from blosc_getitem. - if ret <= 0: - raise RuntimeError('error during blosc partial decompression: %d', ret) - - return dest - - -# set the value of this variable to True or False to override the -# default adaptive behaviour -use_threads = None - - -def _get_use_threads(): - global use_threads - proc = multiprocessing.current_process() - - # check if locks are available, and if not no threads - if not mutex: - return False - - # check for fork - if proc.pid != _importer_pid: - # If this module has been imported in the parent process, and the current process - # is a fork, attempting to use blosc in multi-threaded mode will cause a - # program hang, so we force use of blosc ctx functions, i.e., no threads. - return False - - if use_threads in [True, False]: - # user has manually overridden the default behaviour - _use_threads = use_threads - - else: - # Adaptive behaviour: allow blosc to use threads if it is being called from the - # main Python thread in the main Python process, inferring that it is being run - # from within a single-threaded, single-process program; otherwise do not allow - # blosc to use threads, inferring it is being run from within a multi-threaded - # program or multi-process program - - if proc.name != 'MainProcess': - _use_threads = False - elif hasattr(threading, 'main_thread'): - _use_threads = (threading.main_thread() == threading.current_thread()) - else: - _use_threads = threading.current_thread().name == 'MainThread' - - return _use_threads - - -_shuffle_repr = ['AUTOSHUFFLE', 'NOSHUFFLE', 'SHUFFLE', 'BITSHUFFLE'] - - -class Blosc(Codec): - """Codec providing compression using the Blosc meta-compressor. - - Parameters - ---------- - cname : string, optional - A string naming one of the compression algorithms available within blosc, e.g., - 'zstd', 'blosclz', 'lz4', 'lz4hc', 'zlib' or 'snappy'. - clevel : integer, optional - An integer between 0 and 9 specifying the compression level. - shuffle : integer, optional - Either NOSHUFFLE (0), SHUFFLE (1), BITSHUFFLE (2) or AUTOSHUFFLE (-1). If AUTOSHUFFLE, - bit-shuffle will be used for buffers with itemsize 1, and byte-shuffle will - be used otherwise. The default is `SHUFFLE`. - blocksize : int - The requested size of the compressed blocks. If 0 (default), an automatic - blocksize will be used. - - See Also - -------- - numcodecs.zstd.Zstd, numcodecs.lz4.LZ4 - - """ - - codec_id = 'blosc' - NOSHUFFLE = NOSHUFFLE - SHUFFLE = SHUFFLE - BITSHUFFLE = BITSHUFFLE - AUTOSHUFFLE = AUTOSHUFFLE - max_buffer_size = 2**31 - 1 - - def __init__(self, cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=AUTOBLOCKS): - self.cname = cname - if isinstance(cname, str): - self._cname_bytes = cname.encode('ascii') - else: - self._cname_bytes = cname - self.clevel = clevel - self.shuffle = shuffle - self.blocksize = blocksize - - def encode(self, buf): - buf = ensure_contiguous_ndarray(buf, self.max_buffer_size) - return compress(buf, self._cname_bytes, self.clevel, self.shuffle, self.blocksize) - - def decode(self, buf, out=None): - buf = ensure_contiguous_ndarray(buf, self.max_buffer_size) - return decompress(buf, out) - - def decode_partial(self, buf, int start, int nitems, out=None): - '''**Experimental**''' - buf = ensure_contiguous_ndarray(buf, self.max_buffer_size) - return decompress_partial(buf, start, nitems, dest=out) - - def __repr__(self): - r = '%s(cname=%r, clevel=%r, shuffle=%s, blocksize=%s)' % \ - (type(self).__name__, - self.cname, - self.clevel, - _shuffle_repr[self.shuffle + 1], - self.blocksize) - return r diff --git a/numcodecs/tests/test_blosc.py b/numcodecs/tests/test_blosc.py index 907b2307..d969545a 100644 --- a/numcodecs/tests/test_blosc.py +++ b/numcodecs/tests/test_blosc.py @@ -7,7 +7,7 @@ try: - from numcodecs import blosc + from numcodecs import blosc as blosc from numcodecs.blosc import Blosc except ImportError: # pragma: no cover pytest.skip("numcodecs.blosc not available", allow_module_level=True) @@ -15,7 +15,6 @@ from numcodecs.tests.common import ( check_encode_decode, - check_encode_decode_partial, check_config, check_backwards_compatibility, check_err_decode_object_buffer, @@ -78,19 +77,6 @@ def test_encode_decode(array, codec): check_encode_decode(array, codec) -@pytest.mark.parametrize('codec', codecs) -@pytest.mark.parametrize( - 'array', - [ - pytest.param(x) if len(x.shape) == 1 else pytest.param(x, marks=[pytest.mark.xfail]) - for x in arrays - ], -) -def test_partial_decode(codec, array): - _skip_null(codec) - check_encode_decode_partial(array, codec) - - def test_config(): codec = Blosc(cname='zstd', clevel=3, shuffle=1) check_config(codec) @@ -120,33 +106,6 @@ def test_eq(): assert Blosc(cname='lz4') != 'foo' -def test_compress_blocksize_default(use_threads): - arr = np.arange(1000, dtype='i4') - - blosc.use_threads = use_threads - - # default blocksize - enc = blosc.compress(arr, b'lz4', 1, Blosc.NOSHUFFLE) - _, _, blocksize = blosc.cbuffer_sizes(enc) - assert blocksize > 0 - - # explicit default blocksize - enc = blosc.compress(arr, b'lz4', 1, Blosc.NOSHUFFLE, 0) - _, _, blocksize = blosc.cbuffer_sizes(enc) - assert blocksize > 0 - - -@pytest.mark.parametrize('bs', (2**7, 2**8)) -def test_compress_blocksize(use_threads, bs): - arr = np.arange(1000, dtype='i4') - - blosc.use_threads = use_threads - - enc = blosc.compress(arr, b'lz4', 1, Blosc.NOSHUFFLE, bs) - _, _, blocksize = blosc.cbuffer_sizes(enc) - assert blocksize == bs - - def test_compress_complib(use_threads): arr = np.arange(1000, dtype='i4') expected_complibs = { @@ -158,7 +117,7 @@ def test_compress_complib(use_threads): } blosc.use_threads = use_threads for cname in blosc.list_compressors(): - enc = blosc.compress(arr, cname.encode(), 1, Blosc.NOSHUFFLE) + enc = blosc.compress(arr, cname, 1, Blosc.NOSHUFFLE) complib = blosc.cbuffer_complib(enc) expected_complib = expected_complibs[cname] assert complib == expected_complib @@ -170,33 +129,6 @@ def test_compress_complib(use_threads): blosc.compress(arr, b'foo', 1) -@pytest.mark.parametrize('dtype', ['i1', 'i2', 'i4', 'i8']) -def test_compress_metainfo(dtype, use_threads): - arr = np.arange(1000, dtype=dtype) - for shuffle in Blosc.NOSHUFFLE, Blosc.SHUFFLE, Blosc.BITSHUFFLE: - blosc.use_threads = use_threads - for cname in blosc.list_compressors(): - enc = blosc.compress(arr, cname.encode(), 1, shuffle) - typesize, did_shuffle, _ = blosc.cbuffer_metainfo(enc) - assert typesize == arr.dtype.itemsize - assert did_shuffle == shuffle - - -def test_compress_autoshuffle(use_threads): - arr = np.arange(8000) - for dtype in 'i1', 'i2', 'i4', 'i8', 'f2', 'f4', 'f8', 'bool', 'S10': - varr = arr.view(dtype) - blosc.use_threads = use_threads - for cname in blosc.list_compressors(): - enc = blosc.compress(varr, cname.encode(), 1, Blosc.AUTOSHUFFLE) - typesize, did_shuffle, _ = blosc.cbuffer_metainfo(enc) - assert typesize == varr.dtype.itemsize - if typesize == 1: - assert did_shuffle == Blosc.BITSHUFFLE - else: - assert did_shuffle == Blosc.SHUFFLE - - def test_config_blocksize(): # N.B., we want to be backwards compatible with any config where blocksize is not # explicitly stated @@ -264,17 +196,8 @@ def test_err_encode_object_buffer(): check_err_encode_object_buffer(Blosc()) -def test_decompression_error_handling(): - for codec in codecs: - _skip_null(codec) - with pytest.raises(RuntimeError): - codec.decode(bytearray()) - with pytest.raises(RuntimeError): - codec.decode(bytearray(0)) - - -def test_max_buffer_size(): - for codec in codecs: - _skip_null(codec) - assert codec.max_buffer_size == 2**31 - 1 - check_max_buffer_size(codec) +@pytest.mark.parametrize("codec", codecs) +def test_max_buffer_size(codec): + _skip_null(codec) + assert codec.max_buffer_size == 2**31 - 1 + check_max_buffer_size(codec) diff --git a/numcodecs/tests/test_blosc_v2.py b/numcodecs/tests/test_blosc_v2.py deleted file mode 100644 index 94230293..00000000 --- a/numcodecs/tests/test_blosc_v2.py +++ /dev/null @@ -1,286 +0,0 @@ -from multiprocessing import Pool -from multiprocessing.pool import ThreadPool - - -import numpy as np -import pytest - - -try: - from numcodecs import blosc_v2 as blosc - from numcodecs.blosc_v2 import Blosc -except ImportError: # pragma: no cover - pytest.skip("numcodecs.blosc not available", allow_module_level=True) - - -from numcodecs.tests.common import ( - check_encode_decode, - check_config, - check_backwards_compatibility, - check_err_decode_object_buffer, - check_err_encode_object_buffer, - check_max_buffer_size, -) - - -codecs = [ - Blosc(shuffle=Blosc.SHUFFLE), - Blosc(clevel=0, shuffle=Blosc.SHUFFLE), - Blosc(cname='lz4', shuffle=Blosc.SHUFFLE), - Blosc(cname='lz4', clevel=1, shuffle=Blosc.NOSHUFFLE), - Blosc(cname='lz4', clevel=5, shuffle=Blosc.SHUFFLE), - Blosc(cname='lz4', clevel=9, shuffle=Blosc.BITSHUFFLE), - Blosc(cname='zlib', clevel=1, shuffle=0), - Blosc(cname='zstd', clevel=1, shuffle=1), - Blosc(cname='blosclz', clevel=1, shuffle=2), - None, # was snappy - Blosc(shuffle=Blosc.SHUFFLE, blocksize=0), - Blosc(shuffle=Blosc.SHUFFLE, blocksize=2**8), - Blosc(cname='lz4', clevel=1, shuffle=Blosc.NOSHUFFLE, blocksize=2**8), -] - - -# mix of dtypes: integer, float, bool, string -# mix of shapes: 1D, 2D, 3D -# mix of orders: C, F -arrays = [ - np.arange(1000, dtype='i4'), - np.linspace(1000, 1001, 1000, dtype='f8'), - np.random.normal(loc=1000, scale=1, size=(100, 10)), - np.random.randint(0, 2, size=1000, dtype=bool).reshape(100, 10, order='F'), - np.random.choice([b'a', b'bb', b'ccc'], size=1000).reshape(10, 10, 10), - np.random.randint(0, 2**60, size=1000, dtype='u8').view('M8[ns]'), - np.random.randint(0, 2**60, size=1000, dtype='u8').view('m8[ns]'), - np.random.randint(0, 2**25, size=1000, dtype='u8').view('M8[m]'), - np.random.randint(0, 2**25, size=1000, dtype='u8').view('m8[m]'), - np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('M8[ns]'), - np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('m8[ns]'), - np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('M8[m]'), - np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('m8[m]'), -] - - -def _skip_null(codec): - if codec is None: - pytest.skip("codec has been removed") - - -@pytest.fixture(scope='module', params=[True, False, None]) -def use_threads(request): - return request.param - - -@pytest.mark.parametrize('array', arrays) -@pytest.mark.parametrize('codec', codecs) -def test_encode_decode(array, codec): - _skip_null(codec) - check_encode_decode(array, codec) - - -""" -@pytest.mark.parametrize('codec', codecs) -@pytest.mark.parametrize( - 'array', - [ - pytest.param(x) if len(x.shape) == 1 else pytest.param(x, marks=[pytest.mark.xfail]) - for x in arrays - ], -) -def test_partial_decode(codec, array): - _skip_null(codec) - check_encode_decode_partial(array, codec) -""" - - -def test_config(): - codec = Blosc(cname='zstd', clevel=3, shuffle=1) - check_config(codec) - codec = Blosc(cname='lz4', clevel=1, shuffle=2, blocksize=2**8) - check_config(codec) - - -def test_repr(): - expect = "Blosc(cname='zstd', clevel=3, shuffle=SHUFFLE, blocksize=0)" - actual = repr(Blosc(cname='zstd', clevel=3, shuffle=Blosc.SHUFFLE, blocksize=0)) - assert expect == actual - expect = "Blosc(cname='lz4', clevel=1, shuffle=NOSHUFFLE, blocksize=256)" - actual = repr(Blosc(cname='lz4', clevel=1, shuffle=Blosc.NOSHUFFLE, blocksize=256)) - assert expect == actual - expect = "Blosc(cname='zlib', clevel=9, shuffle=BITSHUFFLE, blocksize=512)" - actual = repr(Blosc(cname='zlib', clevel=9, shuffle=Blosc.BITSHUFFLE, blocksize=512)) - assert expect == actual - expect = "Blosc(cname='blosclz', clevel=5, shuffle=AUTOSHUFFLE, blocksize=1024)" - actual = repr(Blosc(cname='blosclz', clevel=5, shuffle=Blosc.AUTOSHUFFLE, blocksize=1024)) - assert expect == actual - - -def test_eq(): - assert Blosc() == Blosc() - assert Blosc(cname='lz4') != Blosc(cname='zstd') - assert Blosc(clevel=1) != Blosc(clevel=9) - assert Blosc(cname='lz4') != 'foo' - - -@pytest.mark.skip("blosc-python has no way to get blocksize") -def test_compress_blocksize_default(use_threads): - arr = np.arange(1000, dtype='i4') - - blosc.use_threads = use_threads - - # default blocksize - enc = blosc.compress(arr, b'lz4', clevel=1, shuffle=Blosc.NOSHUFFLE) - _, _, blocksize = blosc.cbuffer_sizes(enc) - assert blocksize > 0 - - # explicit default blocksize - enc = blosc.compress(arr, b'lz4', clevel=1, shuffle=Blosc.NOSHUFFLE, blocksize=0) - _, _, blocksize = blosc.cbuffer_sizes(enc) - assert blocksize > 0 - - -@pytest.mark.skip("blosc-python has no way to get cbuffer sizes") -@pytest.mark.parametrize('bs', (2**7, 2**8)) -def test_compress_blocksize(use_threads, bs): - arr = np.arange(1000, dtype='i4') - - blosc.use_threads = use_threads - - enc = blosc.compress(arr, b'lz4', clevel=1, shuffle=Blosc.NOSHUFFLE, blocksize=bs) - _, _, blocksize = blosc.cbuffer_sizes(enc) - assert blocksize == bs - - -def test_compress_complib(use_threads): - arr = np.arange(1000, dtype='i4') - expected_complibs = { - 'lz4': 'LZ4', - 'lz4hc': 'LZ4', - 'blosclz': 'BloscLZ', - 'zlib': 'Zlib', - 'zstd': 'Zstd', - } - blosc.use_threads = use_threads - for cname in blosc.list_compressors(): - enc = blosc.compress(arr, cname, 1, Blosc.NOSHUFFLE) - complib = blosc.cbuffer_complib(enc) - expected_complib = expected_complibs[cname] - assert complib == expected_complib - with pytest.raises(ValueError): - # capitalized cname - blosc.compress(arr, b'LZ4', 1) - with pytest.raises(ValueError): - # bad cname - blosc.compress(arr, b'foo', 1) - - -@pytest.mark.skip("blosc-python has no way to get cbuffer metainfo") -@pytest.mark.parametrize('dtype', ['i1', 'i2', 'i4', 'i8']) -def test_compress_metainfo(dtype, use_threads): - arr = np.arange(1000, dtype=dtype) - for shuffle in Blosc.NOSHUFFLE, Blosc.SHUFFLE, Blosc.BITSHUFFLE: - blosc.use_threads = use_threads - for cname in blosc.list_compressors(): - enc = blosc.compress(arr, cname, 1, shuffle) - typesize, did_shuffle, _ = blosc.cbuffer_metainfo(enc) - assert typesize == arr.dtype.itemsize - assert did_shuffle == shuffle - - -@pytest.mark.skip("blosc-python has no way to get cbuffer metainfo") -def test_compress_autoshuffle(use_threads): - arr = np.arange(8000) - for dtype in 'i1', 'i2', 'i4', 'i8', 'f2', 'f4', 'f8', 'bool', 'S10': - varr = arr.view(dtype) - blosc.use_threads = use_threads - for cname in blosc.list_compressors(): - enc = blosc.compress(varr, cname.encode(), 1, Blosc.AUTOSHUFFLE) - typesize, did_shuffle, _ = blosc.cbuffer_metainfo(enc) - assert typesize == varr.dtype.itemsize - if typesize == 1: - assert did_shuffle == Blosc.BITSHUFFLE - else: - assert did_shuffle == Blosc.SHUFFLE - - -def test_config_blocksize(): - # N.B., we want to be backwards compatible with any config where blocksize is not - # explicitly stated - - # blocksize not stated - config = dict(cname='lz4', clevel=1, shuffle=Blosc.SHUFFLE) - codec = Blosc.from_config(config) - assert codec.blocksize == 0 - - # blocksize stated - config = dict(cname='lz4', clevel=1, shuffle=Blosc.SHUFFLE, blocksize=2**8) - codec = Blosc.from_config(config) - assert codec.blocksize == 2**8 - - -def test_backwards_compatibility(): - check_backwards_compatibility(Blosc.codec_id, arrays, codecs) - - -def _encode_worker(data): - compressor = Blosc(cname='zlib', clevel=9, shuffle=Blosc.SHUFFLE) - enc = compressor.encode(data) - return enc - - -def _decode_worker(enc): - compressor = Blosc() - data = compressor.decode(enc) - return data - - -@pytest.mark.parametrize('pool', (Pool, ThreadPool)) -def test_multiprocessing(use_threads, pool): - data = np.arange(1000000) - enc = _encode_worker(data) - - pool = pool(5) - - try: - blosc.use_threads = use_threads - - # test with process pool and thread pool - - # test encoding - enc_results = pool.map(_encode_worker, [data] * 5) - assert all(len(enc) == len(e) for e in enc_results) - - # test decoding - dec_results = pool.map(_decode_worker, [enc] * 5) - assert all(data.nbytes == len(d) for d in dec_results) - - # tidy up - pool.close() - pool.join() - - finally: - blosc.use_threads = None # restore default - - -def test_err_decode_object_buffer(): - check_err_decode_object_buffer(Blosc()) - - -def test_err_encode_object_buffer(): - check_err_encode_object_buffer(Blosc()) - - -@pytest.mark.skip("blosc can decode empty data fine") -def test_decompression_error_handling(): - for codec in codecs: - _skip_null(codec) - with pytest.raises(RuntimeError): - codec.decode(bytearray()) - with pytest.raises(RuntimeError): - codec.decode(bytearray(0)) - - -@pytest.mark.parametrize("codec", codecs) -def test_max_buffer_size(codec): - _skip_null(codec) - assert codec.max_buffer_size == 2**31 - 1 - check_max_buffer_size(codec) diff --git a/setup.py b/setup.py index 815224aa..f1c47cba 100644 --- a/setup.py +++ b/setup.py @@ -49,82 +49,6 @@ def error(*msg): print('[numcodecs]', *msg, **kwargs) -def blosc_extension(): - info('setting up Blosc extension') - - extra_compile_args = base_compile_args.copy() - define_macros = [] - - # setup blosc sources - blosc_sources = [f for f in glob('c-blosc/blosc/*.c') if 'avx2' not in f and 'sse2' not in f] - include_dirs = [os.path.join('c-blosc', 'blosc')] - - # add internal complibs - blosc_sources += glob('c-blosc/internal-complibs/lz4*/*.c') - blosc_sources += glob('c-blosc/internal-complibs/snappy*/*.cc') - blosc_sources += glob('c-blosc/internal-complibs/zlib*/*.c') - blosc_sources += glob('c-blosc/internal-complibs/zstd*/common/*.c') - blosc_sources += glob('c-blosc/internal-complibs/zstd*/compress/*.c') - blosc_sources += glob('c-blosc/internal-complibs/zstd*/decompress/*.c') - blosc_sources += glob('c-blosc/internal-complibs/zstd*/dictBuilder/*.c') - include_dirs += [d for d in glob('c-blosc/internal-complibs/*') if os.path.isdir(d)] - include_dirs += [d for d in glob('c-blosc/internal-complibs/*/*') if os.path.isdir(d)] - include_dirs += [d for d in glob('c-blosc/internal-complibs/*/*/*') if os.path.isdir(d)] - # remove minizip because Python.h 3.8 tries to include crypt.h - include_dirs = [d for d in include_dirs if 'minizip' not in d] - define_macros += [ - ('HAVE_LZ4', 1), - # ('HAVE_SNAPPY', 1), - ('HAVE_ZLIB', 1), - ('HAVE_ZSTD', 1), - ] - # define_macros += [('CYTHON_TRACE', '1')] - - # SSE2 - if have_sse2 and not disable_sse2: - info('compiling Blosc extension with SSE2 support') - extra_compile_args.append('-DSHUFFLE_SSE2_ENABLED') - blosc_sources += [f for f in glob('c-blosc/blosc/*.c') if 'sse2' in f] - if os.name == 'nt': - define_macros += [('__SSE2__', 1)] - else: - info('compiling Blosc extension without SSE2 support') - - # AVX2 - if have_avx2 and not disable_avx2: - info('compiling Blosc extension with AVX2 support') - extra_compile_args.append('-DSHUFFLE_AVX2_ENABLED') - blosc_sources += [f for f in glob('c-blosc/blosc/*.c') if 'avx2' in f] - if os.name == 'nt': - define_macros += [('__AVX2__', 1)] - else: - info('compiling Blosc extension without AVX2 support') - - # include assembly files - if cpuinfo.platform.machine() == 'x86_64': - extra_objects = [ - S[:-1] + 'o' for S in glob("c-blosc/internal-complibs/zstd*/decompress/*amd64.S") - ] - else: - extra_objects = [] - - sources = ['numcodecs/blosc.pyx'] - - # define extension module - extensions = [ - Extension( - 'numcodecs.blosc', - sources=sources + blosc_sources, - include_dirs=include_dirs, - define_macros=define_macros, - extra_compile_args=extra_compile_args, - extra_objects=extra_objects, - ), - ] - - return extensions - - def zstd_extension(): info('setting up Zstandard extension') @@ -358,8 +282,8 @@ def run(self): def run_setup(with_extensions): if with_extensions: ext_modules = ( - blosc_extension() - + zstd_extension() + # blosc_extension() + zstd_extension() + lz4_extension() + compat_extension() + shuffle_extension()