diff --git a/debian/changelog b/debian/changelog index 5c420e8..0ed1013 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,15 +1,14 @@ -wlanpi-core (1.0.5-3.3) UNRELEASED; urgency=medium +wlanpi-core (1.0.5-4.1) UNRELEASED; urgency=medium [ Michael Ketchel ] * Draft build of new wlan control features * Fixed an api definition typo * More tweaks, and a version bump to fire dev package deploy * Increased resistance to injection attacks - - [ _ ] * Version bump for build + * Add automatic default gateway configuration for wlan api - -- _ Mon, 11 Nov 2024 21:16:01 +0000 + -- Michael Ketchel Wed, 13 Nov 2024 21:01:24 +0000 wlanpi-core (1.0.5-1) unstable; urgency=high diff --git a/wlanpi_core/api/api_v1/endpoints/network_api.py b/wlanpi_core/api/api_v1/endpoints/network_api.py index ae532bf..c7919f7 100644 --- a/wlanpi_core/api/api_v1/endpoints/network_api.py +++ b/wlanpi_core/api/api_v1/endpoints/network_api.py @@ -21,22 +21,32 @@ log = logging.getLogger("uvicorn") -def validate_wlan_interface(interface: Optional[str], required: bool = True) -> None: +def validate_wlan_interface( + interface: Optional[str], required: bool = True, raise_on_invalid: bool = True +) -> bool: if (required or interface is not None) and interface not in list_wlan_interfaces(): - raise ValidationError( - f"Invalid/unavailable interface specified: #{interface}", status_code=400 - ) + if raise_on_invalid: + raise ValidationError( + f"Invalid/unavailable interface specified: #{interface}", + status_code=400, + ) + return False + return True def validate_ethernet_interface( - interface: Optional[str], required: bool = True -) -> None: + interface: Optional[str], required: bool = True, raise_on_invalid: bool = True +) -> bool: if ( required or interface is not None ) and interface not in list_ethernet_interfaces(): - raise ValidationError( - f"Invalid/unavailable interface specified: #{interface}", status_code=400 - ) + if raise_on_invalid: + raise ValidationError( + f"Invalid/unavailable interface specified: #{interface}", + status_code=400, + ) + return False + return True ################################ diff --git a/wlanpi_core/models/network/wlan/wlan_dbus_interface.py b/wlanpi_core/models/network/wlan/wlan_dbus_interface.py index e9d7372..80b23cd 100644 --- a/wlanpi_core/models/network/wlan/wlan_dbus_interface.py +++ b/wlanpi_core/models/network/wlan/wlan_dbus_interface.py @@ -31,7 +31,12 @@ from wlanpi_core.schemas.network.network import SupplicantNetwork from wlanpi_core.utils.g_lib_loop import GLibLoop from wlanpi_core.utils.general import byte_array_to_string -from wlanpi_core.utils.network import get_ip_address, renew_dhcp +from wlanpi_core.utils.network import ( + add_default_route, + get_ip_address, + remove_default_routes, + renew_dhcp, +) class WlanDBUSInterface: @@ -302,7 +307,9 @@ def properties_changed_callback(properties): time.sleep(2) # Is sleeping here really the answer? if self.interface_name: + remove_default_routes(interface=self.interface_name) renew_dhcp(self.interface_name) + add_default_route(interface=self.interface_name) ipaddr = get_ip_address(self.interface_name) connection_events.append( network.NetworkEvent( diff --git a/wlanpi_core/utils/network.py b/wlanpi_core/utils/network.py index ab13ccd..00588c5 100644 --- a/wlanpi_core/utils/network.py +++ b/wlanpi_core/utils/network.py @@ -6,6 +6,7 @@ from typing import Any, Optional, Union from wlanpi_core.models.runcommand_error import RunCommandError +from wlanpi_core.models.validation_error import ValidationError from wlanpi_core.utils.general import run_command @@ -73,6 +74,77 @@ def get_ip_address(interface): return None +def remove_default_routes(interface: str): + """ + Removes the default route for an interface. Primarily used if you used add_default_route for the interface. + @param interface: The interface to remove routes for + @return: None + """ + + # Get existing routes for this adapter + routes: list[dict[str, Any]] = run_command( # type: ignore + ["ip", "--json", "route"] + ).output_from_json() + for route in routes: + if route["dev"] == interface and route["dst"] == "default": + run_command(["ip", "route", "del", "default", "dev", interface]) + return routes + + +def add_default_route( + interface: str, + router_address: Optional[str] = None, + metric: Optional[int] = None, +) -> str: + """ + Adds a default route to an interface + @param interface: The interface (e.g. 'wlan0') to add the route for + @param router_address: Optionally specify which IP this route is via. If left blank, it will be grabbed from the dhclient lease file. + @param metric: Optional metric for the route. If left as none, a lowest-priority metric starting at 200 will be calculated unless there are no other default routes. + @return: A string representing the new default route. + """ + + # Validate the interface to protect against arbitrarily reading fs data. + if interface not in [*list_wlan_interfaces(), *list_ethernet_interfaces()]: + raise ValidationError( + f"Invalid/unavailable interface specified: #{interface}", + status_code=400, + ) + + # Obtain the router address if not manually provided + if router_address is None: + with open(f"/var/lib/dhcp/dhclient.{interface}.leases", "r") as lease_file: + lease_data = lease_file.readlines() + router_address = ( + next((s for s in lease_data if "option routers" in s)) + .strip() + .strip(";") + .split(" ", 2)[-1] + ) + + # Calculate a new metric if needed + if metric is None: + routes: list[dict[str, Any]] = run_command( # type: ignore + ["ip", "--json", "route"] + ).output_from_json() + default_routes = [x["metric"] or 0 for x in routes if x["dst"] == "default"] + if len(default_routes): + metric = max(default_routes) + if metric < 200: + metric = 200 + else: + metric += 1 + + # Generate and set new default route + new_route = f"default via {router_address} dev {interface}" + if metric: + new_route += f" metric {metric}" + + command = ["ip", "route", "add", *new_route.split(" ")] + run_command(command) + return new_route + + def renew_dhcp(interface) -> None: """ Uses dhclient to release and request a new DHCP lease @@ -329,4 +401,4 @@ def get_interface_mac(interface: str) -> str: if __name__ == "__main__": - print(list_wlan_interfaces()) + print(add_default_route("wlan0"))