-
Notifications
You must be signed in to change notification settings - Fork 604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ZLG-CAN and Tosun CAN device #1351
Changes from all commits
e53a373
5f52ae5
b2568b7
cd9b777
6a0eceb
30268e9
eef9383
b437c0b
0f2c57a
5b14b4f
2d6e999
c97966a
8214c06
9633fd4
2b56cd7
6dbf790
c62a286
8bdc7f7
063ead5
a8e406f
57433e3
6ea0e40
b3663ef
af50bd3
c64aaab
076c08d
4e1c19e
938c80c
97846e9
2b88882
b2fae2f
42af86c
717d139
42ce4a0
6ecb50a
87aebc7
6da9752
cbda76c
e4235b8
8e41eb1
6ca69aa
0962085
670e263
298b1a0
5e468f4
2b1e5ac
2422f95
e8a894e
e542eb0
59da217
6c02949
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,274 @@ | ||
import collections | ||
import time | ||
import warnings | ||
from typing import List, Optional, Tuple, Union, Deque, Any | ||
|
||
import can | ||
import can.typechecking | ||
import ctypes | ||
from can.bus import LOG | ||
from can.exceptions import ( | ||
CanError, | ||
CanInterfaceNotImplementedError, | ||
CanOperationError, | ||
CanInitializationError, | ||
) | ||
|
||
from tosun import TSCanMessage, TSCanFdMessage, TSMasterException, TosunDevice, TSMasterMessageType | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The tosun package is planing to be refactored by using rust |
||
|
||
|
||
def tosun_convert_msg(msg): | ||
if isinstance(msg, TSCanMessage): | ||
return can.Message( | ||
timestamp=msg.FTimeUs / 1000, | ||
arbitration_id=msg.FIdentifier, | ||
is_extended_id=bool(msg.FProperties & 0x04), | ||
is_remote_frame=bool(msg.FProperties & 0x02), | ||
channel=msg.FIdxChn, | ||
dlc=msg.FDLC, | ||
data=bytes(msg.FData), | ||
is_fd=False, | ||
is_rx=not bool(msg.FProperties & 0x01), # False if msg.FProperties & 0x01 else True, | ||
) | ||
elif isinstance(msg, TSCanFdMessage): | ||
return can.Message( | ||
timestamp=msg.FTimeUs / 1000, | ||
arbitration_id=msg.FIdentifier, | ||
is_extended_id=bool(msg.FProperties & 0x04), | ||
is_remote_frame=bool(msg.FProperties & 0x02), | ||
channel=msg.FIdxChn, | ||
dlc=can.util.dlc2len(msg.FDLC), | ||
data=bytes(msg.FData)[:can.util.dlc2len(msg.FDLC)], | ||
is_fd=bool(msg.FFDProperties & 0x01), | ||
is_rx=not bool(msg.FProperties & 0x01), # False if msg.FProperties & 0x01 else True, | ||
bitrate_switch=bool(msg.FFDProperties & 0x02), | ||
error_state_indicator=bool(msg.FFDProperties & 0x04), | ||
is_error_frame=bool(msg.FProperties & 0x80) | ||
) | ||
elif isinstance(msg, can.Message): | ||
if msg.is_fd: | ||
result = TSCanFdMessage() | ||
result.FFDProperties = 0x00 | (0x02 if msg.bitrate_switch else 0x00) | \ | ||
(0x04 if msg.error_state_indicator else 0x00) | ||
else: | ||
result = TSCanMessage() | ||
result.FIdxChn = msg.channel | ||
result.FProperties = 0x00 | (0x00 if msg.is_rx else 0x01) | \ | ||
(0x02 if msg.is_remote_frame else 0x00) | \ | ||
(0x04 if msg.is_extended_id else 0x00) | ||
result.FDLC = can.util.len2dlc(msg.dlc) | ||
result.FIdentifier = msg.arbitration_id | ||
result.FTimeUs = int(msg.timestamp * 1000) | ||
for index, item in enumerate(msg.data): | ||
result.FData[index] = item | ||
return result | ||
else: | ||
raise TSMasterException(f'Unknown message type: {type(msg)}') | ||
|
||
|
||
class TosunBus(can.BusABC): | ||
|
||
def __init__(self, channel: Any = None, *, | ||
mappings: list[dict], | ||
configs: Union[List[dict], Tuple[dict]], | ||
fifo_status: str = 'enable', | ||
turbo_enable: bool = True, | ||
receive_own_messages=True, | ||
rx_queue_size: Optional[int] = None, | ||
can_filters: Optional[can.typechecking.CanFilters] = None, | ||
**kwargs: object): | ||
super().__init__(channel, can_filters, **kwargs) | ||
self.receive_own_messages = receive_own_messages | ||
|
||
if isinstance(channel, list): | ||
self.channels = channel | ||
else: | ||
self.channels = [] | ||
if 'with_com' not in kwargs: | ||
kwargs['with_com'] = False | ||
try: | ||
self.device = TosunDevice(self.channels, **kwargs) | ||
self.device.disconnect() | ||
count = self.device.channel_count('can', len(mappings)) | ||
assert count == len(mappings) | ||
self.available = [] | ||
for _mapping in mappings: | ||
_mapping = self.device.mapping_instance(**_mapping) | ||
self.device.set_mapping(_mapping) | ||
if _mapping.FMappingDisabled is False: | ||
chl_index = _mapping.FAppChannelIndex | ||
if isinstance(chl_index, ctypes.c_int): | ||
self.available.append(chl_index.value) | ||
else: | ||
self.available.append(chl_index) | ||
|
||
for index, chl in enumerate(self.available): | ||
try: | ||
config: dict = configs[index] | ||
except IndexError: | ||
LOG.warning(f'TOSUN-CAN - channel:{chl} not initialized.') | ||
continue | ||
|
||
bitrate = config.get('bitrate', None) | ||
if bitrate is None: | ||
raise CanInitializationError('TOSUN-CAN - bitrate is required.') | ||
# data_bitrate | ||
del config['bitrate'] | ||
bitrate = int(bitrate / 1000) | ||
config['kbaudrate'] = bitrate | ||
|
||
data_bitrate = config.get('data_bitrate', None) | ||
if data_bitrate is None: | ||
data_bitrate = bitrate | ||
else: | ||
del config['data_bitrate'] | ||
data_bitrate = int(data_bitrate / 1000) | ||
config['db_kbaudrate'] = data_bitrate | ||
|
||
self.device.configure_baudrate(chl, **config) | ||
|
||
self.device.turbo_mode(turbo_enable) | ||
self.device.connect() | ||
try: | ||
self.device.set_receive_fifo_status(fifo_status) | ||
except TSMasterException: | ||
self.device.set_receive_fifo_status(fifo_status) | ||
|
||
self.rx_queue = collections.deque( | ||
maxlen=rx_queue_size | ||
) # type: Deque[Any] # channel, raw_msg | ||
except TSMasterException as e: | ||
self.device.finalize() | ||
raise can.CanOperationError(str(e)) | ||
|
||
def _recv_from_queue(self) -> Tuple[can.Message, bool]: | ||
"""Return a message from the internal receive queue""" | ||
raw_msg = self.rx_queue.popleft() | ||
if isinstance(raw_msg, can.Message): | ||
return raw_msg, False | ||
return tosun_convert_msg(raw_msg), False | ||
|
||
def poll_received_messages(self, timeout): | ||
try: | ||
for channel in self.available: | ||
canfd_num = self.device.fifo_read_buffer_count(channel, TSMasterMessageType.CAN_FD) | ||
if self.device.com_enabled: | ||
LOG.debug(f'TOSUN-CAN - canfd message received: {canfd_num}.') | ||
while canfd_num > 0: | ||
canfd_num -= 1 | ||
success, chl_index, is_remote, is_extend, is_edl, is_brs, dlc, can_id, timestamp, data = \ | ||
self.device.fifo_receive_msg(channel, self.receive_own_messages, TSMasterMessageType.CAN_FD) | ||
if success: | ||
self.rx_queue.append( | ||
can.Message( | ||
timestamp=timestamp, | ||
arbitration_id=can_id, | ||
is_extended_id=is_extend, | ||
is_remote_frame=is_remote, | ||
channel=chl_index, | ||
dlc=dlc, | ||
data=[int(i) for i in data.split(',')], | ||
is_fd=True, | ||
bitrate_switch=is_brs, | ||
|
||
) | ||
) | ||
else: | ||
if canfd_num: | ||
can_msgfd, canfd_num = self.device.tsfifo_receive_msgs(channel, canfd_num, | ||
self.receive_own_messages, | ||
TSMasterMessageType.CAN_FD) | ||
LOG.debug(f'TOSUN-CAN - canfd message received: {canfd_num}.') | ||
self.rx_queue.extend( | ||
can_msgfd[i] for i in range(canfd_num) | ||
) | ||
except TSMasterException as e: | ||
raise can.CanOperationError(str(e)) | ||
|
||
def clear_rx_buffer(self, channel=None): | ||
if channel: | ||
self.device.fifo_clear_receive_buffers(channel, TSMasterMessageType.CAN) | ||
self.device.fifo_clear_receive_buffers(channel, TSMasterMessageType.CAN_FD) | ||
else: | ||
for channel in self.available: | ||
self.device.fifo_clear_receive_buffers(channel, TSMasterMessageType.CAN) | ||
self.device.fifo_clear_receive_buffers(channel, TSMasterMessageType.CAN_FD) | ||
|
||
def _recv_internal(self, timeout: Optional[float]) -> Tuple[Optional[can.Message], bool]: | ||
|
||
if self.rx_queue: | ||
return self._recv_from_queue() | ||
|
||
deadline = None | ||
while deadline is None or time.time() < deadline: | ||
if deadline is None and timeout is not None: | ||
deadline = time.time() + timeout | ||
|
||
self.poll_received_messages(timeout) | ||
|
||
if self.rx_queue: | ||
return self._recv_from_queue() | ||
|
||
return None, False | ||
|
||
def send(self, msg: can.Message, timeout: Optional[float] = 50, sync: bool = True) -> None: | ||
try: | ||
if len(self.available) > 0 and msg.channel is None: | ||
msg.channel = self.available[0] | ||
if msg.channel not in self.available: | ||
raise CanOperationError(f'Channel: {msg.channel} not in {self.available}') | ||
msg = tosun_convert_msg(msg) | ||
self.device.transmit(msg, sync, timeout=timeout) | ||
except TSMasterException as e: | ||
raise can.CanOperationError(e) | ||
|
||
@staticmethod | ||
def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]: | ||
warnings.warn('Not supported by Tosun device.', DeprecationWarning, 2) | ||
|
||
def fileno(self) -> int: | ||
warnings.warn('Not supported by Tosun device.', DeprecationWarning, 2) | ||
|
||
# def cyclic_send(self, msg: can.Message, period: int): | ||
# pass | ||
# | ||
# def del_cyclic_send(self, msg: can.Message): | ||
# pass | ||
# | ||
# def configure_can_regs(self): | ||
# self.device.tsapp_configure_can_register() | ||
|
||
def __enter__(self): | ||
return self | ||
|
||
def shutdown(self) -> None: | ||
LOG.debug('TSMaster - shutdown.') | ||
super().shutdown() | ||
self.device.finalize() | ||
|
||
|
||
if __name__ == '__main__': | ||
from tosun import TSChannelIndex, TSAppChannelType, TSDeviceType, TSDeviceSubType | ||
import time | ||
mapping = {'app_name': 'TSMaster', | ||
'app_chl_idx': TSChannelIndex.CHN1, | ||
'app_chl_type': TSAppChannelType.APP_CAN, | ||
'hw_type': TSDeviceType.TS_USB_DEVICE, | ||
'hw_idx': 0, | ||
'hw_chl_idx': TSChannelIndex.CHN1, | ||
'hw_subtype': TSDeviceSubType.TC1016, | ||
'hw_name': 'TC1016'} | ||
with TosunBus(mappings=[mapping, ], configs=[ | ||
{'bitrate': 500_000, 'initenal_resistance': 1} | ||
], | ||
# with_com=True | ||
) as bus: | ||
# while True: | ||
# print(bus.device.tsfifo_receive_msgs(0, 100, TSReadTxRxMode.TX_RX_MESSAGES, TSMasterMessageType.CAN)) | ||
# time.sleep(0.5) | ||
while True: | ||
print(bus.recv()) | ||
time.sleep(0.01) | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest adding an entry for the optional dependency
tosun
inextras_require
insetup.py
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ZLG-CAN device has been refactored by using rust and upload to pypi.
https://pypi.org/project/zlgcan-driver-py/
New pull requests at #1784