Skip to content

Commit

Permalink
tests: validate the aout triggering
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Stanea <[email protected]>
  • Loading branch information
Adrian-Stanea authored and AlexandraTrifan committed Sep 20, 2024
1 parent 9ac6fc2 commit 1159040
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 8 deletions.
171 changes: 167 additions & 4 deletions tests/analog_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import random
import sys
import reset_def_values as reset
from helpers import get_result_files, get_sample_rate_display_format, get_time_format, save_data_to_csv, plot_to_file
from helpers import get_result_files, get_sample_rate_display_format, get_time_format, save_data_to_csv, plot_to_file, plot_to_file_multiline
from open_context import ctx_timeout, ctx
from create_files import results_file, results_dir, csv

Expand Down Expand Up @@ -1068,6 +1068,8 @@ def write_file(file, test_name, channel, data_string):
file.write("\n\nAmplitude test on channel " + str(channel) + ": \n")
elif test_name == "buffer_transition_glitch":
file.write("\n\nTest buffer transition glitch on channel " + str(channel) + ": \n")
elif test_name == "aout_triggering":
file.write("\n\nTest aout start with trigger event on channel = " + str(channel) + ": \n")
for i in range(len(data_string)):
file.write(str(data_string[i]) + '\n')

Expand Down Expand Up @@ -1317,7 +1319,7 @@ def get_experiment_config_for_sample_hold(dac_sr):
raise ValueError("Invalid DAC sample rate.")
return cfg

def are_values_within_range(data: np.ndarray, lower_bound, upper_bound, chn):
def are_values_within_range(data: np.ndarray, lower_bound, upper_bound, chn=None):
assert lower_bound < upper_bound, "Invalid bounds"
is_CH0_in_range = np.all((lower_bound <= data[0]) & (data[0] <= upper_bound))
is_CH1_in_range = np.all((lower_bound <= data[1]) & (data[1] <= upper_bound))
Expand All @@ -1329,7 +1331,6 @@ def are_values_within_range(data: np.ndarray, lower_bound, upper_bound, chn):
return is_CH1_in_range
else:
raise ValueError(f"Unknown channel: {chn}")

def test_last_sample_hold(
ain: libm2k.M2kAnalogIn,
aout: libm2k.M2kAnalogOut,
Expand Down Expand Up @@ -1511,4 +1512,166 @@ def check_for_glitch(data, threshold=0.3):
filename=f"last_sample_hold_{chn_str}_{sr_str}_step4.png")

aout.stop()
return glitched, is_last_sample_hold_ok, is_idle_ok
return glitched, is_last_sample_hold_ok, is_idle_ok


def test_aout_triggering(
ain: libm2k.M2kAnalogIn,
aout: libm2k.M2kAnalogOut,
dig: libm2k.M2kDigital,
trig: libm2k.M2kHardwareTrigger,
ctx: libm2k.M2k,
auto_rearm : bool, isCyclic : bool, status
):
def configure_trigger(trig: libm2k.M2kHardwareTrigger,
dig: libm2k.M2kDigital,
trig_pin, status, delay):
trig.setAnalogDelay(-delay)
trig.setDigitalDelay(-delay)
trig.setDigitalSource(libm2k.SRC_NONE) # DigitalIn conditioned by internal trigger structure
trig.setDigitalCondition(trig_pin, libm2k.RISING_EDGE_DIGITAL)
trig.setAnalogOutTriggerSource(libm2k.TRIGGER_LA) # aout conditioned by the LA trigger
trig.setAnalogOutTriggerStatus(status)
file_name, dir_name, csv_path = get_result_files(gen_reports)
test_name = "aout_triggering"
data_string = []

TRIG_PIN = libm2k.DIO_CHANNEL_0
DELAY = 8_000
BUFFER_SIZE = 16_000
OVERSAMPLING = 1
KB_COUNT = 40
N_SAMPLES = 1024
AMPLITUDE = 5
OFFSET = 0
TIMEOUT = 10_000

