Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Linux support #44

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[flake8]
ignore = E501, E203
extend-ignore = E203
max-line-length = 200
135 changes: 135 additions & 0 deletions Linux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import copy
import re
import subprocess
from operator import itemgetter
from pathlib import Path

from base import BaseUSBMap
from Scripts import shared


def quick_read(path: Path):
return path.read_text().strip("\x00").strip()


def quick_read_2(path: Path, name: str):
return quick_read(path / name)


class LinuxUSBMap(BaseUSBMap):
def enumerate_hub(self, hub: Path):
bus_number = quick_read_2(hub, "busnum")
hub_info = {
"hub_name": hub.name,
"port_count": int(quick_read_2(hub, "maxchild")),
"speed": shared.USBDeviceSpeeds.from_speed(int(quick_read_2(hub, "speed"))),
"version": quick_read_2(hub, "version"),
"ports": [],
}

# Get the ports
ports = hub.glob(f"{bus_number}-0:1.0/usb{bus_number}-port*")
for i, port in enumerate(sorted(ports, key=lambda x: int(x.name.replace(f"usb{bus_number}-port", "")))):
port_info = {
"name": quick_read_2(port, "firmware_node/path").split(".")[-1] if (port / "firmware_node/path").exists() else f"Port {i}",
"comment": None,
"index": int(port.name.replace(f"usb{bus_number}-port", "")), # Need to parse it from the name. I hate linux
"class": hub_info["speed"], # tbd
"type": None,
"guessed": None, # tbd
"connect_type": quick_read_2(port, "connect_type"),
"devices": [],
"type_c": False,
"path": str(port),
}

if (port / "peer").exists():
port_info["companion_info"] = re.match(r"(?P<hub>usb\d+)-port(?P<port>\d+)", (port / "peer").resolve().name).groupdict()
else:
port_info["companion_info"] = {"hub": "", "port": ""}

if (port / "connector").exists():
# I think this is only USB-C
port_info["type_c"] = True
other_ports = [i for i in (port / "connector").glob("usb*-port*") if i.resolve() != port.resolve()]
assert len(other_ports) == 1
if (port / "peer").exists():
assert port_info["companion_info"] == re.match(r"(?P<hub>usb\d+)-port(?P<port>\d+)", other_ports[0].resolve().name).groupdict()
port_info["companion_info"] = re.match(r"(?P<hub>usb\d+)-port(?P<port>\d+)", other_ports[0].resolve().name).groupdict()

if (port / "device").exists():
device = port / "device"
device_info = {
# TODO: Use lsusb?
"name": f"{quick_read_2(device, 'manufacturer')} {quick_read_2(device, 'product')}" if (device / "manufacturer").exists() else "Unknown Device",
"speed": shared.USBDeviceSpeeds.from_speed(int(quick_read_2(device, "speed"))),
"devices": [], # I'm not dealing with this rn
}

if int(quick_read_2(device, "bDeviceClass"), 16) == 9 or (device / "maxchild").exists():
# This is a hub. Enumerate.
device_info["devices"] = self.enumerate_hub(device)

port_info["devices"].append(device_info)

hub_info["ports"].append(port_info)
hub_info["ports"].sort(key=itemgetter("index"))
return hub_info

def get_controllers(self):
controller_paths: set[Path] = set()

for bus_path in Path("/sys/bus/usb/devices").iterdir():
# Only look at buses
if not bus_path.stem.startswith("usb"):
continue

# The parent of the bus is the controller
controller_paths.add(bus_path.resolve().parent)

controllers = []

for controller_path in sorted(controller_paths):
print(f"Processing controller {controller_path}")

lspci_output = subprocess.run(["lspci", "-vvvmm", "-s", controller_path.stem], stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode()
lspci_output = {i.partition(":\t")[0]: i.partition("\t")[2] for i in lspci_output.splitlines() if i}

controller = {
"name": lspci_output["Device"],
"identifiers": {
"bdf": [int(i, 16) for i in [controller_path.name[5:7], controller_path.name[8:10], controller_path.suffix[1]]],
"pci_id": [quick_read_2(controller_path, "vendor")[2:], quick_read_2(controller_path, "device")[2:]],
},
"ports": [],
}

if (controller_path / "revision").exists():
controller["identifiers"]["pci_revision"] = int(quick_read(controller_path / "revision"), 16)

if (controller_path / "subsystem_vendor").exists() and (controller_path / "subsystem_device").exists():
controller["identifiers"]["pci_id"] += [quick_read_2(controller_path, "subsystem_vendor")[2:], quick_read_2(controller_path, "subsystem_device")[2:]]

if (controller_path / "firmware_node/path").exists():
controller["identifiers"]["acpi_path"] = quick_read_2(controller_path, "firmware_node/path")

controller["class"] = shared.USBControllerTypes(int(quick_read_2(controller_path, "class"), 16) & 0xFF)

# Enumerate the buses
for hub in sorted(controller_path.glob("usb*")):
# maxchild, speed, version
controller |= self.enumerate_hub(hub)

controllers.append(controller)

self.controllers = controllers
if not self.controllers_historical:
self.controllers_historical = copy.deepcopy(self.controllers)
else:
self.merge_controllers(self.controllers_historical, self.controllers)

def update_devices(self):
self.get_controllers()


e = LinuxUSBMap()
28 changes: 24 additions & 4 deletions Scripts/shared.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# pylint: disable=invalid-name
import enum
from numbers import Number
import sys
from time import time
from typing import Callable
Expand Down Expand Up @@ -33,6 +34,21 @@ def __str__(self) -> str:
def __bool__(self) -> bool:
return True

@classmethod
def from_speed(cls, speed):
if speed == 1.5:
return USBDeviceSpeeds.LowSpeed
elif speed == 12:
return USBDeviceSpeeds.FullSpeed
elif speed == 480:
return USBDeviceSpeeds.HighSpeed
elif speed == 5000:
return USBDeviceSpeeds.SuperSpeed
elif speed == 10000:
return USBDeviceSpeeds.SuperSpeedPlus
else:
return USBDeviceSpeeds.Unknown


class USBPhysicalPortTypes(enum.IntEnum):
USBTypeA = 0
Expand All @@ -56,10 +72,11 @@ def __bool__(self) -> bool:


class USBControllerTypes(enum.IntEnum):
UHCI = int("0x00", 16)
OHCI = int("0x10", 16)
EHCI = int("0x20", 16)
XHCI = int("0x30", 16)
UHCI = 0x00
OHCI = 0x10
EHCI = 0x20
XHCI = 0x30
USB4 = 0x40 # Futureproofing
Unknown = 9999

def __str__(self) -> str:
Expand Down Expand Up @@ -122,12 +139,15 @@ def time_it(func: Callable, text: str, *args, **kwargs):
input(f"{text} took {end - start}, press enter to continue".strip())
return result


debugging = False


def debug(str):
if debugging:
input(f"DEBUG: {str}\nPress enter to continue")


test_mode = False and debugging
if test_mode:
debug_dump_path = Path(input("Debug dump path: ").strip().replace("'", "").replace('"', ""))
Expand Down
1 change: 1 addition & 0 deletions Scripts/usbdump.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def get_hub_type(port):
def get_hub_by_name(name):
return hub_map.get(name)


# TODO: Figure out how to deal with the hub name not matching
def get_companion_port(port):
return ([i for i in hub_map.get(port["companion_info"]["hub"], {"ports": []})["ports"] if i["index"] == port["companion_info"]["port"]] or [None])[0]
Expand Down