Skip to content

Add ZLG interface #1209

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
785b53b
Add CanBitRateError
keelung-yang Jan 12, 2022
7970dad
Add ZLG interface
keelung-yang Jan 12, 2022
c8ed9c0
Merge branch 'hardbyte:develop' into develop
keelung-yang Jan 18, 2022
48e6299
Add ZlgBitTiming.bitrates property
keelung-yang Jan 18, 2022
3532684
Merge branch 'hardbyte:develop' into develop
keelung-yang Jan 19, 2022
0135c00
Merge branch 'hardbyte:develop' into develop
keelung-yang Jan 24, 2022
07bcfae
Remove ZCAN_DEVICE_TYPE / ZCAN_DEVICE_INDEX / ZCAN_CHANNEL
keelung-yang Jan 24, 2022
d7a6942
Add bus padding support
keelung-yang Jan 26, 2022
80285a9
Determine CAN-FD by data_bitrate
keelung-yang Jan 27, 2022
e03ad0f
Merge branch 'hardbyte:develop' into develop
keelung-yang Jan 27, 2022
c53458f
Merge branch 'hardbyte:develop' into develop
keelung-yang Jan 28, 2022
9602053
Remove bus padding since it should be handled in upper layer
keelung-yang Jan 28, 2022
bc68483
Balance receiving CAN/CAN-FD messages
keelung-yang Feb 6, 2022
d401057
Use ctypes only in vci module
keelung-yang Feb 8, 2022
4584868
Implement software timeout
keelung-yang Feb 9, 2022
9c6a615
Raise CanInitializationError if not in Linux
keelung-yang Feb 9, 2022
d38ab5d
Improve accuracy of software timeout
keelung-yang Feb 10, 2022
93309ae
Merge branch 'hardbyte:develop' into develop
keelung-yang Feb 19, 2022
62211e5
Merge branch 'hardbyte:develop' into develop
keelung-yang Sep 21, 2022
652a96b
Add build-in 5Mbps support
keelung-yang Sep 23, 2022
d1db8e0
Fix conflicts while merging v4.2
keelung-yang Apr 24, 2023
31631cf
Discard changes to avoid conflicts
keelung-yang Apr 24, 2023
4514014
Merge branch 'develop' into develop
keelung-yang Apr 24, 2023
471d7fc
Format code with black
keelung-yang Apr 24, 2023
fad194a
Restore changes to avoid conflicts
keelung-yang Apr 24, 2023
38439f1
Merge branch 'hardbyte:develop' into develop
keelung-yang Apr 27, 2023
c29257b
Merge branch 'develop' into develop
zariiii9003 Oct 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions can/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
+-- CanInitializationError
+-- CanOperationError
+-- CanTimeoutError
+-- CanBitRateError

Keep in mind that some functions and methods may raise different exceptions.
For example, validating typical arguments and parameters might result in a
Expand Down Expand Up @@ -111,6 +112,15 @@ class CanTimeoutError(CanError, TimeoutError):
"""


class CanBitRateError(CanError):
"""Indicates the invalid / unsupported bitrate.

Example scenarios:
- Invalid / unsupported bitrate on bus
- Can't convert between bit timing and bitrate
"""


@contextmanager
def error_check(
error_message: Optional[str] = None,
Expand Down
1 change: 1 addition & 0 deletions can/interfaces/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"neousys": ("can.interfaces.neousys", "NeousysBus"),
"etas": ("can.interfaces.etas", "EtasBus"),
"socketcand": ("can.interfaces.socketcand", "SocketCanDaemonBus"),
"zlg": ("can.interfaces.zlg", "ZlgCanBus"),
}


Expand Down
16 changes: 16 additions & 0 deletions can/interfaces/zlg/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
Unofficial ZLG USBCAN/USBCANFD implementation for Linux
"""

__version__ = "2.0"
__date__ = "20220209"
__author__ = "[email protected]"
__history__ = """
1. Initial creation, [email protected], 2022-01-06
2. Determine CAN-FD by data_bitrate, [email protected], 2022-01-27
3. Remove bus padding since it should be handled in upper layer, [email protected], 2022-01-28
4. Balance receiving CAN/CAN-FD messages, [email protected], 2022-02-06
5. Implement software timeout, [email protected], 2022-02-09
"""

from can.interfaces.zlg.bus import ZlgCanBus
238 changes: 238 additions & 0 deletions can/interfaces/zlg/bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import time
import ctypes
import platform

from can import BusABC, BusState, Message
from can import CanInitializationError, CanOperationError, CanTimeoutError

from .vci import *
from .timing import ZlgBitTiming


