Skip to content

Commit

Permalink
tests: validate last sample hold
Browse files Browse the repository at this point in the history
- After a non-cyclic buffer is pushed, the last sample will idle
at the output untill a new sample is pushed or the channel is stopped.
- When the oposite channel idles at 0V we test for an interval of +/- 0.2V
because a small spikes appears ocasionally if the swing on the channel that is being
pushed is to large.

Signed-off-by: Adrian Stanea <[email protected]>
  • Loading branch information
Adrian-Stanea committed Aug 30, 2024
1 parent 339e134 commit 153794f
Show file tree
Hide file tree
Showing 5 changed files with 340 additions and 16 deletions.
250 changes: 248 additions & 2 deletions tests/analog_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import random
import sys
import reset_def_values as reset
from helpers import get_result_files, save_data_to_csv, plot_to_file
from helpers import get_result_files, get_sample_rate_display_format, get_time_format, save_data_to_csv, plot_to_file
from open_context import ctx_timeout, ctx
from create_files import results_file, results_dir, csv

from shapefile import shape_gen, Shape

# dicts that will be saved to csv files
shape_csv_vals = {}
ampl_csv_vals = {}
Expand Down Expand Up @@ -1265,4 +1267,248 @@ def test_buffer_transition_glitch(channel, ain, aout, trig, waveform, amplitude=
f'buffer_glitch_plot_ch{channel}_{waveform}.png',
data_marked=filtered_peaks)

return num_peaks
return num_peaks


def get_experiment_config_for_sample_hold(dac_sr):
cfg = {}
if dac_sr == 75_000_000:
cfg["dac_sr"] = dac_sr
cfg["adc_sr"] = 100_000_000
cfg["buffer_size"] = 20_000
cfg["trig_threshold"] = 2.9
cfg["amplitude"] = 5
cfg["samples_per_period"] = 1024 * 8
cfg["offset"] = 0
elif dac_sr == 7_500_000:
cfg["dac_sr"] = dac_sr
cfg["adc_sr"] = 100_000_000
cfg["buffer_size"] = 30_000
cfg["trig_threshold"] = 2.9
cfg["amplitude"] = 5
cfg["samples_per_period"] = 1024
cfg["offset"] = 0
elif dac_sr == 750_000:
cfg["dac_sr"] = dac_sr
cfg["adc_sr"] = 10_000_000
cfg["buffer_size"] = 30_000
cfg["trig_threshold"] = 2.9
cfg["amplitude"] = 5
cfg["samples_per_period"] = 1024
cfg["offset"] = 0
elif dac_sr == 75_000:
cfg["dac_sr"] = dac_sr
cfg["adc_sr"] = 1_000_000
cfg["buffer_size"] = 30_000
cfg["trig_threshold"] = 2.9
cfg["amplitude"] = 5
cfg["samples_per_period"] = 1024
cfg["offset"] = 0
elif dac_sr == 7_500:
cfg["dac_sr"] = dac_sr
cfg["adc_sr"] = 1_000_000
cfg["buffer_size"] = 30_000
cfg["trig_threshold"] = 2.9
cfg["amplitude"] = 5
cfg["samples_per_period"] = 128
cfg["offset"] = 0
# 750 Hz ommited to avoid long test duration
else:
raise ValueError("Invalid DAC sample rate.")
return cfg

def are_values_within_range(data: np.ndarray, lower_bound, upper_bound, chn):
assert lower_bound < upper_bound, "Invalid bounds"
is_CH0_in_range = np.all((lower_bound <= data[0]) & (data[0] <= upper_bound))
is_CH1_in_range = np.all((lower_bound <= data[1]) & (data[1] <= upper_bound))
if chn is None:
return is_CH0_in_range and is_CH1_in_range
elif chn == libm2k.ANALOG_IN_CHANNEL_1:
return is_CH0_in_range
elif chn == libm2k.ANALOG_IN_CHANNEL_2:
return is_CH1_in_range
else:
raise ValueError(f"Unknown channel: {chn}")

def test_last_sample_hold(
ain: libm2k.M2kAnalogIn,
aout: libm2k.M2kAnalogOut,
trig: libm2k.M2kHardwareTrigger,
ctx: libm2k.M2k,
cfg, channel
):
def step_ramp_rising(aout_chn, trig_chn, buffer_ramp_up):
set_trig(trig, trig_chn, 8192, libm2k.RISING_EDGE_ANALOG, -cfg.get("trig_threshold"))
ain.startAcquisition(cfg.get("buffer_size"))
if aout_chn is None:
aout.push([buffer_ramp_up, buffer_ramp_up])
else:
aout.push(aout_chn, buffer_ramp_up)
data = np.array(ain.getSamples(cfg.get("buffer_size")))
# Flush values from previous buffer
ain.stopAcquisition()
return data

