Skip to content

Add typing annotations for can.notifier #679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 15, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 39 additions & 26 deletions can/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
This module contains the implementation of :class:`~can.Notifier`.
"""

from typing import Iterable, List, Optional, Union

from can.bus import BusABC
from can.listener import Listener
from can.message import Message

import threading
import logging
import time
Expand All @@ -13,7 +19,13 @@


class Notifier:
def __init__(self, bus, listeners, timeout=1.0, loop=None):
def __init__(
self,
bus: BusABC,
listeners: Iterable[Listener],
timeout: float = 1.0,
loop: Optional[asyncio.AbstractEventLoop] = None,
):
"""Manages the distribution of :class:`can.Message` instances to listeners.

Supports multiple buses and listeners.
Expand All @@ -24,37 +36,40 @@ def __init__(self, bus, listeners, timeout=1.0, loop=None):
many listeners carry out flush operations to persist data.


:param can.BusABC bus: A :ref:`bus` or a list of buses to listen to.
:param list listeners: An iterable of :class:`~can.Listener`
:param float timeout: An optional maximum number of seconds to wait for any message.
:param asyncio.AbstractEventLoop loop:
An :mod:`asyncio` event loop to schedule listeners in.
:param bus: A :ref:`bus` or a list of buses to listen to.
:param listeners: An iterable of :class:`~can.Listener`
:param timeout: An optional maximum number of seconds to wait for any message.
:param loop: An :mod:`asyncio` event loop to schedule listeners in.
"""
self.listeners = listeners
self.listeners = list(listeners)
self.bus = bus
self.timeout = timeout
self._loop = loop

#: Exception raised in thread
self.exception = None
self.exception: Optional[Exception] = None

self._running = True
self._lock = threading.Lock()

self._readers = []
self._readers: List[Union[int, threading.Thread]] = []
buses = self.bus if isinstance(self.bus, list) else [self.bus]
for bus in buses:
self.add_bus(bus)

def add_bus(self, bus):
def add_bus(self, bus: BusABC):
"""Add a bus for notification.

:param can.BusABC bus:
:param bus:
CAN bus instance.
"""
if self._loop is not None and hasattr(bus, "fileno") and bus.fileno() >= 0:
if (
self._loop is not None
and hasattr(bus, "fileno")
and bus.fileno() >= 0 # type: ignore
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is disabled here because mypy doesn't really deal well with hasattr checks. See the discussion here. We could either:

  1. Disable the type checker (like is done here)
  2. Perform the cast to Any
  3. Refactor the code to have fileno on the base class return None by default (which we can check for), and then subclasses provide their own implementation as needed

I'm personally inclined to 3, but that's a bit more involved than the other 2 (and perhaps should be kept out of this change). Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vote for 3, but it can return -1 instead of None.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd vote for 1. It's just a shortcoming of the type system, and no problem with our API.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that one makes sense for this PR, but 3 would be the best long term solution.

):
# Use file descriptor to watch for messages
reader = bus.fileno()
reader = bus.fileno() # type: ignore
self._loop.add_reader(reader, self._on_message_available, bus)
else:
reader = threading.Thread(
Expand All @@ -66,11 +81,11 @@ def add_bus(self, bus):
reader.start()
self._readers.append(reader)

def stop(self, timeout=5):
def stop(self, timeout: float = 5):
"""Stop notifying Listeners when new :class:`~can.Message` objects arrive
and call :meth:`~can.Listener.stop` on each Listener.

:param float timeout:
:param timeout:
Max time in seconds to wait for receive threads to finish.
Should be longer than timeout given at instantiation.
"""
Expand All @@ -81,14 +96,14 @@ def stop(self, timeout=5):
now = time.time()
if now < end_time:
reader.join(end_time - now)
else:
elif self._loop:
# reader is a file descriptor
self._loop.remove_reader(reader)
for listener in self.listeners:
if hasattr(listener, "stop"):
listener.stop()

def _rx_thread(self, bus):
def _rx_thread(self, bus: BusABC):
msg = None
try:
while self._running:
Expand All @@ -109,40 +124,38 @@ def _rx_thread(self, bus):
self._on_error(exc)
raise

def _on_message_available(self, bus):
def _on_message_available(self, bus: BusABC):
msg = bus.recv(0)
if msg is not None:
self._on_message_received(msg)

def _on_message_received(self, msg):
def _on_message_received(self, msg: Message):
for callback in self.listeners:
res = callback(msg)
if self._loop is not None and asyncio.iscoroutine(res):
# Schedule coroutine
self._loop.create_task(res)

def _on_error(self, exc):
def _on_error(self, exc: Exception):
for listener in self.listeners:
if hasattr(listener, "on_error"):
listener.on_error(exc)

def add_listener(self, listener):
def add_listener(self, listener: Listener):
"""Add new Listener to the notification list.
If it is already present, it will be called two times
each time a message arrives.

:param can.Listener listener: Listener to be added to
the list to be notified
:param listener: Listener to be added to the list to be notified
"""
self.listeners.append(listener)

def remove_listener(self, listener):
def remove_listener(self, listener: Listener):
"""Remove a listener from the notification list. This method
trows an exception if the given listener is not part of the
stored listeners.

:param can.Listener listener: Listener to be removed from
the list to be notified
:param listener: Listener to be removed from the list to be notified
:raises ValueError: if `listener` was never added to this notifier
"""
self.listeners.remove(listener)