ADC_SR = 100_000_000
DAC_SR = 75_000_000
SR_IN_DIG = 100_000_000
SR_OUT_DIG = 100_000_000

ctx.reset()
ctx.calibrateADC()
ctx.calibrateDAC()
ctx.setTimeout(TIMEOUT)

ain.setSampleRate(ADC_SR)
ain.setOversamplingRatio(OVERSAMPLING)
ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_1, True)
ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_2, True)
ain.setRange(libm2k.ANALOG_IN_CHANNEL_1, -10, 10)
ain.setRange(libm2k.ANALOG_IN_CHANNEL_2, -10, 10)
assert ain.getSampleRate() == ADC_SR, "Failed to set the sample rate for AnalogIn"

aout.setSampleRate(0, DAC_SR)
aout.setSampleRate(1, DAC_SR)
aout.enableChannel(0, True)
aout.enableChannel(1, True)
aout.setOversamplingRatio(0, 1)
aout.setOversamplingRatio(1, 1)
aout.setKernelBuffersCount(0, KB_COUNT)
aout.setKernelBuffersCount(1, KB_COUNT)
assert aout.getSampleRate(1) == DAC_SR, "Failed to set the sample rate for AnalogOut1"

dig.setDirection(TRIG_PIN, libm2k.DIO_OUTPUT)
dig.setOutputMode(TRIG_PIN, libm2k.DIO_PUSHPULL)
dig.enableChannel(TRIG_PIN, True)
dig.setCyclic(False)
dig.setValueRaw(TRIG_PIN, libm2k.LOW)
dig.setSampleRateIn(SR_IN_DIG)
dig.setSampleRateOut(SR_OUT_DIG)
assert dig.getSampleRateIn() == SR_IN_DIG , "Failed to set the sample rate for DigitalIn"
assert dig.getSampleRateOut() == SR_OUT_DIG , "Failed to set the sample rate for DigitalOut"

# LA trigger will determine an action for the aout based on the provided status
configure_trigger(trig, dig, TRIG_PIN, status, DELAY)
aout.setCyclic(isCyclic)
aout.setBufferRearmOnTrigger(auto_rearm)

# Configure Aout Signal
buf = shape_gen(n=N_SAMPLES, amplitude=AMPLITUDE, offset=OFFSET)[Shape.FALLING_RAMP.value]
aout.push([buf, buf])

ctx.startMixedSignalAcquisition(BUFFER_SIZE)

dig.setValueRaw(TRIG_PIN, libm2k.HIGH) # Trigger event -> should start the AOUT
analog_data = np.array(ain.getSamples(BUFFER_SIZE))
digital_data = np.array(dig.getSamples(BUFFER_SIZE))
mask = 0x0001 << TRIG_PIN
digital_data_chn = (digital_data & mask) >> TRIG_PIN

ctx.stopMixedSignalAcquisition()

# Validate test
peaks_CH0, _ = find_peaks(analog_data[0], prominence=1, height=1, distance = 100)
peaks_CH1, _ = find_peaks(analog_data[1], prominence=1, height=1, distance = 100)

CH0_left = analog_data[0][:DELAY]
CH0_right = analog_data[0][DELAY:]
peaks_CH0_left, _ = find_peaks(CH0_left, prominence=1, height=1, distance = 100)
peaks_CH0_right, _ = find_peaks(CH0_right, prominence=1, height=1, distance = 100)
CH1_left = analog_data[1][:DELAY]
CH1_right = analog_data[1][DELAY:]
peaks_CH1_left, _ = find_peaks(CH1_left, prominence=1, height=1, distance = 100)
peaks_CH1_right, _ = find_peaks(CH1_right, prominence=1, height=1, distance = 100)