def step_ramp_falling(aout_chn, trig_chn, buffer_ramp_down):
set_trig(trig, trig_chn, 8192, libm2k.FALLING_EDGE_ANALOG, cfg.get("trig_threshold"))
ain.startAcquisition(cfg.get("buffer_size"))
if aout_chn is None:
aout.push([buffer_ramp_down, buffer_ramp_down])
else:
aout.push(aout_chn, buffer_ramp_down)
data = np.array(ain.getSamples(cfg.get("buffer_size")))
# Flush values from previous buffer
ain.stopAcquisition()
return data

def check_for_glitch(data, threshold=0.3):
# The glitch is unwanted and happened in between the last sample of the previous buffer and the first sample of the new buffer.
# NOTE: At DAC_SR <= 7.5 KHz we see oscilations due to the response of the HDL filter
glitch_found = False
for chn_samples in data:
if any(abs(left - right) >= threshold for left, right in zip(chn_samples, chn_samples[1:])):
glitch_found = True
return glitch_found

file_name, dir_name, csv_path = get_result_files(gen_reports)
test_name = "sample_hold"
data_string = []

chn_str = "both_channels" if channel is None else f"CH{channel}"
sr_str = get_sample_rate_display_format(cfg.get("dac_sr"))
x_time, x_label = get_time_format(cfg.get("buffer_size"), cfg.get("adc_sr"))

if gen_reports:
subdir_name = f"{dir_name}/last_sample_hold/{chn_str}"
os.makedirs(subdir_name, exist_ok=True)

SLEEP = 0.15
glitched = False
is_last_sample_hold_ok = True # Assume it is ok until proven otherwise
is_idle_ok = True
assert channel in [libm2k.ANALOG_IN_CHANNEL_1, libm2k.ANALOG_IN_CHANNEL_2, None], "Invalid channel ... None means use both channels"
trig_chn = libm2k.ANALOG_IN_CHANNEL_1 if channel is None else channel

buffer_ramp_up = shape_gen(n=cfg["samples_per_period"],
amplitude=cfg["amplitude"],
offset=cfg["offset"])[Shape.RISING_RAMP.value]
buffer_ramp_down = shape_gen(n=cfg["samples_per_period"],
amplitude=cfg["amplitude"],
offset=cfg["offset"])[Shape.FALLING_RAMP.value]

ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_1, True)
ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_2, True)
ain.setSampleRate(cfg.get("adc_sr"))
ain.setRange(0, libm2k.PLUS_MINUS_25V)
ain.setRange(1, libm2k.PLUS_MINUS_25V)

aout.setSampleRate(0, cfg.get("dac_sr"))
aout.setSampleRate(1, cfg.get("dac_sr"))
aout.setKernelBuffersCount(0, 4)
aout.setKernelBuffersCount(1, 4)
aout.enableChannel(0, True)
aout.enableChannel(1, True)
aout.setCyclic(False)

# Alternate between rising and falling ramps: rising, falling, rising, falling
# NOTE: we selected an arbitraty number of samples from both ends to validate sample hold and reset functionality
# 1: Rising
data = step_ramp_rising(channel, trig_chn, buffer_ramp_up)
if channel is None:
# Both channels should idle at 0V before push due to being reset
is_idle_ok = is_idle_ok and are_values_within_range(data[:, :2000], -0.20, 0.20, channel)
assert is_idle_ok, "STEP1: Both channels should idle low before push due to being reset"
elif channel == libm2k.ANALOG_IN_CHANNEL_1:
# CH2 should idle at 0V if we are testing CH1
is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_2)
assert is_idle_ok, "STEP1: CH2 should idle at 0V if we are testing CH1"
elif channel == libm2k.ANALOG_IN_CHANNEL_2:
is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_1)
assert is_idle_ok, "STEP1: CH1 should idle at 0V if we are testing CH2"
# Shoud hold last sample from new buffer for current channel config
is_idle_ok = is_idle_ok and are_values_within_range(data[:, -2000:], cfg["amplitude"] * 0.85, cfg["amplitude"] * 1.15, channel)

