diff --git a/CHANGELOG.md b/CHANGELOG.md index 7955b0f9b..d36d3b342 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Write the date in place of the "Unreleased" in the case a new version is release - Add adapters for reading back assets with the image/jpeg and multipart/related;type=image/jpeg mimetypes. +- Refactor CSVAdapter to allow pd.read_csv kwargs ## v0.1.0b10 (2024-10-11) diff --git a/docs/source/reference/service.md b/docs/source/reference/service.md index 213a81d57..69f87acb8 100644 --- a/docs/source/reference/service.md +++ b/docs/source/reference/service.md @@ -25,7 +25,7 @@ or its dask counterpart. .. autosummary:: :toctree: generated - tiled.adapters.csv.read_csv + tiled.adapters.csv.CSVAdapter tiled.adapters.excel.ExcelAdapter tiled.adapters.hdf5.HDF5Adapter tiled.adapters.netcdf.read_netcdf diff --git a/tiled/_tests/test_catalog.py b/tiled/_tests/test_catalog.py index 61b24e98f..9f33dbb54 100644 --- a/tiled/_tests/test_catalog.py +++ b/tiled/_tests/test_catalog.py @@ -13,7 +13,7 @@ import tifffile import xarray -from ..adapters.csv import read_csv +from ..adapters.csv import CSVAdapter from ..adapters.dataframe import ArrayAdapter from ..adapters.tiff import TiffAdapter from ..catalog import in_memory @@ -236,7 +236,7 @@ async def test_write_dataframe_external_direct(a, tmpdir): filepath = str(tmpdir / "file.csv") data_uri = ensure_uri(filepath) df.to_csv(filepath, index=False) - dfa = read_csv(data_uri) + dfa = CSVAdapter.from_uris(data_uri) structure = asdict(dfa.structure()) await a.create_node( key="x", diff --git a/tiled/_tests/test_jpeg.py b/tiled/_tests/test_jpeg.py index 3e7abc77c..ed12bcd02 100644 --- a/tiled/_tests/test_jpeg.py +++ b/tiled/_tests/test_jpeg.py @@ -32,7 +32,7 @@ def client(tmpdir_module): tree = MapAdapter( { - "color": JPEGAdapter(ensure_uri(path)), + "color": JPEGAdapter.from_uris(ensure_uri(path)), "sequence": JPEGSequenceAdapter.from_uris( [ensure_uri(filepath) for filepath in filepaths] ), diff --git a/tiled/_tests/test_writing.py b/tiled/_tests/test_writing.py index 571aaad59..555292af5 100644 --- a/tiled/_tests/test_writing.py +++ b/tiled/_tests/test_writing.py @@ -537,7 +537,11 @@ def test_write_with_specified_mimetype(tree): df = pandas.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}) structure = TableStructure.from_pandas(df) - for mimetype in [PARQUET_MIMETYPE, "text/csv", APACHE_ARROW_FILE_MIME_TYPE]: + for mimetype in [ + PARQUET_MIMETYPE, + "text/csv", + APACHE_ARROW_FILE_MIME_TYPE, + ]: x = client.new( "table", [ diff --git a/tiled/adapters/array.py b/tiled/adapters/array.py index c849bc799..4edcae966 100644 --- a/tiled/adapters/array.py +++ b/tiled/adapters/array.py @@ -49,6 +49,7 @@ def __init__( self._structure = structure self._metadata = metadata or {} self.specs = specs or [] + self.access_policy = access_policy @classmethod def from_array( @@ -100,30 +101,12 @@ def __repr__(self) -> str: @property def dims(self) -> Optional[Tuple[str, ...]]: - """ - - Returns - ------- - - """ return self._structure.dims def metadata(self) -> JSON: - """ - - Returns - ------- - - """ return self._metadata def structure(self) -> ArrayStructure: - """ - - Returns - ------- - - """ return self._structure def read( diff --git a/tiled/adapters/arrow.py b/tiled/adapters/arrow.py index 84e5afffb..61882faf0 100644 --- a/tiled/adapters/arrow.py +++ b/tiled/adapters/arrow.py @@ -48,6 +48,19 @@ def __init__( self.specs = list(specs or []) self.access_policy = access_policy + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: TableStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "ArrowAdapter": + data_uris = [a.data_uri for a in assets] + return cls(data_uris, structure, metadata, specs, access_policy) + def metadata(self) -> JSON: """ diff --git a/tiled/adapters/awkward_buffers.py b/tiled/adapters/awkward_buffers.py index 8a595e6d8..7d27191b7 100644 --- a/tiled/adapters/awkward_buffers.py +++ b/tiled/adapters/awkward_buffers.py @@ -2,7 +2,7 @@ A directory containing awkward buffers, one file per form key. """ from pathlib import Path -from typing import List, Optional +from typing import Dict, List, Optional, Union import awkward.forms @@ -73,3 +73,17 @@ def from_directory( specs=specs, access_policy=access_policy, ) + + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: AwkwardStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "AwkwardBuffersAdapter": + return cls.from_directory( + assets[0].data_uri, structure, metadata, specs, access_policy + ) diff --git a/tiled/adapters/csv.py b/tiled/adapters/csv.py index 08eed0554..09e8492b1 100644 --- a/tiled/adapters/csv.py +++ b/tiled/adapters/csv.py @@ -4,6 +4,7 @@ import dask.dataframe import pandas +from ..structures.array import ArrayStructure, BuiltinDtype, StructDtype from ..structures.core import Spec, StructureFamily from ..structures.data_source import Asset, DataSource, Management from ..structures.table import TableStructure @@ -66,7 +67,7 @@ def read_csv( class CSVAdapter: - """ """ + """Adapter for tabular data stored as partitioned text (csv) files""" structure_family = StructureFamily.table @@ -77,68 +78,89 @@ def __init__( metadata: Optional[JSON] = None, specs: Optional[List[Spec]] = None, access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], ) -> None: - """ + """Adapter for partitioned tabular data stored as a sequence of text (csv) files Parameters ---------- - data_uris : + data_uris : list of uris to csv files structure : metadata : specs : access_policy : + kwargs : dict + any keyword arguments that can be passed to the pandas.read_csv function, e.g. names, sep, dtype, etc. """ - # TODO Store data_uris instead and generalize to non-file schemes. - self._partition_paths = [path_from_uri(uri) for uri in data_uris] + self._file_paths = [path_from_uri(uri) for uri in data_uris] self._metadata = metadata or {} + self._read_csv_kwargs = kwargs if structure is None: - table = dask.dataframe.read_csv(self._partition_paths) + table = dask.dataframe.read_csv( + self._file_paths[0], **self._read_csv_kwargs + ) structure = TableStructure.from_dask_dataframe(table) + structure.npartitions = len(self._file_paths) self._structure = structure self.specs = list(specs or []) self.access_policy = access_policy - def metadata(self) -> JSON: - """ - - Returns - ------- + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: TableStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "CSVAdapter": + return cls( + [ast.data_uri for ast in assets], + structure, + metadata, + specs, + access_policy, + **kwargs, + ) - """ - return self._metadata + @classmethod + def from_uris( + cls, + data_uris: Union[str, List[str]], + structure: Optional[TableStructure] = None, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "CSVAdapter": + if isinstance(data_uris, str): + data_uris = [data_uris] - @property - def dataframe_adapter(self) -> TableAdapter: - """ + return cls(data_uris, structure, metadata, specs, access_policy, **kwargs) - Returns - ------- + def __repr__(self) -> str: + return f"{type(self).__name__}({self._structure.columns!r})" - """ - partitions = [] - for path in self._partition_paths: - if not Path(path).exists(): - partition = None - else: - partition = dask.dataframe.read_csv(path) - partitions.append(partition) - return DataFrameAdapter(partitions, self._structure) + def metadata(self) -> JSON: + return self._metadata @classmethod def init_storage(cls, data_uri: str, structure: TableStructure) -> List[Asset]: - """ + """Initialize partitioned csv storage Parameters ---------- - data_uri : - structure : + data_uri : str + location of the dataset, should point to a folder in which partitioned csv files will be created + structure : TableStructure + description of the data structure Returns ------- - + list of assets with each element corresponding to individual partition files """ - directory = path_from_uri(data_uri) - directory.mkdir(parents=True, exist_ok=True) + path_from_uri(data_uri).mkdir(parents=True, exist_ok=True) assets = [ Asset( data_uri=f"{data_uri}/partition-{i}.csv", @@ -153,90 +175,92 @@ def init_storage(cls, data_uri: str, structure: TableStructure) -> List[Asset]: def append_partition( self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame], partition: int ) -> None: - """ + """Append data to an existing partition Parameters ---------- - data : - partition : - - Returns - ------- + data : dask.dataframe.DataFrame or pandas.DataFrame + data to be appended + partition : int + index of the partition to be appended to """ - uri = self._partition_paths[partition] + uri = self._file_paths[partition] data.to_csv(uri, index=False, mode="a", header=False) def write_partition( self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame], partition: int ) -> None: - """ + """Write data to a new partition or overwrite an existing one Parameters ---------- - data : - partition : - - Returns - ------- + data : dask.dataframe.DataFrame or pandas.DataFrame + data to be appended + partition : int + index of the partition to be appended to """ - uri = self._partition_paths[partition] + uri = self._file_paths[partition] data.to_csv(uri, index=False) def write(self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame]) -> None: - """ + """Default writing function to a dataset with a single partition Parameters ---------- - data : - - Returns - ------- + data : dask.dataframe.DataFrame or pandas.DataFrame + data to be written """ if self.structure().npartitions != 1: raise NotImplementedError - uri = self._partition_paths[0] + uri = self._file_paths[0] data.to_csv(uri, index=False) - def read( - self, *args: Any, **kwargs: Any - ) -> Union[pandas.DataFrame, dask.dataframe.DataFrame]: + def read(self, fields: Optional[List[str]] = None) -> dask.dataframe.DataFrame: """ Parameters ---------- - args : - kwargs : + fields : Returns ------- """ - return self.dataframe_adapter.read(*args, **kwargs) + dfs = [ + self.read_partition(i, fields=fields) for i in range(len(self._file_paths)) + ] - def read_partition(self, *args: Any, **kwargs: Any) -> pandas.DataFrame: - """ + return dask.dataframe.concat(dfs, axis=0) + + def read_partition( + self, + indx: int, + fields: Optional[List[str]] = None, + ) -> dask.dataframe.DataFrame: + """Read a single partition Parameters ---------- - args : - kwargs : + indx : int + index of the partition to read + fields : Returns ------- """ - return self.dataframe_adapter.read_partition(*args, **kwargs) - def structure(self) -> TableStructure: - """ + df = dask.dataframe.read_csv(self._file_paths[indx], **self._read_csv_kwargs) - Returns - ------- + if fields is not None: + df = df[fields] - """ + return df.compute() + + def structure(self) -> TableStructure: return self._structure def get(self, key: str) -> Union[ArrayAdapter, None]: @@ -276,9 +300,9 @@ def generate_data_sources( """ return [ DataSource( - structure_family=self.dataframe_adapter.structure_family, + structure_family=StructureFamily.table, mimetype=mimetype, - structure=dict_or_none(self.dataframe_adapter.structure()), + structure=dict_or_none(self.structure()), parameters={}, management=Management.external, assets=[ @@ -292,59 +316,105 @@ def generate_data_sources( ) ] - @classmethod - def from_single_file( - cls, - data_uri: str, - structure: Optional[TableStructure] = None, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - ) -> "CSVAdapter": - """ - - Parameters - ---------- - data_uri : - structure : - metadata : - specs : - access_policy : - - Returns - ------- - - """ - return cls( - [data_uri], - structure=structure, - metadata=metadata, - specs=specs, - access_policy=access_policy, - ) - def __getitem__(self, key: str) -> ArrayAdapter: - """ + """Get an ArrayAdapter for a single column Parameters ---------- - key : + key : str + column name to get Returns ------- - + An array adapter corresponding to a single column in the table. """ # Must compute to determine shape. return ArrayAdapter.from_array(self.read([key])[key].values) def items(self) -> Iterator[Tuple[str, ArrayAdapter]]: - """ + """Iterator over table columns Returns ------- - + Tuples of column names and corresponding ArrayAdapters """ yield from ( (key, ArrayAdapter.from_array(self.read([key])[key].values)) for key in self._structure.columns ) + + +class CSVArrayAdapter(ArrayAdapter): + """Adapter for array-type data stored as partitioned csv files""" + + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: ArrayStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "CSVArrayAdapter": + """Adapter for partitioned array data stored as a sequence of csv files + + Parameters + ---------- + data_uris : list of uris to csv files + structure : + metadata : + specs : + access_policy : + kwargs : dict + any keyword arguments that can be passed to the pandas.read_csv function, e.g. names, sep, dtype, etc. + """ + + # Load the array lazily with Dask + file_paths = [path_from_uri(ast.data_uri) for ast in assets] + dtype_numpy = structure.data_type.to_numpy_dtype() + nrows = kwargs.pop("nrows", None) # dask doesn't accept nrows + ddf = dask.dataframe.read_csv(file_paths, dtype=dtype_numpy, **kwargs) + chunks_0: tuple[int, ...] = structure.chunks[0] # chunking along the rows dimension (when not stackable) + + if not dtype_numpy.isbuiltin: + # Structural np dtype (0) -- return a records array + # NOTE: dask.DataFrame.to_records() allows one to pass `index=False` to drop the index column, but as + # of desk ver. 2024.2.1 it seems broken and doesn't do anything. Instead, we set an index to any + # (first) column in the df to prevent it from creating an extra one. + array = ddf.set_index(ddf.columns[0]).to_records(lengths=chunks_0) + else: + # Simple np dtype (1 or 2) -- all fields have the same type -- return a usual array + array = ddf.to_dask_array(lengths=chunks_0) + + # Possibly extend or cut the table according the nrows parameter + if nrows is not None: + # TODO: this pulls all the data and can take long to compute. Instead, we can open the files and + # iterate over the rows directly, which is about 4-5 times faster for 50K rows. + # Can also just .compute() and return a np array instead + nrows_actual = len(ddf) + if nrows > nrows_actual: + padding = dask.array.zeros_like(array, shape=(nrows-nrows_actual, *array.shape[1:])) + array = dask.array.append(array[:nrows_actual, ...], padding, axis=0) + else: + array = array[:nrows, ...] + + array = array.reshape(structure.shape).rechunk(structure.chunks) + + return cls( + array, + structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + ) + + @classmethod + def from_uris( + cls, + file_paths: Union[str, List[str]], + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "CSVArrayAdapter": + # TODO!!! + array = dask.dataframe.read_csv(file_paths, **kwargs).to_dask_array() + return cls.from_array(array) # type: ignore diff --git a/tiled/adapters/excel.py b/tiled/adapters/excel.py index 384aac86c..04f1caacf 100644 --- a/tiled/adapters/excel.py +++ b/tiled/adapters/excel.py @@ -1,10 +1,15 @@ -from typing import Any +from typing import Any, Dict, List, Optional, Union import dask.dataframe import pandas from ..adapters.mapping import MapAdapter +from ..structures.core import Spec +from ..structures.data_source import Asset +from ..structures.table import TableStructure from .dataframe import DataFrameAdapter +from .protocols import AccessPolicy +from .type_alliases import JSON class ExcelAdapter(MapAdapter): @@ -81,3 +86,23 @@ def from_uri(cls, data_uri: str, **kwargs: Any) -> "ExcelAdapter": """ file = pandas.ExcelFile(data_uri) return cls.from_file(file) + + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: TableStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "ExcelAdapter": + data_uri = assets[0].data_uri + return cls.from_uri( + data_uri, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + **kwargs, + ) diff --git a/tiled/adapters/hdf5.py b/tiled/adapters/hdf5.py index 6606d8b13..106945e5b 100644 --- a/tiled/adapters/hdf5.py +++ b/tiled/adapters/hdf5.py @@ -3,7 +3,7 @@ import sys import warnings from pathlib import Path -from typing import Any, Iterator, List, Optional, Tuple, Union +from typing import Any, Dict, Iterator, List, Optional, Tuple, Union import h5py import numpy @@ -13,6 +13,7 @@ from ..iterviews import ItemsView, KeysView, ValuesView from ..structures.array import ArrayStructure from ..structures.core import Spec, StructureFamily +from ..structures.data_source import Asset from ..structures.table import TableStructure from ..utils import node_repr, path_from_uri from .array import ArrayAdapter @@ -103,6 +104,28 @@ def __init__( self._provided_metadata = metadata or {} super().__init__() + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: ArrayStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "HDF5Adapter": + return hdf5_lookup( + data_uri=assets[0].data_uri, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + swmr=kwargs.get("swmr", SWMR_DEFAULT), # type: ignore + libver=kwargs.get("libver", "latest"), # type: ignore + dataset=kwargs.get("dataset"), # type: ignore + path=kwargs.get("path"), # type: ignore + ) + @classmethod def from_file( cls, diff --git a/tiled/adapters/jpeg.py b/tiled/adapters/jpeg.py index ae6a6ffa8..b6a489ad3 100644 --- a/tiled/adapters/jpeg.py +++ b/tiled/adapters/jpeg.py @@ -1,5 +1,5 @@ import builtins -from typing import Any, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np from numpy._typing import NDArray @@ -7,6 +7,7 @@ from ..structures.array import ArrayStructure, BuiltinDtype from ..structures.core import Spec, StructureFamily +from ..structures.data_source import Asset from ..utils import path_from_uri from .protocols import AccessPolicy from .resource_cache import with_resource_cache @@ -30,7 +31,7 @@ def __init__( self, data_uri: str, *, - structure: Optional[ArrayStructure] = None, + structure: ArrayStructure, metadata: Optional[JSON] = None, specs: Optional[List[Spec]] = None, access_policy: Optional[AccessPolicy] = None, @@ -45,22 +46,64 @@ def __init__( specs : access_policy : """ - if not isinstance(data_uri, str): - raise Exception filepath = path_from_uri(data_uri) cache_key = (Image.open, filepath) self._file = with_resource_cache(cache_key, Image.open, filepath) self.specs = specs or [] self._provided_metadata = metadata or {} self.access_policy = access_policy + self._structure = structure + + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: ArrayStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "JPEGAdapter": + return cls( + assets[0].data_uri, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + ) + + @classmethod + def from_uris( + cls, + data_uris: Union[str, List[str]], + structure: Optional[ArrayStructure] = None, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "JPEGAdapter": + if not isinstance(data_uris, str): + data_uris = data_uris[0] + + filepath = path_from_uri(data_uris) + cache_key = (Image.open, filepath) + _file = with_resource_cache(cache_key, Image.open, filepath) + if structure is None: - arr = np.asarray(self._file) + arr = np.asarray(_file) structure = ArrayStructure( shape=arr.shape, chunks=tuple((dim,) for dim in arr.shape), data_type=BuiltinDtype.from_numpy_dtype(arr.dtype), ) - self._structure = structure + + return cls( + data_uris, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + ) def metadata(self) -> JSON: """ diff --git a/tiled/adapters/netcdf.py b/tiled/adapters/netcdf.py index 3b9d588fb..0e986e41d 100644 --- a/tiled/adapters/netcdf.py +++ b/tiled/adapters/netcdf.py @@ -1,8 +1,14 @@ from pathlib import Path -from typing import List, Union +from typing import Dict, List, Optional, Union import xarray +from ..server.schemas import Asset +from ..structures.core import Spec +from ..structures.table import TableStructure +from ..utils import path_from_uri +from .protocols import AccessPolicy +from .type_alliases import JSON from .xarray import DatasetAdapter @@ -19,3 +25,21 @@ def read_netcdf(filepath: Union[str, List[str], Path]) -> DatasetAdapter: """ ds = xarray.open_dataset(filepath, decode_times=False) return DatasetAdapter.from_dataset(ds) + + +class NetCDFAdapter: + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: Optional[ + TableStructure + ] = None, # NOTE: ContainerStructure? ArrayStructure? + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "NetCDFAdapter": + filepath = path_from_uri(assets[0].data_uri) + + return read_netcdf(filepath) # type: ignore diff --git a/tiled/adapters/parquet.py b/tiled/adapters/parquet.py index cc7138c00..ce8c2e216 100644 --- a/tiled/adapters/parquet.py +++ b/tiled/adapters/parquet.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import Any, List, Optional, Union +from typing import Any, Dict, List, Optional, Union import dask.dataframe import pandas @@ -43,6 +43,20 @@ def __init__( self.specs = list(specs or []) self.access_policy = access_policy + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: TableStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "ParquetDatasetAdapter": + return cls( + [ast.data_uri for ast in assets], structure, metadata, specs, access_policy + ) + def metadata(self) -> JSON: """ diff --git a/tiled/adapters/protocols.py b/tiled/adapters/protocols.py index 01d5cdacc..5a6e50ef3 100644 --- a/tiled/adapters/protocols.py +++ b/tiled/adapters/protocols.py @@ -19,6 +19,11 @@ class BaseAdapter(Protocol): + # @abstractmethod + # @classmethod + # def from_assets(cls, assets, kwargs) -> "BaseAdapter": + # pass + @abstractmethod def metadata(self) -> JSON: pass diff --git a/tiled/adapters/sequence.py b/tiled/adapters/sequence.py index 4e0058ced..1d2218833 100644 --- a/tiled/adapters/sequence.py +++ b/tiled/adapters/sequence.py @@ -1,13 +1,14 @@ import builtins from abc import abstractmethod from pathlib import Path -from typing import Any, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np from numpy._typing import NDArray from ..structures.array import ArrayStructure, BuiltinDtype -from ..structures.core import Spec +from ..structures.core import Spec, StructureFamily +from ..structures.data_source import Asset from ..utils import path_from_uri from .protocols import AccessPolicy from .type_alliases import JSON, NDSlice @@ -22,7 +23,7 @@ class FileSequenceAdapter: When subclassing, define the `_load_from_files` method specific for a particular file type. """ - structure_family = "array" + structure_family = StructureFamily.array @classmethod def from_uris( @@ -33,21 +34,6 @@ def from_uris( specs: Optional[List[Spec]] = None, access_policy: Optional[AccessPolicy] = None, ) -> "FileSequenceAdapter": - """ - - Parameters - ---------- - data_uris : - structure : - metadata : - specs : - access_policy : - - Returns - ------- - - """ - return cls( filepaths=[path_from_uri(data_uri) for data_uri in data_uris], structure=structure, @@ -56,6 +42,24 @@ def from_uris( access_policy=access_policy, ) + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: ArrayStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "FileSequenceAdapter": + return cls( + filepaths=[path_from_uri(a.data_uri) for a in assets], + structure=structure, + specs=specs, + metadata=metadata, + access_policy=access_policy, + ) + def __init__( self, filepaths: List[Path], diff --git a/tiled/adapters/sparse_blocks_parquet.py b/tiled/adapters/sparse_blocks_parquet.py index 1a5ed7dbb..d76e9c337 100644 --- a/tiled/adapters/sparse_blocks_parquet.py +++ b/tiled/adapters/sparse_blocks_parquet.py @@ -1,5 +1,5 @@ import itertools -from typing import Any, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union import dask.base import dask.dataframe @@ -68,6 +68,19 @@ def __init__( self.specs = list(specs or []) self.access_policy = access_policy + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: COOStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "SparseBlocksParquetAdapter": + data_uris = [a.data_uri for a in assets] + return cls(data_uris, structure, metadata, specs, access_policy) + @classmethod def init_storage( cls, diff --git a/tiled/adapters/table.py b/tiled/adapters/table.py index 4b7d3ff0c..c07a73438 100644 --- a/tiled/adapters/table.py +++ b/tiled/adapters/table.py @@ -147,12 +147,6 @@ def __init__( self.access_policy = access_policy def __repr__(self) -> str: - """ - - Returns - ------- - - """ return f"{type(self).__name__}({self._structure.columns!r})" def __getitem__(self, key: str) -> ArrayAdapter: @@ -170,11 +164,11 @@ def __getitem__(self, key: str) -> ArrayAdapter: return ArrayAdapter.from_array(self.read([key])[key].values) def items(self) -> Iterator[Tuple[str, ArrayAdapter]]: - """ + """Iterator over table columns Returns ------- - + Tuples of column names and corresponding ArrayAdapters """ yield from ( (key, ArrayAdapter.from_array(self.read([key])[key].values)) @@ -230,7 +224,7 @@ def read( def read_partition( self, partition: int, - fields: Optional[str] = None, + fields: Optional[List[str]] = None, ) -> Union[pandas.DataFrame, dask.dataframe.DataFrame]: """ @@ -245,7 +239,7 @@ def read_partition( """ df = self._partitions[partition] if df is None: - raise RuntimeError(f"partition {partition} has not be stored yet") + raise RuntimeError(f"Partition {partition} has not been stored yet.") if fields is not None: df = df[fields] if isinstance(df, dask.dataframe.DataFrame): diff --git a/tiled/adapters/tiff.py b/tiled/adapters/tiff.py index 75ef69ad1..16420b53a 100644 --- a/tiled/adapters/tiff.py +++ b/tiled/adapters/tiff.py @@ -6,6 +6,7 @@ from ..structures.array import ArrayStructure, BuiltinDtype from ..structures.core import Spec, StructureFamily +from ..structures.data_source import Asset from ..utils import path_from_uri from .protocols import AccessPolicy from .resource_cache import with_resource_cache @@ -68,6 +69,24 @@ def __init__( ) self._structure = structure + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: ArrayStructure, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> "TiffAdapter": + return cls( + assets[0].data_uri, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + ) + def metadata(self) -> JSON: """ diff --git a/tiled/adapters/zarr.py b/tiled/adapters/zarr.py index 7a914965c..0ab3ffce2 100644 --- a/tiled/adapters/zarr.py +++ b/tiled/adapters/zarr.py @@ -2,7 +2,7 @@ import collections.abc import os import sys -from typing import Any, Iterator, List, Optional, Tuple, Union +from typing import Any, Dict, Iterator, List, Optional, Tuple, Union import zarr.core import zarr.hierarchy @@ -27,18 +27,6 @@ def read_zarr( structure: Optional[ArrayStructure] = None, **kwargs: Any, ) -> Union["ZarrGroupAdapter", ArrayAdapter]: - """ - - Parameters - ---------- - data_uri : - structure : - kwargs : - - Returns - ------- - - """ filepath = path_from_uri(data_uri) zarr_obj = zarr.open(filepath) # Group or Array adapter: Union[ZarrGroupAdapter, ArrayAdapter] @@ -409,3 +397,35 @@ def inlined_contents_enabled(self, depth: int) -> bool: """ return depth <= INLINED_DEPTH + + +class ZarrAdapter: + @classmethod + def from_assets( + cls, + assets: List[Asset], + structure: ArrayStructure, # NOTE: possibly need to be a Union of Array and Mapping structures + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + **kwargs: Optional[Union[str, List[str], Dict[str, str]]], + ) -> Union[ZarrGroupAdapter, ArrayAdapter]: + zarr_obj = zarr.open(path_from_uri(assets[0].data_uri)) # Group or Array + if isinstance(zarr_obj, zarr.hierarchy.Group): + return ZarrGroupAdapter( + zarr_obj, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + **kwargs, + ) + else: + return ZarrArrayAdapter( + zarr_obj, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + **kwargs, + ) diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index b3b407955..17e215249 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -449,12 +449,11 @@ async def lookup_adapter( async def get_adapter(self): (data_source,) = self.data_sources try: - adapter_factory = self.context.adapters_by_mimetype[data_source.mimetype] + adapter_class = self.context.adapters_by_mimetype[data_source.mimetype] except KeyError: raise RuntimeError( f"Server configuration has no adapter for mimetype {data_source.mimetype!r}" ) - parameters = collections.defaultdict(list) for asset in data_source.assets: if asset.parameter is None: continue @@ -464,8 +463,7 @@ async def get_adapter(self): f"Only 'file://...' scheme URLs are currently supported, not {asset.data_uri}" ) if scheme == "file": - # Protect against misbehaving clients reading from unintended - # parts of the filesystem. + # Protect against misbehaving clients reading from unintended parts of the filesystem. asset_path = path_from_uri(asset.data_uri) for readable_storage in self.context.readable_storage: if Path( @@ -479,18 +477,16 @@ async def get_adapter(self): f"Refusing to serve {asset.data_uri} because it is outside " "the readable storage area for this server." ) - if asset.num is None: - parameters[asset.parameter] = asset.data_uri - else: - parameters[asset.parameter].append(asset.data_uri) - adapter_kwargs = dict(parameters) - adapter_kwargs.update(data_source.parameters) - adapter_kwargs["specs"] = self.node.specs - adapter_kwargs["metadata"] = self.node.metadata_ - adapter_kwargs["structure"] = data_source.structure - adapter_kwargs["access_policy"] = self.access_policy adapter = await anyio.to_thread.run_sync( - partial(adapter_factory, **adapter_kwargs) + partial( + adapter_class.from_assets, + data_source.assets, + structure=data_source.structure, + specs=self.node.specs, + metadata=self.node.metadata_, + access_policy=self.access_policy, + **data_source.parameters, + ), ) for query in self.queries: adapter = adapter.search(query) diff --git a/tiled/client/register.py b/tiled/client/register.py index b46c31bbb..662b77e07 100644 --- a/tiled/client/register.py +++ b/tiled/client/register.py @@ -296,10 +296,12 @@ async def register_single_item( ) unhandled_items.append(item) return - adapter_factory = settings.adapters_by_mimetype[mimetype] + adapter_class = settings.adapters_by_mimetype[mimetype] logger.info(" Resolved mimetype '%s' with adapter for '%s'", mimetype, item) try: - adapter = await anyio.to_thread.run_sync(adapter_factory, ensure_uri(item)) + adapter = await anyio.to_thread.run_sync( + adapter_class.from_uris, [ensure_uri(item)] + ) except Exception: logger.exception(" SKIPPED: Error constructing adapter for '%s':", item) return @@ -408,7 +410,9 @@ async def register_image_sequence(node, name, sequence, settings): adapter_class = settings.adapters_by_mimetype[mimetype] key = settings.key_from_filename(name) try: - adapter = adapter_class([ensure_uri(filepath) for filepath in sequence]) + adapter = adapter_class.from_uris( + [ensure_uri(filepath) for filepath in sequence] + ) except Exception: logger.exception(" SKIPPED: Error constructing adapter for '%s'", name) return diff --git a/tiled/mimetypes.py b/tiled/mimetypes.py index 04759330f..219314117 100644 --- a/tiled/mimetypes.py +++ b/tiled/mimetypes.py @@ -13,33 +13,40 @@ AWKWARD_BUFFERS_MIMETYPE = "application/x-awkward-buffers" DEFAULT_ADAPTERS_BY_MIMETYPE = OneShotCachedMap( { - "image/tiff": lambda: importlib.import_module( - "..adapters.tiff", __name__ - ).TiffAdapter, + # "image/tiff": lambda: importlib.import_module( + # "..adapters.tiff", __name__ + # ).TiffAdapter, "multipart/related;type=image/tiff": lambda: importlib.import_module( "..adapters.tiff", __name__ - ).TiffSequenceAdapter.from_uris, + ).TiffSequenceAdapter, "image/jpeg": lambda: importlib.import_module( "..adapters.jpeg", __name__ ).JPEGAdapter, "multipart/related;type=image/jpeg": lambda: importlib.import_module( "..adapters.jpeg", __name__ - ).JPEGSequenceAdapter.from_uris, + ).JPEGSequenceAdapter, "text/csv": lambda: importlib.import_module( "..adapters.csv", __name__ ).CSVAdapter, - # "text/csv": lambda: importlib.import_module( - # "..adapters.csv", __name__ - # ).CSVAdapter.from_single_file, + "multipart/related;type=text/csv": lambda: importlib.import_module( + "..adapters.csv", __name__ + ).CSVAdapter, + # https://www.rfc-editor.org/rfc/rfc4180#section-3 + "text/csv;header=present": lambda: importlib.import_module( + "..adapters.csv", __name__ + ).CSVAdapter, + "text/csv;header=absent": lambda: importlib.import_module( + "..adapters.csv", __name__ + ).CSVArrayAdapter, XLSX_MIME_TYPE: lambda: importlib.import_module( "..adapters.excel", __name__ - ).ExcelAdapter.from_uri, + ).ExcelAdapter, "application/x-hdf5": lambda: importlib.import_module( "..adapters.hdf5", __name__ - ).hdf5_lookup, + ).HDF5Adapter, "application/x-netcdf": lambda: importlib.import_module( "..adapters.netcdf", __name__ - ).read_netcdf, + ).NetCDFAdapter, PARQUET_MIMETYPE: lambda: importlib.import_module( "..adapters.parquet", __name__ ).ParquetDatasetAdapter, @@ -48,10 +55,10 @@ ).SparseBlocksParquetAdapter, ZARR_MIMETYPE: lambda: importlib.import_module( "..adapters.zarr", __name__ - ).read_zarr, + ).ZarrAdapter, AWKWARD_BUFFERS_MIMETYPE: lambda: importlib.import_module( "..adapters.awkward_buffers", __name__ - ).AwkwardBuffersAdapter.from_directory, + ).AwkwardBuffersAdapter, APACHE_ARROW_FILE_MIME_TYPE: lambda: importlib.import_module( "..adapters.arrow", __name__ ).ArrowAdapter, @@ -62,9 +69,7 @@ DEFAULT_REGISTERATION_ADAPTERS_BY_MIMETYPE.set( "text/csv", - lambda: importlib.import_module( - "..adapters.csv", __name__ - ).CSVAdapter.from_single_file, + lambda: importlib.import_module("..adapters.csv", __name__).CSVAdapter, )