Skip to content

Commit

Permalink
Improve serial port performance by threading
Browse files Browse the repository at this point in the history
  • Loading branch information
luoja committed Nov 13, 2024
1 parent 9a766ce commit 11dbd6e
Showing 1 changed file with 99 additions and 75 deletions.
174 changes: 99 additions & 75 deletions can/interfaces/slcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import io
import logging
import time
from typing import Any, Optional, Tuple

from typing import Any, Optional, Tuple, List
from queue import Queue, Empty
import threading
from serial.tools import list_ports
from can import BusABC, CanProtocol, Message, typechecking

from ..exceptions import (
Expand All @@ -22,8 +24,7 @@
import serial
except ImportError:
logger.warning(
"You won't be able to use the slcan can backend without "
"the serial module installed!"
"You won't be able to use the slcan can backend without " "the serial module installed!"
)
serial = None

Expand Down Expand Up @@ -132,6 +133,10 @@ def __init__(
rtscts=False,
**kwargs,
)
self._timestamp_offset = time.time() - time.perf_counter()
self.queue_read = Queue()
self.event_read = threading.Event()
threading.Thread(None, target=self._read_can_thread, args=(self.event_read,)).start()

def set_bitrate(self, bitrate: int) -> None:
"""
Expand Down Expand Up @@ -164,33 +169,9 @@ def _write(self, string: str) -> None:
self.serialPortOrig.write(string.encode() + self.LINE_TERMINATOR)
self.serialPortOrig.flush()

def _read(self, timeout: Optional[float]) -> Optional[str]:
_timeout = serial.Timeout(timeout)

with error_check("Could not read from serial device"):
while True:
# Due to accessing `serialPortOrig.in_waiting` too often will reduce the performance.
# We read the `serialPortOrig.in_waiting` only once here.
in_waiting = self.serialPortOrig.in_waiting
for _ in range(max(1, in_waiting)):
new_byte = self.serialPortOrig.read(size=1)
if new_byte:
self._buffer.extend(new_byte)
else:
break

if new_byte in (self._ERROR, self._OK):
string = self._buffer.decode()
self._buffer.clear()
return string

if _timeout.expired():
break

return None

def flush(self) -> None:
self._buffer.clear()
self.queue_read = Queue()
with error_check("Could not flush"):
self.serialPortOrig.reset_input_buffer()

Expand All @@ -203,55 +184,81 @@ def open(self) -> None:
def close(self) -> None:
self._write("C")

def _recv_internal(
self, timeout: Optional[float]
) -> Tuple[Optional[Message], bool]:
def _recv_internal(self, timeout: Optional[float]):
try:
return self.queue_read.get(timeout=timeout), False
except Empty:
return None, False

def _read_can_thread(self, event_recv_send_batch_zlg):
while not event_recv_send_batch_zlg.is_set():
msgs = self._read_can()
for i in msgs:
self.queue_read.put(i)
time.sleep(0.005)

def _read_can(self) -> List[Message]:
canId = None
remote = False
extended = False
data = None
msgs = []

string = self._read(timeout)
with error_check("Could not read from serial device"):
# Due to accessing `serialPortOrig.in_waiting` too often will reduce the performance.
# We read the `serialPortOrig.in_waiting` only once here.
in_waiting = self.serialPortOrig.in_waiting
for _ in range(in_waiting):
new_byte = self.serialPortOrig.read(size=1)
if new_byte:
self._buffer.extend(new_byte)
else:
break

if not string:
pass
elif string[0] in (
"T",
"x", # x is an alternative extended message identifier for CANDapter
):
# extended frame
canId = int(string[1:9], 16)
dlc = int(string[9])
extended = True
data = bytearray.fromhex(string[10 : 10 + dlc * 2])
elif string[0] == "t":
# normal frame
canId = int(string[1:4], 16)
dlc = int(string[4])
data = bytearray.fromhex(string[5 : 5 + dlc * 2])
elif string[0] == "r":
# remote frame
canId = int(string[1:4], 16)
dlc = int(string[4])
remote = True
elif string[0] == "R":
# remote extended frame
canId = int(string[1:9], 16)
dlc = int(string[9])
extended = True
remote = True

if canId is not None:
msg = Message(
arbitration_id=canId,
is_extended_id=extended,
timestamp=time.time(), # Better than nothing...
is_remote_frame=remote,
dlc=dlc,
data=data,
)
return msg, False
return None, False
if new_byte in (self._ERROR, self._OK):
string = self._buffer.decode()
self._buffer.clear()

if not string:
pass
elif string[0] in (
"T",
"x", # x is an alternative extended message identifier for CANDapter
):
# extended frame
canId = int(string[1:9], 16)
dlc = int(string[9])
extended = True
data = bytearray.fromhex(string[10 : 10 + dlc * 2])
elif string[0] == "t":
# normal frame
canId = int(string[1:4], 16)
dlc = int(string[4])
data = bytearray.fromhex(string[5 : 5 + dlc * 2])
elif string[0] == "r":
# remote frame
canId = int(string[1:4], 16)
dlc = int(string[4])
remote = True
elif string[0] == "R":
# remote extended frame
canId = int(string[1:9], 16)
dlc = int(string[9])
extended = True
remote = True

if canId is not None:
msg = Message(
arbitration_id=canId,
is_extended_id=extended,
timestamp=self._timestamp_offset
+ time.perf_counter(), # Better than nothing...
is_remote_frame=remote,
dlc=dlc,
data=data,
)
msgs.append(msg)
return msgs

def send(self, msg: Message, timeout: Optional[float] = None) -> None:
if timeout != self.serialPortOrig.write_timeout:
Expand All @@ -271,6 +278,7 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None:

def shutdown(self) -> None:
super().shutdown()
self.event_read.set()
self.close()
with error_check("Could not close serial socket"):
self.serialPortOrig.close()
Expand All @@ -285,9 +293,7 @@ def fileno(self) -> int:
except Exception as exception:
raise CanOperationError("Cannot fetch fileno") from exception

def get_version(
self, timeout: Optional[float]
) -> Tuple[Optional[int], Optional[int]]:
def get_version(self, timeout: Optional[float]) -> Tuple[Optional[int], Optional[int]]:
"""Get HW and SW version of the slcan interface.
:param timeout:
Expand Down Expand Up @@ -334,3 +340,21 @@ def get_serial_number(self, timeout: Optional[float]) -> Optional[str]:
return serial_number

return None

@staticmethod
def _detect_available_configs():
"""
Identify slcan devices
"""
ports = []

for p in list_ports.comports():
ports.append((p.device, p.description))
return [
{
"interface": "slcan",
"channel": port,
"name": des,
}
for port, des in ports
]

0 comments on commit 11dbd6e

Please sign in to comment.