Skip to content

addressing feedback from @jepler #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 49 additions & 46 deletions adafruit_mcp2515/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
`adafruit_mcp2515`
================================================================================

A CircuitPython library for working with the MCP2515 CAN bus controller
A CircuitPython library for working with the MCP2515 CAN bus controller using the
CircuitPython `canio` API


* Author(s): Bryan Siepert
Expand All @@ -20,8 +21,7 @@
* Adafruit CircuitPython firmware for the supported boards:
https://github.com/adafruit/circuitpython/releases

# * Adafruit's Bus Device library: https://github.com/adafruit/Adafruit_CircuitPython_BusDevice
# * Adafruit's Register library: https://github.com/adafruit/Adafruit_CircuitPython_Register
* Adafruit's Bus Device library: https://github.com/adafruit/Adafruit_CircuitPython_BusDevice
"""

from collections import namedtuple
Expand All @@ -30,6 +30,7 @@
from micropython import const
import adafruit_bus_device.spi_device as spi_device
from .canio import *
from .timer import Timer

__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_MCP2515.git"
Expand Down Expand Up @@ -162,12 +163,11 @@
"TransmitBuffer",
["CTRL_REG", "STD_ID_REG", "INT_FLAG_MASK", "LOAD_CMD", "SEND_CMD"],
)
# perhaps this will be stateful later? #TODO : dedup with above
FilterMask = namedtuple(
"FilterMask", ["CTRL_REG", "STD_ID_REG", "INT_FLAG_MASK", "LOAD_CMD", "SEND_CMD"],
)

# This is magic, don't disturb the dragon
# expects a 16Mhz crystal
_BAUD_RATES = {
# CNF1, CNF2, CNF3
1000000: (0x00, 0xD0, 0x82),
500000: (0x00, 0xF0, 0x86),
250000: (0x41, 0xF1, 0x85),
Expand Down Expand Up @@ -209,7 +209,7 @@ def _tx_buffer_status_decode(status_byte):


class MCP2515: # pylint:disable=too-many-instance-attributes
"""Library for the MCP2515 CANbus controller"""
"""A common shared-bus protocol."""

def __init__(
self,
Expand All @@ -222,29 +222,35 @@ def __init__(
auto_restart: bool = False,
debug: bool = False
):
"""[summary]

Args:
spi_bus (busio.SPI): The SPI bus used to communicate with the MCP2515
cs_pin (digitalio.DigitalInOut): SPI bus enable pin
baudrate (int, optional): The bit rate of the bus in Hz. All devices on the bus must \
agree on this value. Defaults to 250000.
loopback (bool, optional): When True the rx pin’s value is ignored, and the device \
receives the packets it sends. Defaults to False.
silent (bool, optional): When True the tx pin is always driven to the high logic level.\
This mode can be used to “sniff” a CAN bus without interfering.. Defaults to False.
auto_restart (bool, optional): If True, will restart communications after entering \
bus-off state. Defaults to False.
debug (bool, optional): If True, will enable printing debug information. Defaults to \
False.
"""A common shared-bus protocol.

:param ~busio.SPI spi: The SPI bus used to communicate with the MCP2515
:param ~digitalio.DigitalInOut cs_pin: SPI bus enable pin
:param int baudrate: The bit rate of the bus in Hz, using a 16Mhz crystal. All devices on\
the bus must agree on this value. Defaults to 250000.
:param bool loopback: Receive only packets sent from this device, and send only to this\
device. Requires that `silent` is also set to `True`, but only prevents transmission to\
other devices. Otherwise the send/receive behavior is normal.
:param bool silent: When `True` the controller does not transmit and all messages are\
received, ignoring errors and filters. This mode can be used to “sniff” a CAN bus without\
interfering. Defaults to `False`.
:param bool auto_restart: **Not supported by hardware. An `AttributeError` will be raised\
if `auto_restart` is set to `True`** If `True`, will restart communications after entering\
bus-off state. Defaults to `False`.

:param bool debug: If `True`, will enable printing debug information. Defaults to `False`.
"""
if loopback and silent:
raise AttributeError("only loopback or silent mode can bet set, not both")