status_str = "START" if status == libm2k.START else "STOP"
isCyclic_str = "Cyclic" if isCyclic else "Non-Cyclic"
rearm_str = "Ream" if auto_rearm else "No-Rearm"
data_string.append(f"Configuration: status={status_str} \t isCyclic={isCyclic_str} \t auto_rearm={rearm_str}")
data_string.append(f"\tPeaks before trigger: CH0={len(peaks_CH0_left)} CH1={len(peaks_CH1_left)}")
data_string.append(f"\tPeaks after trigger: CH0={len(peaks_CH0_right)} CH1={len(peaks_CH1_right)}")

result = True
# NOTE: auto_rearm only has effect on START status
# Case 1, 2, 4
if ((status == libm2k.START) and (not isCyclic) and (not auto_rearm)) or \
((status == libm2k.START) and (not isCyclic) and (auto_rearm)) or \
((status == libm2k.START) and (isCyclic) and (auto_rearm)):
# Should IDLE before trigger at 0V because the channel was reset
result = are_values_within_range(analog_data[:, :DELAY - 500], -0.2, 0.2)
# result = result and (len(peaks_CH0_left) == 0) and (len(peaks_CH1_left) == 0)
# Should output exactly 1 period after trigger
result = result and (len(peaks_CH0_right) == 1) and (len(peaks_CH1_right) == 1)
# Case 3
if (status == libm2k.START) and (isCyclic) and (not auto_rearm):
# Should IDLE before trigger at 0V because the channel was reset
result = are_values_within_range(analog_data[:, :DELAY ], -0.2, 0.2)
# Should output multiple period after trigger
result = result and (len(peaks_CH0_right) > 1) and (len(peaks_CH1_right) > 1)
# Case 5 and 6
if ((status == libm2k.STOP) and (not isCyclic) and (not auto_rearm)) or \
((status == libm2k.STOP) and (not isCyclic) and (auto_rearm)):
# The channels are in the last sample hold state and STOP is not available for non-cyclic buffers due to HDL limitations
# We expect both channels to hold last sample for the entire duration
result = result and are_values_within_range(analog_data, -AMPLITUDE * 1.2, -AMPLITUDE * 0.8)
result = result and (len(peaks_CH0_left) == 0) and (len(peaks_CH1_left) == 0)
result = result and (len(peaks_CH0_right) == 0) and (len(peaks_CH1_right) == 0)
# Case 7 and 8
if ((status == libm2k.STOP) and (isCyclic) and (not auto_rearm)) or \
((status == libm2k.STOP) and (isCyclic) and (auto_rearm)):
# Should be generating cyclic signal before trigger
result = result and (len(peaks_CH0_left) > 1) and (len(peaks_CH1_left) > 1)
# Should stop generating signal after trigger
# TODO: might need aditional delay since the channel takes some time untill it stops from when the trigger event occurs
result = result and are_values_within_range(analog_data[:, -DELAY + 500:], -0.2, 0.2)

