diff --git a/vban_cmd/bus.py b/vban_cmd/bus.py index 49a8e5d..05b75a7 100644 --- a/vban_cmd/bus.py +++ b/vban_cmd/bus.py @@ -102,7 +102,7 @@ class BusLevel(IRemote): def fget(i): return round((((1 << 16) - 1) - i) * -0.01, 1) - if self._remote.running and self._remote.event.ldirty: + if not self._remote.stopped() and self._remote.event.ldirty: return tuple( fget(i) for i in self._remote.cache["bus_level"][self.range[0] : self.range[-1]] diff --git a/vban_cmd/strip.py b/vban_cmd/strip.py index e4b1af5..73e98b9 100644 --- a/vban_cmd/strip.py +++ b/vban_cmd/strip.py @@ -296,7 +296,7 @@ class StripLevel(IRemote): def fget(i): return round((((1 << 16) - 1) - i) * -0.01, 1) - if self._remote.running and self._remote.event.ldirty: + if not self._remote.stopped() and self._remote.event.ldirty: return tuple( fget(i) for i in self._remote.cache["strip_level"][ diff --git a/vban_cmd/vbancmd.py b/vban_cmd/vbancmd.py index f55ec71..b0afd23 100644 --- a/vban_cmd/vbancmd.py +++ b/vban_cmd/vbancmd.py @@ -1,5 +1,6 @@ import logging import socket +import threading import time from abc import ABCMeta, abstractmethod from pathlib import Path @@ -88,16 +89,17 @@ class VbanCmd(metaclass=ABCMeta): def login(self) -> None: """Starts the subscriber and updater threads (unless in outbound mode)""" if not self.outbound: - self.running = True self.event.info() - self.subscriber = Subscriber(self) + self.stop_event = threading.Event() + self.stop_event.clear() + self.subscriber = Subscriber(self, self.stop_event) self.subscriber.start() queue = Queue() self.updater = Updater(self, queue) self.updater.start() - self.producer = Producer(self, queue) + self.producer = Producer(self, queue, self.stop_event) self.producer.start() self.logger.info( @@ -106,6 +108,9 @@ class VbanCmd(metaclass=ABCMeta): ) ) + def stopped(self): + return self.stop_event.is_set() + def _set_rt(self, cmd: str, val: Union[str, float]): """Sends a string request command over a network.""" self.socks[Socket.request].sendto( @@ -213,8 +218,10 @@ class VbanCmd(metaclass=ABCMeta): self.logger.info(f"Profile '{name}' applied!") def logout(self) -> None: - self.running = False - time.sleep(0.2) + if not self.stopped(): + self.logger.debug("events thread shutdown started") + self.stop_event.set() + self.subscriber.join() # wait for subscriber thread to complete cycle [sock.close() for sock in self.socks] self.logger.info(f"{type(self).__name__}: Successfully logged out of {self}") diff --git a/vban_cmd/worker.py b/vban_cmd/worker.py index d5b7724..4d45e0a 100644 --- a/vban_cmd/worker.py +++ b/vban_cmd/worker.py @@ -14,14 +14,15 @@ logger = logging.getLogger(__name__) class Subscriber(threading.Thread): """fire a subscription packet every 10 seconds""" - def __init__(self, remote): - super().__init__(name="subscriber", daemon=True) + def __init__(self, remote, stop_event): + super().__init__(name="subscriber", daemon=False) self._remote = remote + self.stop_event = stop_event self.logger = logger.getChild(self.__class__.__name__) self.packet = SubscribeHeader() def run(self): - while self._remote.running: + while not self.stopped(): try: self._remote.socks[Socket.register].sendto( self.packet.header, @@ -30,23 +31,39 @@ class Subscriber(threading.Thread): self.packet.framecounter = ( int.from_bytes(self.packet.framecounter, "little") + 1 ).to_bytes(4, "little") - time.sleep(10) + self.until_stopped(10) except socket.gaierror as e: self.logger.exception(f"{type(e).__name__}: {e}") raise VBANCMDConnectionError( f"unable to resolve hostname {self._remote.ip}" ) from e + self.logger.debug(f"terminating {self.name} thread") + + def stopped(self): + return self.stop_event.is_set() + + def until_stopped(self, timeout, period=0.2): + must_end = time.time() + timeout + while time.time() < must_end: + if self.stopped(): + break + time.sleep(period) class Producer(threading.Thread): """Continously send job queue to the Updater thread at a rate of self._remote.ratelimit.""" - def __init__(self, remote, queue): - super().__init__(name="producer", daemon=True) + def __init__(self, remote, queue, stop_event): + super().__init__(name="producer", daemon=False) self._remote = remote self.queue = queue + self.stop_event = stop_event self.logger = logger.getChild(self.__class__.__name__) self.packet_expected = VbanRtPacketHeader() + self._remote.socks[Socket.response].settimeout(self._remote.timeout) + self._remote.socks[Socket.response].bind( + (socket.gethostbyname(socket.gethostname()), self._remote.port) + ) self._remote._public_packet = self._get_rt() ( self._remote.cache["strip_level"], @@ -60,7 +77,6 @@ class Producer(threading.Thread): data = None while not data: data = self._fetch_rt_packet() - time.sleep(self._remote.DELAY) return data return fget() @@ -68,10 +84,10 @@ class Producer(threading.Thread): def _fetch_rt_packet(self) -> Optional[VbanRtPacket]: try: data, _ = self._remote.socks[Socket.response].recvfrom(2048) - # check for packet data + # do we have packet data? if len(data) > HEADER_SIZE: - # check if packet is of type rt packet response - if self.packet_expected.header == data[: HEADER_SIZE - 4]: + # is the packet of type VBAN RT response? + if self.packet_expected.header == data[:HEADER_SIZE]: return VbanRtPacket( _kind=self._remote.kind, _voicemeeterType=data[28:29], @@ -103,8 +119,11 @@ class Producer(threading.Thread): f"timeout waiting for RtPacket from {self._remote.ip}" ) from e + def stopped(self): + return self.stop_event.is_set() + def run(self): - while self._remote.running: + while not self.stopped(): _pp = self._get_rt() pdirty = _pp.pdirty(self._remote.public_packet) ldirty = _pp.ldirty( @@ -137,10 +156,6 @@ class Updater(threading.Thread): self._remote = remote self.queue = queue self.logger = logger.getChild(self.__class__.__name__) - self._remote.socks[Socket.response].settimeout(self._remote.timeout) - self._remote.socks[Socket.response].bind( - (socket.gethostbyname(socket.gethostname()), self._remote.port) - ) self._remote._strip_comp = [False] * (self._remote.kind.num_strip_levels) self._remote._bus_comp = [False] * (self._remote.kind.num_bus_levels)