From f16882509a5bbf57930906862855425a2188aef3 Mon Sep 17 00:00:00 2001 From: Dhinak G <17605561+dhinakg@users.noreply.github.com> Date: Fri, 1 Jul 2022 00:00:29 -0400 Subject: [PATCH 1/5] Initial Linux info --- Linux.py | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 Linux.py diff --git a/Linux.py b/Linux.py new file mode 100644 index 0000000..57af5a7 --- /dev/null +++ b/Linux.py @@ -0,0 +1,52 @@ +from pathlib import Path +import subprocess + + +def quick_read(path: Path): + return path.read_text().strip() + + +def quick_read_2(path: Path, name: str): + return quick_read(path / Path(name)) + + +controller_paths = [] + + +for bus_path in Path("/sys/bus/usb/devices").iterdir(): + if not bus_path.stem.startswith("usb"): + continue + + controller_paths.append(bus_path.resolve().parent) + +for controller_path in controller_paths: + print(f"Processing controller {controller_path}") + + lspci_output = subprocess.run(["lspci", "-vvvmm", "-s", controller_path.stem], stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode() + lspci_output = {i.partition(":\t")[0]: i.partition("\t")[2] for i in lspci_output.splitlines() if i} + + controller = { + "name": lspci_output["Device"], + "identifiers": { + "bdf": [int(i, 16) for i in [controller_path.name[5:7], controller_path.name[8:10], controller_path.suffix[1]]], + "pci_id": [quick_read_2(controller_path, "vendor")[2:], quick_read_2(controller_path, "device")[2:]], + }, + } + + if (controller_path / Path("subsystem_vendor")).exists() and (controller_path / Path("subsystem_device")).exists(): + controller["identifiers"]["pci_id"] += [quick_read_2(controller_path, "subsystem_vendor")[2:], quick_read_2(controller_path, "subsystem_device")[2:]] + + if (controller_path / Path("firmware_node/path")).exists(): + controller["identifiers"]["acpi_path"] = quick_read_2(controller_path, "firmware_node/path") + print(controller) + + + for f in controller_path.iterdir(): + if not f.is_file(): + continue + print(f) + try: + print(f.read_text()) + except: + print(f"{f} failed") + break From 7ffec45eaf4d2ccac658f48a635dd4026465aad7 Mon Sep 17 00:00:00 2001 From: Dhinak G <17605561+dhinakg@users.noreply.github.com> Date: Thu, 11 Aug 2022 10:41:30 -0400 Subject: [PATCH 2/5] More initial work --- Linux.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/Linux.py b/Linux.py index 57af5a7..6918392 100644 --- a/Linux.py +++ b/Linux.py @@ -1,5 +1,7 @@ from pathlib import Path import subprocess +from Scripts import iokit, shared +from base import BaseUSBMap def quick_read(path: Path): @@ -10,13 +12,15 @@ def quick_read_2(path: Path, name: str): return quick_read(path / Path(name)) -controller_paths = [] +controller_paths: list[Path] = [] for bus_path in Path("/sys/bus/usb/devices").iterdir(): + # Only look at buses if not bus_path.stem.startswith("usb"): continue + # The parent of the bus is the controller controller_paths.append(bus_path.resolve().parent) for controller_path in controller_paths: @@ -31,22 +35,20 @@ def quick_read_2(path: Path, name: str): "bdf": [int(i, 16) for i in [controller_path.name[5:7], controller_path.name[8:10], controller_path.suffix[1]]], "pci_id": [quick_read_2(controller_path, "vendor")[2:], quick_read_2(controller_path, "device")[2:]], }, + "ports": [] } + if (controller_path / Path("revision")).exists(): + controller["identifiers"]["pci_revision"] = int(quick_read(controller_path / Path("revision")), 16) + if (controller_path / Path("subsystem_vendor")).exists() and (controller_path / Path("subsystem_device")).exists(): controller["identifiers"]["pci_id"] += [quick_read_2(controller_path, "subsystem_vendor")[2:], quick_read_2(controller_path, "subsystem_device")[2:]] if (controller_path / Path("firmware_node/path")).exists(): controller["identifiers"]["acpi_path"] = quick_read_2(controller_path, "firmware_node/path") - print(controller) - - - for f in controller_path.iterdir(): - if not f.is_file(): - continue - print(f) - try: - print(f.read_text()) - except: - print(f"{f} failed") - break + + controller["class"] = shared.USBControllerTypes(int(quick_read(controller_path / Path("class")), 16) & 0xff) + + # Enumerate the buses + for hub in controller_path.glob("usb*"): + From dabafb50eff3cf4dde96c3e10916d19038e56353 Mon Sep 17 00:00:00 2001 From: Dhinak G <17605561+dhinakg@users.noreply.github.com> Date: Fri, 12 Aug 2022 23:23:31 -0400 Subject: [PATCH 3/5] Add in the port detection --- Linux.py | 88 ++++++++++++++++++++++++++++++++++++++--------- Scripts/shared.py | 28 ++++++++++++--- 2 files changed, 95 insertions(+), 21 deletions(-) diff --git a/Linux.py b/Linux.py index 6918392..a7ddba8 100644 --- a/Linux.py +++ b/Linux.py @@ -1,54 +1,108 @@ -from pathlib import Path +import re import subprocess -from Scripts import iokit, shared +from operator import itemgetter +from pathlib import Path + from base import BaseUSBMap +from Scripts import shared def quick_read(path: Path): - return path.read_text().strip() + return path.read_text().strip("\x00").strip() def quick_read_2(path: Path, name: str): - return quick_read(path / Path(name)) + return quick_read(path / name) -controller_paths: list[Path] = [] +controller_paths: set[Path] = set() -for bus_path in Path("/sys/bus/usb/devices").iterdir(): +for bus_path in Path("../utb-samples/gwy/sys/bus/usb/devices").iterdir(): # Only look at buses if not bus_path.stem.startswith("usb"): continue # The parent of the bus is the controller - controller_paths.append(bus_path.resolve().parent) + controller_paths.add(bus_path.resolve().parent) for controller_path in controller_paths: print(f"Processing controller {controller_path}") - lspci_output = subprocess.run(["lspci", "-vvvmm", "-s", controller_path.stem], stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode() - lspci_output = {i.partition(":\t")[0]: i.partition("\t")[2] for i in lspci_output.splitlines() if i} + # lspci_output = subprocess.run(["lspci", "-vvvmm", "-s", controller_path.stem], stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode() + # lspci_output = {i.partition(":\t")[0]: i.partition("\t")[2] for i in lspci_output.splitlines() if i} controller = { - "name": lspci_output["Device"], + # "name": lspci_output["Device"], "identifiers": { "bdf": [int(i, 16) for i in [controller_path.name[5:7], controller_path.name[8:10], controller_path.suffix[1]]], "pci_id": [quick_read_2(controller_path, "vendor")[2:], quick_read_2(controller_path, "device")[2:]], }, - "ports": [] + "ports": [], } - if (controller_path / Path("revision")).exists(): - controller["identifiers"]["pci_revision"] = int(quick_read(controller_path / Path("revision")), 16) + if (controller_path / "revision").exists(): + controller["identifiers"]["pci_revision"] = int(quick_read(controller_path / "revision"), 16) - if (controller_path / Path("subsystem_vendor")).exists() and (controller_path / Path("subsystem_device")).exists(): + if (controller_path / "subsystem_vendor").exists() and (controller_path / "subsystem_device").exists(): controller["identifiers"]["pci_id"] += [quick_read_2(controller_path, "subsystem_vendor")[2:], quick_read_2(controller_path, "subsystem_device")[2:]] - if (controller_path / Path("firmware_node/path")).exists(): + if (controller_path / "firmware_node/path").exists(): controller["identifiers"]["acpi_path"] = quick_read_2(controller_path, "firmware_node/path") - controller["class"] = shared.USBControllerTypes(int(quick_read(controller_path / Path("class")), 16) & 0xff) + controller["class"] = shared.USBControllerTypes(int(quick_read_2(controller_path, "class"), 16) & 0xFF) # Enumerate the buses for hub in controller_path.glob("usb*"): - + # maxchild, speed, version + bus_number = quick_read_2(hub, "busnum") + hub_info = { + "hub_name": hub.name, + "port_count": int(quick_read_2(hub, "maxchild")), + "speed": shared.USBDeviceSpeeds.from_speed(int(quick_read_2(hub, "speed"))), + "version": quick_read_2(hub, "version"), + "ports": [], + } + print(hub_info) + # Get the ports + ports = hub.glob(f"{bus_number}-0:1.0/usb{bus_number}-port*") + for port in sorted(ports, key=lambda x: int(x.name.replace(f"usb{bus_number}-port", ""))): + port_info = { + "name": quick_read_2(port, "firmware_node/path").split(".")[-1], + "comment": None, + "index": int(port.name.replace(f"usb{bus_number}-port", "")), # Need to parse it from the name. I hate linux + "class": hub_info["speed"], # tbd + "type": None, + "guessed": None, # tbd + "connect_type": quick_read_2(port, "connect_type"), + "devices": [], + "type_c": False, + } + + if (port / "peer").exists(): + port_info["companion_info"] = re.match(r"(?Pusb\d+)-port(?P\d+)", (port / "peer").resolve().name).groupdict() + else: + port_info["companion_info"] = {"hub": "", "port": ""} + + if (port / "connector").exists(): + # I think this is only USB-C + port_info["type_c"] = True + other_ports = [i for i in (port / "connector").glob("usb*-port*") if i.resolve() != port.resolve()] + assert len(other_ports) == 1 + if (port / "peer").exists(): + assert port_info["companion_info"] == re.match(r"(?Pusb\d+)-port(?P\d+)", other_ports[0].resolve().name).groupdict() + port_info["companion_info"] = re.match(r"(?Pusb\d+)-port(?P\d+)", other_ports[0].resolve().name).groupdict() + + if (port / "device").exists(): + device = port / "device" + device_info = { + # TODO: Use lsusb? + "name": f"{quick_read_2(device, 'manufacturer')} {quick_read_2(device, 'product')}" if (device / "manufacturer").exists() else "Unknown Device", + "speed": shared.USBDeviceSpeeds.from_speed(int(quick_read_2(device, "speed"))), + "devices": [], # I'm not dealing with this rn + } + port_info["devices"].append(device_info) + + hub_info["ports"].append(port_info) + hub_info["ports"].sort(key=itemgetter("index")) + print(hub_info) diff --git a/Scripts/shared.py b/Scripts/shared.py index 2ce13c2..cefc2ea 100644 --- a/Scripts/shared.py +++ b/Scripts/shared.py @@ -1,5 +1,6 @@ # pylint: disable=invalid-name import enum +from numbers import Number import sys from time import time from typing import Callable @@ -33,6 +34,21 @@ def __str__(self) -> str: def __bool__(self) -> bool: return True + @classmethod + def from_speed(cls, speed): + if speed == 1.5: + return USBDeviceSpeeds.LowSpeed + elif speed == 12: + return USBDeviceSpeeds.FullSpeed + elif speed == 480: + return USBDeviceSpeeds.HighSpeed + elif speed == 5000: + return USBDeviceSpeeds.SuperSpeed + elif speed == 10000: + return USBDeviceSpeeds.SuperSpeedPlus + else: + return USBDeviceSpeeds.Unknown + class USBPhysicalPortTypes(enum.IntEnum): USBTypeA = 0 @@ -56,10 +72,11 @@ def __bool__(self) -> bool: class USBControllerTypes(enum.IntEnum): - UHCI = int("0x00", 16) - OHCI = int("0x10", 16) - EHCI = int("0x20", 16) - XHCI = int("0x30", 16) + UHCI = 0x00 + OHCI = 0x10 + EHCI = 0x20 + XHCI = 0x30 + USB4 = 0x40 # Futureproofing Unknown = 9999 def __str__(self) -> str: @@ -122,12 +139,15 @@ def time_it(func: Callable, text: str, *args, **kwargs): input(f"{text} took {end - start}, press enter to continue".strip()) return result + debugging = False + def debug(str): if debugging: input(f"DEBUG: {str}\nPress enter to continue") + test_mode = False and debugging if test_mode: debug_dump_path = Path(input("Debug dump path: ").strip().replace("'", "").replace('"', "")) From 6a321fce9055aca7868ba89c0e87458525aad216 Mon Sep 17 00:00:00 2001 From: Dhinak G <17605561+dhinakg@users.noreply.github.com> Date: Sat, 13 Aug 2022 23:48:32 -0400 Subject: [PATCH 4/5] Move hub enumeration to a different function --- .flake8 | 3 +- Linux.py | 116 ++++++++++++++++++++++++--------------------- Scripts/usbdump.py | 1 + 3 files changed, 66 insertions(+), 54 deletions(-) diff --git a/.flake8 b/.flake8 index 59a1801..e7b7337 100644 --- a/.flake8 +++ b/.flake8 @@ -1,2 +1,3 @@ [flake8] -ignore = E501, E203 \ No newline at end of file +extend-ignore = E203 +max-line-length = 200 diff --git a/Linux.py b/Linux.py index a7ddba8..0c162be 100644 --- a/Linux.py +++ b/Linux.py @@ -26,7 +26,67 @@ def quick_read_2(path: Path, name: str): # The parent of the bus is the controller controller_paths.add(bus_path.resolve().parent) -for controller_path in controller_paths: + +def enumerate_hub(hub: Path): + bus_number = quick_read_2(hub, "busnum") + hub_info = { + "hub_name": hub.name, + "port_count": int(quick_read_2(hub, "maxchild")), + "speed": shared.USBDeviceSpeeds.from_speed(int(quick_read_2(hub, "speed"))), + "version": quick_read_2(hub, "version"), + "ports": [], + } + print(hub_info) + # Get the ports + ports = hub.glob(f"{bus_number}-0:1.0/usb{bus_number}-port*") + for port in sorted(ports, key=lambda x: int(x.name.replace(f"usb{bus_number}-port", ""))): + port_info = { + "name": quick_read_2(port, "firmware_node/path").split(".")[-1], + "comment": None, + "index": int(port.name.replace(f"usb{bus_number}-port", "")), # Need to parse it from the name. I hate linux + "class": hub_info["speed"], # tbd + "type": None, + "guessed": None, # tbd + "connect_type": quick_read_2(port, "connect_type"), + "devices": [], + "type_c": False, + } + + if (port / "peer").exists(): + port_info["companion_info"] = re.match(r"(?Pusb\d+)-port(?P\d+)", (port / "peer").resolve().name).groupdict() + else: + port_info["companion_info"] = {"hub": "", "port": ""} + + if (port / "connector").exists(): + # I think this is only USB-C + port_info["type_c"] = True + other_ports = [i for i in (port / "connector").glob("usb*-port*") if i.resolve() != port.resolve()] + assert len(other_ports) == 1 + if (port / "peer").exists(): + assert port_info["companion_info"] == re.match(r"(?Pusb\d+)-port(?P\d+)", other_ports[0].resolve().name).groupdict() + port_info["companion_info"] = re.match(r"(?Pusb\d+)-port(?P\d+)", other_ports[0].resolve().name).groupdict() + + if (port / "device").exists(): + device = port / "device" + device_info = { + # TODO: Use lsusb? + "name": f"{quick_read_2(device, 'manufacturer')} {quick_read_2(device, 'product')}" if (device / "manufacturer").exists() else "Unknown Device", + "speed": shared.USBDeviceSpeeds.from_speed(int(quick_read_2(device, "speed"))), + "devices": [], # I'm not dealing with this rn + } + + if int(quick_read_2(device, "bDeviceClass")) == 9 or (device/"maxchild").exists(): + # This is a hub. Enumerate. + device_info["devices"] = enumerate_hub(device) + + port_info["devices"].append(device_info) + + hub_info["ports"].append(port_info) + hub_info["ports"].sort(key=itemgetter("index")) + return hub_info + + +for controller_path in sorted(controller_paths): print(f"Processing controller {controller_path}") # lspci_output = subprocess.run(["lspci", "-vvvmm", "-s", controller_path.stem], stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode() @@ -53,56 +113,6 @@ def quick_read_2(path: Path, name: str): controller["class"] = shared.USBControllerTypes(int(quick_read_2(controller_path, "class"), 16) & 0xFF) # Enumerate the buses - for hub in controller_path.glob("usb*"): + for hub in sorted(controller_path.glob("usb*")): # maxchild, speed, version - bus_number = quick_read_2(hub, "busnum") - hub_info = { - "hub_name": hub.name, - "port_count": int(quick_read_2(hub, "maxchild")), - "speed": shared.USBDeviceSpeeds.from_speed(int(quick_read_2(hub, "speed"))), - "version": quick_read_2(hub, "version"), - "ports": [], - } - print(hub_info) - # Get the ports - ports = hub.glob(f"{bus_number}-0:1.0/usb{bus_number}-port*") - for port in sorted(ports, key=lambda x: int(x.name.replace(f"usb{bus_number}-port", ""))): - port_info = { - "name": quick_read_2(port, "firmware_node/path").split(".")[-1], - "comment": None, - "index": int(port.name.replace(f"usb{bus_number}-port", "")), # Need to parse it from the name. I hate linux - "class": hub_info["speed"], # tbd - "type": None, - "guessed": None, # tbd - "connect_type": quick_read_2(port, "connect_type"), - "devices": [], - "type_c": False, - } - - if (port / "peer").exists(): - port_info["companion_info"] = re.match(r"(?Pusb\d+)-port(?P\d+)", (port / "peer").resolve().name).groupdict() - else: - port_info["companion_info"] = {"hub": "", "port": ""} - - if (port / "connector").exists(): - # I think this is only USB-C - port_info["type_c"] = True - other_ports = [i for i in (port / "connector").glob("usb*-port*") if i.resolve() != port.resolve()] - assert len(other_ports) == 1 - if (port / "peer").exists(): - assert port_info["companion_info"] == re.match(r"(?Pusb\d+)-port(?P\d+)", other_ports[0].resolve().name).groupdict() - port_info["companion_info"] = re.match(r"(?Pusb\d+)-port(?P\d+)", other_ports[0].resolve().name).groupdict() - - if (port / "device").exists(): - device = port / "device" - device_info = { - # TODO: Use lsusb? - "name": f"{quick_read_2(device, 'manufacturer')} {quick_read_2(device, 'product')}" if (device / "manufacturer").exists() else "Unknown Device", - "speed": shared.USBDeviceSpeeds.from_speed(int(quick_read_2(device, "speed"))), - "devices": [], # I'm not dealing with this rn - } - port_info["devices"].append(device_info) - - hub_info["ports"].append(port_info) - hub_info["ports"].sort(key=itemgetter("index")) - print(hub_info) + print(enumerate_hub(hub)) diff --git a/Scripts/usbdump.py b/Scripts/usbdump.py index ed85a4e..8ee8d9d 100644 --- a/Scripts/usbdump.py +++ b/Scripts/usbdump.py @@ -101,6 +101,7 @@ def get_hub_type(port): def get_hub_by_name(name): return hub_map.get(name) + # TODO: Figure out how to deal with the hub name not matching def get_companion_port(port): return ([i for i in hub_map.get(port["companion_info"]["hub"], {"ports": []})["ports"] if i["index"] == port["companion_info"]["port"]] or [None])[0] From f7d1de0d73909702cdbf4c4ab7ac7d8774e43605 Mon Sep 17 00:00:00 2001 From: Dhinak G <17605561+dhinakg@users.noreply.github.com> Date: Wed, 12 Jul 2023 11:44:16 -0400 Subject: [PATCH 5/5] Somewhat working --- Linux.py | 189 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 103 insertions(+), 86 deletions(-) diff --git a/Linux.py b/Linux.py index 0c162be..e8511e3 100644 --- a/Linux.py +++ b/Linux.py @@ -1,3 +1,4 @@ +import copy import re import subprocess from operator import itemgetter @@ -15,104 +16,120 @@ def quick_read_2(path: Path, name: str): return quick_read(path / name) -controller_paths: set[Path] = set() - - -for bus_path in Path("../utb-samples/gwy/sys/bus/usb/devices").iterdir(): - # Only look at buses - if not bus_path.stem.startswith("usb"): - continue - - # The parent of the bus is the controller - controller_paths.add(bus_path.resolve().parent) - - -def enumerate_hub(hub: Path): - bus_number = quick_read_2(hub, "busnum") - hub_info = { - "hub_name": hub.name, - "port_count": int(quick_read_2(hub, "maxchild")), - "speed": shared.USBDeviceSpeeds.from_speed(int(quick_read_2(hub, "speed"))), - "version": quick_read_2(hub, "version"), - "ports": [], - } - print(hub_info) - # Get the ports - ports = hub.glob(f"{bus_number}-0:1.0/usb{bus_number}-port*") - for port in sorted(ports, key=lambda x: int(x.name.replace(f"usb{bus_number}-port", ""))): - port_info = { - "name": quick_read_2(port, "firmware_node/path").split(".")[-1], - "comment": None, - "index": int(port.name.replace(f"usb{bus_number}-port", "")), # Need to parse it from the name. I hate linux - "class": hub_info["speed"], # tbd - "type": None, - "guessed": None, # tbd - "connect_type": quick_read_2(port, "connect_type"), - "devices": [], - "type_c": False, +class LinuxUSBMap(BaseUSBMap): + def enumerate_hub(self, hub: Path): + bus_number = quick_read_2(hub, "busnum") + hub_info = { + "hub_name": hub.name, + "port_count": int(quick_read_2(hub, "maxchild")), + "speed": shared.USBDeviceSpeeds.from_speed(int(quick_read_2(hub, "speed"))), + "version": quick_read_2(hub, "version"), + "ports": [], } - if (port / "peer").exists(): - port_info["companion_info"] = re.match(r"(?Pusb\d+)-port(?P\d+)", (port / "peer").resolve().name).groupdict() - else: - port_info["companion_info"] = {"hub": "", "port": ""} + # Get the ports + ports = hub.glob(f"{bus_number}-0:1.0/usb{bus_number}-port*") + for i, port in enumerate(sorted(ports, key=lambda x: int(x.name.replace(f"usb{bus_number}-port", "")))): + port_info = { + "name": quick_read_2(port, "firmware_node/path").split(".")[-1] if (port / "firmware_node/path").exists() else f"Port {i}", + "comment": None, + "index": int(port.name.replace(f"usb{bus_number}-port", "")), # Need to parse it from the name. I hate linux + "class": hub_info["speed"], # tbd + "type": None, + "guessed": None, # tbd + "connect_type": quick_read_2(port, "connect_type"), + "devices": [], + "type_c": False, + "path": str(port), + } - if (port / "connector").exists(): - # I think this is only USB-C - port_info["type_c"] = True - other_ports = [i for i in (port / "connector").glob("usb*-port*") if i.resolve() != port.resolve()] - assert len(other_ports) == 1 if (port / "peer").exists(): - assert port_info["companion_info"] == re.match(r"(?Pusb\d+)-port(?P\d+)", other_ports[0].resolve().name).groupdict() - port_info["companion_info"] = re.match(r"(?Pusb\d+)-port(?P\d+)", other_ports[0].resolve().name).groupdict() - - if (port / "device").exists(): - device = port / "device" - device_info = { - # TODO: Use lsusb? - "name": f"{quick_read_2(device, 'manufacturer')} {quick_read_2(device, 'product')}" if (device / "manufacturer").exists() else "Unknown Device", - "speed": shared.USBDeviceSpeeds.from_speed(int(quick_read_2(device, "speed"))), - "devices": [], # I'm not dealing with this rn + port_info["companion_info"] = re.match(r"(?Pusb\d+)-port(?P\d+)", (port / "peer").resolve().name).groupdict() + else: + port_info["companion_info"] = {"hub": "", "port": ""} + + if (port / "connector").exists(): + # I think this is only USB-C + port_info["type_c"] = True + other_ports = [i for i in (port / "connector").glob("usb*-port*") if i.resolve() != port.resolve()] + assert len(other_ports) == 1 + if (port / "peer").exists(): + assert port_info["companion_info"] == re.match(r"(?Pusb\d+)-port(?P\d+)", other_ports[0].resolve().name).groupdict() + port_info["companion_info"] = re.match(r"(?Pusb\d+)-port(?P\d+)", other_ports[0].resolve().name).groupdict() + + if (port / "device").exists(): + device = port / "device" + device_info = { + # TODO: Use lsusb? + "name": f"{quick_read_2(device, 'manufacturer')} {quick_read_2(device, 'product')}" if (device / "manufacturer").exists() else "Unknown Device", + "speed": shared.USBDeviceSpeeds.from_speed(int(quick_read_2(device, "speed"))), + "devices": [], # I'm not dealing with this rn + } + + if int(quick_read_2(device, "bDeviceClass"), 16) == 9 or (device / "maxchild").exists(): + # This is a hub. Enumerate. + device_info["devices"] = self.enumerate_hub(device) + + port_info["devices"].append(device_info) + + hub_info["ports"].append(port_info) + hub_info["ports"].sort(key=itemgetter("index")) + return hub_info + + def get_controllers(self): + controller_paths: set[Path] = set() + + for bus_path in Path("/sys/bus/usb/devices").iterdir(): + # Only look at buses + if not bus_path.stem.startswith("usb"): + continue + + # The parent of the bus is the controller + controller_paths.add(bus_path.resolve().parent) + + controllers = [] + + for controller_path in sorted(controller_paths): + print(f"Processing controller {controller_path}") + + lspci_output = subprocess.run(["lspci", "-vvvmm", "-s", controller_path.stem], stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode() + lspci_output = {i.partition(":\t")[0]: i.partition("\t")[2] for i in lspci_output.splitlines() if i} + + controller = { + "name": lspci_output["Device"], + "identifiers": { + "bdf": [int(i, 16) for i in [controller_path.name[5:7], controller_path.name[8:10], controller_path.suffix[1]]], + "pci_id": [quick_read_2(controller_path, "vendor")[2:], quick_read_2(controller_path, "device")[2:]], + }, + "ports": [], } - if int(quick_read_2(device, "bDeviceClass")) == 9 or (device/"maxchild").exists(): - # This is a hub. Enumerate. - device_info["devices"] = enumerate_hub(device) - - port_info["devices"].append(device_info) + if (controller_path / "revision").exists(): + controller["identifiers"]["pci_revision"] = int(quick_read(controller_path / "revision"), 16) - hub_info["ports"].append(port_info) - hub_info["ports"].sort(key=itemgetter("index")) - return hub_info + if (controller_path / "subsystem_vendor").exists() and (controller_path / "subsystem_device").exists(): + controller["identifiers"]["pci_id"] += [quick_read_2(controller_path, "subsystem_vendor")[2:], quick_read_2(controller_path, "subsystem_device")[2:]] + if (controller_path / "firmware_node/path").exists(): + controller["identifiers"]["acpi_path"] = quick_read_2(controller_path, "firmware_node/path") -for controller_path in sorted(controller_paths): - print(f"Processing controller {controller_path}") + controller["class"] = shared.USBControllerTypes(int(quick_read_2(controller_path, "class"), 16) & 0xFF) - # lspci_output = subprocess.run(["lspci", "-vvvmm", "-s", controller_path.stem], stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode() - # lspci_output = {i.partition(":\t")[0]: i.partition("\t")[2] for i in lspci_output.splitlines() if i} + # Enumerate the buses + for hub in sorted(controller_path.glob("usb*")): + # maxchild, speed, version + controller |= self.enumerate_hub(hub) - controller = { - # "name": lspci_output["Device"], - "identifiers": { - "bdf": [int(i, 16) for i in [controller_path.name[5:7], controller_path.name[8:10], controller_path.suffix[1]]], - "pci_id": [quick_read_2(controller_path, "vendor")[2:], quick_read_2(controller_path, "device")[2:]], - }, - "ports": [], - } + controllers.append(controller) - if (controller_path / "revision").exists(): - controller["identifiers"]["pci_revision"] = int(quick_read(controller_path / "revision"), 16) - - if (controller_path / "subsystem_vendor").exists() and (controller_path / "subsystem_device").exists(): - controller["identifiers"]["pci_id"] += [quick_read_2(controller_path, "subsystem_vendor")[2:], quick_read_2(controller_path, "subsystem_device")[2:]] + self.controllers = controllers + if not self.controllers_historical: + self.controllers_historical = copy.deepcopy(self.controllers) + else: + self.merge_controllers(self.controllers_historical, self.controllers) - if (controller_path / "firmware_node/path").exists(): - controller["identifiers"]["acpi_path"] = quick_read_2(controller_path, "firmware_node/path") + def update_devices(self): + self.get_controllers() - controller["class"] = shared.USBControllerTypes(int(quick_read_2(controller_path, "class"), 16) & 0xFF) - # Enumerate the buses - for hub in sorted(controller_path.glob("usb*")): - # maxchild, speed, version - print(enumerate_hub(hub)) +e = LinuxUSBMap()