Skip to content

Fast channel reads via context manager #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions adafruit_ads1x15/ads1x15.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,15 @@ def mode(self, mode):
raise ValueError("Unsupported mode.")
self._mode = mode

def read(self, pin, is_differential=False):
def read(self, pin, is_differential=False, fast=False):
"""I2C Interface for ADS1x15-based ADCs reads.

params:
:param pin: individual or differential pin.
:param bool is_differential: single-ended or differential read.
"""
pin = pin if is_differential else pin + 0x04
return self._read(pin)
return self._read(pin, fast)

def _data_rate_default(self):
"""Retrieve the default data rate for this ADC (in samples per second).
Expand All @@ -147,21 +147,22 @@ def _conversion_value(self, raw_adc):
"""
raise NotImplementedError('Subclass must implement _conversion_value function!')

def _read(self, pin):
def _read(self, pin, fast=False):
"""Perform an ADC read. Returns the signed integer result of the read."""
config = _ADS1X15_CONFIG_OS_SINGLE
config |= (pin & 0x07) << _ADS1X15_CONFIG_MUX_OFFSET
config |= _ADS1X15_CONFIG_GAIN[self.gain]
config |= self.mode
config |= self.rate_config[self.data_rate]
config |= _ADS1X15_CONFIG_COMP_QUE_DISABLE
self._write_register(_ADS1X15_POINTER_CONFIG, config)
if not fast:
config = _ADS1X15_CONFIG_OS_SINGLE
config |= (pin & 0x07) << _ADS1X15_CONFIG_MUX_OFFSET
config |= _ADS1X15_CONFIG_GAIN[self.gain]
config |= self.mode
config |= self.rate_config[self.data_rate]
config |= _ADS1X15_CONFIG_COMP_QUE_DISABLE
self._write_register(_ADS1X15_POINTER_CONFIG, config)

if self.mode == Mode.SINGLE:
while not self._conversion_complete():
pass
if self.mode == Mode.SINGLE:
while not self._conversion_complete():
pass

return self.get_last_result()
return self._conversion_value(self.get_last_result(fast))

def _conversion_complete(self):
"""Return status of ADC conversion."""
Expand All @@ -170,11 +171,13 @@ def _conversion_complete(self):
# OS = 1: Device is not currently performing a conversion
return self._read_register(_ADS1X15_POINTER_CONFIG) & 0x8000

def get_last_result(self):
def get_last_result(self, fast=False):
"""Read the last conversion result when in continuous conversion mode.
Will return a signed integer value.
Will return a signed integer value. If fast is True, the register
pointer is not updated as part of the read. This reduces I2C traffic
and increases possible read rate.
"""
return self._conversion_value(self._read_register(_ADS1X15_POINTER_CONVERSION))
return self._read_register(_ADS1X15_POINTER_CONVERSION, fast)

def _write_register(self, reg, value):
"""Write 16 bit value to register."""
Expand All @@ -184,10 +187,12 @@ def _write_register(self, reg, value):
with self.i2c_device as i2c:
i2c.write(self.buf)

def _read_register(self, reg):
def _read_register(self, reg, fast=False):
"""Read 16 bit register value."""
self.buf[0] = reg
with self.i2c_device as i2c:
i2c.write(self.buf, end=1, stop=False)
i2c.readinto(self.buf, end=2)
if fast:
i2c.readinto(self.buf, end=2)
else:
i2c.write_then_readinto(bytearray([reg]), self.buf, in_end=2, stop=False)
return self.buf[0] << 8 | self.buf[1]
22 changes: 20 additions & 2 deletions adafruit_ads1x15/analog_in.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
class AnalogIn():
"""AnalogIn Mock Implementation for ADC Reads."""

fast_read_channel = None

def __init__(self, ads, positive_pin, negative_pin=None):
"""AnalogIn

Expand All @@ -70,12 +72,28 @@ def __init__(self, ads, positive_pin, negative_pin=None):
@property
def value(self):
"""Returns the value of an ADC pin as an integer."""
return self._ads.read(self._pin_setting,
is_differential=self.is_differential) << (16 - self._ads.bits)
if AnalogIn.fast_read_channel is None:
fast = False
elif AnalogIn.fast_read_channel == self:
fast = True
else:
raise RuntimeError("Fast read in use by another channel.")
return self._ads.read(self._pin_setting, is_differential=self.is_differential, fast=fast)

@property
def voltage(self):
"""Returns the voltage from the ADC pin as a floating point value."""
raw = self.value
volts = raw * (_ADS1X15_PGA_RANGE[self._ads.gain] / (2**(self._ads.bits-1) - 1))
return volts

def __enter__(self):
if AnalogIn.fast_read_channel is not None:
raise RuntimeError("Fast read in use by another channel.")
AnalogIn.fast_read_channel = self
# do a throw away read to set pointer register
self._ads.read(self._pin_setting, is_differential=self.is_differential)
return self

def __exit__(self, *exc):
AnalogIn.fast_read_channel = None
44 changes: 44 additions & 0 deletions examples/ads1x15_fast_read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import time
import board
import busio
import adafruit_ads1x15.ads1015 as ADS
from adafruit_ads1x15.ads1x15 import Mode
from adafruit_ads1x15.analog_in import AnalogIn

# Data collection setup
RATE = 3300
SAMPLES = 1000

# Create the I2C bus
# Set frequency high to reduce time spent with I2C comms
i2c = busio.I2C(board.SCL, board.SDA, frequency=1000000)

# Create the ADC object using the I2C bus
ads = ADS.ADS1015(i2c)

# Create single-ended input on channel 0
chan0 = AnalogIn(ads, ADS.P0)

# ADC Configuration
ads.mode = Mode.CONTINUOUS
ads.data_rate = RATE

# Create list to store data samples
data = [None]*SAMPLES

start = time.monotonic()

# Use a context manager to utilize fast reading of a channel
with chan0 as chan:
# no other channels can be accessed inside here
for i in range(SAMPLES):
data[i] = chan.value
# some form of conversion complete synchronization
# should go here, but currently there is no efficient
# way to do this

end = time.monotonic()
total_time = end - start

print("Time of capture: {}s".format(total_time))
print("Sample rate requested={} actual={}".format(RATE, SAMPLES / total_time))