From 68e406ee851e2086f445f073368c4b3b46b6281b Mon Sep 17 00:00:00 2001 From: Nick Hehr Date: Tue, 31 Dec 2024 14:09:21 -0500 Subject: [PATCH] fix: initialize hardware during reconfigure, move out of get_readings (#10) --- src/mcp300x.py | 59 +++++++++++++++++++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 18 deletions(-) diff --git a/src/mcp300x.py b/src/mcp300x.py index 85ce64d..fb25cc5 100644 --- a/src/mcp300x.py +++ b/src/mcp300x.py @@ -1,4 +1,4 @@ -from typing import ClassVar, Mapping, Any, Dict, Optional, List, cast +from typing import ClassVar, Mapping, Any, Optional from typing_extensions import Self from viam.module.types import Reconfigurable @@ -17,6 +17,7 @@ LOGGER = getLogger(__name__) + class mcp3xxx(Sensor, Reconfigurable): # Defines new model's colon-delimited-triplet MODEL: ClassVar[Model] = Model(ModelFamily("viamlabs", "sensor"), "mcp300x") @@ -26,7 +27,9 @@ class mcp3xxx(Sensor, Reconfigurable): # Constructor @classmethod - def new(cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]) -> Self: + def new( + cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase] + ) -> Self: sensor = cls(config.name) sensor.reconfigure(config, dependencies) return sensor @@ -41,45 +44,65 @@ def validate(cls, config: ComponentConfig): raise NameError("A sensor_pin must be defined") if channel_map == "": - raise NameError("Channel map must be defined, refers to sensor type and which channel it connects to") + raise NameError( + "Channel map must be defined, refers to sensor type and which channel it connects to" + ) return # Handles attribute reconfiguration - def reconfigure(self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]): + def reconfigure( + self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase] + ): # Initializes the resource instance self.sensor_pin = int(config.attributes.fields["sensor_pin"].number_value) self.channel_map = dict(config.attributes.fields["channel_map"].struct_value) - LOGGER.info(f"Channel map is {self.channel_map}") + LOGGER.debug(f"Channel map is {self.channel_map}") + + if getattr(self, "spi") is not None: + self.spi.deinit() + + if getattr(self, "cs") is not None: + self.cs.deinit() + + # Sensor Pin Logic + # Creates the SPI bus + self.spi = busio.SPI(clock=board.SCK, MISO=board.MISO, MOSI=board.MOSI) + + # Creates the cs (chip select) with a gpio pin variable, we are using 24 GPIO 8 (SPI Chip Select 0), so 8 for the config + cs_pin = f"D{self.sensor_pin}" + self.cs = digitalio.DigitalInOut(getattr(board, cs_pin)) + + # Creates the MCP3008 object, works with MCP3002 and MCP3004 since it is all encompassing + self.mcp = MCP.MCP3008(self.spi, self.cs) return """ Implements the methods the Viam RDK defines for the Sensor API (rdk:components:sensor) """ + # Implements the Viam Sensor API's get_readings() method async def get_readings( - self, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None, **kwargs + self, + *, + extra: Optional[Mapping[str, Any]] = None, + timeout: Optional[float] = None, + **kwargs, ) -> Mapping[str, Any]: """ Obtains the measurements/data specific to this sensor. Returns: Mapping[str, Any]: The measurements. Can be of any type. """ - # Sensor Pin Logic - # Creates the SPI bus - spi = busio.SPI(clock=board.SCK, MISO=board.MISO, MOSI=board.MOSI) - - # Creates the cs (chip select) with a gpio pin variable, we are using 24 GPIO 8 (SPI Chip Select 0), so 8 for the config - my_pin = f"D{self.sensor_pin}" - cs = digitalio.DigitalInOut(getattr(board, my_pin)) - - # Creates the MCP3008 object, works with MCP3002 and MCP3004 since it is all encompassing - mcp = MCP.MCP3008(spi, cs) readings = {} # Iterates over values for label, channel in self.channel_map.items(): - LOGGER.info(f"loop channel is {channel} and loop label is {label}") + LOGGER.debug(f"loop channel is {channel} and loop label is {label}") # Create an analog input channel chan = int(channel) - readings[label] = mcp.read(chan) + readings[label] = self.mcp.read(chan) return readings + + async def close(self): + self.spi.deinit() + self.cs.deinit()