From 182756bc1fedccf649973b7f3fe34083f70e0145 Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Mon, 29 Sep 2025 15:12:58 +0200 Subject: [PATCH 01/16] refactor: initial Python class definition --- spidev/__init__.py | 9 ++++++ spidev_module.c => spidev/_cspi.c | 32 +------------------ spidev/_cspi.pyi | 48 +++++++++++++++++++++++++++++ spidev/_spi.py | 51 +++++++++++++++++++++++++++++++ 4 files changed, 109 insertions(+), 31 deletions(-) create mode 100644 spidev/__init__.py rename spidev_module.c => spidev/_cspi.c (98%) create mode 100644 spidev/_cspi.pyi create mode 100644 spidev/_spi.py diff --git a/spidev/__init__.py b/spidev/__init__.py new file mode 100644 index 0000000..a1a9875 --- /dev/null +++ b/spidev/__init__.py @@ -0,0 +1,9 @@ +""" +TODO +""" + +from ._spi import SpiDev + +__version__ = "" + +__all__ = ("SpiDev",) diff --git a/spidev_module.c b/spidev/_cspi.c similarity index 98% rename from spidev_module.c rename to spidev/_cspi.c index 6793266..921656f 100644 --- a/spidev_module.c +++ b/spidev/_cspi.c @@ -1,5 +1,5 @@ /* - * spidev_module.c - Python bindings for Linux SPI access through spidev + * _cspi.c - Python bindings for Linux SPI access through spidev * * MIT License * @@ -1433,32 +1433,6 @@ PyDoc_STRVAR(SpiDevObjectType_doc, "Return a new SPI object that is (optionally) connected to the\n" "specified SPI device interface.\n"); -static -PyObject *SpiDev_enter(PyObject *self, PyObject *args) -{ - if (!PyArg_ParseTuple(args, "")) - return NULL; - - Py_INCREF(self); - return self; -} - -static -PyObject *SpiDev_exit(SpiDevObject *self, PyObject *args) -{ - - PyObject *exc_type = 0; - PyObject *exc_value = 0; - PyObject *traceback = 0; - if (!PyArg_UnpackTuple(args, "__exit__", 3, 3, &exc_type, &exc_value, - &traceback)) { - return 0; - } - - SpiDev_close(self); - Py_RETURN_FALSE; -} - static PyMethodDef SpiDev_methods[] = { {"open", (PyCFunction)SpiDev_open, METH_VARARGS | METH_KEYWORDS, SpiDev_open_doc}, @@ -1480,10 +1454,6 @@ static PyMethodDef SpiDev_methods[] = { SpiDev_xfer2_doc}, {"xfer3", (PyCFunction)SpiDev_xfer3, METH_VARARGS, SpiDev_xfer3_doc}, - {"__enter__", (PyCFunction)SpiDev_enter, METH_VARARGS, - NULL}, - {"__exit__", (PyCFunction)SpiDev_exit, METH_VARARGS, - NULL}, {NULL}, }; diff --git a/spidev/_cspi.pyi b/spidev/_cspi.pyi new file mode 100644 index 0000000..6a2533c --- /dev/null +++ b/spidev/_cspi.pyi @@ -0,0 +1,48 @@ +# I generated this with ChatGPT and think it's mostly correct. It is not meant +# to be permanent and is their to help with uplifting most of the C module's +# code to a .py file while making sure the properties and method signature stay +# the same. +from typing import Any, Sequence, List, Tuple, Union + +class SpiDev: + def __init__(self, bus: int | None = ..., client: int | None = ...) -> None: ... + def open(self, bus: int, device: int) -> None: ... + def open_path(self, path: str) -> None: ... + def close(self) -> None: ... + def fileno(self) -> int: ... + def readbytes(self, length: int) -> List[int]: ... + def writebytes(self, values: Sequence[int]) -> None: ... + def writebytes2(self, values: Union[Sequence[int], bytes, bytearray, memoryview]) -> None: ... + def xfer( + self, + values: Sequence[int], + speed_hz: int | None = ..., + delay_usecs: int | None = ..., + bits_per_word: int | None = ... + ) -> List[int]: ... + def xfer2( + self, + values: Sequence[int], + speed_hz: int | None = ..., + delay_usecs: int | None = ..., + bits_per_word: int | None = ... + ) -> List[int]: ... + def xfer3( + self, + values: Sequence[int], + speed_hz: int | None = ..., + delay_usecs: int | None = ..., + bits_per_word: int | None = ... + ) -> Tuple[int, ...]: ... + + mode: int + cshigh: bool + threewire: bool + lsbfirst: bool + loop: bool + no_cs: bool + bits_per_word: int + max_speed_hz: int + read0: bool + +__version__: str diff --git a/spidev/_spi.py b/spidev/_spi.py new file mode 100644 index 0000000..2536197 --- /dev/null +++ b/spidev/_spi.py @@ -0,0 +1,51 @@ +from . import _cspi + +from types import TracebackType +from typing import Self + + +class SpiDev(_cspi.SpiDev): + def __init__( + self, + bus: int | None = None, + device: int | None = None, + *, + path: StrPath | None = None, + mode: int | None = None, + bits_per_word: int | None = None, + max_speed_hz: int | None = None, + ): + super().__init__(bus, device) + + self.bus = bus + self.device = device + super().__init__(bus, client) + + def __enter__(self) -> Self: + """ + Warning: The `bus` and `device` attributes have to be set to open the + connection: + + ``` + spi = SpiDev(bus=0, device=1) + # or + spi = SpiDev() + spi.bus = 0 + spi.device = 1 + # or + with spi: + spi.open(0, 1) + ... + """ + if self.bus and self.device: + super().open(self.bus, self.device) + + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + super().close() From 02ea45ab8d262ccd2b33361e49e8acbc134b0794 Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Mon, 29 Sep 2025 16:07:37 +0200 Subject: [PATCH 02/16] refactor(fileno)!: raise ValueError if connection not open This is the same behavior as other Python I/O stream's `fileno()` method. BREAKING CHANGE: ValueError instead of -1 if the SPI device connection is not open. --- spidev/_cspi.pyi | 78 +++++++++++++++++++++++++----------------------- spidev/_spi.py | 14 +++++++++ 2 files changed, 54 insertions(+), 38 deletions(-) diff --git a/spidev/_cspi.pyi b/spidev/_cspi.pyi index 6a2533c..da4bd80 100644 --- a/spidev/_cspi.pyi +++ b/spidev/_cspi.pyi @@ -5,44 +5,46 @@ from typing import Any, Sequence, List, Tuple, Union class SpiDev: - def __init__(self, bus: int | None = ..., client: int | None = ...) -> None: ... - def open(self, bus: int, device: int) -> None: ... - def open_path(self, path: str) -> None: ... - def close(self) -> None: ... - def fileno(self) -> int: ... - def readbytes(self, length: int) -> List[int]: ... - def writebytes(self, values: Sequence[int]) -> None: ... - def writebytes2(self, values: Union[Sequence[int], bytes, bytearray, memoryview]) -> None: ... - def xfer( - self, - values: Sequence[int], - speed_hz: int | None = ..., - delay_usecs: int | None = ..., - bits_per_word: int | None = ... - ) -> List[int]: ... - def xfer2( - self, - values: Sequence[int], - speed_hz: int | None = ..., - delay_usecs: int | None = ..., - bits_per_word: int | None = ... - ) -> List[int]: ... - def xfer3( - self, - values: Sequence[int], - speed_hz: int | None = ..., - delay_usecs: int | None = ..., - bits_per_word: int | None = ... - ) -> Tuple[int, ...]: ... + def __init__(self, bus: int | None = ..., client: int | None = ...) -> None: ... + def open(self, bus: int, device: int) -> None: ... + def open_path(self, path: str) -> None: ... + def close(self) -> None: ... + def fileno(self) -> int: ... + def readbytes(self, length: int) -> List[int]: ... + def writebytes(self, values: Sequence[int]) -> None: ... + def writebytes2( + self, values: Union[Sequence[int], bytes, bytearray, memoryview] + ) -> None: ... + def xfer( + self, + values: Sequence[int], + speed_hz: int | None = ..., + delay_usecs: int | None = ..., + bits_per_word: int | None = ..., + ) -> List[int]: ... + def xfer2( + self, + values: Sequence[int], + speed_hz: int | None = ..., + delay_usecs: int | None = ..., + bits_per_word: int | None = ..., + ) -> List[int]: ... + def xfer3( + self, + values: Sequence[int], + speed_hz: int | None = ..., + delay_usecs: int | None = ..., + bits_per_word: int | None = ..., + ) -> Tuple[int, ...]: ... - mode: int - cshigh: bool - threewire: bool - lsbfirst: bool - loop: bool - no_cs: bool - bits_per_word: int - max_speed_hz: int - read0: bool + mode: int + cshigh: bool + threewire: bool + lsbfirst: bool + loop: bool + no_cs: bool + bits_per_word: int + max_speed_hz: int + read0: bool __version__: str diff --git a/spidev/_spi.py b/spidev/_spi.py index 2536197..c01950f 100644 --- a/spidev/_spi.py +++ b/spidev/_spi.py @@ -21,6 +21,20 @@ def __init__( self.device = device super().__init__(bus, client) + def fileno(self) -> int: + """Return the file descriptor if it exists. + + Returns: + int: File descriptor number. + + Raises: + ValueError: if the connection is not open. + """ + fd = super().fileno() + if fd < 0: + raise ValueError("I/O operation on closed file") + return fd + def __enter__(self) -> Self: """ Warning: The `bus` and `device` attributes have to be set to open the From 296a5dc93f4911f246b2a0f1cc7e430d236e7ff5 Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Mon, 29 Sep 2025 17:16:57 +0200 Subject: [PATCH 03/16] refactor(mode)!: validate mode property in Python class BREAKING CHANGE: Raises ValueError if mode is not between 0 and 3. --- spidev/_cspi.c | 22 +--------------------- spidev/_spi.py | 24 +++++++++++++++++++++++- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/spidev/_cspi.c b/spidev/_cspi.c index 921656f..2bcc4d4 100644 --- a/spidev/_cspi.c +++ b/spidev/_cspi.c @@ -1012,28 +1012,8 @@ SpiDev_set_mode(SpiDevObject *self, PyObject *val, void *closure) "Cannot delete attribute"); return -1; } -#if PY_MAJOR_VERSION < 3 - if (PyInt_Check(val)) { - mode = PyInt_AS_LONG(val); - } else -#endif - { - if (PyLong_Check(val)) { - mode = PyLong_AS_LONG(val); - } else { - PyErr_SetString(PyExc_TypeError, - "The mode attribute must be an integer"); - return -1; - } - } - - if ( mode > 3 ) { - PyErr_SetString(PyExc_TypeError, - "The mode attribute must be an integer" - "between 0 and 3."); - return -1; - } + mode = PyLong_AS_LONG(val); // clean and set CPHA and CPOL bits tmp = ( self->mode & ~(SPI_CPHA | SPI_CPOL) ) | mode ; diff --git a/spidev/_spi.py b/spidev/_spi.py index c01950f..d00e9e1 100644 --- a/spidev/_spi.py +++ b/spidev/_spi.py @@ -19,7 +19,29 @@ def __init__( self.bus = bus self.device = device - super().__init__(bus, client) + if mode is not None: + self.mode = mode + + @property + def mode(self) -> int: + """SPI mode. + + A two bit pattern of clock polarity and phase [CPOL|CPHA], + min: 0b00 = 0, max: 0b11 = 3 + """ + return super().mode + + @mode.setter + def mode(self, value: int) -> None: + try: + v = int(value) + except (TypeError, ValueError): + raise TypeError(f"mode must be an integer, but is {value}") + + if not 0 <= v <= 3: + raise ValueError(f"mode must be between 0 and 3, but is {v}") + + super().__setattr__("mode", v) def fileno(self) -> int: """Return the file descriptor if it exists. From 8cde1bfd9f16731a7de30a34116d50e4e937ed58 Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Mon, 29 Sep 2025 17:17:57 +0200 Subject: [PATCH 04/16] feat: add I/O-like methods closed, readable, read, writeable, write --- spidev/_cspi.pyi | 5 ++--- spidev/_spi.py | 51 ++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/spidev/_cspi.pyi b/spidev/_cspi.pyi index da4bd80..b985fec 100644 --- a/spidev/_cspi.pyi +++ b/spidev/_cspi.pyi @@ -2,6 +2,7 @@ # to be permanent and is their to help with uplifting most of the C module's # code to a .py file while making sure the properties and method signature stay # the same. +from collections.abc import Buffer from typing import Any, Sequence, List, Tuple, Union class SpiDev: @@ -12,9 +13,7 @@ class SpiDev: def fileno(self) -> int: ... def readbytes(self, length: int) -> List[int]: ... def writebytes(self, values: Sequence[int]) -> None: ... - def writebytes2( - self, values: Union[Sequence[int], bytes, bytearray, memoryview] - ) -> None: ... + def writebytes2(self, values: Union[Sequence[int], Buffer]) -> None: ... def xfer( self, values: Sequence[int], diff --git a/spidev/_spi.py b/spidev/_spi.py index d00e9e1..14a9f7f 100644 --- a/spidev/_spi.py +++ b/spidev/_spi.py @@ -1,7 +1,11 @@ from . import _cspi -from types import TracebackType -from typing import Self +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from types import TracebackType + from typing import Self, Union, Sequence + from collections.abc import Buffer class SpiDev(_cspi.SpiDev): @@ -43,6 +47,14 @@ def mode(self, value: int) -> None: super().__setattr__("mode", v) + def closed(self) -> bool: + """True if the connection is closed.""" + try: + self.fileno() + return True + except ValueError: + return False + def fileno(self) -> int: """Return the file descriptor if it exists. @@ -57,10 +69,40 @@ def fileno(self) -> int: raise ValueError("I/O operation on closed file") return fd + def read(self, size: int | None = None, /) -> list[int]: + """Read and return up to _size_ bytes. + + If size is omitted or negative, 1 byte is read. + + Returns: + list[int]: _size_ number of bytes. + """ + if not self.readable(): + raise OSError("SPI device not readable") + # TODO: negative size generally means "read as much as possible". How + # can we mimic this behavior? + if size is None or size < 1: + size = 1 + return super().readbytes(size) + + def readable(self) -> bool: + """True if the SPI connection is currently open.""" + return not self.closed() + + def writeable(self) -> bool: + """True if the SPI connection is currently open.""" + return not self.closed() + + def write(self, b: Sequence[int] | Buffer, /) -> None: + if not self.writeable(): + raise OSError("SPI device not writeable") + # TODO: return number of bytes written + super().writebytes2(b) + def __enter__(self) -> Self: """ - Warning: The `bus` and `device` attributes have to be set to open the - connection: + Warning: The `bus` and `device` attributes must be set to open the + connection automatically: ``` spi = SpiDev(bus=0, device=1) @@ -72,6 +114,7 @@ def __enter__(self) -> Self: with spi: spi.open(0, 1) ... + ``` """ if self.bus and self.device: super().open(self.bus, self.device) From b003ea96b053206774362dda462446eca8f6e321 Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Mon, 29 Sep 2025 17:53:01 +0200 Subject: [PATCH 05/16] refactor(open): open() can use instance attributes bus/device or path Calling `open()` without arguments can use the instance attributes `bus` and `device` or `path` to call `open(bus, device)` or `open_path(path` from the super class respectively. Thus the following options are available: ```python SpiDev().open(0, 1) SpiDev(bus=0, device=1).open() SpiDev(bus=1, device=1).open(0, 0) # args to open() overwrite attributes SpiDev(path='/dev/myspi').open() # calls open_path('/dev/myspi') ``` --- spidev/_cspi.c | 30 +----------------------- spidev/_cspi.pyi | 1 - spidev/_spi.py | 59 +++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 57 insertions(+), 33 deletions(-) diff --git a/spidev/_cspi.c b/spidev/_cspi.c index 2bcc4d4..2675633 100644 --- a/spidev/_cspi.c +++ b/spidev/_cspi.c @@ -1366,27 +1366,7 @@ SpiDev_open_path(SpiDevObject *self, PyObject *args, PyObject *kwds) } -PyDoc_STRVAR(SpiDev_open_doc, - "open(bus, device)\n\n" - "Connects the object to the specified SPI device.\n" - "open(X,Y) will open /dev/spidev.\n"); - -static PyObject * -SpiDev_open(SpiDevObject *self, PyObject *args, PyObject *kwds) -{ - int bus, device; - char path[SPIDEV_MAXPATH]; - static char *kwlist[] = {"bus", "device", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "ii:open", kwlist, &bus, &device)) - return NULL; - if (snprintf(path, SPIDEV_MAXPATH, "/dev/spidev%d.%d", bus, device) >= SPIDEV_MAXPATH) { - PyErr_SetString(PyExc_OverflowError, - "Bus and/or device number is invalid."); - return NULL; - } - return SpiDev_open_dev(self, path); -} - +// TODO: not necessary anymore? static int SpiDev_init(SpiDevObject *self, PyObject *args, PyObject *kwds) { @@ -1398,12 +1378,6 @@ SpiDev_init(SpiDevObject *self, PyObject *args, PyObject *kwds) kwlist, &bus, &client)) return -1; - if (bus >= 0) { - SpiDev_open(self, args, kwds); - if (PyErr_Occurred()) - return -1; - } - return 0; } @@ -1414,8 +1388,6 @@ PyDoc_STRVAR(SpiDevObjectType_doc, "specified SPI device interface.\n"); static PyMethodDef SpiDev_methods[] = { - {"open", (PyCFunction)SpiDev_open, METH_VARARGS | METH_KEYWORDS, - SpiDev_open_doc}, {"open_path", (PyCFunction)SpiDev_open_path, METH_VARARGS | METH_KEYWORDS, SpiDev_open_path_doc}, {"close", (PyCFunction)SpiDev_close, METH_NOARGS, diff --git a/spidev/_cspi.pyi b/spidev/_cspi.pyi index b985fec..e678434 100644 --- a/spidev/_cspi.pyi +++ b/spidev/_cspi.pyi @@ -7,7 +7,6 @@ from typing import Any, Sequence, List, Tuple, Union class SpiDev: def __init__(self, bus: int | None = ..., client: int | None = ...) -> None: ... - def open(self, bus: int, device: int) -> None: ... def open_path(self, path: str) -> None: ... def close(self) -> None: ... def fileno(self) -> int: ... diff --git a/spidev/_spi.py b/spidev/_spi.py index 14a9f7f..3909205 100644 --- a/spidev/_spi.py +++ b/spidev/_spi.py @@ -3,10 +3,13 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: + from os import PathLike from types import TracebackType - from typing import Self, Union, Sequence + from typing import Self, Union, Sequence, overload from collections.abc import Buffer + StrPath = Union[str, PathLike[str]] + class SpiDev(_cspi.SpiDev): def __init__( @@ -26,6 +29,31 @@ def __init__( if mode is not None: self.mode = mode + if path and (bus or device): + raise ValueError( + "both path and bus/device number of SPI device are provided" + ) + self.path = path + + if mode is not None: + self.mode = mode + if bits_per_word is not None: + self.bits_per_word = bits_per_word + if max_speed_hz is not None: + self.max_speed_hz = max_speed_hz + if read0 is not None: + super().__setattr__("read0", read0) + + # TODO: open() here? It's what the original implementation did + self.open() + + def _resolve_path(self) -> str: + if self.bus is not None and self.device is not None: + return "/dev/spidev{:d}.{:d}".format(self.bus, self.device) + elif self.path is not None: + return str(self.path) + raise ValueError("bus/device or path not set") + @property def mode(self) -> int: """SPI mode. @@ -69,6 +97,29 @@ def fileno(self) -> int: raise ValueError("I/O operation on closed file") return fd + @overload + def open(self) -> None: ... + @overload + def open(self, bus: int, device: int) -> None: ... + def open(self, bus: int | None = None, device: int | None = None) -> None: + """Connect to the SPI device special file. + + If bus and device are provided it opens "/dev/spidev". If + path is provided it opens the SPI device at given path. Symbolic links + are followed. + + Raises: + ValueError: If bus/device or path is not provided. + """ + if bus: + self.bus = bus + if device: + self.device = device + + path = self._resolve_path() + super().open_path(path) + # TODO: return and set fd + def read(self, size: int | None = None, /) -> list[int]: """Read and return up to _size_ bytes. @@ -116,8 +167,10 @@ def __enter__(self) -> Self: ... ``` """ - if self.bus and self.device: - super().open(self.bus, self.device) + try: + self.open() + except ValueError: + pass return self From a35b0ded88835e19adcdc82532f43c76f9ce5fc8 Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Mon, 29 Sep 2025 18:59:54 +0200 Subject: [PATCH 06/16] refactor(bits_per_word)!: validate property in Python class BREAKING CHANGE: bits_per_word raises a ValueError instead of a TypeError when its value is not between 8 and 32. --- spidev/_cspi.c | 22 +--------------------- spidev/_spi.py | 23 +++++++++++++++++++++-- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/spidev/_cspi.c b/spidev/_cspi.c index 2675633..0018886 100644 --- a/spidev/_cspi.c +++ b/spidev/_cspi.c @@ -1182,33 +1182,13 @@ SpiDev_get_bits_per_word(SpiDevObject *self, void *closure) static int SpiDev_set_bits_per_word(SpiDevObject *self, PyObject *val, void *closure) { - uint8_t bits; - if (val == NULL) { PyErr_SetString(PyExc_TypeError, "Cannot delete attribute"); return -1; } -#if PY_MAJOR_VERSION < 3 - if (PyInt_Check(val)) { - bits = PyInt_AS_LONG(val); - } else -#endif - { - if (PyLong_Check(val)) { - bits = PyLong_AS_LONG(val); - } else { - PyErr_SetString(PyExc_TypeError, - "The bits_per_word attribute must be an integer"); - return -1; - } - } - if (bits < 8 || bits > 32) { - PyErr_SetString(PyExc_TypeError, - "invalid bits_per_word (8 to 32)"); - return -1; - } + uint8_t bits = PyLong_AS_LONG(val); if (self->bits_per_word != bits) { if (ioctl(self->fd, SPI_IOC_WR_BITS_PER_WORD, &bits) == -1) { diff --git a/spidev/_spi.py b/spidev/_spi.py index 3909205..80cdaa5 100644 --- a/spidev/_spi.py +++ b/spidev/_spi.py @@ -28,6 +28,8 @@ def __init__( self.device = device if mode is not None: self.mode = mode + if bits_per_word is not None: + self.bits_per_word = bits_per_word if path and (bus or device): raise ValueError( @@ -64,17 +66,34 @@ def mode(self) -> int: return super().mode @mode.setter - def mode(self, value: int) -> None: + def mode(self, value: int, /) -> None: try: v = int(value) except (TypeError, ValueError): - raise TypeError(f"mode must be an integer, but is {value}") + raise TypeError(f"mode must be an integer, but is {type(value)}") if not 0 <= v <= 3: raise ValueError(f"mode must be between 0 and 3, but is {v}") super().__setattr__("mode", v) + @property + def bits_per_word(self) -> int: + """Bits per word used in the xfer methods.""" + return super().bits_per_word + + @bits_per_word.setter + def bits_per_word(self, value: int, /) -> None: + try: + v = int(value) + except (TypeError, ValueError): + raise TypeError(f"bits_per_word must be an integer, but is {type(value)}") + + if not (8 <= value <= 32): + raise ValueError(f"bits_per_word must be between 8 and 32, but is {v}") + + super().__setattr__("bits_per_word", v) + def closed(self) -> bool: """True if the connection is closed.""" try: From d585d9f4ae8bb27bda7b700fd8ed74b58de58963 Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Mon, 29 Sep 2025 19:08:53 +0200 Subject: [PATCH 07/16] refactor(max_speed_hz): validate property in Python class --- spidev/_cspi.c | 17 ++--------------- spidev/_spi.py | 35 +++++++++++++++++++++-------------- 2 files changed, 23 insertions(+), 29 deletions(-) diff --git a/spidev/_cspi.c b/spidev/_cspi.c index 0018886..895cb83 100644 --- a/spidev/_cspi.c +++ b/spidev/_cspi.c @@ -1211,27 +1211,14 @@ SpiDev_get_max_speed_hz(SpiDevObject *self, void *closure) static int SpiDev_set_max_speed_hz(SpiDevObject *self, PyObject *val, void *closure) { - uint32_t max_speed_hz; if (val == NULL) { PyErr_SetString(PyExc_TypeError, "Cannot delete attribute"); return -1; } -#if PY_MAJOR_VERSION < 3 - if (PyInt_Check(val)) { - max_speed_hz = PyInt_AS_LONG(val); - } else -#endif - { - if (PyLong_Check(val)) { - max_speed_hz = PyLong_AS_LONG(val); - } else { - PyErr_SetString(PyExc_TypeError, - "The max_speed_hz attribute must be an integer"); - return -1; - } - } + + uint32_t max_speed_hz = PyLong_AS_LONG(val); if (self->max_speed_hz != max_speed_hz) { if (ioctl(self->fd, SPI_IOC_WR_MAX_SPEED_HZ, &max_speed_hz) == -1) { diff --git a/spidev/_spi.py b/spidev/_spi.py index 80cdaa5..d917d3b 100644 --- a/spidev/_spi.py +++ b/spidev/_spi.py @@ -5,12 +5,20 @@ if TYPE_CHECKING: from os import PathLike from types import TracebackType - from typing import Self, Union, Sequence, overload + from typing import Self, Union, Sequence, overload, Any from collections.abc import Buffer StrPath = Union[str, PathLike[str]] +def as_int(val: Any, name: str = "Value") -> int: + try: + v = int(val) + except (TypeError, ValueError): + raise TypeError(f"{name} must be an integer, but is {type(val)}") + return v + + class SpiDev(_cspi.SpiDev): def __init__( self, @@ -26,11 +34,6 @@ def __init__( self.bus = bus self.device = device - if mode is not None: - self.mode = mode - if bits_per_word is not None: - self.bits_per_word = bits_per_word - if path and (bus or device): raise ValueError( "both path and bus/device number of SPI device are provided" @@ -67,10 +70,7 @@ def mode(self) -> int: @mode.setter def mode(self, value: int, /) -> None: - try: - v = int(value) - except (TypeError, ValueError): - raise TypeError(f"mode must be an integer, but is {type(value)}") + v = as_int(value, "mode") if not 0 <= v <= 3: raise ValueError(f"mode must be between 0 and 3, but is {v}") @@ -84,16 +84,23 @@ def bits_per_word(self) -> int: @bits_per_word.setter def bits_per_word(self, value: int, /) -> None: - try: - v = int(value) - except (TypeError, ValueError): - raise TypeError(f"bits_per_word must be an integer, but is {type(value)}") + v = as_int(value, "bits_per_word") if not (8 <= value <= 32): raise ValueError(f"bits_per_word must be between 8 and 32, but is {v}") super().__setattr__("bits_per_word", v) + @property + def max_speed_hz(self) -> int: + """Max speed (in Hertz).""" + return super().max_speed_hz + + @max_speed_hz.setter + def max_speed_hz(self, value: int, /) -> None: + v = as_int(value, "max_speed_hz") + super().__setattr__("max_speed_hz", v) + def closed(self) -> bool: """True if the connection is closed.""" try: From 3e0913468bc953fdb54672e96f4313912e2853fa Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Mon, 29 Sep 2025 19:58:14 +0200 Subject: [PATCH 08/16] fix: Small fixes --- spidev/_spi.py | 48 ++++++++++++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/spidev/_spi.py b/spidev/_spi.py index d917d3b..4f99955 100644 --- a/spidev/_spi.py +++ b/spidev/_spi.py @@ -13,13 +13,20 @@ def as_int(val: Any, name: str = "Value") -> int: try: - v = int(val) + return int(val) except (TypeError, ValueError): raise TypeError(f"{name} must be an integer, but is {type(val)}") - return v class SpiDev(_cspi.SpiDev): + """TODO. + + Examples: + >>> SpiDev(0, 1) # connect to /dev/spidev0.1 + + >>> SpiDev(path='/dev/myspi') # connect to /dev/myspi + """ + def __init__( self, bus: int | None = None, @@ -29,6 +36,7 @@ def __init__( mode: int | None = None, bits_per_word: int | None = None, max_speed_hz: int | None = None, + read0: bool | None = None, ): super().__init__(bus, device) @@ -75,6 +83,7 @@ def mode(self, value: int, /) -> None: if not 0 <= v <= 3: raise ValueError(f"mode must be between 0 and 3, but is {v}") + # TODO: needs investigation if this works super().__setattr__("mode", v) @property @@ -89,6 +98,7 @@ def bits_per_word(self, value: int, /) -> None: if not (8 <= value <= 32): raise ValueError(f"bits_per_word must be between 8 and 32, but is {v}") + # TODO: needs investigation if this works super().__setattr__("bits_per_word", v) @property @@ -99,6 +109,7 @@ def max_speed_hz(self) -> int: @max_speed_hz.setter def max_speed_hz(self, value: int, /) -> None: v = as_int(value, "max_speed_hz") + # TODO: needs investigation if this works super().__setattr__("max_speed_hz", v) def closed(self) -> bool: @@ -125,8 +136,10 @@ def fileno(self) -> int: @overload def open(self) -> None: ... + @overload def open(self, bus: int, device: int) -> None: ... + def open(self, bus: int | None = None, device: int | None = None) -> None: """Connect to the SPI device special file. @@ -156,8 +169,8 @@ def read(self, size: int | None = None, /) -> list[int]: """ if not self.readable(): raise OSError("SPI device not readable") - # TODO: negative size generally means "read as much as possible". How - # can we mimic this behavior? + # TODO: negative size in BaseIO.read() means "read as much as + # possible". How can we mimic this behavior? if size is None or size < 1: size = 1 return super().readbytes(size) @@ -178,20 +191,8 @@ def write(self, b: Sequence[int] | Buffer, /) -> None: def __enter__(self) -> Self: """ - Warning: The `bus` and `device` attributes must be set to open the - connection automatically: - - ``` - spi = SpiDev(bus=0, device=1) - # or - spi = SpiDev() - spi.bus = 0 - spi.device = 1 - # or - with spi: - spi.open(0, 1) - ... - ``` + Warning: If `bus` and `device` attributes or `path` attribute is not set, + the file has to be opened manually using the `open()` method. """ try: self.open() @@ -207,3 +208,14 @@ def __exit__( traceback: TracebackType | None, ) -> None: super().close() + + def __str__(self) -> str: + if self.bus is not None and self.device is not None: + return f"{self.__class__.__name__}({self.bus}, {self.device})" + elif self.path is not None: + return f"{self.__class__.__name__}({self.path})" + else: + return f"{self.__class__.__name__}()" + + # def __repr__(self): + # args = [f'' for a in ('bus', 'device', 'path', ) From 862aff1cdea897aec50c2a610334c21a1dc30f8c Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Mon, 29 Sep 2025 20:27:35 +0200 Subject: [PATCH 09/16] feat: add __repr__ and __eq__ --- spidev/_spi.py | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/spidev/_spi.py b/spidev/_spi.py index 4f99955..17e3a9d 100644 --- a/spidev/_spi.py +++ b/spidev/_spi.py @@ -217,5 +217,29 @@ def __str__(self) -> str: else: return f"{self.__class__.__name__}()" - # def __repr__(self): - # args = [f'' for a in ('bus', 'device', 'path', ) + def __repr__(self) -> str: + # SpiDev(bus=0, device=1, bits_per_word=8) + args = ", ".join( + f"{a}={getattr(self, a)}" + for a in ( + "bus", + "device", + "path", + "mode", + "bits_per_word", + "max_speed_hz", + "read0", + ) + if getattr(self, a) is not None + ) + return f"{self.__class__.__name__}({args})" + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, type(self)): + return False + + try: + return self._resolve_path() == other._resolve_path() + except ValueError: + # return False if one of the instances is uninitiated + return False From 7b2d353c7bbba148065250300edb0a5baa3677ec Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Mon, 29 Sep 2025 23:13:25 +0200 Subject: [PATCH 10/16] refactor: don't use old class as super class Posed some issues with setting attributes of the super class and it's harder to refactor later on when C implementation will be cleaned up. --- spidev/__init__.py | 2 +- spidev/_spi.py | 119 ++++++++++++++++++++++++++++++++------------- 2 files changed, 87 insertions(+), 34 deletions(-) diff --git a/spidev/__init__.py b/spidev/__init__.py index a1a9875..1c7b5de 100644 --- a/spidev/__init__.py +++ b/spidev/__init__.py @@ -4,6 +4,6 @@ from ._spi import SpiDev -__version__ = "" +__version__ = "4.0.0" __all__ = ("SpiDev",) diff --git a/spidev/_spi.py b/spidev/_spi.py index 17e3a9d..1aec899 100644 --- a/spidev/_spi.py +++ b/spidev/_spi.py @@ -2,23 +2,23 @@ from typing import TYPE_CHECKING -if TYPE_CHECKING: - from os import PathLike - from types import TracebackType - from typing import Self, Union, Sequence, overload, Any - from collections.abc import Buffer +from os import PathLike +from types import TracebackType +from typing import Self, Union, Sequence, overload, Any, TypeVar, Callable +from collections.abc import Buffer - StrPath = Union[str, PathLike[str]] +StrPath = Union[str, PathLike[str]] +T = TypeVar("T") -def as_int(val: Any, name: str = "Value") -> int: +def try_convert(val: Any, typename: Callable[[Any], T], varname: str = "Value") -> T: try: - return int(val) + return typename(val) except (TypeError, ValueError): - raise TypeError(f"{name} must be an integer, but is {type(val)}") + raise TypeError(f"{varname} must have type {typename}, but is {type(val)}") -class SpiDev(_cspi.SpiDev): +class SpiDev: """TODO. Examples: @@ -38,7 +38,7 @@ def __init__( max_speed_hz: int | None = None, read0: bool | None = None, ): - super().__init__(bus, device) + self._cmod = _cspi.SpiDev(bus, device) self.bus = bus self.device = device @@ -55,7 +55,7 @@ def __init__( if max_speed_hz is not None: self.max_speed_hz = max_speed_hz if read0 is not None: - super().__setattr__("read0", read0) + self.read0 = read0 # TODO: open() here? It's what the original implementation did self.open() @@ -74,46 +74,56 @@ def mode(self) -> int: A two bit pattern of clock polarity and phase [CPOL|CPHA], min: 0b00 = 0, max: 0b11 = 3 """ - return super().mode + return self._cmod.mode @mode.setter def mode(self, value: int, /) -> None: - v = as_int(value, "mode") + v = try_convert(value, int, "mode") if not 0 <= v <= 3: raise ValueError(f"mode must be between 0 and 3, but is {v}") - # TODO: needs investigation if this works - super().__setattr__("mode", v) + self._cmod.mode = v @property def bits_per_word(self) -> int: """Bits per word used in the xfer methods.""" - return super().bits_per_word + return self._cmod.bits_per_word @bits_per_word.setter def bits_per_word(self, value: int, /) -> None: - v = as_int(value, "bits_per_word") + v = try_convert(value, int, "bits_per_word") if not (8 <= value <= 32): raise ValueError(f"bits_per_word must be between 8 and 32, but is {v}") - # TODO: needs investigation if this works - super().__setattr__("bits_per_word", v) + self._cmod.bits_per_word = v @property def max_speed_hz(self) -> int: """Max speed (in Hertz).""" - return super().max_speed_hz + return self._cmod.max_speed_hz @max_speed_hz.setter def max_speed_hz(self, value: int, /) -> None: - v = as_int(value, "max_speed_hz") - # TODO: needs investigation if this works - super().__setattr__("max_speed_hz", v) + v = try_convert(value, int, "max_speed_hz") + self._cmod.max_speed_hz = v + + @property + def read0(self) -> bool: + """Read 0 bytes after transfer to lower CS if cshigh is set.""" + return self._cmod.read0 + + @read0.setter + def read0(self, value: bool, /) -> None: + v = try_convert(value, bool, "read0") + self._cmod.read0 = v + + def close(self) -> None: + self._cmod.close() def closed(self) -> bool: - """True if the connection is closed.""" + """True if the connection is not opened.""" try: self.fileno() return True @@ -129,7 +139,7 @@ def fileno(self) -> int: Raises: ValueError: if the connection is not open. """ - fd = super().fileno() + fd = self._cmod.fileno() if fd < 0: raise ValueError("I/O operation on closed file") return fd @@ -155,10 +165,14 @@ def open(self, bus: int | None = None, device: int | None = None) -> None: if device: self.device = device - path = self._resolve_path() - super().open_path(path) + self.open_path() # TODO: return and set fd + def open_path(self, path: StrPath | None = None) -> None: + if path: + self.path = path + self._cmod.open_path(self._resolve_path()) + def read(self, size: int | None = None, /) -> list[int]: """Read and return up to _size_ bytes. @@ -173,12 +187,15 @@ def read(self, size: int | None = None, /) -> list[int]: # possible". How can we mimic this behavior? if size is None or size < 1: size = 1 - return super().readbytes(size) + return self._cmod.readbytes(size) def readable(self) -> bool: """True if the SPI connection is currently open.""" return not self.closed() + def readbytes(self, length: int) -> list[int]: + return self._cmod.readbytes(length) + def writeable(self) -> bool: """True if the SPI connection is currently open.""" return not self.closed() @@ -187,7 +204,40 @@ def write(self, b: Sequence[int] | Buffer, /) -> None: if not self.writeable(): raise OSError("SPI device not writeable") # TODO: return number of bytes written - super().writebytes2(b) + self._cmod.writebytes2(b) + + def writebytes(self, values: Sequence[int]) -> None: + self._cmod.writebytes(values) + + def writebytes2(self, values: Union[Sequence[int], Buffer]) -> None: + self._cmod.writebytes2(values) + + def xfer( + self, + values: Sequence[int], + speed_hz: int | None = None, + delay_usecs: int | None = None, + bits_per_word: int | None = None, + ) -> list[int]: + return self._cmod.xfer(values, speed_hz, delay_usecs, bits_per_word) + + def xfer2( + self, + values: Sequence[int], + speed_hz: int | None = None, + delay_usecs: int | None = None, + bits_per_word: int | None = None, + ) -> list[int]: + return self._cmod.xfer2(values, speed_hz, delay_usecs, bits_per_word) + + def xfer3( + self, + values: Sequence[int], + speed_hz: int | None = None, + delay_usecs: int | None = None, + bits_per_word: int | None = None, + ) -> tuple[int, ...]: + return self._cmod.xfer3(values, speed_hz, delay_usecs, bits_per_word) def __enter__(self) -> Self: """ @@ -207,20 +257,23 @@ def __exit__( exc_value: BaseException | None, traceback: TracebackType | None, ) -> None: - super().close() + self._cmod.close() def __str__(self) -> str: if self.bus is not None and self.device is not None: + # SpiDev(0, 1) return f"{self.__class__.__name__}({self.bus}, {self.device})" elif self.path is not None: - return f"{self.__class__.__name__}({self.path})" + # SpiDev('/dev/myspi') + return f"{self.__class__.__name__}({repr(self.path)})" else: + # SpiDev() return f"{self.__class__.__name__}()" def __repr__(self) -> str: # SpiDev(bus=0, device=1, bits_per_word=8) args = ", ".join( - f"{a}={getattr(self, a)}" + f"{a}={repr(getattr(self, a))}" for a in ( "bus", "device", From 66045c301dac0498f744183068001b9c3687d08e Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Mon, 29 Sep 2025 23:27:08 +0200 Subject: [PATCH 11/16] More documentation --- spidev/_cspi.pyi | 2 +- spidev/_spi.py | 21 +++++++++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/spidev/_cspi.pyi b/spidev/_cspi.pyi index e678434..188c16f 100644 --- a/spidev/_cspi.pyi +++ b/spidev/_cspi.pyi @@ -3,7 +3,7 @@ # code to a .py file while making sure the properties and method signature stay # the same. from collections.abc import Buffer -from typing import Any, Sequence, List, Tuple, Union +from typing import Sequence, List, Tuple, Union class SpiDev: def __init__(self, bus: int | None = ..., client: int | None = ...) -> None: ... diff --git a/spidev/_spi.py b/spidev/_spi.py index 1aec899..aede93c 100644 --- a/spidev/_spi.py +++ b/spidev/_spi.py @@ -1,7 +1,5 @@ from . import _cspi -from typing import TYPE_CHECKING - from os import PathLike from types import TracebackType from typing import Self, Union, Sequence, overload, Any, TypeVar, Callable @@ -12,6 +10,7 @@ def try_convert(val: Any, typename: Callable[[Any], T], varname: str = "Value") -> T: + """Try to convert `val` to `typename`. Raise a TypeError if conversion fails.""" try: return typename(val) except (TypeError, ValueError): @@ -120,6 +119,7 @@ def read0(self, value: bool, /) -> None: self._cmod.read0 = v def close(self) -> None: + """Close the object from the interface.""" self._cmod.close() def closed(self) -> bool: @@ -157,6 +157,10 @@ def open(self, bus: int | None = None, device: int | None = None) -> None: path is provided it opens the SPI device at given path. Symbolic links are followed. + Args: + bus: Bus number. + device: Device number. + Raises: ValueError: If bus/device or path is not provided. """ @@ -169,6 +173,14 @@ def open(self, bus: int | None = None, device: int | None = None) -> None: # TODO: return and set fd def open_path(self, path: StrPath | None = None) -> None: + """Open SPI device at given path. + + Args: + path: Path to SPI device. + + Raises: + IOError + """ if path: self.path = path self._cmod.open_path(self._resolve_path()) @@ -190,7 +202,7 @@ def read(self, size: int | None = None, /) -> list[int]: return self._cmod.readbytes(size) def readable(self) -> bool: - """True if the SPI connection is currently open.""" + """True if the SPI device is currently open.""" return not self.closed() def readbytes(self, length: int) -> list[int]: @@ -242,7 +254,8 @@ def xfer3( def __enter__(self) -> Self: """ Warning: If `bus` and `device` attributes or `path` attribute is not set, - the file has to be opened manually using the `open()` method. + the file has to be manually opened using its `open()` or `open_path()` + method. """ try: self.open() From 3fbe5dec5d93518b9c51548cd82880575613ba4c Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Wed, 1 Oct 2025 14:42:31 +0200 Subject: [PATCH 12/16] Fix ruff errors --- pyproject.toml | 15 ++++++++ spidev/__init__.py | 4 +- spidev/_cspi.pyi | 13 +++---- spidev/_spi.py | 95 ++++++++++++++++++++++++++++------------------ 4 files changed, 80 insertions(+), 47 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 638dd9c..3a8d531 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,18 @@ [build-system] requires = ["setuptools>=61.0"] build-backend = "setuptools.build_meta" + +[tool.mypy] +strict = true + +[tool.ruff.lint] +select = [ + "E", # pycodestyle + "F", # Pyflakes + "A", # flake8-builtins + "UP", # pyupgrade + "FA", # flake8-future-annotations + "B", # flake8-bugbear + "SIM", # flake8-simplify + "I", # isort +] diff --git a/spidev/__init__.py b/spidev/__init__.py index 1c7b5de..e4fa517 100644 --- a/spidev/__init__.py +++ b/spidev/__init__.py @@ -1,6 +1,4 @@ -""" -TODO -""" +"""TODO.""" from ._spi import SpiDev diff --git a/spidev/_cspi.pyi b/spidev/_cspi.pyi index 188c16f..e9680e1 100644 --- a/spidev/_cspi.pyi +++ b/spidev/_cspi.pyi @@ -2,38 +2,37 @@ # to be permanent and is their to help with uplifting most of the C module's # code to a .py file while making sure the properties and method signature stay # the same. -from collections.abc import Buffer -from typing import Sequence, List, Tuple, Union +from collections.abc import Buffer, Sequence class SpiDev: def __init__(self, bus: int | None = ..., client: int | None = ...) -> None: ... def open_path(self, path: str) -> None: ... def close(self) -> None: ... def fileno(self) -> int: ... - def readbytes(self, length: int) -> List[int]: ... + def readbytes(self, length: int) -> list[int]: ... def writebytes(self, values: Sequence[int]) -> None: ... - def writebytes2(self, values: Union[Sequence[int], Buffer]) -> None: ... + def writebytes2(self, values: Sequence[int] | Buffer) -> None: ... def xfer( self, values: Sequence[int], speed_hz: int | None = ..., delay_usecs: int | None = ..., bits_per_word: int | None = ..., - ) -> List[int]: ... + ) -> list[int]: ... def xfer2( self, values: Sequence[int], speed_hz: int | None = ..., delay_usecs: int | None = ..., bits_per_word: int | None = ..., - ) -> List[int]: ... + ) -> list[int]: ... def xfer3( self, values: Sequence[int], speed_hz: int | None = ..., delay_usecs: int | None = ..., bits_per_word: int | None = ..., - ) -> Tuple[int, ...]: ... + ) -> tuple[int, ...]: ... mode: int cshigh: bool diff --git a/spidev/_spi.py b/spidev/_spi.py index aede93c..b054faa 100644 --- a/spidev/_spi.py +++ b/spidev/_spi.py @@ -1,20 +1,27 @@ +from __future__ import annotations + +from contextlib import suppress +from typing import TYPE_CHECKING, Any, Callable, Self, TypeVar, overload +from warnings import deprecated + from . import _cspi -from os import PathLike -from types import TracebackType -from typing import Self, Union, Sequence, overload, Any, TypeVar, Callable -from collections.abc import Buffer +if TYPE_CHECKING: + from collections.abc import Buffer, Sequence + from os import PathLike + from types import TracebackType -StrPath = Union[str, PathLike[str]] -T = TypeVar("T") + StrPath = str | PathLike[str] + T = TypeVar("T") -def try_convert(val: Any, typename: Callable[[Any], T], varname: str = "Value") -> T: +def try_convert(val: object, typename: Callable[[Any], T], varname: str = "Value") -> T: """Try to convert `val` to `typename`. Raise a TypeError if conversion fails.""" try: return typename(val) - except (TypeError, ValueError): - raise TypeError(f"{varname} must have type {typename}, but is {type(val)}") + except (TypeError, ValueError) as err: + msg = f"{varname} must have type {typename}, but is {type(val)}" + raise TypeError(msg) from err class SpiDev: @@ -24,9 +31,10 @@ class SpiDev: >>> SpiDev(0, 1) # connect to /dev/spidev0.1 >>> SpiDev(path='/dev/myspi') # connect to /dev/myspi + """ - def __init__( + def __init__( # noqa: PLR0913 self, bus: int | None = None, device: int | None = None, @@ -36,14 +44,14 @@ def __init__( bits_per_word: int | None = None, max_speed_hz: int | None = None, read0: bool | None = None, - ): + ) -> None: self._cmod = _cspi.SpiDev(bus, device) self.bus = bus self.device = device if path and (bus or device): raise ValueError( - "both path and bus/device number of SPI device are provided" + "both path and bus/device number of SPI device are provided", ) self.path = path @@ -60,9 +68,18 @@ def __init__( self.open() def _resolve_path(self) -> str: + """Construct path from bus and device numbers or from given path. + + Returns: + str: Resolved path to the SPI device. + + Raises: + ValueError: if bus+device or path not set + + """ if self.bus is not None and self.device is not None: - return "/dev/spidev{:d}.{:d}".format(self.bus, self.device) - elif self.path is not None: + return "/dev/spidev{self.bus:d}.{self.device:d}" + if self.path is not None: return str(self.path) raise ValueError("bus/device or path not set") @@ -79,8 +96,10 @@ def mode(self) -> int: def mode(self, value: int, /) -> None: v = try_convert(value, int, "mode") - if not 0 <= v <= 3: - raise ValueError(f"mode must be between 0 and 3, but is {v}") + # more than two bits, can only be 0-3 + if v not in range(4): + msg = f"mode {v} has more than two bits" + raise ValueError(msg) self._cmod.mode = v @@ -93,8 +112,9 @@ def bits_per_word(self) -> int: def bits_per_word(self, value: int, /) -> None: v = try_convert(value, int, "bits_per_word") - if not (8 <= value <= 32): - raise ValueError(f"bits_per_word must be between 8 and 32, but is {v}") + if v not in (8, 16, 32): + msg = f"bits_per_word must be 8, 16 or 32, is {v}" + raise ValueError(msg) self._cmod.bits_per_word = v @@ -123,7 +143,7 @@ def close(self) -> None: self._cmod.close() def closed(self) -> bool: - """True if the connection is not opened.""" + """Return True if the connection is not opened.""" try: self.fileno() return True @@ -138,6 +158,7 @@ def fileno(self) -> int: Raises: ValueError: if the connection is not open. + """ fd = self._cmod.fileno() if fd < 0: @@ -163,6 +184,7 @@ def open(self, bus: int | None = None, device: int | None = None) -> None: Raises: ValueError: If bus/device or path is not provided. + """ if bus: self.bus = bus @@ -180,6 +202,7 @@ def open_path(self, path: StrPath | None = None) -> None: Raises: IOError + """ if path: self.path = path @@ -192,6 +215,7 @@ def read(self, size: int | None = None, /) -> list[int]: Returns: list[int]: _size_ number of bytes. + """ if not self.readable(): raise OSError("SPI device not readable") @@ -202,14 +226,14 @@ def read(self, size: int | None = None, /) -> list[int]: return self._cmod.readbytes(size) def readable(self) -> bool: - """True if the SPI device is currently open.""" + """Return True if the SPI device is currently open.""" return not self.closed() def readbytes(self, length: int) -> list[int]: return self._cmod.readbytes(length) def writeable(self) -> bool: - """True if the SPI connection is currently open.""" + """Return True if the SPI connection is currently open.""" return not self.closed() def write(self, b: Sequence[int] | Buffer, /) -> None: @@ -252,15 +276,9 @@ def xfer3( return self._cmod.xfer3(values, speed_hz, delay_usecs, bits_per_word) def __enter__(self) -> Self: - """ - Warning: If `bus` and `device` attributes or `path` attribute is not set, - the file has to be manually opened using its `open()` or `open_path()` - method. - """ - try: + # TODO: make open() idempotent and raise IOError if opening fails + with suppress(ValueError): self.open() - except ValueError: - pass return self @@ -276,17 +294,17 @@ def __str__(self) -> str: if self.bus is not None and self.device is not None: # SpiDev(0, 1) return f"{self.__class__.__name__}({self.bus}, {self.device})" - elif self.path is not None: + if self.path is not None: # SpiDev('/dev/myspi') - return f"{self.__class__.__name__}({repr(self.path)})" - else: - # SpiDev() - return f"{self.__class__.__name__}()" + return f"{self.__class__.__name__}({self.path!r})" + + # SpiDev() + return f"{self.__class__.__name__}()" def __repr__(self) -> str: - # SpiDev(bus=0, device=1, bits_per_word=8) + # e.g. SpiDev(bus=0, device=1, bits_per_word=8) args = ", ".join( - f"{a}={repr(getattr(self, a))}" + f"{a}={getattr(self, a)!r}" for a in ( "bus", "device", @@ -300,7 +318,7 @@ def __repr__(self) -> str: ) return f"{self.__class__.__name__}({args})" - def __eq__(self, other: Any) -> bool: + def __eq__(self, other: object) -> bool: if not isinstance(other, type(self)): return False @@ -309,3 +327,6 @@ def __eq__(self, other: Any) -> bool: except ValueError: # return False if one of the instances is uninitiated return False + + def __del__(self) -> None: + del self._cmod From 2c2c997f351839a3711d585e79d4f0424de62225 Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Wed, 1 Oct 2025 14:43:39 +0200 Subject: [PATCH 13/16] fix!: mark some functions as deprecated - SpiDev().open(bus, device) --> SpiDev(bus, device).open() - SpiDev().open_path(path) --> SpiDev(path=path).open() - SpiDev().readbytes() --> SpiDev().read() - SpiDev().writebytes() --> SpiDev().write() - SpiDev().writebytes2() --> SpiDev().write() --- spidev/_spi.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/spidev/_spi.py b/spidev/_spi.py index b054faa..fafb3ec 100644 --- a/spidev/_spi.py +++ b/spidev/_spi.py @@ -169,6 +169,7 @@ def fileno(self) -> int: def open(self) -> None: ... @overload + @deprecated("use SpiDev(bus, device).open()") def open(self, bus: int, device: int) -> None: ... def open(self, bus: int | None = None, device: int | None = None) -> None: @@ -191,9 +192,10 @@ def open(self, bus: int | None = None, device: int | None = None) -> None: if device: self.device = device - self.open_path() + self._cmod.open_path(self._resolve_path()) # TODO: return and set fd + @deprecated("use SpiDev(path='...').open()") def open_path(self, path: StrPath | None = None) -> None: """Open SPI device at given path. @@ -229,6 +231,7 @@ def readable(self) -> bool: """Return True if the SPI device is currently open.""" return not self.closed() + @deprecated("use SpiDev().read()") def readbytes(self, length: int) -> list[int]: return self._cmod.readbytes(length) @@ -242,10 +245,12 @@ def write(self, b: Sequence[int] | Buffer, /) -> None: # TODO: return number of bytes written self._cmod.writebytes2(b) + @deprecated("use SpiDev().write()") def writebytes(self, values: Sequence[int]) -> None: self._cmod.writebytes(values) - def writebytes2(self, values: Union[Sequence[int], Buffer]) -> None: + @deprecated("use SpiDev().write()") + def writebytes2(self, values: Sequence[int] | Buffer) -> None: self._cmod.writebytes2(values) def xfer( From 06ccbf3d664154fe70e13fd568b8dd47db348cb3 Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Wed, 1 Oct 2025 18:40:44 +0200 Subject: [PATCH 14/16] refactor(cmodule): rename to _spimodule.c More in line with Python conventions --- spidev/_spi.py | 4 ++-- spidev/{_cspi.c => _spimodule.c} | 21 +++++++++++++++++++++ spidev/{_cspi.pyi => _spimodule.pyi} | 0 3 files changed, 23 insertions(+), 2 deletions(-) rename spidev/{_cspi.c => _spimodule.c} (97%) rename spidev/{_cspi.pyi => _spimodule.pyi} (100%) diff --git a/spidev/_spi.py b/spidev/_spi.py index fafb3ec..e885e8d 100644 --- a/spidev/_spi.py +++ b/spidev/_spi.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING, Any, Callable, Self, TypeVar, overload from warnings import deprecated -from . import _cspi +from . import _spimodule if TYPE_CHECKING: from collections.abc import Buffer, Sequence @@ -45,7 +45,7 @@ def __init__( # noqa: PLR0913 max_speed_hz: int | None = None, read0: bool | None = None, ) -> None: - self._cmod = _cspi.SpiDev(bus, device) + self._cmod = _spimodule.SpiDev(bus, device) self.bus = bus self.device = device diff --git a/spidev/_cspi.c b/spidev/_spimodule.c similarity index 97% rename from spidev/_cspi.c rename to spidev/_spimodule.c index 895cb83..29b632b 100644 --- a/spidev/_cspi.c +++ b/spidev/_spimodule.c @@ -1257,6 +1257,27 @@ SpiDev_set_read0(SpiDevObject *self, PyObject *val, void *closure) return 0; } +static PyObject * +SpiDev_get_settings(SpiDevObject *self, void *closure) +{ + PyObject* dict = PyDict_New(); + if (!dict) + return NULL; + + PyDict_SetItemString(dict, "mode", Py_BuildValue("i", (self->mode & (SPI_CPHA | SPI_CPOL)))); + PyDict_SetItemString(dict, "bits_per_word", Py_BuildValue("i", self->bits_per_word)); + PyDict_SetItemString(dict, "max_speed_hz", Py_BuildValue("i", self->max_speed_hz)); + PyDict_SetItemString(dict, "read0", (self->read0 == 1) ? Py_True : Py_False); + + PyDict_SetItemString(dict, "cshigh", (self->mode & SPI_CS_HIGH) ? Py_True : Py_False); + PyDict_SetItemString(dict, "threewire", (self->mode & SPI_3WIRE) ? Py_True : Py_False); + PyDict_SetItemString(dict, "lsbfirst", (self->mode & SPI_LSB_FIRST) ? Py_True : Py_False); + PyDict_SetItemString(dict, "loop", (self->mode & SPI_LOOP) ? Py_True : Py_False); + PyDict_SetItemString(dict, "no_cs", (self->mode & SPI_NO_CS) ? Py_True : Py_False); + + return dict; +} + static PyGetSetDef SpiDev_getset[] = { {"mode", (getter)SpiDev_get_mode, (setter)SpiDev_set_mode, "SPI mode as two bit pattern of \n" diff --git a/spidev/_cspi.pyi b/spidev/_spimodule.pyi similarity index 100% rename from spidev/_cspi.pyi rename to spidev/_spimodule.pyi From ec87bbb0f04471bb12ee76b919fdc62d7e23f770 Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Wed, 1 Oct 2025 18:45:16 +0200 Subject: [PATCH 15/16] docs: docstrings and README updates --- README.md | 118 ++++++++++++++++++++----------------------------- spidev/_spi.py | 93 +++++++++++++++++++++++++++++++------- 2 files changed, 125 insertions(+), 86 deletions(-) diff --git a/README.md b/README.md index b47a31e..f6cfce2 100644 --- a/README.md +++ b/README.md @@ -1,33 +1,49 @@ # Python Spidev -This project contains a python module for interfacing with SPI devices from -user space via the spidev linux kernel driver. +Python interface for the spidev Linux kernel driver for connecting to SPI +devices. All code is MIT licensed unless explicitly stated otherwise. ## Usage ```python -import spidev -spi = spidev.SpiDev() -spi.open_path(spidev_devicefile_path) -to_send = [0x01, 0x02, 0x03] -spi.xfer(to_send) +from spidev import SpiDev + +# bus=0, device=1. Same as path="/dev/spidev0.1" +with SpiDev(0, 1) as spi: + spi.write([0x01, 0x02, 0x03]) + +# or directly from a path and manually opening/closing the file: + +spi = SpiDev(path="/dev/myspidev") +spi.open() + +print(spi.read(64)) + +spi.close() ``` + ## Settings ```python import spidev -spi = spidev.SpiDev() -spi.open_path("/dev/spidev0.0") -# Settings (for example) -spi.max_speed_hz = 5000 -spi.mode = 0b01 +# setting options during initialization +spi = spidev.SpiDev(0, 0, mode=0b01, max_speed_hz=5000) -... +with spi: + spi.read() + +# changing options on the go +spi.mode = 0b11 + +with spi: + spi.read() ``` +Full list of supported options: + * `bits_per_word` * `cshigh` * `loop` - Set the "SPI_LOOP" flag to enable loopback mode @@ -48,55 +64,17 @@ spi.mode = 0b01 ## Methods - open_path(filesystem_path) - -Connects to the specified SPI device special file, following symbolic links if -appropriate (see note on deterministic SPI bus numbering in the Linux kernel -below for why this can be advantageous in some configurations). - - open(bus, device) - -Equivalent to calling `open_path("/dev/spidev.")`. n.b. **Either** -`open_path` or `open` should be used. - - readbytes(n) - -Read n bytes from SPI device. +The `SpiDev` class is similar to Python's [I/O Base +Classes](https://docs.python.org/3/library/io.html#i-o-base-classes), and so +are its methods. The most important ones are: - writebytes(list of values) - -Writes a list of values to SPI device. - - writebytes2(list of values) - -Similar to `writebytes` but accepts arbitrary large lists. If list size exceeds -buffer size (which is read from `/sys/module/spidev/parameters/bufsiz`), data -will be split into smaller chunks and sent in multiple operations. - -Also, `writebytes2` understands [buffer -protocol](https://docs.python.org/3/c-api/buffer.html) so it can accept numpy -byte arrays for example without need to convert them with `tolist()` first. -This offers much better performance where you need to transfer frames to -SPI-connected displays for instance. - - xfer(list of values[, speed_hz, delay_usec, bits_per_word]) - -Performs an SPI transaction. Chip-select should be released and reactivated -between blocks. Delay specifies the delay in usec between blocks. - - xfer2(list of values[, speed_hz, delay_usec, bits_per_word]) - -Performs an SPI transaction. Chip-select should be held active between blocks. - - xfer3(list of values[, speed_hz, delay_usec, bits_per_word]) - -Similar to `xfer2` but accepts arbitrary large lists. If list size exceeds -buffer size (which is read from `/sys/module/spidev/parameters/bufsiz`), data -will be split into smaller chunks and sent in multiple operations. - - close() - -Disconnects from the SPI device. +* `close()`: disconnects from the SPI device. +* `closed`: True if the SPI device is closed. +* `fileno()`: Underlying file descriptor. `ValueError` if closed. +* `readable()`: True if not closed. +* `read(n)`: Read `n` bytes from the SPI device. +* `writeable()`: True if not closed. +* `write(b)`: Write bytes to SPI device. ## The Linux kernel and SPI bus numbering and the role of udev @@ -146,7 +124,7 @@ bus number and SPI chip select number to user space in the form of a POSIX A user space program (usually 'udev') listens for kernel device creation events, and creates a file system "device node" for user space software to interact with. By convention, for spidev, the device nodes are named -/dev/spidev. is (where the *bus* is the Linux kernel's internal +`/dev/spidev.` is (where the *bus* is the Linux kernel's internal SPI bus number (see below) and the *device* number corresponds to the SPI controller "chip select" output pin that is connected to the SPI *device* 'chip select' input pin. @@ -172,22 +150,22 @@ symbolic links to allow identification of block storage devices e.g. see the output of `ls -alR /dev/disk`. `99-local-spi-example-udev.rules` included with py-spidev includes example udev -rules for creating stable symlink device paths (for use with `open_path`). +rules for creating stable symlink device paths (for use with SpiDev's `path` +argument). -e.g. the following Python code could be used to communicate with an SPI device +E.g. the following Python code could be used to communicate with an SPI device attached to chip-select line 0 of an individual FTDI FT232H USB to SPI adapter which has the USB serial number "1A8FG636": -``` -#!/usr/bin/env python3 +```python import spidev -spi = spidev.SpiDev() -spi.open_path("/dev/spi/by-path/usb-sernum-1A8FG636-cs-0") -# TODO: Useful stuff here -spi.close() +spi = spidev.SpiDev(path="/dev/spi/by-path/usb-sernum-1A8FG636-cs-0") +with spi: + # TODO: Useful stuff here + ... ``` In the more general case, the example udev file should be modified as diff --git a/spidev/_spi.py b/spidev/_spi.py index e885e8d..1db3635 100644 --- a/spidev/_spi.py +++ b/spidev/_spi.py @@ -78,11 +78,20 @@ def _resolve_path(self) -> str: """ if self.bus is not None and self.device is not None: - return "/dev/spidev{self.bus:d}.{self.device:d}" + return f"/dev/spidev{self.bus:d}.{self.device:d}" if self.path is not None: return str(self.path) raise ValueError("bus/device or path not set") + @property + def closed(self) -> bool: + """Return True if the connection is not opened.""" + try: + self.fileno() + return True + except ValueError: + return False + @property def mode(self) -> int: """SPI mode. @@ -142,14 +151,6 @@ def close(self) -> None: """Close the object from the interface.""" self._cmod.close() - def closed(self) -> bool: - """Return True if the connection is not opened.""" - try: - self.fileno() - return True - except ValueError: - return False - def fileno(self) -> int: """Return the file descriptor if it exists. @@ -210,7 +211,7 @@ def open_path(self, path: StrPath | None = None) -> None: self.path = path self._cmod.open_path(self._resolve_path()) - def read(self, size: int | None = None, /) -> list[int]: + def read(self, size: int = -1, /) -> list[int]: """Read and return up to _size_ bytes. If size is omitted or negative, 1 byte is read. @@ -218,18 +219,21 @@ def read(self, size: int | None = None, /) -> list[int]: Returns: list[int]: _size_ number of bytes. + Raises: + OSError: If device is closed. + """ if not self.readable(): - raise OSError("SPI device not readable") + raise OSError("device closed") # TODO: negative size in BaseIO.read() means "read as much as # possible". How can we mimic this behavior? - if size is None or size < 1: + if size < 1: size = 1 return self._cmod.readbytes(size) def readable(self) -> bool: """Return True if the SPI device is currently open.""" - return not self.closed() + return not self.closed @deprecated("use SpiDev().read()") def readbytes(self, length: int) -> list[int]: @@ -237,12 +241,25 @@ def readbytes(self, length: int) -> list[int]: def writeable(self) -> bool: """Return True if the SPI connection is currently open.""" - return not self.closed() + return not self.closed def write(self, b: Sequence[int] | Buffer, /) -> None: + """Write bytes to SPI device. + + Accepts arbitrary large lists. If list size exceeds buffer size (read + from /sys/module/spidev/parameters/bufsiz), data will be + split into smaller chunks and sent in multiple operations. + + Args: + b: Sequence of bytes or Buffer to write. + + Raises: + OSError: If device is closed. + + """ if not self.writeable(): - raise OSError("SPI device not writeable") - # TODO: return number of bytes written + raise OSError("device closed") + # TODO: return number of bytes written like RawIOBase self._cmod.writebytes2(b) @deprecated("use SpiDev().write()") @@ -260,6 +277,20 @@ def xfer( delay_usecs: int | None = None, bits_per_word: int | None = None, ) -> list[int]: + """Performs an SPI transaction. + + NOTE: Chip-select should be released and reactivated between blocks. + + Args: + values: Bytes to write. + speed_hz: Speed to use. + delay_usecs: Delay in microseconds between blocks. + bits_per_word: Bits per word. + + Returns: + TODO + + """ return self._cmod.xfer(values, speed_hz, delay_usecs, bits_per_word) def xfer2( @@ -269,6 +300,20 @@ def xfer2( delay_usecs: int | None = None, bits_per_word: int | None = None, ) -> list[int]: + """Performs an SPI transaction. + + NOTE: Chip-select should be held active between blocks. + + Args: + values: Bytes to write. + speed_hz: Speed to use. + delay_usecs: Delay in microseconds between blocks. + bits_per_word: Bits per word. + + Returns: + TODO + + """ return self._cmod.xfer2(values, speed_hz, delay_usecs, bits_per_word) def xfer3( @@ -278,6 +323,22 @@ def xfer3( delay_usecs: int | None = None, bits_per_word: int | None = None, ) -> tuple[int, ...]: + """Performs an SPI transaction. + + Accepts arbitrary large lists. If list size exceeds buffer size (read + from /sys/module/spidev/parameters/bufsiz), data will be split + into smaller chunks and sent in multiple operations. + + Args: + values: Bytes to write. + speed_hz: Speed to use. + delay_usecs: Delay in microseconds between blocks. + bits_per_word: Bits per word. + + Returns: + TODO + + """ return self._cmod.xfer3(values, speed_hz, delay_usecs, bits_per_word) def __enter__(self) -> Self: From 5d02d3acddb5fea513415f593c972dca7418e806 Mon Sep 17 00:00:00 2001 From: Yochem van Rosmalen Date: Wed, 1 Oct 2025 22:37:27 +0200 Subject: [PATCH 16/16] feat: add missing properties --- spidev/_spi.py | 65 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 11 deletions(-) diff --git a/spidev/_spi.py b/spidev/_spi.py index 1db3635..8864154 100644 --- a/spidev/_spi.py +++ b/spidev/_spi.py @@ -25,7 +25,7 @@ def try_convert(val: object, typename: Callable[[Any], T], varname: str = "Value class SpiDev: - """TODO. + """Connect to a SPI device. Examples: >>> SpiDev(0, 1) # connect to /dev/spidev0.1 @@ -65,7 +65,8 @@ def __init__( # noqa: PLR0913 self.read0 = read0 # TODO: open() here? It's what the original implementation did - self.open() + if (bus and device) or path: + self.open() def _resolve_path(self) -> str: """Construct path from bus and device numbers or from given path. @@ -105,7 +106,7 @@ def mode(self) -> int: def mode(self, value: int, /) -> None: v = try_convert(value, int, "mode") - # more than two bits, can only be 0-3 + # two bits max, thus value can only be 0-3 if v not in range(4): msg = f"mode {v} has more than two bits" raise ValueError(msg) @@ -134,8 +135,7 @@ def max_speed_hz(self) -> int: @max_speed_hz.setter def max_speed_hz(self, value: int, /) -> None: - v = try_convert(value, int, "max_speed_hz") - self._cmod.max_speed_hz = v + self._cmod.max_speed_hz = try_convert(value, int, "max_speed_hz") @property def read0(self) -> bool: @@ -144,8 +144,50 @@ def read0(self) -> bool: @read0.setter def read0(self, value: bool, /) -> None: - v = try_convert(value, bool, "read0") - self._cmod.read0 = v + self._cmod.read0 = try_convert(value, bool, "read0") + + @property + def cshigh(self) -> bool: + return self._cmod.cshigh + + @cshigh.setter + def cshigh(self, value: bool, /) -> None: + self._cmod.cshigh = try_convert(value, bool, "cshigh") + + @property + def threewire(self) -> bool: + """SI/SO signals shared.""" + return self._cmod.threewire + + @threewire.setter + def threewire(self, value: bool, /) -> None: + self._cmod.threewire = try_convert(value, bool, "threewire") + + @property + def lsbfirst(self) -> bool: + return self._cmod.lsbfirst + + @lsbfirst.setter + def lsbfirst(self, value: bool, /) -> None: + self._cmod.lsbfirst = try_convert(value, bool, "lsbfirst") + + @property + def loop(self) -> bool: + """Sets the SPI_LOOP flag to enable loopback mode.""" + return self._cmod.loop + + @loop.setter + def loop(self, value: bool, /) -> None: + self._cmod.loop = try_convert(value, bool, "loop") + + @property + def no_cs(self) -> bool: + """Sets the SPI_NO_CS flag to disable use of the chip select.""" + return self._cmod.no_cs + + @no_cs.setter + def no_cs(self, value: bool, /) -> None: + self._cmod.no_cs = try_convert(value, bool, "no_cs") def close(self) -> None: """Close the object from the interface.""" @@ -193,8 +235,9 @@ def open(self, bus: int | None = None, device: int | None = None) -> None: if device: self.device = device + # TODO: If Cmod's open() would return the fd we can lift a lot of logic + # to here. self._cmod.open_path(self._resolve_path()) - # TODO: return and set fd @deprecated("use SpiDev(path='...').open()") def open_path(self, path: StrPath | None = None) -> None: @@ -225,8 +268,8 @@ def read(self, size: int = -1, /) -> list[int]: """ if not self.readable(): raise OSError("device closed") - # TODO: negative size in BaseIO.read() means "read as much as - # possible". How can we mimic this behavior? + # TODO: IOBase.read(-1) means "read as much as + # possible". Can we mimic this behavior? if size < 1: size = 1 return self._cmod.readbytes(size) @@ -259,7 +302,7 @@ def write(self, b: Sequence[int] | Buffer, /) -> None: """ if not self.writeable(): raise OSError("device closed") - # TODO: return number of bytes written like RawIOBase + # TODO: return number of bytes written like RawIOBase does self._cmod.writebytes2(b) @deprecated("use SpiDev().write()")