diff --git a/can/interfaces/__init__.py b/can/interfaces/__init__.py index 3065e9bfd..b40649342 100644 --- a/can/interfaces/__init__.py +++ b/can/interfaces/__init__.py @@ -30,6 +30,8 @@ "neousys": ("can.interfaces.neousys", "NeousysBus"), "etas": ("can.interfaces.etas", "EtasBus"), "socketcand": ("can.interfaces.socketcand", "SocketCanDaemonBus"), + "zlgcan": ("can.interfaces.zlgcan", "ZCanBus"), + "tosun": ("can.interfaces.tosun", "TosunBus"), } if sys.version_info >= (3, 8): diff --git a/can/interfaces/tosun.py b/can/interfaces/tosun.py new file mode 100644 index 000000000..f1727f583 --- /dev/null +++ b/can/interfaces/tosun.py @@ -0,0 +1,274 @@ +import collections +import time +import warnings +from typing import List, Optional, Tuple, Union, Deque, Any + +import can +import can.typechecking +import ctypes +from can.bus import LOG +from can.exceptions import ( + CanError, + CanInterfaceNotImplementedError, + CanOperationError, + CanInitializationError, +) + +from tosun import TSCanMessage, TSCanFdMessage, TSMasterException, TosunDevice, TSMasterMessageType + + +def tosun_convert_msg(msg): + if isinstance(msg, TSCanMessage): + return can.Message( + timestamp=msg.FTimeUs / 1000, + arbitration_id=msg.FIdentifier, + is_extended_id=bool(msg.FProperties & 0x04), + is_remote_frame=bool(msg.FProperties & 0x02), + channel=msg.FIdxChn, + dlc=msg.FDLC, + data=bytes(msg.FData), + is_fd=False, + is_rx=not bool(msg.FProperties & 0x01), # False if msg.FProperties & 0x01 else True, + ) + elif isinstance(msg, TSCanFdMessage): + return can.Message( + timestamp=msg.FTimeUs / 1000, + arbitration_id=msg.FIdentifier, + is_extended_id=bool(msg.FProperties & 0x04), + is_remote_frame=bool(msg.FProperties & 0x02), + channel=msg.FIdxChn, + dlc=can.util.dlc2len(msg.FDLC), + data=bytes(msg.FData)[:can.util.dlc2len(msg.FDLC)], + is_fd=bool(msg.FFDProperties & 0x01), + is_rx=not bool(msg.FProperties & 0x01), # False if msg.FProperties & 0x01 else True, + bitrate_switch=bool(msg.FFDProperties & 0x02), + error_state_indicator=bool(msg.FFDProperties & 0x04), + is_error_frame=bool(msg.FProperties & 0x80) + ) + elif isinstance(msg, can.Message): + if msg.is_fd: + result = TSCanFdMessage() + result.FFDProperties = 0x00 | (0x02 if msg.bitrate_switch else 0x00) | \ + (0x04 if msg.error_state_indicator else 0x00) + else: + result = TSCanMessage() + result.FIdxChn = msg.channel + result.FProperties = 0x00 | (0x00 if msg.is_rx else 0x01) | \ + (0x02 if msg.is_remote_frame else 0x00) | \ + (0x04 if msg.is_extended_id else 0x00) + result.FDLC = can.util.len2dlc(msg.dlc) + result.FIdentifier = msg.arbitration_id + result.FTimeUs = int(msg.timestamp * 1000) + for index, item in enumerate(msg.data): + result.FData[index] = item + return result + else: + raise TSMasterException(f'Unknown message type: {type(msg)}') + + +class TosunBus(can.BusABC): + + def __init__(self, channel: Any = None, *, + mappings: list[dict], + configs: Union[List[dict], Tuple[dict]], + fifo_status: str = 'enable', + turbo_enable: bool = True, + receive_own_messages=True, + rx_queue_size: Optional[int] = None, + can_filters: Optional[can.typechecking.CanFilters] = None, + **kwargs: object): + super().__init__(channel, can_filters, **kwargs) + self.receive_own_messages = receive_own_messages + + if isinstance(channel, list): + self.channels = channel + else: + self.channels = [] + if 'with_com' not in kwargs: + kwargs['with_com'] = False + try: + self.device = TosunDevice(self.channels, **kwargs) + self.device.disconnect() + count = self.device.channel_count('can', len(mappings)) + assert count == len(mappings) + self.available = [] + for _mapping in mappings: + _mapping = self.device.mapping_instance(**_mapping) + self.device.set_mapping(_mapping) + if _mapping.FMappingDisabled is False: + chl_index = _mapping.FAppChannelIndex + if isinstance(chl_index, ctypes.c_int): + self.available.append(chl_index.value) + else: + self.available.append(chl_index) + + for index, chl in enumerate(self.available): + try: + config: dict = configs[index] + except IndexError: + LOG.warning(f'TOSUN-CAN - channel:{chl} not initialized.') + continue + + bitrate = config.get('bitrate', None) + if bitrate is None: + raise CanInitializationError('TOSUN-CAN - bitrate is required.') + # data_bitrate + del config['bitrate'] + bitrate = int(bitrate / 1000) + config['kbaudrate'] = bitrate + + data_bitrate = config.get('data_bitrate', None) + if data_bitrate is None: + data_bitrate = bitrate + else: + del config['data_bitrate'] + data_bitrate = int(data_bitrate / 1000) + config['db_kbaudrate'] = data_bitrate + + self.device.configure_baudrate(chl, **config) + + self.device.turbo_mode(turbo_enable) + self.device.connect() + try: + self.device.set_receive_fifo_status(fifo_status) + except TSMasterException: + self.device.set_receive_fifo_status(fifo_status) + + self.rx_queue = collections.deque( + maxlen=rx_queue_size + ) # type: Deque[Any] # channel, raw_msg + except TSMasterException as e: + self.device.finalize() + raise can.CanOperationError(str(e)) + + def _recv_from_queue(self) -> Tuple[can.Message, bool]: + """Return a message from the internal receive queue""" + raw_msg = self.rx_queue.popleft() + if isinstance(raw_msg, can.Message): + return raw_msg, False + return tosun_convert_msg(raw_msg), False + + def poll_received_messages(self, timeout): + try: + for channel in self.available: + canfd_num = self.device.fifo_read_buffer_count(channel, TSMasterMessageType.CAN_FD) + if self.device.com_enabled: + LOG.debug(f'TOSUN-CAN - canfd message received: {canfd_num}.') + while canfd_num > 0: + canfd_num -= 1 + success, chl_index, is_remote, is_extend, is_edl, is_brs, dlc, can_id, timestamp, data = \ + self.device.fifo_receive_msg(channel, self.receive_own_messages, TSMasterMessageType.CAN_FD) + if success: + self.rx_queue.append( + can.Message( + timestamp=timestamp, + arbitration_id=can_id, + is_extended_id=is_extend, + is_remote_frame=is_remote, + channel=chl_index, + dlc=dlc, + data=[int(i) for i in data.split(',')], + is_fd=True, + bitrate_switch=is_brs, + + ) + ) + else: + if canfd_num: + can_msgfd, canfd_num = self.device.tsfifo_receive_msgs(channel, canfd_num, + self.receive_own_messages, + TSMasterMessageType.CAN_FD) + LOG.debug(f'TOSUN-CAN - canfd message received: {canfd_num}.') + self.rx_queue.extend( + can_msgfd[i] for i in range(canfd_num) + ) + except TSMasterException as e: + raise can.CanOperationError(str(e)) + + def clear_rx_buffer(self, channel=None): + if channel: + self.device.fifo_clear_receive_buffers(channel, TSMasterMessageType.CAN) + self.device.fifo_clear_receive_buffers(channel, TSMasterMessageType.CAN_FD) + else: + for channel in self.available: + self.device.fifo_clear_receive_buffers(channel, TSMasterMessageType.CAN) + self.device.fifo_clear_receive_buffers(channel, TSMasterMessageType.CAN_FD) + + def _recv_internal(self, timeout: Optional[float]) -> Tuple[Optional[can.Message], bool]: + + if self.rx_queue: + return self._recv_from_queue() + + deadline = None + while deadline is None or time.time() < deadline: + if deadline is None and timeout is not None: + deadline = time.time() + timeout + + self.poll_received_messages(timeout) + + if self.rx_queue: + return self._recv_from_queue() + + return None, False + + def send(self, msg: can.Message, timeout: Optional[float] = 50, sync: bool = True) -> None: + try: + if len(self.available) > 0 and msg.channel is None: + msg.channel = self.available[0] + if msg.channel not in self.available: + raise CanOperationError(f'Channel: {msg.channel} not in {self.available}') + msg = tosun_convert_msg(msg) + self.device.transmit(msg, sync, timeout=timeout) + except TSMasterException as e: + raise can.CanOperationError(e) + + @staticmethod + def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]: + warnings.warn('Not supported by Tosun device.', DeprecationWarning, 2) + + def fileno(self) -> int: + warnings.warn('Not supported by Tosun device.', DeprecationWarning, 2) + + # def cyclic_send(self, msg: can.Message, period: int): + # pass + # + # def del_cyclic_send(self, msg: can.Message): + # pass + # + # def configure_can_regs(self): + # self.device.tsapp_configure_can_register() + + def __enter__(self): + return self + + def shutdown(self) -> None: + LOG.debug('TSMaster - shutdown.') + super().shutdown() + self.device.finalize() + + +if __name__ == '__main__': + from tosun import TSChannelIndex, TSAppChannelType, TSDeviceType, TSDeviceSubType + import time + mapping = {'app_name': 'TSMaster', + 'app_chl_idx': TSChannelIndex.CHN1, + 'app_chl_type': TSAppChannelType.APP_CAN, + 'hw_type': TSDeviceType.TS_USB_DEVICE, + 'hw_idx': 0, + 'hw_chl_idx': TSChannelIndex.CHN1, + 'hw_subtype': TSDeviceSubType.TC1016, + 'hw_name': 'TC1016'} + with TosunBus(mappings=[mapping, ], configs=[ + {'bitrate': 500_000, 'initenal_resistance': 1} + ], + # with_com=True + ) as bus: + # while True: + # print(bus.device.tsfifo_receive_msgs(0, 100, TSReadTxRxMode.TX_RX_MESSAGES, TSMasterMessageType.CAN)) + # time.sleep(0.5) + while True: + print(bus.recv()) + time.sleep(0.01) + + + diff --git a/can/interfaces/zlgcan.py b/can/interfaces/zlgcan.py new file mode 100644 index 000000000..7d9d6321e --- /dev/null +++ b/can/interfaces/zlgcan.py @@ -0,0 +1,410 @@ +import collections +import ctypes +import logging +import platform +import time +import warnings + +from can.bus import LOG + +import can.typechecking +from can.exceptions import ( + CanError, + CanInterfaceNotImplementedError, + CanOperationError, + CanInitializationError, +) +from typing import Optional, Tuple, Sequence, Union, Deque, Any +from can import BusABC, Message +from zlgcan import ZCAN, ZCANDeviceType, ZCANException, ZCANMessageType, ZCANCanTransType + +logger = logging.getLogger(__name__) +_os = platform.system() + + +def zlg_convert_msg(msg, **kwargs): + if _os.lower() == 'windows': + return _zlg_convert_msg_win(msg, **kwargs) + elif _os.lower() == 'linux': + return _zlg_convert_msg_linux(msg, **kwargs) + else: + raise ZCANException(f'Unsupported platform: {_os}') + + +def _zlg_convert_msg_linux(msg, **kwargs): + from zlgcan.linux import ZCAN_CAN_FRAME, ZCAN_CANFD_FRAME, ZCAN_MSG_HEADER, ZCAN_MSG_INFO + if isinstance(msg, Message): + trans_type = kwargs.get('trans_type', ZCANCanTransType.NORMAL if kwargs.get('resend', False) else ZCANCanTransType.SINGLE) + if msg.is_fd: + result = (ZCAN_CANFD_FRAME * 1)() + result[0].data = (ctypes.c_ubyte * 64)(*msg.data) + else: + result = (ZCAN_CAN_FRAME * 1)() + result[0].data = (ctypes.c_ubyte * msg.dlc)(*msg.data) + info = ZCAN_MSG_INFO() + info.mode = trans_type + info.is_fd = int(msg.is_fd) + info.is_remote = int(msg.is_remote_frame) + info.is_extend = int(msg.is_extended_id) + info.is_error = int(msg.is_error_frame) + info.bitrate_switch = int(msg.bitrate_switch) + info.error_status = int(msg.error_state_indicator) + + header = ZCAN_MSG_HEADER() + header.id = msg.arbitration_id + header.info = info + header.channel = msg.channel + header.dlc = msg.dlc + + result[0].header = header + + return result + elif isinstance(msg, (ZCAN_CAN_FRAME, ZCAN_CANFD_FRAME)): + header = msg.header + info = header.info + return Message( + timestamp=header.timestamp / 1000, + arbitration_id=header.id, + is_extended_id=bool(info.is_extend), + is_remote_frame=bool(info.is_remote), + is_error_frame=bool(info.is_error), + channel=header.channel, + dlc=header.dlc, + data=bytes(msg.data), + is_fd=bool(info.is_fd), + is_rx=True, + bitrate_switch=bool(info.bitrate_switch), + error_state_indicator=bool(info.error_status), + ) + else: + raise ZCANException(f'Unknown message type: {type(msg)}') + + +def _zlg_convert_msg_win(msg, **kwargs): # channel=None, trans_type=0, is_merge=False, **kwargs): + + from zlgcan.windows import ZCAN_Transmit_Data, ZCAN_TransmitFD_Data, ZCANDataObj, ZCAN_Receive_Data, ZCAN_ReceiveFD_Data + ZCAN_Transmit_Data_1 = (ZCAN_Transmit_Data * 1) + ZCAN_TransmitFD_Data_1 = (ZCAN_TransmitFD_Data * 1) + ZCANDataObj_1 = (ZCANDataObj * 1) + + if isinstance(msg, Message): # 发送报文转换 + is_merge = kwargs.get('is_merge', None) + trans_type = kwargs.get('trans_type', ZCANCanTransType.NORMAL if kwargs.get('resend', False) else ZCANCanTransType.SINGLE) + assert is_merge is not None, 'is_merge required when convert to ZLG.' + # assert trans_type is not None, 'trans_type required when convert to ZLG.' + if not is_merge: + if msg.is_fd: + result = ZCAN_TransmitFD_Data_1() + result[0].frame.len = msg.dlc + result[0].frame.brs = msg.bitrate_switch + result[0].frame.data = (ctypes.c_ubyte * 64)(*msg.data) + else: + result = ZCAN_Transmit_Data_1() + result[0].frame.can_dlc = msg.dlc + result[0].frame.data = (ctypes.c_ubyte * msg.dlc)(*msg.data) + result[0].transmit_type = trans_type + + result[0].frame.can_id = msg.arbitration_id + result[0].frame.err = msg.is_error_frame + result[0].frame.rtr = msg.is_remote_frame + result[0].frame.eff = msg.is_extended_id + return result + else: + channel = kwargs.get('channel', None) + assert channel is not None, 'channel required when merge send recv.' + result = ZCANDataObj_1() + result[0].dataType = 1 # can device always equal 1 + assert channel is not None + result[0].chnl = channel + result[0].data.zcanCANFDData.frame.can_id = msg.arbitration_id + result[0].data.zcanCANFDData.frame.err = msg.is_error_frame + result[0].data.zcanCANFDData.frame.rtr = msg.is_remote_frame + result[0].data.zcanCANFDData.frame.eff = msg.is_extended_id + + result[0].data.zcanCANFDData.flag.transmitType = trans_type + echo = kwargs.get('is_echo', False) + result[0].data.zcanCANFDData.flag.txEchoRequest = echo + delay = kwargs.get('delay_mode', 0) + if delay: + result[0].data.zcanCANFDData.flag.txDelay = delay + result[0].data.zcanCANFDData.timeStamp = kwargs['delay_time'] + return result + elif isinstance(msg, ZCAN_Receive_Data): # 接收CAN报文转换 + channel = kwargs.get('channel', None) + assert channel is not None, 'channel required when convert ZLG CAN msg to std msg.' + return Message( + timestamp=msg.timestamp / 1000, + arbitration_id=msg.frame.can_id, + is_extended_id=msg.frame.eff, + is_remote_frame=msg.frame.rtr, + is_error_frame=msg.frame.err, + channel=channel, + dlc=msg.frame.can_dlc, + data=bytes(msg.frame.data), + ) + elif isinstance(msg, ZCAN_ReceiveFD_Data): # 接收CANFD报文转换 + channel = kwargs.get('channel', None) + assert channel is not None, 'channel required when convert ZLG CANFD msg to std msg.' + return Message( + timestamp=msg.timestamp / 1000, + arbitration_id=msg.frame.can_id, + is_extended_id=msg.frame.eff, + is_remote_frame=msg.frame.rtr, + is_error_frame=msg.frame.err, + channel=channel, + dlc=msg.frame.len, + data=bytes(msg.frame.data), + is_fd=True, + # is_rx=True, + bitrate_switch=msg.frame.brs, + error_state_indicator=msg.frame.esi, + ) + elif isinstance(msg, ZCANDataObj): # 合并接收CAN|CANFD报文转换 + data = msg.data.zcanCANFDData + return Message( + timestamp=data.timeStamp / 1000, + arbitration_id=data.frame.can_id, + is_extended_id=data.frame.eff, + is_remote_frame=data.frame.rtr, + is_error_frame=data.frame.err, + channel=msg.chnl, + dlc=data.frame.len, + data=bytes(data.frame.data), + is_fd=data.flag.frameType, + bitrate_switch=data.frame.brs, + error_state_indicator=data.frame.esi, + ), data.flag.txEchoed + else: + raise ZCANException(f'Unknown message type: {type(msg)}') + + +class ZCanBus(BusABC): + + def __init__(self, + channel: Union[int, Sequence[int], str] = None, *, + resend: bool = False, + device_type: ZCANDeviceType, + device_index: int = 0, + rx_queue_size: Optional[int] = None, + configs: Union[list, tuple] = None, + can_filters: Optional[can.typechecking.CanFilters] = None, + **kwargs: object): + """ + Init ZLG CAN Bus device. + :param device_type: The device type in ZCANDeviceType. + :param resend: Auto resend when transmit fail until success if True + :param channel: A channel list(such as [0, 1]) or str split by ","(such as "0, 1") index cont from 0. + :param device_index: The ZLG device index, default 0. + :param rx_queue_size: The size of received queue. + :param configs: The channel configs, is a list of dict: + The index 0 is configuration for channel 0, index 1 is configuration for channel 1, and so on. + When the system is Windows, the config key is: + clock: [Optional] The clock of channel. + bitrate: [Must] The arbitration phase baudrate. + data_bitrate: [Optional] The data phase baudrate, default is baudrate. + initenal_resistance: [Optional] the terminal resistance enable status, optional value{1:enable|0:disable}, default: 1 + mode: [Optional] The can mode, defined in ZCANCanMode, default is NORMAL + filter: [Optional] The filter mode, defined in ZCANCanFilter, default is DOUBLE + acc_code: [Optional] The frame filter acceptance code of SJA1000. + acc_mask: [Optional] The frame mask code of SJA1000. + brp: [Optional] The bit rate prescaler + abit_timing: [Optional] The arbitration phase timing, ignored. + dbit_timing: [Optional] The data phase timing, ignored. + Other property value: please see: https://manual.zlg.cn/web/#/152/6364->设备属性, with out head "n/". + When the system is Linux, the config key is: + clock: [Must] The clock of channel. + initenal_resistance: [Optional] the terminal resistance enable status, optional value{1:enable|0:disable}, default: 1 + arb_tseg1: [Must] The phase buffer time segment1 of arbitration phase. + arb_tseg2: [Must] The phase buffer time segment2 of arbitration phase. + arb_sjw: [Must] The synchronization jump width of arbitration phase. + arb_smp: [Optional] Sample rate of arbitration phase, default is 0, Ignored. + arb_brp: [Must] The bit rate prescaler of arbitration phase. + data_tseg1: [Optional] The phase buffer time segment1 of data phase, default is arb_tseg1. + data_tseg2: [Optional] The phase buffer time segment2 of data phase, default is arb_tseg2. + data_sjw: [Optional] The synchronization jump width of data phase, default is arb_sjw. + data_smp: [Optional] Sample rate of data phase, default is arb_smp, Ignored. + data_brp: [Optional] The bit rate prescaler of data phase, default is arb_brp. + :param can_filters: Not used. + :param kwargs: Not used. + """ + super().__init__(channel=channel, can_filters=can_filters, **kwargs) + + cfg_length = len(configs) + if cfg_length == 0: + raise CanInitializationError('ZLG-CAN - Configuration list or tuple of dict is required.') + + self.rx_queue = collections.deque( + maxlen=rx_queue_size + ) # type: Deque[Tuple[int, Any]] # channel, raw_msg + try: + self.device = ZCAN(resend) + self.device.OpenDevice(device_type, device_index) + self.channels = self.device.channels + self.available = [] + self.channel_info = f"ZLG-CAN - device {device_index}, channels {self.channels}" + # {'mode': 0|1(NORMAL|LISTEN_ONLY), 'filter': 0|1(DOUBLE|SINGLE), 'acc_code': 0x0, 'acc_mask': 0xFFFFFFFF, + # 'brp': 0, 'abit_timing': 0, 'dbit_timing': 0} + + for index, channel in enumerate(self.channels): + try: + config: dict = configs[index] + except IndexError: + LOG.warning(f'ZLG-CAN - channel:{channel} not initialized.') + continue + init_config = {} + if _os.lower() == 'windows': + mode = config.get('mode', None) + if mode is not None: + init_config['mode'] = mode + del config['mode'] + filter = config.get('filter', None) + if filter is not None: + init_config['filter'] = filter + del config['filter'] + acc_code = config.get('acc_code', None) + if acc_code is not None: + init_config['acc_code'] = acc_code + del config['acc_code'] + acc_mask = config.get('acc_mask', None) + if acc_mask is not None: + init_config['acc_mask'] = acc_mask + del config['acc_mask'] + brp = config.get('brp', None) + if brp is not None: + init_config['brp'] = brp + del config['brp'] + abit_timing = config.get('abit_timing', None) + if abit_timing is not None: + init_config['abit_timing'] = abit_timing + del config['abit_timing'] + dbit_timing = config.get('dbit_timing', None) + if dbit_timing is not None: + init_config['dbit_timing'] = dbit_timing + del config['dbit_timing'] + + bitrate = config.get('bitrate', None) + if bitrate is None: + raise CanInitializationError('ZLG-CAN - bitrate is required.') + del config['bitrate'] + config['canfd_abit_baud_rate'] = bitrate + + data_bitrate = config.get('data_bitrate', None) + if data_bitrate is None: + config['canfd_dbit_baud_rate'] = bitrate + else: + del config['data_bitrate'] + config['canfd_dbit_baud_rate'] = data_bitrate + if hasattr(self.device, 'SetValue'): + # try: + # self.device.SetValue(channel, **config) + # except ZCANException: + # pass + self.device.SetValue(channel, **config) + elif _os.lower() == 'linux': + init_config = config + self.device.InitCAN(channel, **init_config) + self.device.StartCAN(channel) + self.available.append(channel) + except ZCANException as e: + raise CanInitializationError(str(e)) + + # def _apply_filters(self, filters: Optional[can.typechecking.CanFilters]) -> None: + # pass + + def _recv_from_queue(self) -> Tuple[Message, bool]: + """Return a message from the internal receive queue""" + channel, raw_msg = self.rx_queue.popleft() + + return zlg_convert_msg(raw_msg, channel=channel), False + + def poll_received_messages(self, timeout): + try: + for channel in self.available: + can_num = self.device.GetReceiveNum(channel, ZCANMessageType.CAN) + canfd_num = self.device.GetReceiveNum(channel, ZCANMessageType.CANFD) + if can_num: + LOG.debug(f'ZLG-CAN - can message received: {can_num}.') + self.rx_queue.extend( + (channel, raw_msg) for raw_msg in self.device.Receive(channel, can_num, timeout) + ) + if canfd_num: + LOG.debug(f'ZLG-CAN - canfd message received: {canfd_num}.') + self.rx_queue.extend( + (channel, raw_msg) for raw_msg in self.device.ReceiveFD(channel, canfd_num, timeout) + ) + except ZCANException as e: + raise CanOperationError(str(e)) + + def _recv_internal(self, timeout: Optional[float]) -> Tuple[Optional[Message], bool]: + + if self.rx_queue: + return self._recv_from_queue() + + deadline = None + while deadline is None or time.time() < deadline: + if deadline is None and timeout is not None: + deadline = time.time() + timeout + + self.poll_received_messages(timeout) + + if self.rx_queue: + return self._recv_from_queue() + + return None, False + + def send(self, msg: Message, timeout: Optional[float] = None, **kwargs) -> None: + try: + channel = msg.channel + # if channel not in self.available: + # if len(self.available) == 0: + # raise CanOperationError(f'Channel: {channel} not in {self.available}') + if len(self.available) > 0 and channel is None: + channel = self.available[0] + if channel not in self.available: + raise CanOperationError(f'Channel: {channel} not in {self.available}') + is_merge = self.device.MergeEnabled() if hasattr(self.device, 'MergeEnabled') else False + if is_merge: + return self.device.TransmitData(zlg_convert_msg(msg, channel=channel, resend=self.device.resend, is_merge=is_merge, **kwargs), 1) + else: + if msg.is_fd: + return self.device.TransmitFD(channel, zlg_convert_msg(msg, resend=self.device.resend, channel=channel, is_merge=is_merge, **kwargs), 1) + return self.device.Transmit(channel, zlg_convert_msg(msg, resend=self.device.resend, channel=channel, is_merge=is_merge, **kwargs), 1) + except ZCANException as e: + raise CanOperationError(str(e)) + + @staticmethod + def _detect_available_configs(): # -> List[can.typechecking.AutoDetectedConfig]: + warnings.warn('Not supported by ZLG-CAN device.', DeprecationWarning, 2) + + def fileno(self): + warnings.warn('Not supported by ZLG-CAN device.', DeprecationWarning, 2) + + def shutdown(self) -> None: + LOG.debug('ZLG-CAN - shutdown.') + super().shutdown() + self.device.CloseDevice() + + def clear_rx_buffer(self, channel=None): + if channel: + assert channel in self.available + self.device.ClearBuffer(channel) + else: + for channel in self.available: + self.device.ClearBuffer(channel) + + def set_hardware_filters(self, channel, filters: Optional[can.typechecking.CanFilters]): + assert channel in self.available, f'channel: {channel} is not initialized!' + _filters = [] + for flt in filters: + can_id = flt.get('can_id') + can_mask = flt.get('can_mask') + extended = False + if isinstance(flt, can.typechecking.CanFilterExtended): + extended = flt.get('extended') + end = ~(can_id ^ can_mask) + if end < 0: + end = -end + LOG.debug(f'~(can_id ^ can_mask): {bin(end)}') + _filters.append((1 if extended else 0, can_id, end)) + + self.device.SetFilters(channel, _filters) diff --git a/setup.cfg b/setup.cfg index 2f3ee032f..70d48aa1c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,4 +32,6 @@ exclude = |^can/interfaces/udp_multicast |^can/interfaces/usb2can |^can/interfaces/virtual + |^can/interfaces/tosun + |^can/interfaces/zlgcan )