if gen_reports:
write_file(file_name, test_name, "Both Channels", data_string)
filename_str = f"aout_triggering_{status_str}_{isCyclic_str}_{rearm_str}.png"
plot_to_file_multiline(
title="AOUT Triggering",
datasets=[
(None, digital_data_chn, {"label":"Digital"}),
(None, analog_data[0], {"label" : "Analog CH0"}),
(peaks_CH0, analog_data[0], {"label" : "Peaks CH0", "marker" : "x"}),
(None, analog_data[1],{"label" : "Analog CH1"}),
(peaks_CH1, analog_data[1], {"label" : "Peaks CH1", "marker" : "x"}),
],
dir_name=dir_name,
filename=filename_str,
ylim=(-6, 6),
)
aout.stop()
return result
34 changes: 34 additions & 0 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,40 @@ def plot_to_file(title, data, dir_name, filename, xlabel=None, x_lim = None, yla
plt.close()
return

def plot_to_file_multiline(
title,
datasets,
dir_name,
filename,
xlabel="Samples", ylabel="Voltage [V]",
xlim = None, ylim = None,
):
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.grid(visible=True)

for data in datasets:
xdata, ydata, fmt = data
if xdata is not None:
if "marker" in fmt:
# Mark scattered points
plt.plot(xdata, ydata[xdata], linestyle="None", **fmt)
else:
plt.plot(xdata, ydata, **fmt)
else:
plt.plot(ydata, **fmt)

if xlim is not None:
plt.xlim(*xlim)
if ylim is not None:
plt.ylim(*ylim)
plt.legend()

plt.savefig(f"{dir_name}/{filename}")
plt.close()
return


def get_time_format(samples, sample_rate):
x_time = np.linspace(0, samples/sample_rate, samples)
Expand Down
22 changes: 19 additions & 3 deletions tests/m2k_analog_test.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import itertools
import sys
import unittest
import libm2k

from shapefile import shape_gen, ref_shape_gen, shape_name
from analog_functions import get_experiment_config_for_sample_hold, test_amplitude, test_last_sample_hold, test_shape, phase_diff_ch0_ch1, test_offset, test_analog_trigger, \
from analog_functions import get_experiment_config_for_sample_hold, test_amplitude, test_aout_triggering, test_last_sample_hold, test_shape, phase_diff_ch0_ch1, test_offset, test_analog_trigger, \
test_voltmeter_functionality, test_kernel_buffers, test_buffer_transition_glitch
from analog_functions import noncyclic_buffer_test, set_samplerates_for_shapetest, set_trig_for_cyclicbuffer_test, \
test_calibration
from analog_functions import compare_in_out_frequency, test_oversampling_ratio, channels_diff_in_samples, test_timeout, \
cyclic_buffer_test
import reset_def_values as reset
from open_context import ctx, ain, aout, trig, create_dir
from open_context import ctx, ain, aout, dig, trig, create_dir
from create_files import results_dir, csv, results_file
import logger
from repeat_test import repeat
Expand Down Expand Up @@ -277,4 +278,19 @@ def test_last_sample_hold(self):
self.assertEqual(has_glitch, False, f'Found glitches on {chn_str} with DAC SR {sr_format}')
self.assertEqual(is_last_sample_hold_ok, True, f'Last sample hold failed on {chn_str} with DAC SR {sr_format}')
self.assertEqual(is_last_sample_hold_ok, True, f'Last sample hold failed on {chn_str} with DAC SR {sr_format}')
self.assertEqual(is_idle_ok, True, 'Test idle condition failed')
self.assertEqual(is_idle_ok, True, 'Test idle condition failed')

@unittest.skipIf(ctx.getFirmwareVersion() < 'v0.33', 'DAC triggering is available starting with firmware v0.33')
def test_aout_triggering(self):
# Test the triggering functionality of the M2kAnalogOut.
# The test looks for patterns before and after the trigger event for 8 different combinations.
autorearm = [False, True]
isCyclic = [False, True]
status = [libm2k.START, libm2k.STOP]
combinations = list(itertools.product(autorearm, isCyclic, status))
for combination in combinations:
autorearm, isCyclic, status = combination
test_result = test_aout_triggering(ain, aout, dig, trig, ctx, autorearm, isCyclic, status)
status_str = "START" if status == libm2k.START else "STOP"
with self.subTest(msg=f'Test aout start with trigger for: status={status_str}, isCyclic={isCyclic}, autorearm={autorearm} '):
self.assertEqual(test_result, True, msg=f'Specification not met')
3 changes: 2 additions & 1 deletion tests/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def wait_():
"test_shapes_ch1\n"
"test_voltmeter\n"
"test_buffer_transition_glitch\n"
"test_last_sample_hold\n")
"test_last_sample_hold\n"
"test_aout_triggering\n")
print("\n ===== class B_TriggerTests ===== \n")
print(" ===== tests ====== \n")
print("test_1_trigger_object\n"
Expand Down

0 comments on commit 1159040

Please sign in to comment.