Skip to content

Commit

Permalink
[Application] rpitx-control-0.3 application release (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
IgrikXD authored Oct 31, 2023
1 parent 6163918 commit 17f28fa
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 238 deletions.
File renamed without changes.
107 changes: 65 additions & 42 deletions ControlApplication/Components.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from concurrent.futures import ThreadPoolExecutor
import os
import pandas
import pickle
import sys
from colorama import Fore, Style
from concurrent.futures import ThreadPoolExecutor
from Logger import *

class BaseModel:
def __init__(self, model_number, case_style, description):
Expand Down Expand Up @@ -33,60 +35,81 @@ class ComponentsList:

def __init__(self, model_type, models_dir, dump_filename, log_filename = None):
self.model_type = model_type
self.dump_file_path = os.path.join(models_dir, dump_filename)
self.log_filename = log_filename
if ("--show-debug-info" in sys.argv) and log_filename:
self.logger = Logger(log_filename)
else:
self.logger = None

if os.path.exists(self.dump_file_path):
self.data = self.loadDump(self.dump_file_path)
dump_file_path = os.path.join(models_dir, dump_filename)

if os.path.exists(dump_file_path):
self.data = self.__loadDump(dump_file_path)
else:
self.data = self.getModelsList(models_dir)
self.saveDump(self.dump_file_path)
self.data = self.__getModelsList(models_dir)
if self.data is not None:
self.__saveDump(dump_file_path)
else:
init_error_info = (
f"{Fore.RED}{model_type} components initialization error!{Style.RESET_ALL}\n"
f"Make sure that the {Fore.YELLOW}{models_dir}{Style.RESET_ALL} directory contains .csv files describing the available components!"
)
print(init_error_info)
exit(1)

def getModelsList(self, models_dir):
def __getModelsList(self, models_dir):
models = []

file_list = [filename for filename in os.listdir(models_dir) if filename.endswith('.csv')]

def processCsvFile(csv_file_path):
df = pandas.read_csv(csv_file_path)
for _, row in df.iterrows():
if self.model_type == self.FILTER:
model = Filter(
row['Model Number'],
row['Case Style'],
row['Description'],
row['Filter Type'],
row['Passband F1 (MHz)'],
row['Passband F2 (MHz)'],
row['Stopband F3 (MHz)'],
row['Stopband F4 (MHz)']
)
elif self.model_type == self.AMPLIFIER:
model = Amplifier(
row['Model Number'],
row['Case Style'],
row['Subcategories'],
row['F Low (MHz)'],
row['F High (MHz)'],
row['Gain (dB) Typ.']
)

models.append(model)
if not file_list:
if self.logger:
self.logger.logMessage(f"{self.model_type} model .csv files are missing!", Logger.LogLevel.ERROR)
return None

with ThreadPoolExecutor() as executor:
executor.map(processCsvFile, [os.path.join(models_dir, filename) for filename in file_list])
for model_list in executor.map(self.__processCsvFile, [os.path.join(models_dir, filename) for filename in file_list]):
models.extend(model_list)

return models

def loadDump(self, dump_file_path):
def __loadDump(self, dump_file_path):
with open(dump_file_path, 'rb') as model_list_dump_file:
if ("--show-debug-info" in sys.argv) and (self.log_filename != None):
with open(self.log_filename, "a") as file:
file.write(f"[INFO]: Dump loaded: {dump_file_path}\n")
if self.logger:
self.logger.logMessage(f"Dump loaded: {dump_file_path}", Logger.LogLevel.INFO)
return pickle.load(model_list_dump_file)

def saveDump(self, dump_file_path):
def __processCsvFile(self, csv_file_path):
models_list = []

df = pandas.read_csv(csv_file_path)
for _, row in df.iterrows():
if (self.model_type == self.FILTER):
model = Filter(
row['Model Number'],
row['Case Style'],
row['Description'],
row['Filter Type'],
row['Passband F1 (MHz)'],
row['Passband F2 (MHz)'],
row['Stopband F3 (MHz)'],
row['Stopband F4 (MHz)']
)
elif (self.model_type == self.AMPLIFIER):
model = Amplifier(
row['Model Number'],
row['Case Style'],
row['Subcategories'],
row['F Low (MHz)'],
row['F High (MHz)'],
row['Gain (dB) Typ.']
)

models_list.append(model)

return models_list

def __saveDump(self, dump_file_path):
with open(dump_file_path, 'wb') as model_list_dump_file:
pickle.dump(self.data, model_list_dump_file)
if ("--show-debug-info" in sys.argv) and (self.log_filename != None):
with open(self.log_filename, "a") as file:
file.write(f"[INFO]: Dump saved: {dump_file_path}\n")
if self.logger:
self.logger.logMessage(f"Dump saved: {dump_file_path}", Logger.LogLevel.INFO)
31 changes: 19 additions & 12 deletions ControlApplication/Device.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

class Device:
# List of available device for operation
DEVICES_LIST = [
SUPPORTED_DEVICES = [
"rpitx-expansion-board-SP3T",
"rpitx-expansion-board-SP4T",
"rpitx-expansion-board-SP6T",
Expand All @@ -15,15 +15,18 @@ class Device:
# <DEVICE> : (<NUMBER OF AVAILABLE FILTERS>, <FILTER SWITCH TRUTH TABLE>, <LNA SWITCH THRUTH TABLE>)
DEVICE_TYPE_MAPPING = {
# Boards without LNA
DEVICES_LIST[0]: (3, RFSwitch.SP3T_SWITCH_TRUTH_TABLE, None),
DEVICES_LIST[1]: (4, RFSwitch.SP4T_SWITCH_TRUTH_TABLE, None),
DEVICES_LIST[2]: (6, RFSwitch.SP6T_SWITCH_TRUTH_TABLE, None),
SUPPORTED_DEVICES[0]: (3, RFSwitch.SP3T_SWITCH_TRUTH_TABLE, None),
SUPPORTED_DEVICES[1]: (4, RFSwitch.SP4T_SWITCH_TRUTH_TABLE, None),
SUPPORTED_DEVICES[2]: (6, RFSwitch.SP6T_SWITCH_TRUTH_TABLE, None),
# Boards with LNA
DEVICES_LIST[3]: (3, RFSwitch.SP3T_SWITCH_TRUTH_TABLE, RFSwitch.SPDT_SWITCH_TRUTH_TABLE),
DEVICES_LIST[4]: (4, RFSwitch.SP4T_SWITCH_TRUTH_TABLE, RFSwitch.SPDT_SWITCH_TRUTH_TABLE),
DEVICES_LIST[5]: (6, RFSwitch.SP6T_SWITCH_TRUTH_TABLE, RFSwitch.SPDT_SWITCH_TRUTH_TABLE)
SUPPORTED_DEVICES[3]: (3, RFSwitch.SP3T_SWITCH_TRUTH_TABLE, RFSwitch.SPDT_SWITCH_TRUTH_TABLE),
SUPPORTED_DEVICES[4]: (4, RFSwitch.SP4T_SWITCH_TRUTH_TABLE, RFSwitch.SPDT_SWITCH_TRUTH_TABLE),
SUPPORTED_DEVICES[5]: (6, RFSwitch.SP6T_SWITCH_TRUTH_TABLE, RFSwitch.SPDT_SWITCH_TRUTH_TABLE)
}

FILTERS_SWITCH_TRUTH_TABLE = 1
LNA_SWITCH_TRUTH_TABLE = 2

def __init__(self, model_name, log_filename = None):
self.model_name = model_name
self.filters = []
Expand All @@ -32,15 +35,19 @@ def __init__(self, model_name, log_filename = None):
self.lna_switch = None
self.log_filename = log_filename

def initFilterRFSwitches(self, input_switch_pinout, output_switch_pinout, switch_truth_table):
def initFilterRFSwitches(self, input_switch_pinout, output_switch_pinout):
if self.filter_switch is None:
self.filter_switch = FilterSwitch(input_switch_pinout, output_switch_pinout, switch_truth_table, self.log_filename)
self.filter_switch = FilterSwitch(input_switch_pinout, output_switch_pinout,
Device.DEVICE_TYPE_MAPPING[self.model_name][self.FILTERS_SWITCH_TRUTH_TABLE],
self.log_filename)

def initLNA(self, input_switch_pinout, output_switch_pinout, switch_truth_table):
def initLNA(self, input_switch_pinout, output_switch_pinout):
switch_truth_table = Device.DEVICE_TYPE_MAPPING[self.model_name][self.LNA_SWITCH_TRUTH_TABLE]
if switch_truth_table and self.lna_switch is None:
self.lna_switch = LNASwitch(input_switch_pinout, output_switch_pinout, switch_truth_table, self.log_filename)
self.lna_switch = LNASwitch(input_switch_pinout, output_switch_pinout,
switch_truth_table,
self.log_filename)


def getConfigurationInfo(self):
delimiter = "=" * 60
configuration_info = f"{delimiter}\nActive board configuration:\n"
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
26 changes: 26 additions & 0 deletions ControlApplication/Logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import datetime
from enum import Enum

class Logger:

class LogLevel(Enum):
ERROR = 1,
INFO = 2

DELIMITER = "-" * 60

def __init__(self, log_filename):
self.log_filename = log_filename

def logMessage(self, message, level = None, use_delimiter = False, timestamp = False):
log_message = message

if timestamp:
log_message = f"[{datetime.datetime.now()}] " + log_message
if level:
log_message = f"[{level.name}] " + log_message
if use_delimiter:
log_message = f"{self.DELIMITER}\n" + log_message + f"\n{self.DELIMITER}"

with open(self.log_filename, "a") as file:
file.write(f"{log_message}\n")
89 changes: 41 additions & 48 deletions ControlApplication/RFSwitch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from gpiozero.pins.mock import MockFactory
from gpiozero import OutputDevice
from gpiozero import BadPinFactory
from Logger import *
import sys

# Aliases for high and low logic levels
Expand Down Expand Up @@ -38,13 +39,18 @@ class RFSwitch():
}

def __init__(self, switch_pinout, switch_truth_table, log_filename = None):
self.log_filename = log_filename

if ("--show-debug-info" in sys.argv) and log_filename:
self.logger = Logger(log_filename)
else:
self.logger = None

# Used BCM port numbering by default
if "--use-mock-gpio" in sys.argv:
used_pin_factory = MockFactory()
if ("--show-debug-info" in sys.argv) and (self.log_filename != None):
with open(self.log_filename, "a") as file:
file.write(f"[INFO]: gpiozero used MockFactory for GPIO operation!\n")

if self.logger:
self.logger.logMessage("gpiozero used MockFactory for GPIO operation!", Logger.LogLevel.INFO)
else:
used_pin_factory = None

Expand All @@ -58,80 +64,73 @@ def __init__(self, switch_pinout, switch_truth_table, log_filename = None):
for gpio_number in self.switch_pinout
]

if ("--show-debug-info" in sys.argv) and (self.log_filename != None):
with open(self.log_filename, "a") as file:
file.write(f"[INFO]: RFSwitch initialized on GPIO: {switch_pinout}\n")
if self.logger:
self.logger.logMessage(f"RFSwitch initialized on GPIO: {switch_pinout}", Logger.LogLevel.INFO)

except BadPinFactory:
self.switch_control = None

if ("--show-debug-info" in sys.argv) and (self.log_filename != None):
with open(self.log_filename, "a") as file:
file.write(f"[ERROR]: RFSwitch not initialized on GPIO: {switch_pinout}\n")
if self.logger:
self.logger.logMessage(f"RFSwitch not initialized on GPIO: {switch_pinout}", Logger.LogLevel.ERROR)

def activateRFOutput(self, rf_output):
if (self.switch_control and (self.active_rf_output != rf_output or self.active_rf_output == None)):
try:
self.active_rf_output = rf_output

if ("--show-debug-info" in sys.argv) and (self.log_filename != None):
with open(self.log_filename, "a") as file:
file.write(f"-------------------------------------------------\n")
file.write(f"[INFO]: RF path {rf_output} activated!\n")
file.write(f"-------------------------------------------------\n")
file.write(f"[INFO]: START OF CNANGING GPIO STATE PROCESS\n")
if self.logger:
self.logger.logMessage(f"RF path {rf_output} activated!", Logger.LogLevel.INFO, True)
self.logger.logMessage("START OF CNANGING GPIO STATE PROCESS", Logger.LogLevel.INFO)

for output_gpio_obj, gpio_state in zip(self.switch_control, self.switch_truth_table[rf_output]):
output_gpio_obj.value = gpio_state

if ("--show-debug-info" in sys.argv) and (self.log_filename != None):
with open(self.log_filename, "a") as file:
file.write(f"{output_gpio_obj.pin}: {gpio_state}\n")
if self.logger:
self.logger.logMessage(f"{output_gpio_obj.pin}: {gpio_state}")

except Exception:
if ("--show-debug-info" in sys.argv) and (self.log_filename != None):
with open(self.log_filename, "a") as file:
file.write(f"[ERROR]: Unable to set state {gpio_state} for {output_gpio_obj.pin}!")
file.write(f"[INFO]: END OF CHANGING GPIO STATE\n")
file.write(f"-------------------------------------------------\n")
if self.logger:
self.logger.logMessage(f"Unable to set state {gpio_state} for {output_gpio_obj.pin}!",
Logger.LogLevel.ERROR)
self.logger.logMessage("END OF CHANGING GPIO STATE PROCESS", Logger.LogLevel.INFO, True)

return False

if ("--show-debug-info" in sys.argv) and (self.log_filename != None):
with open(self.log_filename, "a") as file:
file.write(f"[INFO]: END OF CHANGING GPIO STATE PROCESS\n")
file.write(f"-------------------------------------------------\n")

if self.logger:
self.logger.logMessage("END OF CHANGING GPIO STATE PROCESS", Logger.LogLevel.INFO, True)

return True

elif (not self.switch_control):
if ("--show-debug-info" in sys.argv) and (self.log_filename != None):
with open(self.log_filename, "a") as file:
file.write(f"[ERROR]: RFSwitch not initialized! GPIO {self.switch_pinout} state not changed!\n")
if self.logger:
self.logger.logMessage(f"RFSwitch not initialized! GPIO {self.switch_pinout} state not changed!",
Logger.LogLevel.ERROR)
return False

elif (self.active_rf_output == rf_output):
if ("--show-debug-info" in sys.argv) and (self.log_filename != None):
with open(self.log_filename, "a") as file:
file.write(f"[INFO]: Trying to activate already active RF path {rf_output}!\n")
if self.logger:
self.logger.logMessage(f"Trying to activate already active RF path {rf_output}!", Logger.LogLevel.INFO)
return True

class RFSwitchWrapper(RFSwitch):
class RFSwitchWrapper():
def __init__(self, input_switch_pinout, output_switch_pinout, switch_truth_table, log_filename = None):
self.input_switch = RFSwitch(input_switch_pinout, switch_truth_table, log_filename)
self.output_switch = RFSwitch(output_switch_pinout, switch_truth_table, log_filename)
self.log_filename = log_filename

if ("--show-debug-info" in sys.argv) and log_filename:
self.logger = Logger(log_filename)
else:
self.logger = None

def activateRFPath(self, rf_path_index):
# We activate two switches at the same time because we need to create a
# path for the signal to pass through a particular filter. This is
# achieved by sending the output signal to the input switch, passing
# it through a filter and then exiting through the output switch
if ("--show-debug-info" in sys.argv) and (self.log_filename != None):
with open(self.log_filename, "a") as file:
file.write(f"[INFO]: RFSwitchWrapper.activateRFPath() function called!\n")
file.write(f"[INFO]: Changing the state of the GPIO ports for each of the "
"RFSwitch is performed in a separate thread!\n")
if self.logger:
self.logger.logMessage("RFSwitchWrapper.activateRFPath() function called!", Logger.LogLevel.INFO)
self.logger.logMessage("Changing the state of the GPIO ports for each of the "
"RFSwitch is performed in a separate thread!", Logger.LogLevel.INFO)

with ThreadPoolExecutor(max_workers=2) as executor:
input_switch_state_change_thread = executor.submit(self.input_switch.activateRFOutput, rf_path_index)
Expand All @@ -141,17 +140,11 @@ def activateRFPath(self, rf_path_index):

class FilterSwitch(RFSwitchWrapper):

FILTER_INPUT_SWITCH_GPIO_PINS = [17, 27, 22]
FILTER_OUTPUT_SWITCH_GPIO_PINS = [0, 5, 6]

def enableFilter(self, filter_index):
return self.activateRFPath(filter_index)

class LNASwitch(RFSwitchWrapper):

LNA_INPUT_SWITCH_GPIO_PINS = [23, 24]
LNA_OUTPUT_SWITCH_GPIO_PINS = [16, 26]

def __init__(self, input_switch_pinout, output_switch_pinout, switch_truth_table, log_filename):
super().__init__(input_switch_pinout, output_switch_pinout, switch_truth_table, log_filename)
self.is_active = False
Expand Down
Loading

0 comments on commit 17f28fa

Please sign in to comment.