if gen_reports:
plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Rising Ramp",
data=data[0],
data1=data[1],
x_data=x_time,
xlabel = x_label,
dir_name=subdir_name,
y_lim=(-6, 6),
filename=f"last_sample_hold_{chn_str}_{sr_str}_step1.png")
time.sleep(SLEEP) # wait for the DAC output to settle with last sample
# 2: Falling
data = step_ramp_falling(channel, trig_chn, buffer_ramp_down)
# Shoud start with last sample from previous buffer
is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, :2000], cfg["amplitude"] * 0.85, cfg["amplitude"] * 1.15, channel)
# Shoud hold last sample from new buffer
is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, -2000:], -cfg["amplitude"] * 1.15, -cfg["amplitude"] * 0.85, channel)
if channel == libm2k.ANALOG_IN_CHANNEL_1:
# CH2 should idle at 0V if we are testing CH1
is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_2)
assert is_idle_ok, "STEP2: CH2 should idle at 0V if we are testing CH1"
elif channel == libm2k.ANALOG_IN_CHANNEL_2:
is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_1)
assert is_idle_ok, "STEP2: CH1 should idle at 0V if we are testing CH2"
glitched = glitched or check_for_glitch(data)
if gen_reports:
plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Falling Ramp",
data=data[0],
data1=data[1],
x_data=x_time,
xlabel = x_label,
dir_name=subdir_name,
y_lim=(-6, 6),
filename=f"last_sample_hold_{chn_str}_{sr_str}_step2.png")
time.sleep(SLEEP) # wait for the DAC output to settle with last sample
# 3: Rising
data = step_ramp_rising(channel, trig_chn, buffer_ramp_up)
# Shoud start with last sample from previous buffer
is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, :2000], -cfg["amplitude"] * 1.15, -cfg["amplitude"] * 0.85, channel)
# Shoud hold last sample from new buffer
is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, -2000:], cfg["amplitude"] * 0.85, cfg["amplitude"] * 1.15, channel)
if channel == libm2k.ANALOG_IN_CHANNEL_1:
# CH2 should idle at 0V if we are testing CH1
is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_2)
assert is_idle_ok, "STEP3: CH2 should idle at 0V if we are testing CH1"
elif channel == libm2k.ANALOG_IN_CHANNEL_2:
is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_1)
assert is_idle_ok, "STEP3: CH1 should idle at 0V if we are testing CH2"
glitched = glitched or check_for_glitch(data)
if gen_reports:
plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Rising Ramp",
data=data[0],
data1=data[1],
x_data=x_time,
xlabel = x_label,
dir_name=subdir_name,
y_lim=(-6, 6),
filename=f"last_sample_hold_{chn_str}_{sr_str}_step3.png")
time.sleep(SLEEP) # wait for the DAC output to settle with last sample
# 4: Falling
data = step_ramp_falling(channel, trig_chn, buffer_ramp_down)
# Shoud start with last sample from previous buffer
is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, :2000], cfg["amplitude"] * 0.85, cfg["amplitude"] * 1.15, channel)
# Shoud hold last sample from new buffer
is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, -2000:], -cfg["amplitude"] * 1.15, -cfg["amplitude"] * 0.85, channel)
if channel == libm2k.ANALOG_IN_CHANNEL_1:
# CH2 should idle at 0V if we are testing CH1
is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_2)
assert is_idle_ok, "STEP4: CH2 should idle at 0V if we are testing CH1"
elif channel == libm2k.ANALOG_IN_CHANNEL_2:
is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_1)
assert is_idle_ok, "STEP4: CH1 should idle at 0V if we are testing CH2"
glitched = glitched or check_for_glitch(data)
if gen_reports:
plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Falling Ramp",
data=data[0],
data1=data[1],
x_data=x_time,
xlabel = x_label,
dir_name=subdir_name,
y_lim=(-6, 6),
filename=f"last_sample_hold_{chn_str}_{sr_str}_step4.png")

aout.stop()
return glitched, is_last_sample_hold_ok, is_idle_ok
52 changes: 47 additions & 5 deletions tests/helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

import numpy as np
from pandas import DataFrame
import matplotlib.pyplot as plt

Expand Down Expand Up @@ -27,7 +28,7 @@ def save_data_to_csv(csv_vals, csv_file):
return


def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data1=None, data_marked=None):
def plot_to_file(title, data, dir_name, filename, xlabel=None, x_lim = None, ylabel=None, y_lim = None, data1=None, x_data = None, data_marked=None):
# Saves the plots in a separate folder
# Arguments:
# title -- Title of the plot
Expand All @@ -40,7 +41,8 @@ def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data
# data_marked -- Data that represents specific points on the plot(default: {None})
# plot the signals in a separate folder
plt.title(title)
if xlabel is not None: # if xlabel and ylabel are not specified there will be default values
# if xlabel and ylabel are not specified there will be default values
if xlabel is not None:
plt.xlabel(xlabel)
else:
plt.xlabel('Samples')
Expand All @@ -49,11 +51,51 @@ def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data
else:
plt.ylabel('Voltage [V]')
plt.grid(visible=True)
plt.plot(data) # if a second set of data must be printed (for ch0 and ch1 phase difference in this case)
# if x_data is not None, the plot will be displayed with the specified x_data
if x_data is not None:
plt.plot(x_data, data)
else:
plt.plot(data)
# if a second set of data must be printed (for ch0 and ch1 phase difference in this case)
if data1 is not None:
plt.plot(data1)
if x_data is not None:
plt.plot(x_data, data1)
else:
plt.plot(data1)
# Optional configurations
if x_lim is not None:
plt.xlim(*x_lim)
if y_lim is not None:
plt.ylim(*y_lim)
if data_marked is not None:
plt.plot(data_marked, data[data_marked], 'xr')
plt.savefig(dir_name + "/" + filename)
plt.close()
return
return


