Skip to content

Commit

Permalink
tests: refactor duplicate code
Browse files Browse the repository at this point in the history
- Include helpers.py for common functions

Signed-off-by: Adrian Stanea <[email protected]>
  • Loading branch information
Adrian-Stanea committed Aug 28, 2024
1 parent b236ada commit 339e134
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 293 deletions.
193 changes: 25 additions & 168 deletions tests/analog_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import random
import sys
import reset_def_values as reset
from helpers import get_result_files, save_data_to_csv, plot_to_file
from open_context import ctx_timeout, ctx
from create_files import results_file, results_dir, csv

Expand Down Expand Up @@ -133,16 +134,8 @@ def test_amplitude(out_data, ref_data, n, ain, aout, channel, trig):
# corr_amplitude_min -- correlation coefficient between the vector that holds the minimum amplitude values in the
# input signal and the vector that holds the maximum amplitude values in the reference signal

if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
file, dir_name, csv_path = open_files_and_dirs()
else:
file = results_file
dir_name = results_dir
csv_path = csv
else:
file = []
file_name, dir_name, csv_path = get_result_files(gen_reports)


reset.analog_in(ain)
reset.analog_out(aout)
Expand Down Expand Up @@ -204,7 +197,7 @@ def test_amplitude(out_data, ref_data, n, ain, aout, channel, trig):
data_string.append("Minimum amplitude computed:")
data_string.append(str(min_in))
if gen_reports:
write_file(file, test_name, channel, data_string)
write_file(file_name, test_name, channel, data_string)
aout.stop(channel)
return corr_amplitude_max, corr_amplitude_min

