Skip to content

Commit

Permalink
Draft of ap radio info stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Ketchel committed Nov 6, 2024
1 parent 9308302 commit cc45c05
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 1 deletion.
7 changes: 7 additions & 0 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
wlanpi-core (1.0.5-1.2) UNRELEASED; urgency=medium

[ Michael Ketchel ]
* Draft build of new wlan control features

-- Michael Ketchel <[email protected]> Wed, 06 Nov 2024 22:33:31 +0000

wlanpi-core (1.0.5-1) unstable; urgency=high

* API hardening by blocking iptables rule added by pi-gen
Expand Down
23 changes: 23 additions & 0 deletions wlanpi_core/api/api_v1/endpoints/network_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,3 +408,26 @@ async def disconnect_wireless_network(interface: str, network_id: int):
except Exception as ex:
log.error(ex)
return Response(content="Internal Server Error", status_code=500)


@router.get(
"/wlan/{interface}/phy",
response_model=Optional[dict[str, dict[str, any]]],
response_model_exclude_none=True,
)
@router.get(
"/wlan/phys",
response_model=Optional[dict[str, dict[str, any]]],
response_model_exclude_none=True,
)
async def get_interface_details(interface: Optional[str] = None):
"""
Gets interface details via iw.
"""
try:
return await network_service.interface_details(interface)
except ValidationError as ve:
return Response(content=ve.error_msg, status_code=ve.status_code)
except Exception as ex:
log.error(ex)
return Response(content="Internal Server Error", status_code=500)
10 changes: 10 additions & 0 deletions wlanpi_core/services/network_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
from enum import Enum
from typing import Optional
Expand All @@ -11,6 +12,9 @@
from wlanpi_core.models.validation_error import ValidationError
from wlanpi_core.schemas import network
from wlanpi_core.schemas.network.network import SupplicantNetwork
from wlanpi_core.utils.network import (
get_interface_details,
)