def get_time_format(samples, sample_rate):
x_time = np.linspace(0, samples/sample_rate, samples)

if x_time[-1] < 1e-6:
x_time *= 1e9
x_label = "Time [ns]"
elif x_time[-1] < 1e-3:
x_time *= 1e6
x_label = "Time [us]"
elif x_time[-1] < 1:
x_time *= 1e3
x_label = "Time [ms]"
else:
x_label = "Time [s]"
return x_time, x_label


def get_sample_rate_display_format(sample_rate):
if sample_rate < 1e3:
return f"{sample_rate:.2f} Hz"
if sample_rate < 1e6:
return f"{sample_rate/1e3:.2f} KHz"
if sample_rate < 1e9:
return f"{sample_rate/1e6:.2f} MHz"
return f"{sample_rate/1e9:.2f} GHz"
30 changes: 28 additions & 2 deletions tests/m2k_analog_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import libm2k

from shapefile import shape_gen, ref_shape_gen, shape_name
from analog_functions import test_amplitude, test_shape, phase_diff_ch0_ch1, test_offset, test_analog_trigger, \
from analog_functions import get_experiment_config_for_sample_hold, test_amplitude, test_last_sample_hold, test_shape, phase_diff_ch0_ch1, test_offset, test_analog_trigger, \
test_voltmeter_functionality, test_kernel_buffers, test_buffer_transition_glitch
from analog_functions import noncyclic_buffer_test, set_samplerates_for_shapetest, set_trig_for_cyclicbuffer_test, \
test_calibration
Expand All @@ -15,6 +15,8 @@
import logger
from repeat_test import repeat

from helpers import get_sample_rate_display_format


class A_AnalogTests(unittest.TestCase):
# Class Where are defined all test methods for AnalogIn, AnalogOut, AnalogTrigger
Expand Down Expand Up @@ -251,4 +253,28 @@ def test_buffer_transition_glitch(self):
num_glitches = test_buffer_transition_glitch(channel, ain, aout, trig, waveform)

with self.subTest(msg='Test buffer transition glitch: ' + waveform + ' on ch' + str(channel)):
self.assertEqual(num_glitches, 0, 'Found ' + str(num_glitches) + ' glitches on channel ' + str(channel))
self.assertEqual(num_glitches, 0, 'Found ' + str(num_glitches) + ' glitches on channel ' + str(channel))

@unittest.skipIf(ctx.getFirmwareVersion() < 'v0.33',
'The sample and hold feature is available starting with firmware v0.33. Note: v0.32 had a glitch that is handled in this test.')
def test_last_sample_hold(self):
# Tests the last sample hold functionality for different channels and DAC sample rates.
# This test iterates over different channels (each channel individually and both channels together)
# and then tests the last sample hold functionality. When testing both channels together, 'None'
# is used to denote this case.
# It verifies that the last sample is held correctly and that there are no glitches in the output signal in between the last sample and a new push.

for channel in [libm2k.ANALOG_IN_CHANNEL_1, libm2k.ANALOG_IN_CHANNEL_2, None]:
for dac_sr in [75_000_000, 7_500_000, 750_000, 75_000, 7_500]:
cfg = get_experiment_config_for_sample_hold(dac_sr)
sr_format = get_sample_rate_display_format(cfg.get("dac_sr"))
chn_str = "both_channels" if channel is None else f"CH{channel}"
reset.analog_in(ain)
reset.analog_out(aout)
reset.trigger(trig)
has_glitch, is_last_sample_hold_ok, is_idle_ok = test_last_sample_hold(ain, aout, trig, ctx, cfg, channel)
with self.subTest(msg='Test last sample hold on ' + str(chn_str) + ' with DAC SR ' + str(cfg.get("dac_sr"))):
self.assertEqual(has_glitch, False, f'Found glitches on {chn_str} with DAC SR {sr_format}')
self.assertEqual(is_last_sample_hold_ok, True, f'Last sample hold failed on {chn_str} with DAC SR {sr_format}')
self.assertEqual(is_last_sample_hold_ok, True, f'Last sample hold failed on {chn_str} with DAC SR {sr_format}')
self.assertEqual(is_idle_ok, True, 'Test idle condition failed')
Loading

0 comments on commit 153794f

Please sign in to comment.