Expand All @@ -229,16 +222,7 @@ def test_shape(channel, out_data, ref_data, ain, aout, trig, ch_ratio, shapename
# phase_diff_vect-- vector that holds the phase difference between the input data and the reference data for each
# signal shape

if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
file, dir_name, csv_path = open_files_and_dirs()
else:
file = results_file
dir_name = results_dir
csv_path = csv
else:
file = []
file_name, dir_name, csv_path = get_result_files(gen_reports)

reset.analog_in(ain)
reset.analog_out(aout)
Expand Down Expand Up @@ -284,7 +268,7 @@ def test_shape(channel, out_data, ref_data, ain, aout, trig, ch_ratio, shapename
"Correlation coefficient between " + shapename[i] + "signal and its reference:" + str(corr_shape))
data_string.append("Phase difference between " + shapename[i] + "signal and its reference:" + str(phase_diff))
if gen_reports:
write_file(file, test_name, channel, data_string)
write_file(file_name, test_name, channel, data_string)
aout.stop(channel)

return corr_shape_vect, phase_diff_vect
Expand All @@ -304,16 +288,8 @@ def phase_diff_ch0_ch1(aout, ain, trig):
# Returns:
# phase_diff_between_channels-- the phase difference between channels in degrees
#
if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
file, dir_name, csv_path = open_files_and_dirs()
else:
file = results_file
dir_name = results_dir
csv_path = csv
else:
file = []

file_name, dir_name, csv_path = get_result_files(gen_reports)

reset.analog_in(ain)
reset.analog_out(aout)
Expand Down Expand Up @@ -359,7 +335,7 @@ def phase_diff_ch0_ch1(aout, ain, trig):
phasediff_csv['Ch1, ADCsr=' + str(sr)] = input_data[libm2k.ANALOG_IN_CHANNEL_2]
if gen_reports:
channel = 0
write_file(file, test_name, channel, data_string)
write_file(file_name, test_name, channel, data_string)
save_data_to_csv(phasediff_csv, csv_path + 'ph_diff_channels.csv')
aout.stop()

Expand All @@ -380,16 +356,7 @@ def test_analog_trigger(channel, trig, aout, ain):
# trig_test -- Vector that holds 1 for each trigger condition fulfilled and 0 otherwise
# condition_name-- Vector that holds names of the trigger conditions

if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
file, dir_name, csv_path = open_files_and_dirs()
else:
file = results_file
dir_name = results_dir
csv_path = csv
else:
file = []
file_name, dir_name, csv_path = get_result_files(gen_reports)

reset.analog_in(ain)
reset.analog_out(aout)
Expand Down Expand Up @@ -531,7 +498,7 @@ def test_analog_trigger(channel, trig, aout, ain):
data_string.append("level set: " + str(high))
data_string.append("\nlevel read: " + str(input_data[delay]))
if gen_reports:
write_file(file, test_name, channel, data_string)
write_file(file_name, test_name, channel, data_string)
aout.stop(channel)
return trig_test, condition_name

Expand All @@ -552,16 +519,7 @@ def test_offset(out_data, n, ain, aout, trig, channel):
# Returns
# corr_offset -- Correlation coefficient between the computed offset vector and the defined offset vector

if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
file, dir_name, csv_path = open_files_and_dirs()
else:
file = results_file
dir_name = results_dir
csv_path = csv
else:
file = []
file_name, dir_name, csv_path = get_result_files(gen_reports)

reset.analog_in(ain)
reset.analog_out(aout)
Expand Down Expand Up @@ -604,7 +562,7 @@ def test_offset(out_data, n, ain, aout, trig, channel):

data_string.append(str(in_offset))
if gen_reports:
write_file(file, test_name, channel, data_string)
write_file(file_name, test_name, channel, data_string)
corr_offset, _ = pearsonr(offset, in_offset) # compare the original offset vector with the average values obtained

aout.stop(channel)
Expand All @@ -623,16 +581,7 @@ def test_voltmeter_functionality(channel, ain, aout, ctx):
# Returns:
# voltmeter_ -- Vector thet holds 1 if the read voltage is in specified range and 0 otherwise

if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
file, dir_name, csv_path = open_files_and_dirs()
else:
file = results_file
dir_name = results_dir
csv_path = csv
else:
file = []
file_name, dir_name, csv_path = get_result_files(gen_reports)

reset.analog_in(ain)
reset.analog_out(aout)
Expand Down Expand Up @@ -666,7 +615,7 @@ def test_voltmeter_functionality(channel, ain, aout, ctx):
else:
voltmeter_ = np.append(voltmeter_, 0) # the voltage is out of range
if gen_reports:
write_file(file, test_name, channel, data_string)
write_file(file_name, test_name, channel, data_string)
aout.stop(channel)

return voltmeter_
Expand Down Expand Up @@ -699,16 +648,7 @@ def cyclic_buffer_test(aout, ain, channel, trig):
# Returns:
# cyclic_false -- Must be 1 if a single buffer was sent and succesfully recieved, 0 otherwise

if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
file, dir_name, csv_path = open_files_and_dirs()
else:
file = results_file
dir_name = results_dir
csv_path = csv
else:
file = []
file_name, dir_name, csv_path = get_result_files(gen_reports)
dac_sr = 75000
adc_sr = 10000
min_periods = 3
Expand Down Expand Up @@ -754,16 +694,7 @@ def noncyclic_buffer_test(aout, ain, channel, trig, ctx):
# Returns:
# cyclic_false -- Must be 1 if a single buffer was sent and succesfully recieved, 0 otherwise

if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
file, dir_name, csv_path = open_files_and_dirs()
else:
file = results_file
dir_name = results_dir
csv_path = csv
else:
file = []
file_name, dir_name, csv_path = get_result_files(gen_reports)

reset.analog_in(ain)
reset.analog_out(aout)
Expand Down Expand Up @@ -855,16 +786,7 @@ def compute_frequency(channel, ain, aout, trig):
# ofreqs-- Vector that holds the frequencies of the output buffers
# ifreqs-- Vector that holds the frequencies of the input buffers

if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
file, dir_name, csv_path = open_files_and_dirs()
else:
file = results_file
dir_name = results_dir
csv_path = csv
else:
file = []
file_name, dir_name, csv_path = get_result_files(gen_reports)

test_name = "freq"
data_string = []
Expand Down Expand Up @@ -924,7 +846,7 @@ def compute_frequency(channel, ain, aout, trig):
data_string.append("Out signal frequency:" + str(out_freq))
data_string.append("In singal frequency:" + str(in_freq))
if gen_reports:
write_file(file, test_name, channel, data_string)
write_file(file_name, test_name, channel, data_string)

return ofreqs, ifreqs

Expand Down Expand Up @@ -987,16 +909,7 @@ def test_oversampling_ratio(channel, ain, aout, trig):
# test_osr -- Must be 1 if the computed oversampling ratio is equal with the set oversampling ratio
# and 0 otherwise

if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
file, dir_name, csv_path = open_files_and_dirs()
else:
file = results_file
dir_name = results_dir
csv_path = csv
else:
file = []
file_name, dir_name, csv_path = get_result_files(gen_reports)

reset.analog_in(ain)
reset.analog_out(aout)
Expand Down Expand Up @@ -1044,7 +957,7 @@ def test_oversampling_ratio(channel, ain, aout, trig):

data_string.append("Oversampling ratios computed: \n" + str(verify_osr))
if gen_reports:
write_file(file, test_name, channel, data_string)
write_file(file_name, test_name, channel, data_string)
test_osr = 1
for i in range(len(osr)):
if osr[i] != verify_osr[i]:
Expand All @@ -1053,44 +966,6 @@ def test_oversampling_ratio(channel, ain, aout, trig):
return test_osr


def plot_to_file(title, data, dir_name, filename, xlabel=None, ylabel=None, data1=None, data_marked=None):
# Saves the plots in a separate folder
# Arguments:
# title -- Title of the plot
# data -- Data to be plotted
# filename -- Name of the file with the plot
# Keyword Arguments:
# xlabel -- Label of x-Axis (default: {None})
# ylabel -- Label of y-Axis(default: {None})
# data1 -- Data that should be plotted on the same plot(default: {None})
# data_marked -- Data that represents specific points on the plot(default: {None})
# plot the signals in a separate folder
plt.title(title)
if xlabel is not None: # if xlabel and ylabel are not specified there will be default values
plt.xlabel(xlabel)
else:
plt.xlabel('Samples')
if ylabel is not None:
plt.ylabel(ylabel)
else:
plt.ylabel('Voltage [V]')
plt.grid(visible=True)
plt.plot(data) # if a second set of data must be printed (for ch0 and ch1 phase difference in this case)
if data1 is not None:
plt.plot(data1)
if data_marked is not None:
plt.plot(data_marked, data[data_marked], 'xr')
plt.savefig(dir_name + "/" + filename)
plt.close()
return


def save_data_to_csv(csv_vals, csv_file):
df = DataFrame(csv_vals)
df.to_csv(csv_file)
return


def channels_diff_in_samples(trig, channel, aout, ain):
# Find if there is a sample delay between channels for the same signal and trigger
# Arguments:
Expand All @@ -1107,16 +982,7 @@ def channels_diff_in_samples(trig, channel, aout, ain):
# dac_osr-- oversampling ratios set for Aout
# freq -- frequency of the signals used for the test

if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
file, dir_name, csv_path = open_files_and_dirs()
else:
file = results_file
dir_name = results_dir
csv_path = csv
else:
file = []
file_name, dir_name, csv_path = get_result_files(gen_reports)
reset.analog_in(ain)
reset.analog_out(aout) # /len(samples_diff1[i])
reset.trigger(trig)
Expand Down Expand Up @@ -1172,7 +1038,7 @@ def channels_diff_in_samples(trig, channel, aout, ain):
data_string.append('Samples difference: ' + str(diff_osr))

if gen_reports:
write_file(file, test_name, channel, data_string)
write_file(file_name, test_name, channel, data_string)
save_data_to_csv(diff_in_samples0, csv_path + 'diffSamples_ch0_trigSrc_' + str(channel) + '.csv')
save_data_to_csv(diff_in_samples1, csv_path + 'diffSamples_ch1_trigSrc_' + str(channel) + '.csv')
aout.stop()
Expand Down Expand Up @@ -1317,16 +1183,7 @@ def is_spike(data, peak, threshold = 0.25):
return percentage_dif > threshold

def test_buffer_transition_glitch(channel, ain, aout, trig, waveform, amplitude=1):
if gen_reports:
from create_files import results_file, results_dir, csv, open_files_and_dirs
if results_file is None:
file, dir_name, csv_path = open_files_and_dirs()
else:
file = results_file
dir_name = results_dir
csv_path = csv
else:
file = []
file_name, dir_name, csv_path = get_result_files(gen_reports)

BUFFER_SIZE = 5_00_000

Expand Down Expand Up @@ -1401,7 +1258,7 @@ def test_buffer_transition_glitch(channel, ain, aout, trig, waveform, amplitude=
"Number of glitch peaks found in " + waveform + " signal :" + str(num_peaks))

if gen_reports:
write_file(file, test_name, channel, data_string)
write_file(file_name, test_name, channel, data_string)
plot_to_file(f'Buffer Glitch , channel{channel}',
data,
dir_name,
Expand Down
Loading

0 comments on commit 339e134

Please sign in to comment.