Skip to content

Commit

Permalink
Merge pull request #874 from neuropsychology/read_xdf
Browse files Browse the repository at this point in the history
[Feature] read_xdf() - easily read and tidy up signals from Lab Streaming Layer (LSL) into a DataFrame
  • Loading branch information
DominiqueMakowski authored Aug 13, 2023
2 parents e7d8320 + e8f2d78 commit 4a20e15
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 6 deletions.
Binary file added data/labstreaminglayer.xdf
Binary file not shown.
21 changes: 18 additions & 3 deletions neurokit2/bio/bio_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@


def bio_process(
ecg=None, rsp=None, eda=None, emg=None, ppg=None, eog=None, keep=None, sampling_rate=1000
ecg=None,
rsp=None,
eda=None,
emg=None,
ppg=None,
eog=None,
keep=None,
sampling_rate=1000,
):
"""**Automated processing of bio signals**
Expand Down Expand Up @@ -204,13 +211,21 @@ def bio_process(

# Additional channels to keep
if keep is not None:
keep = keep.reset_index(drop=True)
if isinstance(keep, pd.DataFrame) or isinstance(keep, pd.Series):
keep = keep.reset_index(drop=True)
else:
raise ValueError("The 'keep' argument must be a DataFrame or Series.")

bio_df = pd.concat([bio_df, keep], axis=1)

# RSA
if ecg is not None and rsp is not None:
rsa = hrv_rsa(
ecg_signals, rsp_signals, rpeaks=None, sampling_rate=sampling_rate, continuous=True
ecg_signals,
rsp_signals,
rpeaks=None,
sampling_rate=sampling_rate,
continuous=True,
)
bio_df = pd.concat([bio_df, rsa], axis=1)

Expand Down
10 changes: 9 additions & 1 deletion neurokit2/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
from .read_acqknowledge import read_acqknowledge
from .read_bitalino import read_bitalino
from .read_video import read_video
from .read_xdf import read_xdf
from .write_csv import write_csv

__all__ = ["read_acqknowledge", "read_bitalino", "read_video", "data", "write_csv"]
__all__ = [
"read_acqknowledge",
"read_bitalino",
"read_xdf",
"read_video",
"data",
"write_csv",
]
4 changes: 2 additions & 2 deletions neurokit2/data/read_bitalino.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ def read_bitalino(filename):
Reads and loads a BITalino file into a Pandas DataFrame.
The function outputs both the dataframe and the information (such as the sampling rate)
retrieved from the OpenSignals file).
retrieved from the OpenSignals file.
Parameters
----------
filename : str
Filename (with or without the extension) of an OpenSignals file (e.g., ``"data.txt"``).
Path (with or without the extension) of an OpenSignals file (e.g., ``"data.txt"``).
Returns
----------
Expand Down
140 changes: 140 additions & 0 deletions neurokit2/data/read_xdf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd


def read_xdf(filename, upsample=2, fillmissing=None):
"""**Read and tidy an XDF file**
Reads and tidies an XDF file with multiple streams into a Pandas DataFrame.
The function outputs both the dataframe and the information (such as the sampling rate).
Note that, as XDF can store streams with different sampling rates and different time stamps,
**the function will resample all streams to 2 times (default) the highest sampling rate** (to
minimize aliasing). The final sampling rate can be found in the ``info`` dictionary.
.. note::
This function requires the *pyxdf* module to be installed. You can install it with
``pip install pyxdf``.
Parameters
----------
filename : str
Path (with the extension) of an XDF file (e.g., ``"data.xdf"``).
upsample : float
Factor by which to upsample the data. Default is 2, which means that the data will be
resampled to 2 times the highest sampling rate. You can increase that to further reduce
edge-distortion, especially for high frequency signals like EEG.
fillmissing : float
The maximum duration in seconds of missing data to fill. ``None`` (default) will
interpolate all missing values and prevent issues with NaNs. However, it might be important
to keep the missing intervals (e.g., ``fillmissing=1`` to keep interruptions of more than
1 s) typically corresponding to signal loss or streaming interruptions and exclude them
from further analysis.
Returns
----------
df : DataFrame, dict
The BITalino file as a pandas dataframe if one device was read, or a dictionary
of pandas dataframes (one dataframe per device) if multiple devices are read.
info : dict
The metadata information containing the sampling rate(s).
See Also
--------
.read_bitalino, .signal_resample
Examples
--------
.. ipython:: python
import neurokit2 as nk
# data, info = nk.read_xdf("data.xdf")
# sampling_rate = info["sampling_rate"]
"""
try:
import pyxdf
except ImportError:
raise ImportError(
"The 'pyxdf' module is required for this function to run. ",
"Please install it first (`pip install pyxdf`).",
)

# Load file
# TODO: would be nice to be able to stream a file from URL
streams, header = pyxdf.load_xdf(filename)

# Get smaller time stamp to later use as offset (zero point)
min_ts = min([min(s["time_stamps"]) for s in streams])

# Loop through all the streams and convert to dataframes
dfs = []
for stream in streams:
# Get columns names and make dataframe
channels_info = stream["info"]["desc"][0]["channels"][0]["channel"]
cols = [channels_info[i]["label"][0] for i in range(len(channels_info))]
dat = pd.DataFrame(stream["time_series"], columns=cols)

# Special treatment for some devices
if stream["info"]["name"][0] == "Muse":
# Rename GYRO channels and add ACCelerometer
if stream["info"]["type"][0] == "GYRO":
dat = dat.rename(columns={"X": "GYRO_X", "Y": "GYRO_Y", "Z": "GYRO_Z"})
dat["ACC"] = np.sqrt(
dat["GYRO_X"] ** 2 + dat["GYRO_Y"] ** 2 + dat["GYRO_Z"] ** 2
)

# Muse - PPG data has three channels: ambient, infrared, red
if stream["info"]["type"][0] == "PPG":
dat = dat.rename(columns={"PPG1": "LUX", "PPG2": "PPG", "PPG3": "RED"})
# Zeros suggest interruptions, better to replace with NaNs (I think?)
dat["PPG"] = dat["PPG"].replace(0, value=np.nan)
dat["LUX"] = dat["LUX"].replace(0, value=np.nan)

# Get time stamps and offset from minimum time stamp
dat.index = pd.to_datetime(stream["time_stamps"] - min_ts, unit="s")
dfs.append(dat)

# Store info of each stream ----------------------------------------------------------------

# Store metadata
info = {
"sampling_rates_original": [
float(s["info"]["nominal_srate"][0]) for s in streams
],
"sampling_rates_effective": [
float(s["info"]["effective_srate"]) for s in streams
],
"datetime": header["info"]["datetime"][0],
"data": dfs,
}

# Synchronize ------------------------------------------------------------------------------
# Merge all dataframes by timestamps
# Note: this is a critical steps, as it inserts timestamps and makes it non-evenly spread
df = dfs[0]
for i in range(1, len(dfs)):
df = pd.merge(df, dfs[i], how="outer", left_index=True, right_index=True)
df = df.sort_index()

# Resample and Interpolate -----------------------------------------------------------------
# Final sampling rate will be 2 times the maximum sampling rate
# (to minimize aliasing during interpolation)
info["sampling_rate"] = int(np.max(info["sampling_rates_original"]) * upsample)
if fillmissing is not None:
fillmissing = int(info["sampling_rate"] * fillmissing)

# Create new index with evenly spaced timestamps
idx = pd.date_range(
df.index.min(), df.index.max(), freq=str(1000 / info["sampling_rate"]) + "ms"
)
# https://stackoverflow.com/questions/47148446/pandas-resample-interpolate-is-producing-nans
df = (
df.reindex(df.index.union(idx))
.interpolate(method="index", limit=fillmissing)
.reindex(idx)
)

return df, info

0 comments on commit 4a20e15

Please sign in to comment.