DeviceType = ZCAN_DEVICE.USBCANFD_200U


class ZlgCanBus(BusABC):
def __init__(self, channel, device=0, tres=True, **kwargs):
"""
:param channel: channel index, [0, 1,,,]
:param device: device index, [0, 1,,,]
:param tres: enable/disable termination resistor on specified channel
"""
if platform.system() != "Linux":
raise CanInitializationError(f"Only Linux is supported currently")
self.bitrate = kwargs.get("bitrate", 500000)
self.data_bitrate = kwargs.get("data_bitrate", None)
self.channel_info = (
f"{self.__class__.__name__}{device}:{channel}@{self.bitrate}"
)
if self.data_bitrate:
self.channel_info += f"/{self.data_bitrate}"
self._dev_type = DeviceType.value
self._dev_index = int(device)
self._dev_channel = int(channel)
self._dev_timestamp = time.time()
self.is_opened = self.open()
if not self.is_opened:
raise CanInitializationError(f"Failed to open {self.channel_info}")
self.tres = bool(tres)
super().__init__(int(channel), **kwargs)

@property
def tres(self):
"""Termination resistor"""
return self._tres

@tres.setter
def tres(self, value):
self._tres = bool(value)
if not vci_channel_enable_tres(
self._dev_type, self._dev_index, self._dev_channel, self._tres
):
raise CanOperationError(
f'Failed to {"enable" if value else "disable"} '
f"termination resistor for {self.channel_info} !"
)

@property
def state(self):
err_msg = ZCAN_ERR_MSG()
if not vci_channel_read_info(
self._dev_type, self._dev_index, self._dev_channel, err_msg
):
raise CanOperationError(f"Failed to read CAN{self._dev_channel} status!")
if err_msg.info.err:
if err_msg.info.est:
return BusState.ERROR
else:
return BusState.ACTIVE
return None # https://github.com/hardbyte/python-can/issues/736

@state.setter
def state(self, value):
raise NotImplementedError()

def _from_raw(self, raw):
return Message(
timestamp=raw.header.ts,
arbitration_id=raw.header.id,
is_fd=bool(raw.header.info.fmt),
is_extended_id=bool(raw.header.info.sef),
is_remote_frame=bool(raw.header.info.sdf),
is_error_frame=bool(raw.header.info.err),
bitrate_switch=bool(raw.header.info.brs),
dlc=raw.header.len,
data=bytes(raw.dat[: raw.header.len]),
channel=raw.header.chn,
)

def _to_raw(self, msg):
info = ZCAN_MSG_INFO(
txm=False,
fmt=msg.is_fd,
sdf=msg.is_remote_frame,
sef=msg.is_extended_id,
err=msg.is_error_frame,
brs=msg.bitrate_switch,
est=msg.error_state_indicator,
# pad
)
header = ZCAN_MSG_HDR(
ts=(int(time.time() - self._dev_timestamp) * 1000) & 0xFFFF_FFFF,
id=msg.arbitration_id,
info=info,
chn=self._dev_channel,
len=msg.dlc,
)
if msg.is_fd:
raw = ZCAN_FD_MSG(header=header)
else:
raw = ZCAN_20_MSG(header=header)
ctypes.memmove(raw.dat, bytes(msg.data), msg.dlc)
return raw

def _available_msgs(self) -> tuple[bool, int]:
"""Return (is_fd, buffered_msg_count)"""
can_msg_count = vci_can_get_recv_num(
self._dev_type, self._dev_index, self._dev_channel
)
canfd_msg_count = vci_canfd_get_recv_num(
self._dev_type, self._dev_index, self._dev_channel
)
if can_msg_count > 0 and canfd_msg_count > 0:
if canfd_msg_count > can_msg_count:
return True, canfd_msg_count
else:
return False, can_msg_count
elif can_msg_count == 0 and canfd_msg_count == 0:
return bool(self.data_bitrate), 0
else:
return canfd_msg_count > 0, canfd_msg_count or can_msg_count

def _recv_one(self, fd) -> Message:
delay = 1 # ZLG cann't comfirm what's happen if delay == 0
rx_buf = (ZCAN_FD_MSG * 1)() if fd else (ZCAN_20_MSG * 1)()
if fd:
ret = vci_canfd_recv(
self._dev_type,
self._dev_index,
self._dev_channel,
rx_buf,
len(rx_buf),
delay,
)
else:
ret = vci_can_recv(
self._dev_type,
self._dev_index,
self._dev_channel,
rx_buf,
len(rx_buf),
delay,
)
if ret > 0:
return self._from_raw(rx_buf[0])
else:
return None

