diff --git a/AmplifiersList/Amplifiers.csv b/ControlApplication/AmplifiersList/Amplifiers.csv similarity index 100% rename from AmplifiersList/Amplifiers.csv rename to ControlApplication/AmplifiersList/Amplifiers.csv diff --git a/ControlApplication/Components.py b/ControlApplication/Components.py index 1ba8ba5..689d595 100644 --- a/ControlApplication/Components.py +++ b/ControlApplication/Components.py @@ -1,8 +1,10 @@ -from concurrent.futures import ThreadPoolExecutor import os import pandas import pickle import sys +from colorama import Fore, Style +from concurrent.futures import ThreadPoolExecutor +from Logger import * class BaseModel: def __init__(self, model_number, case_style, description): @@ -33,60 +35,81 @@ class ComponentsList: def __init__(self, model_type, models_dir, dump_filename, log_filename = None): self.model_type = model_type - self.dump_file_path = os.path.join(models_dir, dump_filename) - self.log_filename = log_filename + if ("--show-debug-info" in sys.argv) and log_filename: + self.logger = Logger(log_filename) + else: + self.logger = None - if os.path.exists(self.dump_file_path): - self.data = self.loadDump(self.dump_file_path) + dump_file_path = os.path.join(models_dir, dump_filename) + + if os.path.exists(dump_file_path): + self.data = self.__loadDump(dump_file_path) else: - self.data = self.getModelsList(models_dir) - self.saveDump(self.dump_file_path) + self.data = self.__getModelsList(models_dir) + if self.data is not None: + self.__saveDump(dump_file_path) + else: + init_error_info = ( + f"{Fore.RED}{model_type} components initialization error!{Style.RESET_ALL}\n" + f"Make sure that the {Fore.YELLOW}{models_dir}{Style.RESET_ALL} directory contains .csv files describing the available components!" + ) + print(init_error_info) + exit(1) - def getModelsList(self, models_dir): + def __getModelsList(self, models_dir): models = [] + file_list = [filename for filename in os.listdir(models_dir) if filename.endswith('.csv')] - def processCsvFile(csv_file_path): - df = pandas.read_csv(csv_file_path) - for _, row in df.iterrows(): - if self.model_type == self.FILTER: - model = Filter( - row['Model Number'], - row['Case Style'], - row['Description'], - row['Filter Type'], - row['Passband F1 (MHz)'], - row['Passband F2 (MHz)'], - row['Stopband F3 (MHz)'], - row['Stopband F4 (MHz)'] - ) - elif self.model_type == self.AMPLIFIER: - model = Amplifier( - row['Model Number'], - row['Case Style'], - row['Subcategories'], - row['F Low (MHz)'], - row['F High (MHz)'], - row['Gain (dB) Typ.'] - ) - - models.append(model) + if not file_list: + if self.logger: + self.logger.logMessage(f"{self.model_type} model .csv files are missing!", Logger.LogLevel.ERROR) + return None with ThreadPoolExecutor() as executor: - executor.map(processCsvFile, [os.path.join(models_dir, filename) for filename in file_list]) + for model_list in executor.map(self.__processCsvFile, [os.path.join(models_dir, filename) for filename in file_list]): + models.extend(model_list) return models - def loadDump(self, dump_file_path): + def __loadDump(self, dump_file_path): with open(dump_file_path, 'rb') as model_list_dump_file: - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"[INFO]: Dump loaded: {dump_file_path}\n") + if self.logger: + self.logger.logMessage(f"Dump loaded: {dump_file_path}", Logger.LogLevel.INFO) return pickle.load(model_list_dump_file) - def saveDump(self, dump_file_path): + def __processCsvFile(self, csv_file_path): + models_list = [] + + df = pandas.read_csv(csv_file_path) + for _, row in df.iterrows(): + if (self.model_type == self.FILTER): + model = Filter( + row['Model Number'], + row['Case Style'], + row['Description'], + row['Filter Type'], + row['Passband F1 (MHz)'], + row['Passband F2 (MHz)'], + row['Stopband F3 (MHz)'], + row['Stopband F4 (MHz)'] + ) + elif (self.model_type == self.AMPLIFIER): + model = Amplifier( + row['Model Number'], + row['Case Style'], + row['Subcategories'], + row['F Low (MHz)'], + row['F High (MHz)'], + row['Gain (dB) Typ.'] + ) + + models_list.append(model) + + return models_list + + def __saveDump(self, dump_file_path): with open(dump_file_path, 'wb') as model_list_dump_file: pickle.dump(self.data, model_list_dump_file) - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"[INFO]: Dump saved: {dump_file_path}\n") + if self.logger: + self.logger.logMessage(f"Dump saved: {dump_file_path}", Logger.LogLevel.INFO) diff --git a/ControlApplication/Device.py b/ControlApplication/Device.py index 6503613..31da627 100644 --- a/ControlApplication/Device.py +++ b/ControlApplication/Device.py @@ -2,7 +2,7 @@ class Device: # List of available device for operation - DEVICES_LIST = [ + SUPPORTED_DEVICES = [ "rpitx-expansion-board-SP3T", "rpitx-expansion-board-SP4T", "rpitx-expansion-board-SP6T", @@ -15,15 +15,18 @@ class Device: # : (, , ) DEVICE_TYPE_MAPPING = { # Boards without LNA - DEVICES_LIST[0]: (3, RFSwitch.SP3T_SWITCH_TRUTH_TABLE, None), - DEVICES_LIST[1]: (4, RFSwitch.SP4T_SWITCH_TRUTH_TABLE, None), - DEVICES_LIST[2]: (6, RFSwitch.SP6T_SWITCH_TRUTH_TABLE, None), + SUPPORTED_DEVICES[0]: (3, RFSwitch.SP3T_SWITCH_TRUTH_TABLE, None), + SUPPORTED_DEVICES[1]: (4, RFSwitch.SP4T_SWITCH_TRUTH_TABLE, None), + SUPPORTED_DEVICES[2]: (6, RFSwitch.SP6T_SWITCH_TRUTH_TABLE, None), # Boards with LNA - DEVICES_LIST[3]: (3, RFSwitch.SP3T_SWITCH_TRUTH_TABLE, RFSwitch.SPDT_SWITCH_TRUTH_TABLE), - DEVICES_LIST[4]: (4, RFSwitch.SP4T_SWITCH_TRUTH_TABLE, RFSwitch.SPDT_SWITCH_TRUTH_TABLE), - DEVICES_LIST[5]: (6, RFSwitch.SP6T_SWITCH_TRUTH_TABLE, RFSwitch.SPDT_SWITCH_TRUTH_TABLE) + SUPPORTED_DEVICES[3]: (3, RFSwitch.SP3T_SWITCH_TRUTH_TABLE, RFSwitch.SPDT_SWITCH_TRUTH_TABLE), + SUPPORTED_DEVICES[4]: (4, RFSwitch.SP4T_SWITCH_TRUTH_TABLE, RFSwitch.SPDT_SWITCH_TRUTH_TABLE), + SUPPORTED_DEVICES[5]: (6, RFSwitch.SP6T_SWITCH_TRUTH_TABLE, RFSwitch.SPDT_SWITCH_TRUTH_TABLE) } + FILTERS_SWITCH_TRUTH_TABLE = 1 + LNA_SWITCH_TRUTH_TABLE = 2 + def __init__(self, model_name, log_filename = None): self.model_name = model_name self.filters = [] @@ -32,15 +35,19 @@ def __init__(self, model_name, log_filename = None): self.lna_switch = None self.log_filename = log_filename - def initFilterRFSwitches(self, input_switch_pinout, output_switch_pinout, switch_truth_table): + def initFilterRFSwitches(self, input_switch_pinout, output_switch_pinout): if self.filter_switch is None: - self.filter_switch = FilterSwitch(input_switch_pinout, output_switch_pinout, switch_truth_table, self.log_filename) + self.filter_switch = FilterSwitch(input_switch_pinout, output_switch_pinout, + Device.DEVICE_TYPE_MAPPING[self.model_name][self.FILTERS_SWITCH_TRUTH_TABLE], + self.log_filename) - def initLNA(self, input_switch_pinout, output_switch_pinout, switch_truth_table): + def initLNA(self, input_switch_pinout, output_switch_pinout): + switch_truth_table = Device.DEVICE_TYPE_MAPPING[self.model_name][self.LNA_SWITCH_TRUTH_TABLE] if switch_truth_table and self.lna_switch is None: - self.lna_switch = LNASwitch(input_switch_pinout, output_switch_pinout, switch_truth_table, self.log_filename) + self.lna_switch = LNASwitch(input_switch_pinout, output_switch_pinout, + switch_truth_table, + self.log_filename) - def getConfigurationInfo(self): delimiter = "=" * 60 configuration_info = f"{delimiter}\nActive board configuration:\n" diff --git a/FiltersList/Filters_Band_Pass.csv b/ControlApplication/FiltersList/Filters_Band_Pass.csv similarity index 100% rename from FiltersList/Filters_Band_Pass.csv rename to ControlApplication/FiltersList/Filters_Band_Pass.csv diff --git a/FiltersList/Filters_Band_Stop.csv b/ControlApplication/FiltersList/Filters_Band_Stop.csv similarity index 100% rename from FiltersList/Filters_Band_Stop.csv rename to ControlApplication/FiltersList/Filters_Band_Stop.csv diff --git a/FiltersList/Filters_High_Pass.csv b/ControlApplication/FiltersList/Filters_High_Pass.csv similarity index 100% rename from FiltersList/Filters_High_Pass.csv rename to ControlApplication/FiltersList/Filters_High_Pass.csv diff --git a/FiltersList/Filters_Low_Pass.csv b/ControlApplication/FiltersList/Filters_Low_Pass.csv similarity index 100% rename from FiltersList/Filters_Low_Pass.csv rename to ControlApplication/FiltersList/Filters_Low_Pass.csv diff --git a/ControlApplication/Logger.py b/ControlApplication/Logger.py new file mode 100644 index 0000000..a9a5149 --- /dev/null +++ b/ControlApplication/Logger.py @@ -0,0 +1,26 @@ +import datetime +from enum import Enum + +class Logger: + + class LogLevel(Enum): + ERROR = 1, + INFO = 2 + + DELIMITER = "-" * 60 + + def __init__(self, log_filename): + self.log_filename = log_filename + + def logMessage(self, message, level = None, use_delimiter = False, timestamp = False): + log_message = message + + if timestamp: + log_message = f"[{datetime.datetime.now()}] " + log_message + if level: + log_message = f"[{level.name}] " + log_message + if use_delimiter: + log_message = f"{self.DELIMITER}\n" + log_message + f"\n{self.DELIMITER}" + + with open(self.log_filename, "a") as file: + file.write(f"{log_message}\n") \ No newline at end of file diff --git a/ControlApplication/RFSwitch.py b/ControlApplication/RFSwitch.py index 519ffd9..1bf9c83 100644 --- a/ControlApplication/RFSwitch.py +++ b/ControlApplication/RFSwitch.py @@ -2,6 +2,7 @@ from gpiozero.pins.mock import MockFactory from gpiozero import OutputDevice from gpiozero import BadPinFactory +from Logger import * import sys # Aliases for high and low logic levels @@ -38,13 +39,18 @@ class RFSwitch(): } def __init__(self, switch_pinout, switch_truth_table, log_filename = None): - self.log_filename = log_filename + + if ("--show-debug-info" in sys.argv) and log_filename: + self.logger = Logger(log_filename) + else: + self.logger = None + # Used BCM port numbering by default if "--use-mock-gpio" in sys.argv: used_pin_factory = MockFactory() - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"[INFO]: gpiozero used MockFactory for GPIO operation!\n") + + if self.logger: + self.logger.logMessage("gpiozero used MockFactory for GPIO operation!", Logger.LogLevel.INFO) else: used_pin_factory = None @@ -58,80 +64,73 @@ def __init__(self, switch_pinout, switch_truth_table, log_filename = None): for gpio_number in self.switch_pinout ] - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"[INFO]: RFSwitch initialized on GPIO: {switch_pinout}\n") + if self.logger: + self.logger.logMessage(f"RFSwitch initialized on GPIO: {switch_pinout}", Logger.LogLevel.INFO) except BadPinFactory: self.switch_control = None - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"[ERROR]: RFSwitch not initialized on GPIO: {switch_pinout}\n") + if self.logger: + self.logger.logMessage(f"RFSwitch not initialized on GPIO: {switch_pinout}", Logger.LogLevel.ERROR) def activateRFOutput(self, rf_output): if (self.switch_control and (self.active_rf_output != rf_output or self.active_rf_output == None)): try: self.active_rf_output = rf_output - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"-------------------------------------------------\n") - file.write(f"[INFO]: RF path {rf_output} activated!\n") - file.write(f"-------------------------------------------------\n") - file.write(f"[INFO]: START OF CNANGING GPIO STATE PROCESS\n") + if self.logger: + self.logger.logMessage(f"RF path {rf_output} activated!", Logger.LogLevel.INFO, True) + self.logger.logMessage("START OF CNANGING GPIO STATE PROCESS", Logger.LogLevel.INFO) for output_gpio_obj, gpio_state in zip(self.switch_control, self.switch_truth_table[rf_output]): output_gpio_obj.value = gpio_state - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"{output_gpio_obj.pin}: {gpio_state}\n") + if self.logger: + self.logger.logMessage(f"{output_gpio_obj.pin}: {gpio_state}") except Exception: - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"[ERROR]: Unable to set state {gpio_state} for {output_gpio_obj.pin}!") - file.write(f"[INFO]: END OF CHANGING GPIO STATE\n") - file.write(f"-------------------------------------------------\n") + if self.logger: + self.logger.logMessage(f"Unable to set state {gpio_state} for {output_gpio_obj.pin}!", + Logger.LogLevel.ERROR) + self.logger.logMessage("END OF CHANGING GPIO STATE PROCESS", Logger.LogLevel.INFO, True) return False - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"[INFO]: END OF CHANGING GPIO STATE PROCESS\n") - file.write(f"-------------------------------------------------\n") - + if self.logger: + self.logger.logMessage("END OF CHANGING GPIO STATE PROCESS", Logger.LogLevel.INFO, True) + return True elif (not self.switch_control): - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"[ERROR]: RFSwitch not initialized! GPIO {self.switch_pinout} state not changed!\n") + if self.logger: + self.logger.logMessage(f"RFSwitch not initialized! GPIO {self.switch_pinout} state not changed!", + Logger.LogLevel.ERROR) return False elif (self.active_rf_output == rf_output): - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"[INFO]: Trying to activate already active RF path {rf_output}!\n") + if self.logger: + self.logger.logMessage(f"Trying to activate already active RF path {rf_output}!", Logger.LogLevel.INFO) return True -class RFSwitchWrapper(RFSwitch): +class RFSwitchWrapper(): def __init__(self, input_switch_pinout, output_switch_pinout, switch_truth_table, log_filename = None): self.input_switch = RFSwitch(input_switch_pinout, switch_truth_table, log_filename) self.output_switch = RFSwitch(output_switch_pinout, switch_truth_table, log_filename) - self.log_filename = log_filename + + if ("--show-debug-info" in sys.argv) and log_filename: + self.logger = Logger(log_filename) + else: + self.logger = None def activateRFPath(self, rf_path_index): # We activate two switches at the same time because we need to create a # path for the signal to pass through a particular filter. This is # achieved by sending the output signal to the input switch, passing # it through a filter and then exiting through the output switch - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"[INFO]: RFSwitchWrapper.activateRFPath() function called!\n") - file.write(f"[INFO]: Changing the state of the GPIO ports for each of the " - "RFSwitch is performed in a separate thread!\n") + if self.logger: + self.logger.logMessage("RFSwitchWrapper.activateRFPath() function called!", Logger.LogLevel.INFO) + self.logger.logMessage("Changing the state of the GPIO ports for each of the " + "RFSwitch is performed in a separate thread!", Logger.LogLevel.INFO) with ThreadPoolExecutor(max_workers=2) as executor: input_switch_state_change_thread = executor.submit(self.input_switch.activateRFOutput, rf_path_index) @@ -141,17 +140,11 @@ def activateRFPath(self, rf_path_index): class FilterSwitch(RFSwitchWrapper): - FILTER_INPUT_SWITCH_GPIO_PINS = [17, 27, 22] - FILTER_OUTPUT_SWITCH_GPIO_PINS = [0, 5, 6] - def enableFilter(self, filter_index): return self.activateRFPath(filter_index) class LNASwitch(RFSwitchWrapper): - LNA_INPUT_SWITCH_GPIO_PINS = [23, 24] - LNA_OUTPUT_SWITCH_GPIO_PINS = [16, 26] - def __init__(self, input_switch_pinout, output_switch_pinout, switch_truth_table, log_filename): super().__init__(input_switch_pinout, output_switch_pinout, switch_truth_table, log_filename) self.is_active = False diff --git a/ControlApplication/UserInterface.py b/ControlApplication/UserInterface.py index fe85de3..1027903 100644 --- a/ControlApplication/UserInterface.py +++ b/ControlApplication/UserInterface.py @@ -1,15 +1,20 @@ -import datetime -from Device import * import os import pickle +from Device import * +from Logger import * from whiptail import Whiptail +# Which button was pressed? OK_BUTTON = 0 CANCEL_BUTTON = 1 +# Are we reading the state of the button or the result of the execution? BUTTONS_STATE = 1 USER_CHOICE = 0 -CONFIGS_DIR = "./SavedConfiguration/" +# Absolute path to the directory with the program source files +APPLICATION_DIR = os.path.dirname(os.path.abspath(__file__)) +# Path to the directory where expansion board configuration files are saved +CONFIGS_DIR = f"{APPLICATION_DIR}/SavedConfiguration/" APPLICATION_TITLE = "rpitx-expansion-board control application" FAREWELL_MESSAGE = "Thanks for using rpitx-expansion-board project!" @@ -17,73 +22,65 @@ class UserInterface: - # List of actions available to perform for a specific device - CONFIGURATION_ACTIONS = [ - "Create a new device configuration", - "Load device configuration" - ] - - def __init__(self, devices_list, configuration_actions, log_filename = None): + def __init__(self, log_filename = None): self.whiptail_interface = Whiptail(title=APPLICATION_TITLE) - self.devices_list = devices_list - self.configuration_actions = configuration_actions self.log_filename = log_filename - if ("--show-debug-info" in sys.argv) and (log_filename != None): - self.displayInfo(f"Debug mode enabled!\nLogs will be writed to: {log_filename}") - with open(log_filename, "a") as file: - file.write(f"-------------------------------------------------\n") - file.write(f"[INFO]: Application running at: {datetime.datetime.now()}!\n") - file.write(f"-------------------------------------------------\n") + if ("--show-debug-info" in sys.argv) and log_filename: + self.logger = Logger(log_filename) + else: + self.logger = None + + if self.logger: + self.displayInfo(f"Debug mode enabled!\n\nLogs will be writed to: {log_filename}") + self.logger.logMessage(f"Application running!", Logger.LogLevel.INFO, True, True) - def chooseAction(self): - return self.whiptail_interface.menu("Choose an action:", self.configuration_actions) + def chooseItem(self, prompt, items, exit_if_cancel_pressed = False, cancel_message = None): + user_action = self.whiptail_interface.menu(prompt, items) + # button has been pressed + if (user_action[BUTTONS_STATE] == CANCEL_BUTTON): + if cancel_message: + self.displayInfo(cancel_message) + if exit_if_cancel_pressed: + self.displayFarewellMessageAndExit() + return None + + return user_action[USER_CHOICE] def displayInfo(self, info): self.whiptail_interface.msgbox(info, extra_args=["--scrolltext"]) - def chooseBoard(self): - selected_board = self.whiptail_interface.menu("Choose your board:", self.devices_list) - # button has been pressed - if (selected_board[BUTTONS_STATE] == CANCEL_BUTTON): - self.displayInfo(FAREWELL_MESSAGE) - - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"-------------------------------------------------\n") - file.write(f"[INFO]: Application stopped at: {datetime.datetime.now()}!\n") - file.write(f"-------------------------------------------------\n") - exit(0) - - return selected_board[USER_CHOICE] + def displayFarewellMessageAndExit(self): + self.displayInfo(FAREWELL_MESSAGE) + + if self.logger: + self.logger.logMessage(f"Application stopped!", Logger.LogLevel.INFO, True, True) + exit(0) - def loadDeviceConfiguration(self, selected_board): + def loadDeviceConfiguration(self): while True: - configuration_path = self.whiptail_interface.inputbox("Enter the path of the configuration file:", - os.path.join(CONFIGS_DIR, f"{selected_board}.pkl")) - - # button has been pressed - if(configuration_path[BUTTONS_STATE] == CANCEL_BUTTON): - self.displayInfo("Configuration not loaded! Please choose another board.") + configuration_files_list = [file for file in os.listdir(CONFIGS_DIR) if file.endswith(".pkl")] + + if not configuration_files_list: + self.displayInfo(f"No configuration files found in the directory: {CONFIGS_DIR}" + "\n\nPlease create a new device configuration!") + return None + + configuration_path = self.chooseItem( + "Select a configuration file:", configuration_files_list) + + if not configuration_path: return None - try: - with open(configuration_path[USER_CHOICE], 'rb') as device_configuration_file: - device = pickle.load(device_configuration_file) + with open(f"{CONFIGS_DIR}/{configuration_path}", 'rb') as device_configuration_file: + device = pickle.load(device_configuration_file) - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"[INFO]: Device configuration loaded: {configuration_path[USER_CHOICE]}\n") + if self.logger: + self.logger.logMessage(f"Device configuration loaded: {CONFIGS_DIR}/{configuration_path}", Logger.LogLevel.INFO) - self.displayInfo("Configuration loaded succesfully!") - return device + self.displayInfo("Configuration loaded succesfully!") - except FileNotFoundError: - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"[ERROR]: Configuration file {configuration_path[USER_CHOICE]} not found!\n") - # You will be prompted to enter the new file path - self.displayInfo("Configuration file not found!") + return device def saveDeviceConfiguration(self, device): os.makedirs(CONFIGS_DIR, exist_ok=True) @@ -93,11 +90,10 @@ def saveDeviceConfiguration(self, device): with open(file_path, 'wb') as device_configuration_file: pickle.dump(device, device_configuration_file) - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"[INFO]: Device configuration info saved: {file_path}\n") + if self.logger: + self.logger.logMessage(f"Device configuration info saved: {file_path}", Logger.LogLevel.INFO) - self.displayInfo(f"Configuration saved!\nFile: {file_path}") + self.displayInfo(f"Configuration saved!\n\nFile: {file_path}") def createActionsList(self, device): actions_list = [(f"Activate filter {i + 1}", f"{filter_obj.model_number}, {filter_obj.description}") @@ -128,58 +124,40 @@ def chooseBoardAction(self, device): while True: board_status = self.updateBoardInfo(active_filter, is_lna_activated, device) - action_choice = self.whiptail_interface.menu(board_status, ACTIONS_LIST) - - if (action_choice[BUTTONS_STATE] == CANCEL_BUTTON): - # button has been pressed - self.displayInfo(FAREWELL_MESSAGE) - if ("--show-debug-info" in sys.argv) and (self.log_filename != None): - with open(self.log_filename, "a") as file: - file.write(f"-------------------------------------------------\n") - file.write(f"[INFO]: Application stopped at: {datetime.datetime.now()}!\n") - file.write(f"-------------------------------------------------\n") - exit(0) - else: - user_choice = action_choice[USER_CHOICE] - - if "Activate filter" in user_choice: - filter_id = int(user_choice[-1]) - device_filter = device.filters[filter_id - 1] - active_filter = f"{filter_id} - {device_filter.model_number}, {device_filter.description}" - - if device.filter_switch.enableFilter(filter_id): - self.displayInfo(f"Filter {active_filter} enabled!") - else: - self.displayInfo("Error in device configuration!") - - elif "Toggle LNA" in user_choice: - is_lna_activated = device.lna_switch.toggleLNA() - self.displayInfo("LNA enabled!" if is_lna_activated else "LNA disabled!") + user_choice = self.chooseItem(board_status, ACTIONS_LIST, True) - def selectComponent(self, components_list, prompt): - unique_case_styles = sorted(set(component.case_style for component in components_list)) - case_style_choice = self.whiptail_interface.menu(prompt, unique_case_styles) + if "Activate filter" in user_choice: + filter_id = int(user_choice[-1]) + device_filter = device.filters[filter_id - 1] + active_filter = f"{filter_id} - {device_filter.model_number}, {device_filter.description}" - if case_style_choice[BUTTONS_STATE] == CANCEL_BUTTON: - self.displayInfo(CONFIGURATION_CREATED_ABORTED) - return None + if device.filter_switch.enableFilter(filter_id): + self.displayInfo(f"Filter {active_filter} enabled!") + else: + self.displayInfo("Error in device configuration!") + + elif "Toggle LNA" in user_choice: + is_lna_activated = device.lna_switch.toggleLNA() + self.displayInfo("LNA enabled!" if is_lna_activated else "LNA disabled!") - selected_case_style = case_style_choice[USER_CHOICE] + def selectComponent(self, components_list, prompt): + unique_case_styles = sorted(set(component.case_style for component in components_list)) + selected_case_style = self.chooseItem(prompt, unique_case_styles, False, CONFIGURATION_CREATED_ABORTED) + + if not selected_case_style: + return None available_model_numbers = [component.model_number for component in components_list if component.case_style == selected_case_style] - model_choice = self.whiptail_interface.menu(f"Available models for '{selected_case_style}' case:", available_model_numbers) - - if model_choice[BUTTONS_STATE] == CANCEL_BUTTON: - self.displayInfo(CONFIGURATION_CREATED_ABORTED) + selected_model_number = self.chooseItem(f"Available models for '{selected_case_style}' case:", available_model_numbers, False, CONFIGURATION_CREATED_ABORTED) + + if not selected_model_number: return None - selected_model_number = model_choice[USER_CHOICE] - for component in components_list: if (component.model_number == selected_model_number) and (component.case_style == selected_case_style): return component - def createConfiguration(self, selected_board, filter_objects, amplifier_objects): + def createDeviceConfiguration(self, selected_board, filter_objects, amplifier_objects): device = Device(selected_board, self.log_filename) for i in range(device.DEVICE_TYPE_MAPPING[selected_board][0]): @@ -194,5 +172,4 @@ def createConfiguration(self, selected_board, filter_objects, amplifier_objects) return None device.lna.append(selected_amplifier) - return device - + return device \ No newline at end of file diff --git a/ControlApplication/main.py b/ControlApplication/main.py index cdd84c0..f9b269b 100644 --- a/ControlApplication/main.py +++ b/ControlApplication/main.py @@ -1,25 +1,64 @@ - from colorama import Fore, Style from concurrent.futures import ThreadPoolExecutor from Device import * -from RFSwitch import * from UserInterface import * from Components import * -FILTERS_SWITCH_TRUTH_TABLE = 1 -LNA_SWITCH_TRUTH_TABLE = 2 - -AMPLIFIER_MODELS_DIR = "./AmplifiersList" -FILTER_MODELS_DIR = "./FiltersList" +# Information related to the configuration of RF filter switches +FILTER_MODELS_DIR = f"{APPLICATION_DIR}/FiltersList" FILTER_DUMP_FILE = "FiltersListDump.pkl" +FILTER_INPUT_SWITCH_GPIO_PINS = [17, 27, 22] +FILTER_OUTPUT_SWITCH_GPIO_PINS = [0, 5, 6] + +# Information related to the configuration of LNA switches +AMPLIFIER_MODELS_DIR = f"{APPLICATION_DIR}/AmplifiersList" AMPLIFIER_DUMP_FILE = "AmplifierDump.pkl" +LNA_INPUT_SWITCH_GPIO_PINS = [23, 24] +LNA_OUTPUT_SWITCH_GPIO_PINS = [16, 26] -LOG_FILENAME = "./ControlApplication/DebugInfo.log" +# Log file save location +LOG_FILENAME = f"{APPLICATION_DIR}/DebugInfo.log" -APP_VERISON = 0.2 +# List of actions available to perform for a specific device +APPLICATION_ACTIONS = ["Create a new device configuration", "Load device configuration"] + +APP_VERISON = 0.3 # ----------------------------------------------------------- # Changelog: # ----------------------------------------------------------- +# Version 0.3: +# ----------------------------------------------------------- +# The FiltersList and AmplifiersList directories have been +# moved to the ControlApplication directory. Now, dumps of +# components and configurations of expansion boards are saved +# taking into account the parent directory in which the +# application source files are located. +# +# The application execution order has been changed. Now, when +# you start the program, you are asked to choose one of two +# possible actions: creating a new device configuration or +# loading an existing one. +# When creating a new configuration, you are prompted to +# select the type of device for which the configuration is +# being created. +# When loading an existing configuration, you are prompted to +# select a configuration file from the list (the board type is +# determined automatically). If the application does not find +# information about saved configurations, an information message +# is displayed, after which you will be returned to the main menu. +# +# Checking for the presence of .csv files of component models +# When the program starts, it checks whether .csv files exist in +# the FiltersList and AmplifiersList directories. These files are +# used to build a list of available components when creating a +# device configuration. If it is not possible to load the +# previously created .pkl dump of the list of components and the +# necessary .csv files to build the list of components are +# missing, an error message is displayed in the console and the +# program ends. +# +# Code refactoring. +# ----------------------------------------------------------- # Version 0.2: # ----------------------------------------------------------- # When the activateRFPath() function is called, @@ -29,13 +68,16 @@ # of the RFSwitch objects - first the input switch was switched, # then the output switch. Now, we switch the states of two # switches simultaneously. +# # The ComponentsList constructors are now called in separate # threads, which allows reading data about filters and # amplifiers in parallel rather than sequentially. # When creating a ComponentsList object, each of the .csv files # is now processed in a separate thread. +# # Added highlighting of application start and stop timestamp # using '-' signs. +# # whiptail_interface.msgbox() is now called with an additional # --scrolltext parameter # ----------------------------------------------------------- @@ -59,7 +101,7 @@ def main(): showHelpInfo(APP_VERISON, LOG_FILENAME) exit(0) - user_interface = UserInterface(Device.DEVICES_LIST, UserInterface.CONFIGURATION_ACTIONS, LOG_FILENAME) + user_interface = UserInterface(LOG_FILENAME) # Initializing available filter and amplifier models with ThreadPoolExecutor(max_workers=2) as executor: @@ -70,42 +112,36 @@ def main(): amplifiers_list = amplifiers_future.result() while True: - board = user_interface.chooseBoard() - action = user_interface.chooseAction() + user_action = user_interface.chooseItem("Choose an action:", APPLICATION_ACTIONS, True) - # button has been pressed - # You will be prompted to select your device again - if (action[BUTTONS_STATE] == CANCEL_BUTTON): - continue - # "Create a new device configuration" has been choosen - elif (action[USER_CHOICE] == UserInterface.CONFIGURATION_ACTIONS[0]): - device = user_interface.createConfiguration(board, filters_list.data, amplifiers_list.data) - if device == None: + if (user_action == APPLICATION_ACTIONS[0]): + board = user_interface.chooseItem("Choose your board:", Device.SUPPORTED_DEVICES) + # button has been pressed + if board is None: + continue + device = user_interface.createDeviceConfiguration(board, filters_list.data, amplifiers_list.data) + if device is None: continue user_interface.saveDeviceConfiguration(device) # "Load device configuration" has been choosen - if (action[USER_CHOICE] == UserInterface.CONFIGURATION_ACTIONS[1]): - device = user_interface.loadDeviceConfiguration(board) - if device == None: + elif (user_action == APPLICATION_ACTIONS[1]): + device = user_interface.loadDeviceConfiguration() + # button has been pressed or configuration files are missing + if device is None: continue + + # RF switches are initialized for all types of expansion boards + device.initFilterRFSwitches(FILTER_INPUT_SWITCH_GPIO_PINS, FILTER_OUTPUT_SWITCH_GPIO_PINS) - device.initFilterRFSwitches(FilterSwitch.FILTER_INPUT_SWITCH_GPIO_PINS, - FilterSwitch.FILTER_OUTPUT_SWITCH_GPIO_PINS, - Device.DEVICE_TYPE_MAPPING[board][FILTERS_SWITCH_TRUTH_TABLE]) - - # The LNA will only be initialized if the DEVICE_TYPE_MAPPING structure - # contains switch information to control the LNA - device.initLNA(LNASwitch.LNA_INPUT_SWITCH_GPIO_PINS, - LNASwitch.LNA_OUTPUT_SWITCH_GPIO_PINS, - Device.DEVICE_TYPE_MAPPING[board][LNA_SWITCH_TRUTH_TABLE]) + # The LNA will only be initialized if the currently selected expansion board supports it + device.initLNA(LNA_INPUT_SWITCH_GPIO_PINS, LNA_OUTPUT_SWITCH_GPIO_PINS) # Displaying text information about the active device configuration user_interface.displayInfo(device.getConfigurationInfo()) # The main application menu, allowing you to select and enable a specific filter or toogle LNA state user_interface.chooseBoardAction(device) - # End of while loop if __name__ == "__main__": main() \ No newline at end of file diff --git a/README.md b/README.md index fc6736e..736ed30 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ Expansion board for **Raspberry Pi 4 Model B** that eliminates the need for a di The expansion board is installed by connecting it to the 40-pin Raspberry Pi header and can be fixed additionally by connecting holes on the Raspberry Pi and the expansion board. ## Current development progress: -[![Progress](https://img.shields.io/badge/rpitx--expansion--board-not%20tested-red.svg?longCache=true&style=for-the-badge)](https://easyeda.com/IgrikXD/rpitx-expansion-board) [![Progress](https://img.shields.io/badge/app%20version-0.2-blue.svg?longCache=true&style=for-the-badge)](./ControlApplication) [![Progress](https://img.shields.io/badge/pcb%20version-0.0-blue.svg?longCache=true&style=for-the-badge)](./EasyEDA) +[![Progress](https://img.shields.io/badge/rpitx--expansion--board-not%20tested-red.svg?longCache=true&style=for-the-badge)](https://easyeda.com/IgrikXD/rpitx-expansion-board) [![Progress](https://img.shields.io/badge/app%20version-0.3-blue.svg?longCache=true&style=for-the-badge)](./ControlApplication) [![Progress](https://img.shields.io/badge/pcb%20version-0.0-blue.svg?longCache=true&style=for-the-badge)](./EasyEDA) ## Application usage: ```sh