"""
These are the functions used to deliver the API
Expand Down Expand Up @@ -225,3 +229,9 @@ async def current_network(
)
except ValueError as error:
raise ValidationError(f"{error}", status_code=400)


async def interface_details(
interface: Optional[str],
) -> Optional[dict[str, dict[str, any]]]:
return get_interface_details(interface)
224 changes: 223 additions & 1 deletion wlanpi_core/utils/network.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import logging
import re
import time
from typing import Any, Optional
from collections import deque
from typing import Any, Optional, Union

from wlanpi_core.models.runcommand_error import RunCommandError
from wlanpi_core.utils.general import run_command
Expand Down Expand Up @@ -85,3 +88,222 @@ def renew_dhcp(interface) -> None:
f"Failed to renew DHCP. Code:{err.return_code}, Error: {err.error_msg}"
)
return None


class WlanChannelInfo:
def __init__(
self,
band,
frequency,
channel_number,
max_tx_power,
channel_widths,
disabled=False,
radar_detection=False,
dfs_state=None,
dfs_cac_time=None,
no_ir=False,
):
self.band = band
self.frequency = frequency
self.channel_number = channel_number
self.max_tx_power = max_tx_power
self.channel_widths = channel_widths
self.disabled = disabled
self.radar_detection = radar_detection
self.dfs_state = dfs_state
self.dfs_cac_time = dfs_cac_time
self.no_ir = no_ir

def __repr__(self):
return f"Band {self.band}: {self.frequency} MHz [{self.channel_number}]"

def to_json(self):
"""Returns a JSON representation of the channel information."""
return json.dumps(self.__dict__, indent=2) # Use __dict__ to get all attributes


def parse_iw_phy_output(output):
"""Parses the output of 'iw phy <phy> channels' into a list of ChannelInfo objects."""

channels = []
current_band = None
for line in output.splitlines():
line = line.strip()

if line.startswith("Band"):
current_band = int(line.split(":")[0].split()[1])
continue

if line.startswith("*"):
match = re.match(r"\* (\d+) MHz \[(\d+)\](?: \((.*?)\))?", line)
if match:
frequency = int(match.group(1))
channel_number = int(match.group(2))
disabled = False
if match.group(3) and "disabled" in match.group(3):
disabled = True

channel_info = WlanChannelInfo(
current_band, frequency, channel_number, None, [], disabled
)

channels.append(channel_info)
continue

if "Maximum TX power:" in line:
channels[-1].max_tx_power = float(line.split(":")[1].strip().split()[0])
if "Channel widths:" in line:
channels[-1].channel_widths = line.split(":")[1].strip().split()
if "Radar detection" in line:
channels[-1].radar_detection = True
if "DFS state:" in line:
channels[-1].dfs_state = line.split(":")[1].strip()
if "DFS CAC time:" in line:
channels[-1].dfs_cac_time = int(line.split(":")[1].strip().split()[0])
if "No IR" in line:
channels[-1].no_ir = True

return channels


def get_interface_phy_num(interface: str) -> Optional[int]:
lines = run_command(["iw", "dev", interface, "info"]).grep_stdout_for_string(
"wiphy", split=True
)
if lines:
return int(lines[0].strip().split(" ")[1])
return None


def get_phy_interface_name(phy_num: int) -> Optional[str]:
res = run_command(
["ls", f"/sys/class/ieee80211/phy{phy_num}/device/net/"], raise_on_fail=False
)
if res.success:
return res.stdout.strip()
else:
return None


def get_wlan_channels(interface: str) -> list[WlanChannelInfo]:
phy = get_interface_phy_num(interface)
if phy is None:
return []
return parse_iw_phy_output(
run_command(["iw", "phy", f"phy{phy}", "channels"]).stdout
)


def parse_indented_output(lines: Union[str, list]):
"""Parses command output based on indentation, creating nested dicts/lists."""

def process_lines(lines_deque: deque[str], current_indent=0) -> Union[dict, list]:
"""Recursively processes lines based on indentation."""
pairs = []

while len(lines_deque):
# Bail out if the next line is a higher level.
next_indent = len(lines_deque[0]) - len(lines_deque[0].lstrip())
if next_indent < current_indent:
break
if next_indent == current_indent:
line = lines_deque.popleft()
next_indent = len(lines_deque) and len(lines_deque[0]) - len(
lines_deque[0].lstrip()
)
if next_indent > current_indent:
# This line has a sublevel, so we recurse to get the value.
sub_result = process_lines(lines_deque, next_indent)
pairs.append([line.strip(), sub_result])
else:
pairs.append([line.strip(), None])
return dict(pairs)

if lines is str:
lines = lines.split("\n")
return process_lines(deque(lines))


def parse_iw_list(lines: Union[str, list]):
"""Parses iw list output based on indentation, creating nested dicts/lists."""

def process_lines(lines_deque: deque[str], current_indent=0) -> Union[dict, list]:
"""Recursively processes lines based on indentation."""
pairs = []

while len(lines_deque):
# Bail out if the next line is a higher level.
next_indent = len(lines_deque[0]) - len(lines_deque[0].lstrip())
if next_indent < current_indent:
break
if next_indent == current_indent:
line = lines_deque.popleft()
# Handle an annoying multiline output case
if line.lstrip().startswith("*"):
while len(lines_deque) and (
len(lines_deque[0]) - len(lines_deque[0].lstrip())
> current_indent
):
if not lines_deque[0].strip().startswith("*"):
line += " " + lines_deque.popleft().strip()

next_indent = len(lines_deque) and len(lines_deque[0]) - len(
lines_deque[0].lstrip()
)
if next_indent > current_indent:
# This line has a sublevel, so we recurse to get the value.
sub_result = process_lines(lines_deque, next_indent)
pairs.append([line.strip(), sub_result])
else:
pairs.append([line.strip(), None])

# Detect dict-like structure
if any(
": " in pair[0] or pair[0].rstrip().endswith(":") or pair[1] is not None
for pair in pairs
):
data = {"flags": []}
for pair in pairs:
pair[0] = pair[0].lstrip("*").lstrip()
# We already have key-value data, so it must be a pair.
if pair[1] is not None:
data[pair[0].rstrip(":")] = pair[1]
elif ": " in pair[0]:
key, value = pair[0].split(": ", maxsplit=1)
if value:
data[key] = value.strip()
else:
data["flags"].append(pair[0])
return data
# Almost definitely a list
else:
return [pair[0].lstrip("*").lstrip() for pair in pairs]

if lines is str:
lines = lines.split("\n")
return process_lines(deque(lines))


def get_interface_details(
interface: Optional[str] = None,
) -> Optional[dict[str, dict[str, any]]]:
if interface:
phy_num = get_interface_phy_num(interface=interface)
if phy_num is None:
return None
iw_list_data = parse_iw_list(
run_command(["iw", "phy", f"phy{phy_num}", "info"]).stdout.split("\n")
)
else:
iw_list_data = parse_iw_list(run_command(["iw", "list"]).stdout.split("\n"))

return {
get_phy_interface_name(k.split(" ")[1].split("phy")[1]): v
for k, v in iw_list_data.items()
if "phy" in k
}


if __name__ == "__main__":
print(json.dumps(get_interface_details()))

0 comments on commit cc45c05

Please sign in to comment.