diff --git a/can/notifier.py b/can/notifier.py index 5d0642ee6..679af384d 100644 --- a/can/notifier.py +++ b/can/notifier.py @@ -4,6 +4,12 @@ This module contains the implementation of :class:`~can.Notifier`. """ +from typing import Iterable, List, Optional, Union + +from can.bus import BusABC +from can.listener import Listener +from can.message import Message + import threading import logging import time @@ -13,7 +19,13 @@ class Notifier: - def __init__(self, bus, listeners, timeout=1.0, loop=None): + def __init__( + self, + bus: BusABC, + listeners: Iterable[Listener], + timeout: float = 1.0, + loop: Optional[asyncio.AbstractEventLoop] = None, + ): """Manages the distribution of :class:`can.Message` instances to listeners. Supports multiple buses and listeners. @@ -24,37 +36,40 @@ def __init__(self, bus, listeners, timeout=1.0, loop=None): many listeners carry out flush operations to persist data. - :param can.BusABC bus: A :ref:`bus` or a list of buses to listen to. - :param list listeners: An iterable of :class:`~can.Listener` - :param float timeout: An optional maximum number of seconds to wait for any message. - :param asyncio.AbstractEventLoop loop: - An :mod:`asyncio` event loop to schedule listeners in. + :param bus: A :ref:`bus` or a list of buses to listen to. + :param listeners: An iterable of :class:`~can.Listener` + :param timeout: An optional maximum number of seconds to wait for any message. + :param loop: An :mod:`asyncio` event loop to schedule listeners in. """ - self.listeners = listeners + self.listeners = list(listeners) self.bus = bus self.timeout = timeout self._loop = loop #: Exception raised in thread - self.exception = None + self.exception: Optional[Exception] = None self._running = True self._lock = threading.Lock() - self._readers = [] + self._readers: List[Union[int, threading.Thread]] = [] buses = self.bus if isinstance(self.bus, list) else [self.bus] for bus in buses: self.add_bus(bus) - def add_bus(self, bus): + def add_bus(self, bus: BusABC): """Add a bus for notification. - :param can.BusABC bus: + :param bus: CAN bus instance. """ - if self._loop is not None and hasattr(bus, "fileno") and bus.fileno() >= 0: + if ( + self._loop is not None + and hasattr(bus, "fileno") + and bus.fileno() >= 0 # type: ignore + ): # Use file descriptor to watch for messages - reader = bus.fileno() + reader = bus.fileno() # type: ignore self._loop.add_reader(reader, self._on_message_available, bus) else: reader = threading.Thread( @@ -66,11 +81,11 @@ def add_bus(self, bus): reader.start() self._readers.append(reader) - def stop(self, timeout=5): + def stop(self, timeout: float = 5): """Stop notifying Listeners when new :class:`~can.Message` objects arrive and call :meth:`~can.Listener.stop` on each Listener. - :param float timeout: + :param timeout: Max time in seconds to wait for receive threads to finish. Should be longer than timeout given at instantiation. """ @@ -81,14 +96,14 @@ def stop(self, timeout=5): now = time.time() if now < end_time: reader.join(end_time - now) - else: + elif self._loop: # reader is a file descriptor self._loop.remove_reader(reader) for listener in self.listeners: if hasattr(listener, "stop"): listener.stop() - def _rx_thread(self, bus): + def _rx_thread(self, bus: BusABC): msg = None try: while self._running: @@ -109,40 +124,38 @@ def _rx_thread(self, bus): self._on_error(exc) raise - def _on_message_available(self, bus): + def _on_message_available(self, bus: BusABC): msg = bus.recv(0) if msg is not None: self._on_message_received(msg) - def _on_message_received(self, msg): + def _on_message_received(self, msg: Message): for callback in self.listeners: res = callback(msg) if self._loop is not None and asyncio.iscoroutine(res): # Schedule coroutine self._loop.create_task(res) - def _on_error(self, exc): + def _on_error(self, exc: Exception): for listener in self.listeners: if hasattr(listener, "on_error"): listener.on_error(exc) - def add_listener(self, listener): + def add_listener(self, listener: Listener): """Add new Listener to the notification list. If it is already present, it will be called two times each time a message arrives. - :param can.Listener listener: Listener to be added to - the list to be notified + :param listener: Listener to be added to the list to be notified """ self.listeners.append(listener) - def remove_listener(self, listener): + def remove_listener(self, listener: Listener): """Remove a listener from the notification list. This method trows an exception if the given listener is not part of the stored listeners. - :param can.Listener listener: Listener to be removed from - the list to be notified + :param listener: Listener to be removed from the list to be notified :raises ValueError: if `listener` was never added to this notifier """ self.listeners.remove(listener)