if loopback and not silent:
raise AttributeError("Loopback mode requires silent to be set")
if auto_restart:
raise AttributeError("`auto-restart` is not supported by hardware")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be useful to emulate this by restarting at any in_waiting(), receive() or send() call? is the checking expensive (I could believe that)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to defer implementation until its absence is raised as an issue


self._auto_restart = auto_restart
self._debug = debug
self._bus_device_obj = spi_device.SPIDevice(spi_bus, cs_pin)
self._cs_pin = cs_pin
self._buffer = bytearray(255)
self._buffer = bytearray(20)
self._id_buffer = bytearray(4)
self._unread_message_queue = []
self._timer = Timer()
Expand Down Expand Up @@ -324,33 +330,20 @@ def initialize(self):

self._set_mode(new_mode)

def send(self, message_obj, wait_sent=True):
def send(self, message_obj):
"""Send a message on the bus with the given data and id. If the message could not be sent
due to a full fifo or a bus error condition, RuntimeError is raised.

Args:
message (canio.Message): The message to send. Must be a valid `canio.Message`
"""

# TODO: Timeout
tx_buff = self._get_tx_buffer() # info = addr.
if tx_buff is None:
raise RuntimeError("No transmit buffer available to send")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK this does implement what the core docs say. It's not what core does. sigh. We can let some actual experience inform us as to what is preferable, and then implement it across the various chips, I don't want it to hold things back at this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should wait and see how things go.


# TODO: set buffer priority
self._write_message(tx_buff, message_obj)
if not wait_sent:
return True
sleep(0.010)

send_confirmed = False
self._timer.rewind_to(0.005)
while not self._timer.expired:
# the status register address is whatever tx_buff_n is, minus one?
tx_buff_status = self._read_register(tx_buff.CTRL_REG)
self._dbg(_tx_buffer_status_decode(tx_buff_status))
send_confirmed = (tx_buff_status & _TXB_TXREQ_M) == 0
if send_confirmed:
return True

raise RuntimeError("Timeout occoured waiting for transmit confirmation")
return self._write_message(tx_buff, message_obj)

@property
def unread_message_count(self):
Expand All @@ -375,6 +368,9 @@ def read_message(self):
return self._unread_message_queue.pop(0)

def _read_rx_buffer(self, read_command):
for i in len(self._buffer):
self._buffer[i] = 0

# read from buffer
with self._bus_device_obj as spi:
self._buffer[0] = read_command
Expand Down Expand Up @@ -407,7 +403,6 @@ def _read_rx_buffer(self, read_command):
data=bytes(self._buffer[5 : 5 + message_length]),
extended=extended,
)

self._unread_message_queue.append(frame_obj)

def _read_from_rx_buffers(self):
Expand All @@ -427,6 +422,8 @@ def _read_from_rx_buffers(self):

def _write_message(self, tx_buffer, message_obj):

if tx_buffer is None:
raise RuntimeError("No transmit buffer available to send")
if isinstance(message_obj, RemoteTransmissionRequest):
dlc = message_obj.length
else:
Expand Down Expand Up @@ -471,6 +468,7 @@ def _write_message(self, tx_buffer, message_obj):

# send the frame based on the current buffers
self._start_transmit(tx_buffer)
return True

# TODO: Priority
def _start_transmit(self, tx_buffer):
Expand Down Expand Up @@ -830,7 +828,7 @@ def listen(self, matches=None, *, timeout: float = 10):

An empty filter list causes all messages to be accepted.

Timeout dictates how long ``receive()`` and ``next()`` will block.
Timeout dictates how long ``receive()`` will block.

