diff --git a/tests/analog_functions.py b/tests/analog_functions.py index 7ce4e43d..6a3e6de6 100644 --- a/tests/analog_functions.py +++ b/tests/analog_functions.py @@ -13,10 +13,12 @@ import random import sys import reset_def_values as reset -from helpers import get_result_files, save_data_to_csv, plot_to_file +from helpers import get_result_files, get_sample_rate_display_format, get_time_format, save_data_to_csv, plot_to_file from open_context import ctx_timeout, ctx from create_files import results_file, results_dir, csv +from shapefile import shape_gen, Shape + # dicts that will be saved to csv files shape_csv_vals = {} ampl_csv_vals = {} @@ -1265,4 +1267,248 @@ def test_buffer_transition_glitch(channel, ain, aout, trig, waveform, amplitude= f'buffer_glitch_plot_ch{channel}_{waveform}.png', data_marked=filtered_peaks) - return num_peaks \ No newline at end of file + return num_peaks + + +def get_experiment_config_for_sample_hold(dac_sr): + cfg = {} + if dac_sr == 75_000_000: + cfg["dac_sr"] = dac_sr + cfg["adc_sr"] = 100_000_000 + cfg["buffer_size"] = 20_000 + cfg["trig_threshold"] = 2.9 + cfg["amplitude"] = 5 + cfg["samples_per_period"] = 1024 * 8 + cfg["offset"] = 0 + elif dac_sr == 7_500_000: + cfg["dac_sr"] = dac_sr + cfg["adc_sr"] = 100_000_000 + cfg["buffer_size"] = 30_000 + cfg["trig_threshold"] = 2.9 + cfg["amplitude"] = 5 + cfg["samples_per_period"] = 1024 + cfg["offset"] = 0 + elif dac_sr == 750_000: + cfg["dac_sr"] = dac_sr + cfg["adc_sr"] = 10_000_000 + cfg["buffer_size"] = 30_000 + cfg["trig_threshold"] = 2.9 + cfg["amplitude"] = 5 + cfg["samples_per_period"] = 1024 + cfg["offset"] = 0 + elif dac_sr == 75_000: + cfg["dac_sr"] = dac_sr + cfg["adc_sr"] = 1_000_000 + cfg["buffer_size"] = 30_000 + cfg["trig_threshold"] = 2.9 + cfg["amplitude"] = 5 + cfg["samples_per_period"] = 1024 + cfg["offset"] = 0 + elif dac_sr == 7_500: + cfg["dac_sr"] = dac_sr + cfg["adc_sr"] = 1_000_000 + cfg["buffer_size"] = 30_000 + cfg["trig_threshold"] = 2.9 + cfg["amplitude"] = 5 + cfg["samples_per_period"] = 128 + cfg["offset"] = 0 + # 750 Hz ommited to avoid long test duration + else: + raise ValueError("Invalid DAC sample rate.") + return cfg + +def are_values_within_range(data: np.ndarray, lower_bound, upper_bound, chn): + assert lower_bound < upper_bound, "Invalid bounds" + is_CH0_in_range = np.all((lower_bound <= data[0]) & (data[0] <= upper_bound)) + is_CH1_in_range = np.all((lower_bound <= data[1]) & (data[1] <= upper_bound)) + if chn is None: + return is_CH0_in_range and is_CH1_in_range + elif chn == libm2k.ANALOG_IN_CHANNEL_1: + return is_CH0_in_range + elif chn == libm2k.ANALOG_IN_CHANNEL_2: + return is_CH1_in_range + else: + raise ValueError(f"Unknown channel: {chn}") + +def test_last_sample_hold( + ain: libm2k.M2kAnalogIn, + aout: libm2k.M2kAnalogOut, + trig: libm2k.M2kHardwareTrigger, + ctx: libm2k.M2k, + cfg, channel +): + def step_ramp_rising(aout_chn, trig_chn, buffer_ramp_up): + set_trig(trig, trig_chn, 8192, libm2k.RISING_EDGE_ANALOG, -cfg.get("trig_threshold")) + ain.startAcquisition(cfg.get("buffer_size")) + if aout_chn is None: + aout.push([buffer_ramp_up, buffer_ramp_up]) + else: + aout.push(aout_chn, buffer_ramp_up) + data = np.array(ain.getSamples(cfg.get("buffer_size"))) + # Flush values from previous buffer + ain.stopAcquisition() + return data + + def step_ramp_falling(aout_chn, trig_chn, buffer_ramp_down): + set_trig(trig, trig_chn, 8192, libm2k.FALLING_EDGE_ANALOG, cfg.get("trig_threshold")) + ain.startAcquisition(cfg.get("buffer_size")) + if aout_chn is None: + aout.push([buffer_ramp_down, buffer_ramp_down]) + else: + aout.push(aout_chn, buffer_ramp_down) + data = np.array(ain.getSamples(cfg.get("buffer_size"))) + # Flush values from previous buffer + ain.stopAcquisition() + return data + + def check_for_glitch(data, threshold=0.3): + # The glitch is unwanted and happened in between the last sample of the previous buffer and the first sample of the new buffer. + # NOTE: At DAC_SR <= 7.5 KHz we see oscilations due to the response of the HDL filter + glitch_found = False + for chn_samples in data: + if any(abs(left - right) >= threshold for left, right in zip(chn_samples, chn_samples[1:])): + glitch_found = True + return glitch_found + + file_name, dir_name, csv_path = get_result_files(gen_reports) + test_name = "sample_hold" + data_string = [] + + chn_str = "both_channels" if channel is None else f"CH{channel}" + sr_str = get_sample_rate_display_format(cfg.get("dac_sr")) + x_time, x_label = get_time_format(cfg.get("buffer_size"), cfg.get("adc_sr")) + + if gen_reports: + subdir_name = f"{dir_name}/last_sample_hold/{chn_str}" + os.makedirs(subdir_name, exist_ok=True) + + SLEEP = 0.15 + glitched = False + is_last_sample_hold_ok = True # Assume it is ok until proven otherwise + is_idle_ok = True + assert channel in [libm2k.ANALOG_IN_CHANNEL_1, libm2k.ANALOG_IN_CHANNEL_2, None], "Invalid channel ... None means use both channels" + trig_chn = libm2k.ANALOG_IN_CHANNEL_1 if channel is None else channel + + buffer_ramp_up = shape_gen(n=cfg["samples_per_period"], + amplitude=cfg["amplitude"], + offset=cfg["offset"])[Shape.RISING_RAMP.value] + buffer_ramp_down = shape_gen(n=cfg["samples_per_period"], + amplitude=cfg["amplitude"], + offset=cfg["offset"])[Shape.FALLING_RAMP.value] + + ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_1, True) + ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_2, True) + ain.setSampleRate(cfg.get("adc_sr")) + ain.setRange(0, libm2k.PLUS_MINUS_25V) + ain.setRange(1, libm2k.PLUS_MINUS_25V) + + aout.setSampleRate(0, cfg.get("dac_sr")) + aout.setSampleRate(1, cfg.get("dac_sr")) + aout.setKernelBuffersCount(0, 4) + aout.setKernelBuffersCount(1, 4) + aout.enableChannel(0, True) + aout.enableChannel(1, True) + aout.setCyclic(False) + + # Alternate between rising and falling ramps: rising, falling, rising, falling + # NOTE: we selected an arbitraty number of samples from both ends to validate sample hold and reset functionality + # 1: Rising + data = step_ramp_rising(channel, trig_chn, buffer_ramp_up) + if channel is None: + # Both channels should idle at 0V before push due to being reset + is_idle_ok = is_idle_ok and are_values_within_range(data[:, :2000], -0.20, 0.20, channel) + assert is_idle_ok, "STEP1: Both channels should idle low before push due to being reset" + elif channel == libm2k.ANALOG_IN_CHANNEL_1: + # CH2 should idle at 0V if we are testing CH1 + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_2) + assert is_idle_ok, "STEP1: CH2 should idle at 0V if we are testing CH1" + elif channel == libm2k.ANALOG_IN_CHANNEL_2: + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_1) + assert is_idle_ok, "STEP1: CH1 should idle at 0V if we are testing CH2" + # Shoud hold last sample from new buffer for current channel config + is_idle_ok = is_idle_ok and are_values_within_range(data[:, -2000:], cfg["amplitude"] * 0.85, cfg["amplitude"] * 1.15, channel) + + if gen_reports: + plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Rising Ramp", + data=data[0], + data1=data[1], + x_data=x_time, + xlabel = x_label, + dir_name=subdir_name, + y_lim=(-6, 6), + filename=f"last_sample_hold_{chn_str}_{sr_str}_step1.png") + time.sleep(SLEEP) # wait for the DAC output to settle with last sample + # 2: Falling + data = step_ramp_falling(channel, trig_chn, buffer_ramp_down) + # Shoud start with last sample from previous buffer + is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, :2000], cfg["amplitude"] * 0.85, cfg["amplitude"] * 1.15, channel) + # Shoud hold last sample from new buffer + is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, -2000:], -cfg["amplitude"] * 1.15, -cfg["amplitude"] * 0.85, channel) + if channel == libm2k.ANALOG_IN_CHANNEL_1: + # CH2 should idle at 0V if we are testing CH1 + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_2) + assert is_idle_ok, "STEP2: CH2 should idle at 0V if we are testing CH1" + elif channel == libm2k.ANALOG_IN_CHANNEL_2: + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_1) + assert is_idle_ok, "STEP2: CH1 should idle at 0V if we are testing CH2" + glitched = glitched or check_for_glitch(data) + if gen_reports: + plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Falling Ramp", + data=data[0], + data1=data[1], + x_data=x_time, + xlabel = x_label, + dir_name=subdir_name, + y_lim=(-6, 6), + filename=f"last_sample_hold_{chn_str}_{sr_str}_step2.png") + time.sleep(SLEEP) # wait for the DAC output to settle with last sample + # 3: Rising + data = step_ramp_rising(channel, trig_chn, buffer_ramp_up) + # Shoud start with last sample from previous buffer + is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, :2000], -cfg["amplitude"] * 1.15, -cfg["amplitude"] * 0.85, channel) + # Shoud hold last sample from new buffer + is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, -2000:], cfg["amplitude"] * 0.85, cfg["amplitude"] * 1.15, channel) + if channel == libm2k.ANALOG_IN_CHANNEL_1: + # CH2 should idle at 0V if we are testing CH1 + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_2) + assert is_idle_ok, "STEP3: CH2 should idle at 0V if we are testing CH1" + elif channel == libm2k.ANALOG_IN_CHANNEL_2: + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_1) + assert is_idle_ok, "STEP3: CH1 should idle at 0V if we are testing CH2" + glitched = glitched or check_for_glitch(data) + if gen_reports: + plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Rising Ramp", + data=data[0], + data1=data[1], + x_data=x_time, + xlabel = x_label, + dir_name=subdir_name, + y_lim=(-6, 6), + filename=f"last_sample_hold_{chn_str}_{sr_str}_step3.png") + time.sleep(SLEEP) # wait for the DAC output to settle with last sample + # 4: Falling + data = step_ramp_falling(channel, trig_chn, buffer_ramp_down) + # Shoud start with last sample from previous buffer + is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, :2000], cfg["amplitude"] * 0.85, cfg["amplitude"] * 1.15, channel) + # Shoud hold last sample from new buffer + is_last_sample_hold_ok = is_last_sample_hold_ok and are_values_within_range(data[:, -2000:], -cfg["amplitude"] * 1.15, -cfg["amplitude"] * 0.85, channel) + if channel == libm2k.ANALOG_IN_CHANNEL_1: + # CH2 should idle at 0V if we are testing CH1 + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_2) + assert is_idle_ok, "STEP4: CH2 should idle at 0V if we are testing CH1" + elif channel == libm2k.ANALOG_IN_CHANNEL_2: + is_idle_ok = is_idle_ok and are_values_within_range(data, -0.20, 0.20, libm2k.ANALOG_IN_CHANNEL_1) + assert is_idle_ok, "STEP4: CH1 should idle at 0V if we are testing CH2" + glitched = glitched or check_for_glitch(data) + if gen_reports: + plot_to_file(title=f"Last Sample Hold: {chn_str} - {sr_str} - Falling Ramp", + data=data[0], + data1=data[1], + x_data=x_time, + xlabel = x_label, + dir_name=subdir_name, + y_lim=(-6, 6), + filename=f"last_sample_hold_{chn_str}_{sr_str}_step4.png") + + aout.stop() + return glitched, is_last_sample_hold_ok, is_idle_ok \ No newline at end of file diff --git a/tests/helpers.py b/tests/helpers.py index ea52f605..700a8ec1 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -1,4 +1,5 @@ +import numpy as np from pandas import DataFrame import matplotlib.pyplot as plt @@ -27,7 +28,7 @@ def save_data_to_csv(csv_vals, csv_file): return -def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data1=None, data_marked=None): +def plot_to_file(title, data, dir_name, filename, xlabel=None, x_lim = None, ylabel=None, y_lim = None, data1=None, x_data = None, data_marked=None): # Saves the plots in a separate folder # Arguments: # title -- Title of the plot @@ -40,7 +41,8 @@ def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data # data_marked -- Data that represents specific points on the plot(default: {None}) # plot the signals in a separate folder plt.title(title) - if xlabel is not None: # if xlabel and ylabel are not specified there will be default values + # if xlabel and ylabel are not specified there will be default values + if xlabel is not None: plt.xlabel(xlabel) else: plt.xlabel('Samples') @@ -49,11 +51,51 @@ def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data else: plt.ylabel('Voltage [V]') plt.grid(visible=True) - plt.plot(data) # if a second set of data must be printed (for ch0 and ch1 phase difference in this case) + # if x_data is not None, the plot will be displayed with the specified x_data + if x_data is not None: + plt.plot(x_data, data) + else: + plt.plot(data) + # if a second set of data must be printed (for ch0 and ch1 phase difference in this case) if data1 is not None: - plt.plot(data1) + if x_data is not None: + plt.plot(x_data, data1) + else: + plt.plot(data1) + # Optional configurations + if x_lim is not None: + plt.xlim(*x_lim) + if y_lim is not None: + plt.ylim(*y_lim) if data_marked is not None: plt.plot(data_marked, data[data_marked], 'xr') plt.savefig(dir_name + "/" + filename) plt.close() - return \ No newline at end of file + return + + +def get_time_format(samples, sample_rate): + x_time = np.linspace(0, samples/sample_rate, samples) + + if x_time[-1] < 1e-6: + x_time *= 1e9 + x_label = "Time [ns]" + elif x_time[-1] < 1e-3: + x_time *= 1e6 + x_label = "Time [us]" + elif x_time[-1] < 1: + x_time *= 1e3 + x_label = "Time [ms]" + else: + x_label = "Time [s]" + return x_time, x_label + + +def get_sample_rate_display_format(sample_rate): + if sample_rate < 1e3: + return f"{sample_rate:.2f} Hz" + if sample_rate < 1e6: + return f"{sample_rate/1e3:.2f} KHz" + if sample_rate < 1e9: + return f"{sample_rate/1e6:.2f} MHz" + return f"{sample_rate/1e9:.2f} GHz" diff --git a/tests/m2k_analog_test.py b/tests/m2k_analog_test.py index 79bb8203..04b7a29f 100644 --- a/tests/m2k_analog_test.py +++ b/tests/m2k_analog_test.py @@ -3,7 +3,7 @@ import libm2k from shapefile import shape_gen, ref_shape_gen, shape_name -from analog_functions import test_amplitude, test_shape, phase_diff_ch0_ch1, test_offset, test_analog_trigger, \ +from analog_functions import get_experiment_config_for_sample_hold, test_amplitude, test_last_sample_hold, test_shape, phase_diff_ch0_ch1, test_offset, test_analog_trigger, \ test_voltmeter_functionality, test_kernel_buffers, test_buffer_transition_glitch from analog_functions import noncyclic_buffer_test, set_samplerates_for_shapetest, set_trig_for_cyclicbuffer_test, \ test_calibration @@ -15,6 +15,8 @@ import logger from repeat_test import repeat +from helpers import get_sample_rate_display_format + class A_AnalogTests(unittest.TestCase): # Class Where are defined all test methods for AnalogIn, AnalogOut, AnalogTrigger @@ -251,4 +253,28 @@ def test_buffer_transition_glitch(self): num_glitches = test_buffer_transition_glitch(channel, ain, aout, trig, waveform) with self.subTest(msg='Test buffer transition glitch: ' + waveform + ' on ch' + str(channel)): - self.assertEqual(num_glitches, 0, 'Found ' + str(num_glitches) + ' glitches on channel ' + str(channel)) \ No newline at end of file + self.assertEqual(num_glitches, 0, 'Found ' + str(num_glitches) + ' glitches on channel ' + str(channel)) + + @unittest.skipIf(ctx.getFirmwareVersion() < 'v0.33', + 'The sample and hold feature is available starting with firmware v0.33. Note: v0.32 had a glitch that is handled in this test.') + def test_last_sample_hold(self): + # Tests the last sample hold functionality for different channels and DAC sample rates. + # This test iterates over different channels (each channel individually and both channels together) + # and then tests the last sample hold functionality. When testing both channels together, 'None' + # is used to denote this case. + # It verifies that the last sample is held correctly and that there are no glitches in the output signal in between the last sample and a new push. + + for channel in [libm2k.ANALOG_IN_CHANNEL_1, libm2k.ANALOG_IN_CHANNEL_2, None]: + for dac_sr in [75_000_000, 7_500_000, 750_000, 75_000, 7_500]: + cfg = get_experiment_config_for_sample_hold(dac_sr) + sr_format = get_sample_rate_display_format(cfg.get("dac_sr")) + chn_str = "both_channels" if channel is None else f"CH{channel}" + reset.analog_in(ain) + reset.analog_out(aout) + reset.trigger(trig) + has_glitch, is_last_sample_hold_ok, is_idle_ok = test_last_sample_hold(ain, aout, trig, ctx, cfg, channel) + with self.subTest(msg='Test last sample hold on ' + str(chn_str) + ' with DAC SR ' + str(cfg.get("dac_sr"))): + self.assertEqual(has_glitch, False, f'Found glitches on {chn_str} with DAC SR {sr_format}') + self.assertEqual(is_last_sample_hold_ok, True, f'Last sample hold failed on {chn_str} with DAC SR {sr_format}') + self.assertEqual(is_last_sample_hold_ok, True, f'Last sample hold failed on {chn_str} with DAC SR {sr_format}') + self.assertEqual(is_idle_ok, True, 'Test idle condition failed') \ No newline at end of file diff --git a/tests/main.py b/tests/main.py index f27d301e..2c441078 100644 --- a/tests/main.py +++ b/tests/main.py @@ -74,7 +74,8 @@ def wait_(): "test_shapes_ch0\n" "test_shapes_ch1\n" "test_voltmeter\n" - "test_buffer_transition_glitch\n") + "test_buffer_transition_glitch\n" + "test_last_sample_hold\n") print("\n ===== class B_TriggerTests ===== \n") print(" ===== tests ====== \n") print("test_1_trigger_object\n" diff --git a/tests/shapefile.py b/tests/shapefile.py index ce5c2949..12b4f561 100644 --- a/tests/shapefile.py +++ b/tests/shapefile.py @@ -1,3 +1,4 @@ +from enum import Enum import numpy as np import math @@ -6,7 +7,7 @@ # signals that will be sent to output buffer -def shape_gen(n): +def shape_gen(n, amplitude: float = 1.0, offset: float = 0.0): # Generates different signal shapes that will be sent to the output # Arguments: # n -- Number of samples in the output buffer @@ -15,18 +16,18 @@ def shape_gen(n): shape = [[]] # generate sine wave - sine = np.sin(np.linspace(-np.pi, np.pi, n)) + sine = amplitude*(np.sin(np.linspace(-np.pi, np.pi, n))) + offset # generate square wave - square = np.append(np.linspace(-1, -1, int(n / 2)), np.linspace(1, 1, int(n / 2))) + square = amplitude * np.append(np.linspace(-1, -1, int(n / 2)), np.linspace(1, 1, int(n / 2))) + offset # generate triangle - triangle = np.append(np.linspace(-1, 1, int(n / 2)), np.linspace(1, -1, int(n / 2))) + triangle = amplitude * np.append(np.linspace(-1, 1, int(n / 2)), np.linspace(1, -1, int(n / 2))) + offset # generate rising ramp - rising_ramp = np.linspace(-1, 1, n) + rising_ramp = amplitude * np.linspace(-1, 1, n) + offset # generate falling ramp - falling_ramp = np.linspace(1, -1, n) + falling_ramp = amplitude * np.linspace(1, -1, n) + offset # shape and reference shape buffers shape = [sine, square, triangle, rising_ramp, falling_ramp] @@ -58,3 +59,11 @@ def shape_name(): shape_name = ['Sine', 'Square', 'Triangle', 'Rising_ramp', 'Falling_ramp'] return shape_name + + +class Shape(Enum): + SINE = 0 + SQUARE = 1 + TRIANGLE = 2 + RISING_RAMP = 3 + FALLING_RAMP = 4 \ No newline at end of file