def _recv_internal(self, timeout):
t1 = time.time()
while True:
is_fd, msg_count = self._available_msgs()
if msg_count:
if msg := self._recv_one(is_fd):
return msg, self.filters is None
else:
raise CanOperationError(f"Failed to receive!")
elif timeout is None:
time.sleep(0.001)
elif timeout < 0.001:
return None, self.filters is None
elif (time.time() - t1) < timeout:
time.sleep(0.001)
else:
raise CanTimeoutError(f"Receive timeout!")

def _send_one(self, msg) -> int:
tx_buf = (ZCAN_FD_MSG * 1)() if msg.is_fd else (ZCAN_20_MSG * 1)()
tx_buf[0] = self._to_raw(msg)
if msg.is_fd:
return vci_canfd_send(
self._dev_type, self._dev_index, self._dev_channel, tx_buf, len(tx_buf)
)
else:
return vci_can_send(
self._dev_type, self._dev_index, self._dev_channel, tx_buf, len(tx_buf)
)

def _send_internal(self, msg, timeout=None) -> None:
while timeout is None:
if self._send_one(msg):
return
else:
t1 = time.time()
while (time.time() - t1) < timeout or timeout < 0.001:
if self._send_one(msg):
return
elif timeout < 0.001:
return
else:
raise CanTimeoutError(f"Send message {msg.arbitration_id:03X} timeout!")

def send(self, msg, timeout=None) -> None:
# The maximum tx timeout is 4000ms, limited by firmware, as explained officially
dev_timeout = 4000 if timeout is None else 10
vci_channel_set_tx_timeout(
self._dev_type, self._dev_index, self._dev_channel, dev_timeout
)
if self.data_bitrate: # Force FD if data_bitrate
msg.is_fd = True
self._send_internal(msg, timeout)

def open(self) -> bool:
timing = ZlgBitTiming(self._dev_type)
clock = timing.f_clock
bitrate = timing.timing(self.bitrate)
if self.data_bitrate:
data_bitrate = timing.timing(self.data_bitrate)
else:
data_bitrate = bitrate
if not vci_device_open(self._dev_type, self._dev_index):
return False
if not vci_channel_open(
self._dev_type,
self._dev_index,
self._dev_channel,
clock,
bitrate,
data_bitrate,
):
vci_device_close(self._dev_type, self._dev_index)
return False
else:
return True

def shutdown(self) -> None:
super().shutdown()
vci_channel_close(self._dev_type, self._dev_index, self._dev_channel)
vci_device_close(self._dev_type, self._dev_index)
44 changes: 44 additions & 0 deletions can/interfaces/zlg/timing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from can import BitTiming, CanInitializationError, CanBitRateError
from .vci import ZCAN_BIT_TIMING, ZCAN_DEVICE


# Official timing calculation
# https://manual.zlg.cn/web/#/188/6981
class ZlgBitTiming(BitTiming):
def __init__(self, device, **kwargs):
try:
self._device = ZCAN_DEVICE(device)
except:
raise CanInitializationError(f"Unsupported ZLG CAN device {device} !")
if self._device in (
ZCAN_DEVICE.USBCAN,
ZCAN_DEVICE.USBCANFD_200U,
):
kwargs.setdefault("f_clock", 60000000)
self.speeds = {
125_000: ZCAN_BIT_TIMING(tseg1=10, tseg2=2, sjw=2, brp=31),
250_000: ZCAN_BIT_TIMING(tseg1=10, tseg2=2, sjw=2, brp=15),
500_000: ZCAN_BIT_TIMING(tseg1=10, tseg2=2, sjw=2, brp=7),
1_000_000: ZCAN_BIT_TIMING(tseg1=46, tseg2=11, sjw=3, brp=0),
2_000_000: ZCAN_BIT_TIMING(tseg1=10, tseg2=2, sjw=2, brp=1),
4_000_000: ZCAN_BIT_TIMING(tseg1=10, tseg2=2, sjw=2, brp=0),
5_000_000: ZCAN_BIT_TIMING(tseg1=7, tseg2=2, sjw=2, brp=0),
}
super().__init__(**kwargs)

def timing(self, bitrate=None, force=False) -> ZCAN_BIT_TIMING:
if bitrate in self.speeds:
return self.speeds[bitrate]
elif force:
return ZCAN_BIT_TIMING(
tseg1=self.tseg1,
tseg2=self.tseg2,
sjw=self.sjw,
brp=self.brp,
)
else:
raise CanBitRateError(f"Unsupported {bitrate=}")

@property
def bitrates(self) -> list[int]:
return self.speeds.keys()
Loading