Skip to content

Commit

Permalink
fix: fix Z series values in events() output, and add individual axis …
Browse files Browse the repository at this point in the history
…indices (#167)

* fix: fix events z index

* style(pre-commit.ci): auto fixes [...]

* change hash

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
tlambert03 and pre-commit-ci[bot] authored Aug 31, 2023
1 parent a70c051 commit f9feb42
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 45 deletions.
2 changes: 1 addition & 1 deletion scripts/download_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

# this is just here to invalidate the github actions cache
# change it when a new file is added to the test data in the dropbox folder
__HASH__ = "a1b2c3d4-e5f6-g7h8-i9j0-k1l2m3n4o5p6"
__HASH__ = "a1b2c3d4-e5f6-g7h8-i9j0-j1l2m3n4o5p6"


def main() -> None:
Expand Down
6 changes: 2 additions & 4 deletions src/nd2/_ome.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,13 @@ def nd2_ome_metadata(f: ND2File) -> m.OME:
for c_idx, ch in enumerate(meta.channels or ())
]

axes = [AXIS._MAP[x.type] for x in f.experiment]
planes: list[m.Plane] = []
for s_idx, loop_idx in zip(range(rdr._seq_count()), rdr.loop_indices()):
coords = dict(zip(axes, loop_idx))
fm = rdr.frame_metadata(s_idx)
planes.extend(
m.Plane(
the_z=coords.get(AXIS.Z, 0), # or loop_idx[fm_ch.loops.ZStackLoop]
the_t=coords.get(AXIS.TIME, 0), # or loop_idx[fm_ch.loops.TimeLoop]
the_z=loop_idx.get(AXIS.Z, 0),
the_t=loop_idx.get(AXIS.TIME, 0),
the_c=c_idx,
# exposure_time=...,
# exposure_time_unit=...,
Expand Down
5 changes: 3 additions & 2 deletions src/nd2/_parse/_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import numpy as np

from nd2 import _util
from nd2 import structures as strct
from nd2._sdk_types import ELxModalityMask, EventMeaning, StimulationType

Expand Down Expand Up @@ -648,13 +649,13 @@ def load_frame_metadata(
meta: strct.Metadata,
exp_loops: list[ExpLoop],
frame_time: float,
loop_indices: tuple[int, ...],
loop_indices: dict[str, int],
) -> strct.FrameMetadata:
xy_loop_idx = global_meta["loops"].get("XYPosLoop", -1)
z_loop_idx = global_meta["loops"].get("ZStackLoop", -1)
if 0 <= xy_loop_idx < len(exp_loops):
xy_params = cast("XYPosLoopParams", exp_loops[xy_loop_idx].parameters)
point = xy_params.points[loop_indices[xy_loop_idx]]
point = xy_params.points[loop_indices[_util.AXIS.POSITION]]
name = point.name
x, y, z = point.stagePositionUm
if not xy_params.isSettingZ:
Expand Down
19 changes: 11 additions & 8 deletions src/nd2/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from os import PathLike
from typing import Any, Callable, ClassVar, Mapping, Sequence, Union

from typing_extensions import Final

from nd2.readers import ND2Reader

StrOrPath = Union[str, PathLike]
Expand Down Expand Up @@ -108,17 +110,18 @@ def rgb_int_to_tuple(rgb: int) -> tuple[int, int, int]:
# these are used has headers in the events() table
TIME_KEY = "Time [s]"
Z_SERIES_KEY = "Z-Series"
POSITION_NAME = "Position Name"


class AXIS:
X = "X"
Y = "Y"
Z = "Z"
CHANNEL = "C"
RGB = "S"
TIME = "T"
POSITION = "P"
UNKNOWN = "U"
X: Final = "X"
Y: Final = "Y"
Z: Final = "Z"
CHANNEL: Final = "C"
RGB: Final = "S"
TIME: Final = "T"
POSITION: Final = "P"
UNKNOWN: Final = "U"

_MAP: ClassVar[dict[str, str]] = {
"Unknown": UNKNOWN,
Expand Down
76 changes: 46 additions & 30 deletions src/nd2/readers/_modern/modern_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,23 @@ def experiment(self) -> list[structures.ExpLoop]:
self._experiment = load_experiment(0, self._raw_experiment)
return self._experiment

def loop_indices(self) -> list[tuple[int, ...]]:
return list(product(*[range(x.count) for x in self.experiment()]))
def loop_indices(self) -> list[dict[str, int]]:
"""Return a list of dicts of loop indices for each frame.
Examples
--------
>>> with nd2.ND2File("path/to/file.nd2") as f:
... f.loop_indices()
[
{'Z': 0, 'T': 0, 'C': 0},
{'Z': 0, 'T': 0, 'C': 1},
{'Z': 0, 'T': 0, 'C': 2},
...
]
"""
axes = [_util.AXIS._MAP[x.type] for x in self.experiment()]
indices = product(*(range(x.count) for x in self.experiment()))
return [dict(zip(axes, x)) for x in indices]

def _img_exp_events(self) -> list[structures.ExperimentEvent]:
"""Parse and return all Image and Experiment events."""
Expand Down Expand Up @@ -398,40 +413,41 @@ def _acquisition_data(self) -> dict[str, Sequence[Any]]:
{
"Time [s]": [0.0, 0.0, 0.0, ...],
"Z-Series": [-1.0, 0., 1.0, ...],
"Index": [0, 1, 2, ...],
"T Index": [0, 0, 0, ...],
"Z Index": [0, 1, 2, ...],
}
"""
data: dict[str, np.ndarray | Sequence] = {}
data: dict[str, list] = {}
frame_times = self._cached_frame_times()
if frame_times:
data[_util.TIME_KEY] = [x / 1000 for x in frame_times]

# FIXME: this whole thing is dumb... must be a better way
experiment = self.experiment()
for i, z_loop in enumerate(experiment):
if not isinstance(z_loop, structures.ZStackLoop):
continue

z_positions = [
z_loop.parameters.stepUm * (i - z_loop.parameters.homeIndex)
for i in range(z_loop.count)
]
if not z_loop.parameters.bottomToTop:
z_positions.reverse()

def _seq_z_pos(
seq_index: int, z_idx: int = i, _zp: list[float] = z_positions
) -> float:
"""Convert a sequence index to a coordinate tuple."""
for n, _loop in enumerate(experiment):
if n == z_idx:
return _zp[seq_index % _loop.count]
seq_index //= _loop.count
raise ValueError("Invalid sequence index or z_idx")

seq_count = self._seq_count()
data[_util.Z_SERIES_KEY] = np.array(
[_seq_z_pos(i) for i in range(seq_count)]
)
loop_indices = self.loop_indices()
for frame_idx, loop_idx in enumerate(loop_indices):
data.setdefault("Index", []).append(frame_idx)
for axis, value in loop_idx.items():
data.setdefault(f"{axis} Index", []).append(value)

for loop in self.experiment():
if isinstance(loop, structures.ZStackLoop):
# zpos is a list of actual z positions at each z-index in a single stack
# e.g. [-1, -.5, 0, 0.5, 1]
params = loop.parameters
zpos = [
params.stepUm * (i - params.homeIndex) for i in range(loop.count)
]
if not params.bottomToTop:
zpos.reverse()
data[_util.Z_SERIES_KEY] = [
zpos[frame_index[_util.AXIS.Z]] for frame_index in loop_indices
]
elif isinstance(loop, structures.XYPosLoop):
names = [p.name or "" for p in loop.parameters.points]
data[_util.POSITION_NAME] = [
names[frame_index[_util.AXIS.POSITION]]
for frame_index in loop_indices
]

return data # type: ignore [return-value]

Expand Down
22 changes: 22 additions & 0 deletions tests/test_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from pathlib import Path

import nd2
import pandas as pd
import pytest

DATA = Path(__file__).parent / "data"


@pytest.mark.parametrize("fname", ["t3p3z5c3.nd2", "t3p3c3z5.nd2", "t1t1t1p3c3z5.nd2"])
def test_events(fname: str) -> None:
with nd2.ND2File(DATA / fname) as f:
events = f.events()

df = pd.DataFrame(events)
expected_coords = ([0] * 5 + [1000] * 5 + [2000] * 5) * 3
assert all(df["X Coord [µm]"] == expected_coords)
assert all(df["Y Coord [µm]"] == expected_coords)
assert all(df["Z-Series"] == [-1, -0.5, 0, 0.5, 1] * 9)
assert all(df["T Index"] == [0] * 15 + [1] * 15 + [2] * 15)
assert all(df["Z Index"] == [0, 1, 2, 3, 4] * 9)
assert all(df["Position Name"] == (["p1"] * 5 + [""] * 5 + ["p3"] * 5) * 3)
6 changes: 6 additions & 0 deletions tests/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ def test_recorded_data() -> None:
row_0 = [rd[h][0] for h in headers]
assert headers == [
_util.TIME_KEY,
"Index",
"T Index",
"Z Index",
"Z-Series",
"Camera 1 Temperature [°C]",
"Laser Power; 1.channel [%]",
Expand All @@ -145,6 +148,9 @@ def test_recorded_data() -> None:
]
assert row_0 == [
0.44508349828422067,
0,
0,
0,
-2.0,
-5.0,
0.0,
Expand Down

0 comments on commit f9feb42

Please sign in to comment.