diff --git a/debian/changelog b/debian/changelog index c1f81bb..2b922f3 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,10 @@ +wlanpi-core (1.0.5-1.2) UNRELEASED; urgency=medium + + [ Michael Ketchel ] + * Draft build of new wlan control features + + -- Michael Ketchel Wed, 06 Nov 2024 22:33:31 +0000 + wlanpi-core (1.0.5-1) unstable; urgency=high * API hardening by blocking iptables rule added by pi-gen diff --git a/wlanpi_core/api/api_v1/endpoints/network_api.py b/wlanpi_core/api/api_v1/endpoints/network_api.py index 1884df4..c894d83 100644 --- a/wlanpi_core/api/api_v1/endpoints/network_api.py +++ b/wlanpi_core/api/api_v1/endpoints/network_api.py @@ -408,3 +408,26 @@ async def disconnect_wireless_network(interface: str, network_id: int): except Exception as ex: log.error(ex) return Response(content="Internal Server Error", status_code=500) + + +@router.get( + "/wlan/{interface}/phy", + response_model=Optional[dict[str, dict[str, any]]], + response_model_exclude_none=True, +) +@router.get( + "/wlan/phys", + response_model=Optional[dict[str, dict[str, any]]], + response_model_exclude_none=True, +) +async def get_interface_details(interface: Optional[str] = None): + """ + Gets interface details via iw. + """ + try: + return await network_service.interface_details(interface) + except ValidationError as ve: + return Response(content=ve.error_msg, status_code=ve.status_code) + except Exception as ex: + log.error(ex) + return Response(content="Internal Server Error", status_code=500) diff --git a/wlanpi_core/services/network_service.py b/wlanpi_core/services/network_service.py index f90012a..dc0d042 100644 --- a/wlanpi_core/services/network_service.py +++ b/wlanpi_core/services/network_service.py @@ -1,3 +1,4 @@ +import json import logging from enum import Enum from typing import Optional @@ -11,6 +12,9 @@ from wlanpi_core.models.validation_error import ValidationError from wlanpi_core.schemas import network from wlanpi_core.schemas.network.network import SupplicantNetwork +from wlanpi_core.utils.network import ( + get_interface_details, +) """ These are the functions used to deliver the API @@ -225,3 +229,9 @@ async def current_network( ) except ValueError as error: raise ValidationError(f"{error}", status_code=400) + + +async def interface_details( + interface: Optional[str], +) -> Optional[dict[str, dict[str, any]]]: + return get_interface_details(interface) diff --git a/wlanpi_core/utils/network.py b/wlanpi_core/utils/network.py index 7b2840e..29c66e3 100644 --- a/wlanpi_core/utils/network.py +++ b/wlanpi_core/utils/network.py @@ -1,6 +1,9 @@ +import json import logging +import re import time -from typing import Any, Optional +from collections import deque +from typing import Any, Optional, Union from wlanpi_core.models.runcommand_error import RunCommandError from wlanpi_core.utils.general import run_command @@ -85,3 +88,222 @@ def renew_dhcp(interface) -> None: f"Failed to renew DHCP. Code:{err.return_code}, Error: {err.error_msg}" ) return None + + +class WlanChannelInfo: + def __init__( + self, + band, + frequency, + channel_number, + max_tx_power, + channel_widths, + disabled=False, + radar_detection=False, + dfs_state=None, + dfs_cac_time=None, + no_ir=False, + ): + self.band = band + self.frequency = frequency + self.channel_number = channel_number + self.max_tx_power = max_tx_power + self.channel_widths = channel_widths + self.disabled = disabled + self.radar_detection = radar_detection + self.dfs_state = dfs_state + self.dfs_cac_time = dfs_cac_time + self.no_ir = no_ir + + def __repr__(self): + return f"Band {self.band}: {self.frequency} MHz [{self.channel_number}]" + + def to_json(self): + """Returns a JSON representation of the channel information.""" + return json.dumps(self.__dict__, indent=2) # Use __dict__ to get all attributes + + +def parse_iw_phy_output(output): + """Parses the output of 'iw phy channels' into a list of ChannelInfo objects.""" + + channels = [] + current_band = None + for line in output.splitlines(): + line = line.strip() + + if line.startswith("Band"): + current_band = int(line.split(":")[0].split()[1]) + continue + + if line.startswith("*"): + match = re.match(r"\* (\d+) MHz \[(\d+)\](?: \((.*?)\))?", line) + if match: + frequency = int(match.group(1)) + channel_number = int(match.group(2)) + disabled = False + if match.group(3) and "disabled" in match.group(3): + disabled = True + + channel_info = WlanChannelInfo( + current_band, frequency, channel_number, None, [], disabled + ) + + channels.append(channel_info) + continue + + if "Maximum TX power:" in line: + channels[-1].max_tx_power = float(line.split(":")[1].strip().split()[0]) + if "Channel widths:" in line: + channels[-1].channel_widths = line.split(":")[1].strip().split() + if "Radar detection" in line: + channels[-1].radar_detection = True + if "DFS state:" in line: + channels[-1].dfs_state = line.split(":")[1].strip() + if "DFS CAC time:" in line: + channels[-1].dfs_cac_time = int(line.split(":")[1].strip().split()[0]) + if "No IR" in line: + channels[-1].no_ir = True + + return channels + + +def get_interface_phy_num(interface: str) -> Optional[int]: + lines = run_command(["iw", "dev", interface, "info"]).grep_stdout_for_string( + "wiphy", split=True + ) + if lines: + return int(lines[0].strip().split(" ")[1]) + return None + + +def get_phy_interface_name(phy_num: int) -> Optional[str]: + res = run_command( + ["ls", f"/sys/class/ieee80211/phy{phy_num}/device/net/"], raise_on_fail=False + ) + if res.success: + return res.stdout.strip() + else: + return None + + +def get_wlan_channels(interface: str) -> list[WlanChannelInfo]: + phy = get_interface_phy_num(interface) + if phy is None: + return [] + return parse_iw_phy_output( + run_command(["iw", "phy", f"phy{phy}", "channels"]).stdout + ) + + +def parse_indented_output(lines: Union[str, list]): + """Parses command output based on indentation, creating nested dicts/lists.""" + + def process_lines(lines_deque: deque[str], current_indent=0) -> Union[dict, list]: + """Recursively processes lines based on indentation.""" + pairs = [] + + while len(lines_deque): + # Bail out if the next line is a higher level. + next_indent = len(lines_deque[0]) - len(lines_deque[0].lstrip()) + if next_indent < current_indent: + break + if next_indent == current_indent: + line = lines_deque.popleft() + next_indent = len(lines_deque) and len(lines_deque[0]) - len( + lines_deque[0].lstrip() + ) + if next_indent > current_indent: + # This line has a sublevel, so we recurse to get the value. + sub_result = process_lines(lines_deque, next_indent) + pairs.append([line.strip(), sub_result]) + else: + pairs.append([line.strip(), None]) + return dict(pairs) + + if lines is str: + lines = lines.split("\n") + return process_lines(deque(lines)) + + +def parse_iw_list(lines: Union[str, list]): + """Parses iw list output based on indentation, creating nested dicts/lists.""" + + def process_lines(lines_deque: deque[str], current_indent=0) -> Union[dict, list]: + """Recursively processes lines based on indentation.""" + pairs = [] + + while len(lines_deque): + # Bail out if the next line is a higher level. + next_indent = len(lines_deque[0]) - len(lines_deque[0].lstrip()) + if next_indent < current_indent: + break + if next_indent == current_indent: + line = lines_deque.popleft() + # Handle an annoying multiline output case + if line.lstrip().startswith("*"): + while len(lines_deque) and ( + len(lines_deque[0]) - len(lines_deque[0].lstrip()) + > current_indent + ): + if not lines_deque[0].strip().startswith("*"): + line += " " + lines_deque.popleft().strip() + + next_indent = len(lines_deque) and len(lines_deque[0]) - len( + lines_deque[0].lstrip() + ) + if next_indent > current_indent: + # This line has a sublevel, so we recurse to get the value. + sub_result = process_lines(lines_deque, next_indent) + pairs.append([line.strip(), sub_result]) + else: + pairs.append([line.strip(), None]) + + # Detect dict-like structure + if any( + ": " in pair[0] or pair[0].rstrip().endswith(":") or pair[1] is not None + for pair in pairs + ): + data = {"flags": []} + for pair in pairs: + pair[0] = pair[0].lstrip("*").lstrip() + # We already have key-value data, so it must be a pair. + if pair[1] is not None: + data[pair[0].rstrip(":")] = pair[1] + elif ": " in pair[0]: + key, value = pair[0].split(": ", maxsplit=1) + if value: + data[key] = value.strip() + else: + data["flags"].append(pair[0]) + return data + # Almost definitely a list + else: + return [pair[0].lstrip("*").lstrip() for pair in pairs] + + if lines is str: + lines = lines.split("\n") + return process_lines(deque(lines)) + + +def get_interface_details( + interface: Optional[str] = None, +) -> Optional[dict[str, dict[str, any]]]: + if interface: + phy_num = get_interface_phy_num(interface=interface) + if phy_num is None: + return None + iw_list_data = parse_iw_list( + run_command(["iw", "phy", f"phy{phy_num}", "info"]).stdout.split("\n") + ) + else: + iw_list_data = parse_iw_list(run_command(["iw", "list"]).stdout.split("\n")) + + return { + get_phy_interface_name(k.split(" ")[1].split("phy")[1]): v + for k, v in iw_list_data.items() + if "phy" in k + } + + +if __name__ == "__main__": + print(json.dumps(get_interface_details()))