Args:
match (Optional[Sequence[Match]], optional): [description]. Defaults to None.
Expand All @@ -841,6 +839,11 @@ def listen(self, matches=None, *, timeout: float = 10):
"""
if matches is None:
matches = []
elif self.silent and not self.loopback:
raise AttributeError(
"Hardware does not support setting `matches` in when\
`silent`==`True` and `loopback` == `False`"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting restriction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they imagine that if you're in the monitoring mode that you want to "hear" everything?

)

for match in matches:
self._dbg("match:", match)
Expand Down
34 changes: 4 additions & 30 deletions adafruit_mcp2515/canio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
"""Python implementation of the CircuitPython core `canio` API"""
# pylint:disable=too-few-public-methods, invalid-name, redefined-builtin
import time
from ..timer import Timer


class Message:
"""A class representing a message to send on a `canio` bus
"""
"""A class representing a CANbus data frame"""

# pylint:disable=too-many-arguments,invalid-name,redefined-builtin
def __init__(self, id, data, extended=False):
Expand Down Expand Up @@ -50,10 +50,10 @@ def data(self, new_data):


class RemoteTransmissionRequest:
"""RRTTTTRRRR """
"""A class representing a CANbus remote frame"""

def __init__(self, id: int, length: int, *, extended: bool = False):
"""Construct a Message to send on a CAN bus
"""Construct a RemoteTransmissionRequest to send on a CAN bus

Args:
id (int): The numeric ID of the requested message
Expand Down Expand Up @@ -205,29 +205,3 @@ def __init__(self, address: int, *, mask: int = 0, extended: bool = False):
self.address = address
self.mask = mask
self.extended = extended


############# non-api classes and methods
class Timer:
"""A class to track timeouts, like an egg timer
"""

def __init__(self, timeout=0.0):
self._timeout = None
self._start_time = None
if timeout:
self.rewind_to(timeout)

@property
def expired(self):
"""Returns the expiration status of the timer

Returns:
bool: True if more than `timeout` seconds has past since it was set
"""
return (time.monotonic() - self._start_time) > self._timeout

def rewind_to(self, new_timeout):
"""Re-wind the timer to a new timeout and start ticking"""
self._timeout = float(new_timeout)
self._start_time = time.monotonic()
29 changes: 29 additions & 0 deletions adafruit_mcp2515/timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# SPDX-FileCopyrightText: Copyright (c) 2020 Bryan Siepert for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""Provides a simple timer class; see `Timer`"""
from time import monotonic


class Timer:
"""A reusable class to track timeouts, like an egg timer"""

def __init__(self, timeout=0.0):
self._timeout = None
self._start_time = None
if timeout:
self.rewind_to(timeout)

@property
def expired(self):
"""Returns the expiration status of the timer

Returns:
bool: True if more than `timeout` seconds has past since it was set
"""
return (monotonic() - self._start_time) > self._timeout

def rewind_to(self, new_timeout):
"""Re-wind the timer to a new timeout and start ticking"""
self._timeout = float(new_timeout)
self._start_time = monotonic()
12 changes: 11 additions & 1 deletion examples/canio_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# SPDX-FileCopyrightText: Copyright (c) 2020 Jeff Epler for Adafruit Industries
#
# SPDX-License-Identifier: MIT

###################### Board/ Config selection #################
# This is is only on way to make this test work for other boards.
# As an alternative, CAN_TYPE could be set manually and
# board-specific expectations can be if/else'd, and support for a
# new CAN controller or peripheral would start with defining the
# controller-specifc bits

CAN_TYPE = None
try:
from canio import (
Expand Down Expand Up @@ -37,8 +45,10 @@ def builtin_bus_factory():
def builtin_bus_factory():
cs = DigitalInOut(board.D5)
cs.switch_to_output()
return CAN(board.SPI(), cs, baudrate=1000000, loopback=True)
return CAN(board.SPI(), cs, baudrate=1000000, loopback=True, silent=True)


################################################################

max_standard_id = 0x7FF
max_extended_id = 0x1FFFFFFF
Expand Down
2 changes: 1 addition & 1 deletion examples/mcp2515_loopback_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
def bus():
cs = DigitalInOut(CS_PIN)
cs.switch_to_output()
return CAN(SPI(), cs, loopback=True)
return CAN(SPI(), cs, loopback=True, silent=True)


mb1 = [0xDE, 0xAD, 0xBE, 0xEF]
Expand Down
Loading