-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Heart rate estimation #85
base: main
Are you sure you want to change the base?
Conversation
… into heart_rate_estimation
… into heart_rate_estimation
… into heart_rate_estimation
… into heart_rate_estimation
"outputs": [ | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"Shape of the original data: (64775, 2) (72947, 4)\n", | ||
"Shape of the overlapping segments: (64775, 2) (64361, 4)\n" | ||
] | ||
} | ||
], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Je hebt hier en daar nog execution counts, wat op zich niet een probleem is omdat je misschien juist deze output wilt laten zien aan een gebruiker zonder dat een gebruiker de code hoeft te draaien. Even een check of dat je intentie was.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ehm uiteindelijk wel, maar niet voor nu eig dus excuses
"source": [ | ||
"ppg_config = PPGConfig()\n", | ||
"imu_config = IMUConfig()\n", | ||
"metadatas_ppg, metadatas_imu = scan_and_sync_segments(os.path.join(path_to_sensor_data, 'ppg'),\n", | ||
" os.path.join(path_to_sensor_data, 'imu'))\n", | ||
"df_ppg_proc, df_imu_proc=preprocess_ppg_data(metadatas_ppg[0], metadatas_imu[0],\n", | ||
"df_ppg_proc, df_acc_proc=preprocess_ppg_data(metadatas_ppg[0], metadatas_imu[0],\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"df_ppg_proc, df_acc_proc=preprocess_ppg_data(metadatas_ppg[0], metadatas_imu[0],\n", | |
"df_ppg_proc, df_acc_proc = preprocess_ppg_data(metadatas_ppg[0], metadatas_imu[0],\n", |
self.window_step_size_s: int = 1 | ||
self.min_hr_samples = min_window_length * self.sampling_frequency | ||
self.window_overlap_s = self.window_length_s - self.window_step_size_s | ||
self.min_hr_samples = int(min_window_length * self.sampling_frequency) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
E.g., if sampling frequency equals 25 Hz, and window length equals 3.5, then we have min_window_length * self.sampling_frequency
equals 25 * 3.5 = 87.5. Now, inconveniently int(87.5)
equals 87 (int
rounds to the lowest integer). So perhaps, for these edge cases, you want to use int(np.round(min_window_length * self.sampling_frequency))
or something along these lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Goed punt! En zal het aanpassen, heb je bezwaren tegen round i.p.v. np.round omdat je anders in de config numpy moet inladen als module @Erikpostt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some minor differences between the two methods, and I don't think that having to import numpy
should restrict us to using round()
. I'll let you decide on this matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ALlright, but it doesn't seem that np.round is preferred over round() right? So than we can use just round() @Erikpostt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No they seem to have different methods for rounding, but I expect that, in case the rounding is ever done differently, it should not affect the results much.
|
||
|
||
def estimate_heart_rate(df: pd.DataFrame, df_ppg_preprocessed: pd.DataFrame, config:HeartRateExtractionConfig) -> pd.DataFrame: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me it's a little ambiguous as to the meaning of df
. Could you perhaps choose a different naming for df
, and expand on the difference between these two dataframes in the docstrings?
|
||
|
||
def estimate_heart_rate(df: pd.DataFrame, df_ppg_preprocessed: pd.DataFrame, config:HeartRateExtractionConfig) -> pd.DataFrame: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def estimate_heart_rate(df: pd.DataFrame, df_ppg_preprocessed: pd.DataFrame, config:HeartRateExtractionConfig) -> pd.DataFrame: | |
def estimate_heart_rate(df: pd.DataFrame, df_ppg_preprocessed: pd.DataFrame, config: HeartRateExtractionConfig) -> pd.DataFrame: |
# Assign window-level probabilities to individual samples | ||
ppg_post_prob = df.loc[:, DataColumns.PRED_SQA_PROBA].to_numpy() | ||
#acc_label = df.loc[:, DataColumns.ACCELEROMETER_LABEL].to_numpy() # Adjust later in data columns to get the correct label, should be first intergrated in feature extraction and classification |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bedoel je dat de acc_label uitgecomment is? Want dat is intentional nu ja @Erikpostt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was just wondering.
v_start_idx, v_end_idx = extract_hr_segments(sqa_label, config.min_hr_samples) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps add brief comments (#) here to explain what happens, e.g., # Add signal quality value to each window
and # Extract HR segments above signal quality threshold
.
|
||
# Generate HR estimation time array | ||
rel_segment_time = df_ppg_preprocessed.time[start_idx:end_idx].values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is .time
the same as [DataColumns.TIME]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nee, goed punt
rel_segment_time = df_ppg_preprocessed.time[start_idx:end_idx].values | ||
n_full_segments = len(rel_segment_time) // config.hr_est_samples | ||
hr_time = rel_segment_time[:n_full_segments * config.hr_est_samples : config.hr_est_samples] # relative time in seconds after the start of the segment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps you could clarify this process a little using comments (#), I'm not sure I understand it as it is now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you create this config again? Is it necessary for your code to run? Or is this your way of showing you disagree with the current format? ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Om jou te pesten, nee zonder gekheid ik had hem nog nodig om de config.py aan te passen omdat hij in de vorige merge was verwijderd. Echter had ik hem al gecommit naar deze branch voordat we de configs gingen omschrijven dus daardoor is hij uit het oog verloren. Zal hem verwijderen @Erikpostt
from paradigma.config import HeartRateExtractionConfig | ||
from paradigma.heart_rate.tfd import nonsep_gdtfd | ||
|
||
def assign_sqa_label(ppg_prob, config: HeartRateExtractionConfig, acc_label=None) -> np.ndarray: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def assign_sqa_label(ppg_prob, config: HeartRateExtractionConfig, acc_label=None) -> np.ndarray: | |
def assign_sqa_label(ppg_prob: np.ndarray, config: HeartRateExtractionConfig, acc_label: np.ndarray=None) -> np.ndarray: |
Parameters: | ||
- ppg_prob: numpy array of probabilities for PPG | ||
- acc_label: numpy array of labels for accelerometer (optional) | ||
- fs: Sampling frequency |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these need to be updated.
|
||
# Calculate number of samples to shift for each epoch | ||
samples_shift = config.window_step_size_s * fs | ||
n_samples = int((len(ppg_prob) + config.window_overlap_s) * fs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same to a comment I made earlier, perhaps you first want to round before converting to an integer to avoid 8.6 being rounded to 8.
start_idx = int(np.floor((i - (samples_per_epoch - samples_shift)) / fs)) | ||
end_idx = int(np.floor(i / fs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe int
already does np.floor
inherently, but you could check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for negative numbers, and that's what I need, so I will update it to integer division because that's probably the most efficient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could i - (samples_per_epoch - samples_shift)
be negative? Smart thinking though about the np.floor
for negative values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is for the edge cases at the start where i = 1:150 (because samples_per_epoch is 180 and samples_shift is 30)
if start_idx < 0: | ||
start_idx = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps, instead of this, you could change the earlier:
start_idx = int(np.floor((i - (samples_per_epoch - samples_shift)) / fs))
to:
start_idx = max(0, int((i - (samples_per_epoch - samples_shift)) / fs))
if end_idx > len(ppg_prob): | ||
end_idx = len(ppg_prob) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar approach could be used here.
|
||
# Calculate mean probability and majority voting for labels | ||
data_prob[i] = np.mean(prob) | ||
data_label_imu[i] = int(np.mean(label_imu) >= 0.5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use round here as well if appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case not, since it is a true false statement which I want to convert to 0 or 1, therefore this doesn't have to be rounded
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha you're right, not sure where this suggestion came from. You might want to store it as boolean
if TSDF allows (don't think it does yet though ...)
|
||
# Set probability to zero if majority IMU label is 0 | ||
data_prob[data_label_imu == 0] = 0 | ||
sqa_label = (data_prob > config.threshold_sqa).astype(np.int8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two questions:
- Should the probability be (1) larger than, or (2) larger than or equal to?
- Why do you convert it to
np.int8
? Why don't you store it as a boolean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Probability is larger than (1) instead of (2) in my Matlab code, however this is not entirely correct so I would want to have (2) so I adjust it here. I won't expect that this really matters for the results because I won't expect that many probabilities to be exactly the threshold
- I'll update this and store it as a boolean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! You might run into issues with storing as boolean due to TSDF constraints. I'm sorry if I got your hopes up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This step doesn't have to be stored in tsdf luckily :). It's just an input for extracting the ppg segments suited for HR estimation
Length of each segment (in seconds) to calculate the time-frequency distribution. | ||
fs : int | ||
The sampling frequency of the PPG signal. | ||
MA : int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you don't use this one anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct
""" | ||
|
||
# Constants to handle boundary effects | ||
edge_padding = 4 # Additional 4 seconds (2 seconds on both sides) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest assigning edge_padding = 4 * fs
and tfd_length = tfd_length * fs
, since you use edge_padding
and tfd_length
only in combination with fs
in this function, right?
original_segment_length = (len(ppg) - edge_padding * fs) / fs | ||
|
||
# Determine the number of tfd_length-second segments | ||
if original_segment_length > extended_epoch_length: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Larger, or larger or equal to?
|
||
Returns | ||
------- | ||
hr_smooth_tfd : list |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
np.array
or np.ndarray
now I believe.
|
||
""" | ||
|
||
def nonsep_gdtfd(x, kern_type=None, kern_params=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have used Union
so far, but apparently the correct way since Python 3.10 is using Pipe:
def nonsep_gdtfd(x, kern_type=None, kern_params=None): | |
def nonsep_gdtfd( | |
x: np.ndarray, | |
kern_type: None | str = None, | |
kern_params: None | dict = None | |
): |
x_fft = np.fft.fft(x) | ||
|
||
# Generate the analytic signal in the frequency domain | ||
H = [1] + list(np.repeat(2, N-1)) + [1] + list(np.repeat(0, N-1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps you could use np.concatenate
here, which works for more than 2 items as well:
# Define three arrays
arr1 = np.array([1, 2, 3])
arr2 = np.array([4, 5, 6])
arr3 = np.array([7, 8, 9])
# Concatenate along the first axis (default)
result = np.concatenate((arr1, arr2, arr3))
print(result) # Output: [1 2 3 4 5 6 7 8 9]
Or for lists:
# Define three lists
list1 = [1, 2, 3]
list2 = [4, 5, 6]
list3 = [7, 8, 9]
# Concatenate lists with NumPy
result = np.concatenate((list1, list2, list3))
print(result) # Output: [1 2 3 4 5 6 7 8 9]
I suppose that the FFT works faster with two numpy arrays, but not sure.
|
||
return z | ||
|
||
def gen_time_lag(z): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def gen_time_lag(z): | |
def gen_time_lag(z: np.ndarray): -> np.ndarray |
|
||
return tfd | ||
|
||
def multiply_kernel_signal(tfd, kern_type, kern_params, N, Nh): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def multiply_kernel_signal(tfd, kern_type, kern_params, N, Nh): | |
def multiply_kernel_signal( | |
tfd: np.ndarray, | |
kern_type: str, | |
kern_params: dict, | |
N: int, | |
Nh: int | |
) -> np.ndarray: |
g_lag_slice = gen_doppler_lag_kern(kern_type, kern_params, N, m) | ||
|
||
# Extract and transform the TFD slice for this lag | ||
tfd_slice = np.fft.fft(tfd[:, m]) + 1j * np.fft.fft(tfd[:, Nh + m]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tfd_slice = np.fft.fft(tfd[:, m]) + 1j * np.fft.fft(tfd[:, Nh + m]) | |
tfd_slice = np.fft.fft(tfd[:, m]) + j * np.fft.fft(tfd[:, Nh + m]) |
|
||
return tfd | ||
|
||
def gen_doppler_lag_kern(kern_type, kern_params, N, lag_index): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also fix parameter types in your other functions.
elif kern_type == 'swvd': | ||
key = 'lag' # Key for the lag-independent kernel | ||
# Smoothed Wigner-Ville Distribution (Lag Independent kernel) | ||
if l < 2: | ||
raise ValueError("Need at least two window parameters for SWVD") | ||
win_length = kern_params['win_length'] | ||
win_type = kern_params['win_type'] | ||
win_param = kern_params['win_param'] if l >= 3 else 0 | ||
win_param2 = kern_params['win_param2'] if l >= 4 else 1 | ||
|
||
G1 = get_window(win_length, win_type, win_param) | ||
G1 = pad_window(G1, N) | ||
|
||
# Define window in the time domain or Doppler domain | ||
if win_param2 == 0: | ||
G1 = np.fft.fft(G1) | ||
G1 /= G1[0] | ||
|
||
g[:] = G1 # Assign the window to the kernel | ||
|
||
elif kern_type == 'pwvd': | ||
key = 'doppler' | ||
# Pseudo-Wigner-Ville Distribution (Doppler Independent kernel) | ||
if l < 2: | ||
raise ValueError("Need at least two window parameters for PWVD") | ||
win_length = kern_params['win_length'] | ||
win_type = kern_params['win_type'] | ||
win_param = kern_params['win_param'] if l >= 3 else 0 | ||
win_param2 = kern_params['win_param2'] if l >= 4 else 0 | ||
|
||
G2 = get_window(win_length, win_type, win_param) # Generate the window, same per lag iteration | ||
G2 = pad_window(G2, N) # Zero-pad the window to the length of the signal | ||
G2 = G2[lag_index] # Extract the lag_index-th element of the window --> can this be done more efficiently? Since we only need one element of the window and the padding is similar for every iteration | ||
|
||
g[:] = G2 # Assign the window to the kernel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could combine these:
First:
if kern_type not in [X, Y, Z, A, B, C]:
raise("You should pick one of ...")
Note: you can also use it at the end like you've used it in the else
statement. I only saw this later.
Then
if kern_type == 'wvd':
...
elif kern_type == 'sep':
....
else:
if l < 2:
raise ValueError("Need at least two window parameters for {kern_type.upper()}")
if kern_type == 'swvd':
key = 'lag'
else:
key = 'doppler'
win_length = kern_params['win_length']
win_type = kern_params['win_type']
win_param = kern_params['win_param'] if l >= 3 else 0
win_param2 = kern_params['win_param2'] if l >= 4 else 1
G = get_window(win_length, win_type, win_param)
G = pad_window(G, N)
if kern_type == 'sqvd' and win_param2 == 0:
G = np.fft.fft(G)
G /= G[0]
elif kern_type == 'pwvd':
G = G[lag_index]
g[:] = G
And I don't think you have to assign key
in this loop because you don't seem to be using it.
if win_param is None: | ||
win_param = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps you could set the parameter win_param: list = []
.
win_param = [] | ||
|
||
# Get the window | ||
win = get_win(win_length, win_type, win_param, dft_window) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your get_win
function here assigns an empty list to win_param
if it equals None
. However, in the get_win()
function you assign floats to this variable. Is this working as expected?
R_even_half = np.complex128(tfd[n, :Nh]) + 1j * np.complex128(tfd[n, Nh:]) | ||
R_odd_half = np.complex128(tfd[n+1, :Nh]) + 1j * np.complex128(tfd[n+1, Nh:]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
R_even_half = np.complex128(tfd[n, :Nh]) + 1j * np.complex128(tfd[n, Nh:]) | |
R_odd_half = np.complex128(tfd[n+1, :Nh]) + 1j * np.complex128(tfd[n+1, Nh:]) | |
R_even_half = np.complex128(tfd[n, :Nh]) + j * np.complex128(tfd[n, Nh:]) | |
R_odd_half = np.complex128(tfd[n+1, :Nh]) + j * np.complex128(tfd[n+1, Nh:]) |
R_tslice_odd[N-mb] = np.conj(R_odd_half[mb]) | ||
|
||
# Perform FFT to compute time slices | ||
tfd_time_slice = np.fft.fft(R_tslice_even + 1j * R_tslice_odd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tfd_time_slice = np.fft.fft(R_tslice_even + 1j * R_tslice_odd) | |
tfd_time_slice = np.fft.fft(R_tslice_even + j * R_tslice_odd) |
@@ -288,23 +289,23 @@ def synchronization(ppg_meta, imu_meta): | |||
|
|||
return segment_ppg_total, segment_imu_total | |||
|
|||
def extract_overlapping_segments(df_ppg, df_imu, time_column_ppg='time', time_column_imu='time'): | |||
def extract_overlapping_segments(df_ppg, df_acc, time_column_ppg='time', time_column_imu='time'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't forget to set data types here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! I have a couple of suggestions, mostly related to fixing data types and simplifying code. I would also suggest to add more comments (#) in the code to make sure the user/dev understands the processing of the data.
Ha Erik,
Hierbij de PR voor HR estimation. Er zitten ook een paar renaming dingen in voor de preprocessing maar die spreken denk ik voor zich. Ga ik hierna mij focussen op efficiëntie ;).