Skip to content

Commit

Permalink
Do not set annotation channel when missing from input data when readi…
Browse files Browse the repository at this point in the history
…ng EDF (mne-tools#12044)

Co-authored-by: Paul ROUJANSKY <[email protected]>
  • Loading branch information
paulroujansky and Paul ROUJANSKY authored Oct 2, 2023
1 parent 578f2a9 commit fd08b52
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 19 deletions.
2 changes: 1 addition & 1 deletion doc/changes/devel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Bugs
- Fix bug with axis clip box boundaries in :func:`mne.viz.plot_evoked_topo` and related functions (:gh:`11999` by `Eric Larson`_)
- Fix bug with ``subject_info`` when loading data from and exporting to EDF file (:gh:`11952` by `Paul Roujansky`_)
- Fix bug with delayed checking of :class:`info["bads"] <mne.Info>` (:gh:`12038` by `Eric Larson`_)
- Fix handling of channel information in annotations when loading data from and exporting to EDF file (:gh:`11960` :gh:`12017` by `Paul Roujansky`_)
- Fix handling of channel information in annotations when loading data from and exporting to EDF file (:gh:`11960` :gh:`12017` :gh:`12044` by `Paul Roujansky`_)
- Add missing ``overwrite`` and ``verbose`` parameters to :meth:`Transform.save() <mne.transforms.Transform.save>` (:gh:`12004` by `Marijn van Vliet`_)
- Fix parsing of eye-link :class:`~mne.Annotations` when ``apply_offsets=False`` is provided to :func:`~mne.io.read_raw_eyelink` (:gh:`12003` by `Mathieu Scheltienne`_)
- Correctly prune channel-specific :class:`~mne.Annotations` when creating :class:`~mne.Epochs` without the channel(s) included in the channel specific annotations (:gh:`12010` by `Mathieu Scheltienne`_)
Expand Down
33 changes: 16 additions & 17 deletions mne/io/edf/edf.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def __init__(
)
annotations = _read_annotations_edf(
tal_data[0],
ch_names=info["ch_names"],
encoding=encoding,
)
self.set_annotations(annotations, on_missing="warn")
Expand Down Expand Up @@ -1892,25 +1893,21 @@ def read_raw_gdf(


@fill_doc
def _read_annotations_edf(annotations, encoding="utf8"):
def _read_annotations_edf(annotations, ch_names=None, encoding="utf8"):
"""Annotation File Reader.
Parameters
----------
annotations : ndarray (n_chans, n_samples) | str
Channel data in EDF+ TAL format or path to annotation file.
ch_names : list of string
List of channels' names.
%(encoding_edf)s
Returns
-------
onset : array of float, shape (n_annotations,)
The starting time of annotations in seconds after ``orig_time``.
duration : array of float, shape (n_annotations,)
Durations of the annotations in seconds.
description : array of str, shape (n_annotations,)
Array of strings containing description for each annotation. If a
string, all the annotations are given the same description. To reject
epochs, use description starting with keyword 'bad'. See example above.
annot : instance of Annotations
The annotations.
"""
pat = "([+-]\\d+\\.?\\d*)(\x15(\\d+\\.?\\d*))?(\x14.*?)\x14\x00"
if isinstance(annotations, str):
Expand Down Expand Up @@ -1949,7 +1946,11 @@ def _read_annotations_edf(annotations, encoding="utf8"):
duration = float(ev[2]) if ev[2] else 0
for description in ev[3].split("\x14")[1:]:
if description:
if "@@" in description:
if (
"@@" in description
and ch_names is not None
and description.split("@@")[1] in ch_names
):
description, ch_name = description.split("@@")
key = f"{onset}_{duration}_{description}"
else:
Expand Down Expand Up @@ -1979,22 +1980,20 @@ def _read_annotations_edf(annotations, encoding="utf8"):
offset = -onset

if events:
onset, duration, description, ch_names = zip(*events.values())
onset, duration, description, annot_ch_names = zip(*events.values())
else:
onset, duration, description, ch_names = list(), list(), list(), list()
onset, duration, description, annot_ch_names = list(), list(), list(), list()

assert len(onset) == len(duration) == len(description) == len(ch_names)
assert len(onset) == len(duration) == len(description) == len(annot_ch_names)

annotations = Annotations(
return Annotations(
onset=onset,
duration=duration,
description=description,
orig_time=None,
ch_names=ch_names,
ch_names=annot_ch_names,
)

return annotations


def _get_annotations_gdf(edf_info, sfreq):
onset, duration, desc = list(), list(), list()
Expand Down
77 changes: 76 additions & 1 deletion mne/io/edf/tests/test_edf.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import pytest

from mne import pick_types, Annotations
from mne.annotations import events_from_annotations, read_annotations
from mne.annotations import _ndarray_ch_names, events_from_annotations, read_annotations
from mne.datasets import testing
from mne.io import read_raw_edf, read_raw_bdf, read_raw_fif, edf, read_raw_gdf
from mne.io.tests.test_raw import _test_raw_reader
Expand Down Expand Up @@ -504,6 +504,81 @@ def test_read_utf8_annotations():
assert raw.annotations[1]["description"] == "仰卧"


def test_read_annotations_edf(tmp_path):
"""Test reading annotations from EDF file."""
annot = (
b"+1.1\x14Event A@@CH1\x14\x00\x00"
b"+1.2\x14Event A\x14\x00\x00"
b"+1.3\x14Event B@@CH1\x14\x00\x00"
b"+1.3\x14Event B@@CH2\x14\x00\x00"
b"+1.4\x14Event A@@CH3\x14\x00\x00"
b"+1.5\x14Event B\x14\x00\x00"
)
annot_file = tmp_path / "annotations.edf"
with open(annot_file, "wb") as f:
f.write(annot)

# Test reading annotations from channel data
with open(annot_file, "rb") as f:
tal_channel = _read_ch(
f,
subtype="EDF",
dtype="<i2",
samp=-1,
dtype_byte=None,
)

# Read annotations without input channel names: annotations are left untouched and
# assigned as global
annotations = _read_annotations_edf(tal_channel, ch_names=None, encoding="latin1")
assert_allclose(annotations.onset, [1.1, 1.2, 1.3, 1.3, 1.4, 1.5])
assert not any(annotations.duration) # all durations are 0
assert_array_equal(
annotations.description,
[
"Event A@@CH1",
"Event A",
"Event B@@CH1",
"Event B@@CH2",
"Event A@@CH3",
"Event B",
],
)
assert_array_equal(
annotations.ch_names, _ndarray_ch_names([(), (), (), (), (), ()])
)

# Read annotations with complete input channel names: each annotation is parsed and
# associated to a channel
annotations = _read_annotations_edf(
tal_channel, ch_names=["CH1", "CH2", "CH3"], encoding="latin1"
)
assert_allclose(annotations.onset, [1.1, 1.2, 1.3, 1.4, 1.5])
assert not any(annotations.duration) # all durations are 0
assert_array_equal(
annotations.description, ["Event A", "Event A", "Event B", "Event A", "Event B"]
)
assert_array_equal(
annotations.ch_names,
_ndarray_ch_names([("CH1",), (), ("CH1", "CH2"), ("CH3",), ()]),
)

# Read annotations with incomplete input channel names: "CH3" is missing from input
# channels, turning the related annotation into a global one
annotations = _read_annotations_edf(
tal_channel, ch_names=["CH1", "CH2"], encoding="latin1"
)
assert_allclose(annotations.onset, [1.1, 1.2, 1.3, 1.4, 1.5])
assert not any(annotations.duration) # all durations are 0
assert_array_equal(
annotations.description,
["Event A", "Event A", "Event B", "Event A@@CH3", "Event B"],
)
assert_array_equal(
annotations.ch_names, _ndarray_ch_names([("CH1",), (), ("CH1", "CH2"), (), ()])
)


def test_read_latin1_annotations(tmp_path):
"""Test if annotations encoded as Latin-1 can be read.
Expand Down

0 comments on commit fd08b52

Please sign in to comment.