use Threading.Event object to terminate threads

until_stopped() added to Subscriber thread
This commit is contained in:
onyx-and-iris 2023-08-04 23:13:58 +01:00
parent ee32f92914
commit 72d182a488
4 changed files with 44 additions and 22 deletions

View File

@ -102,7 +102,7 @@ class BusLevel(IRemote):
def fget(i): def fget(i):
return round((((1 << 16) - 1) - i) * -0.01, 1) return round((((1 << 16) - 1) - i) * -0.01, 1)
if self._remote.running and self._remote.event.ldirty: if not self._remote.stopped() and self._remote.event.ldirty:
return tuple( return tuple(
fget(i) fget(i)
for i in self._remote.cache["bus_level"][self.range[0] : self.range[-1]] for i in self._remote.cache["bus_level"][self.range[0] : self.range[-1]]

View File

@ -296,7 +296,7 @@ class StripLevel(IRemote):
def fget(i): def fget(i):
return round((((1 << 16) - 1) - i) * -0.01, 1) return round((((1 << 16) - 1) - i) * -0.01, 1)
if self._remote.running and self._remote.event.ldirty: if not self._remote.stopped() and self._remote.event.ldirty:
return tuple( return tuple(
fget(i) fget(i)
for i in self._remote.cache["strip_level"][ for i in self._remote.cache["strip_level"][

View File

@ -1,5 +1,6 @@
import logging import logging
import socket import socket
import threading
import time import time
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from pathlib import Path from pathlib import Path
@ -88,16 +89,17 @@ class VbanCmd(metaclass=ABCMeta):
def login(self) -> None: def login(self) -> None:
"""Starts the subscriber and updater threads (unless in outbound mode)""" """Starts the subscriber and updater threads (unless in outbound mode)"""
if not self.outbound: if not self.outbound:
self.running = True
self.event.info() self.event.info()
self.subscriber = Subscriber(self) self.stop_event = threading.Event()
self.stop_event.clear()
self.subscriber = Subscriber(self, self.stop_event)
self.subscriber.start() self.subscriber.start()
queue = Queue() queue = Queue()
self.updater = Updater(self, queue) self.updater = Updater(self, queue)
self.updater.start() self.updater.start()
self.producer = Producer(self, queue) self.producer = Producer(self, queue, self.stop_event)
self.producer.start() self.producer.start()
self.logger.info( self.logger.info(
@ -106,6 +108,9 @@ class VbanCmd(metaclass=ABCMeta):
) )
) )
def stopped(self):
return self.stop_event.is_set()
def _set_rt(self, cmd: str, val: Union[str, float]): def _set_rt(self, cmd: str, val: Union[str, float]):
"""Sends a string request command over a network.""" """Sends a string request command over a network."""
self.socks[Socket.request].sendto( self.socks[Socket.request].sendto(
@ -213,8 +218,10 @@ class VbanCmd(metaclass=ABCMeta):
self.logger.info(f"Profile '{name}' applied!") self.logger.info(f"Profile '{name}' applied!")
def logout(self) -> None: def logout(self) -> None:
self.running = False if not self.stopped():
time.sleep(0.2) self.logger.debug("events thread shutdown started")
self.stop_event.set()
self.subscriber.join() # wait for subscriber thread to complete cycle
[sock.close() for sock in self.socks] [sock.close() for sock in self.socks]
self.logger.info(f"{type(self).__name__}: Successfully logged out of {self}") self.logger.info(f"{type(self).__name__}: Successfully logged out of {self}")

View File

@ -14,14 +14,15 @@ logger = logging.getLogger(__name__)
class Subscriber(threading.Thread): class Subscriber(threading.Thread):
"""fire a subscription packet every 10 seconds""" """fire a subscription packet every 10 seconds"""
def __init__(self, remote): def __init__(self, remote, stop_event):
super().__init__(name="subscriber", daemon=True) super().__init__(name="subscriber", daemon=False)
self._remote = remote self._remote = remote
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
self.packet = SubscribeHeader() self.packet = SubscribeHeader()
def run(self): def run(self):
while self._remote.running: while not self.stopped():
try: try:
self._remote.socks[Socket.register].sendto( self._remote.socks[Socket.register].sendto(
self.packet.header, self.packet.header,
@ -30,23 +31,39 @@ class Subscriber(threading.Thread):
self.packet.framecounter = ( self.packet.framecounter = (
int.from_bytes(self.packet.framecounter, "little") + 1 int.from_bytes(self.packet.framecounter, "little") + 1
).to_bytes(4, "little") ).to_bytes(4, "little")
time.sleep(10) self.until_stopped(10)
except socket.gaierror as e: except socket.gaierror as e:
self.logger.exception(f"{type(e).__name__}: {e}") self.logger.exception(f"{type(e).__name__}: {e}")
raise VBANCMDConnectionError( raise VBANCMDConnectionError(
f"unable to resolve hostname {self._remote.ip}" f"unable to resolve hostname {self._remote.ip}"
) from e ) from e
self.logger.debug(f"terminating {self.name} thread")
def stopped(self):
return self.stop_event.is_set()
def until_stopped(self, timeout, period=0.2):
must_end = time.time() + timeout
while time.time() < must_end:
if self.stopped():
break
time.sleep(period)
class Producer(threading.Thread): class Producer(threading.Thread):
"""Continously send job queue to the Updater thread at a rate of self._remote.ratelimit.""" """Continously send job queue to the Updater thread at a rate of self._remote.ratelimit."""
def __init__(self, remote, queue): def __init__(self, remote, queue, stop_event):
super().__init__(name="producer", daemon=True) super().__init__(name="producer", daemon=False)
self._remote = remote self._remote = remote
self.queue = queue self.queue = queue
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
self.packet_expected = VbanRtPacketHeader() self.packet_expected = VbanRtPacketHeader()
self._remote.socks[Socket.response].settimeout(self._remote.timeout)
self._remote.socks[Socket.response].bind(
(socket.gethostbyname(socket.gethostname()), self._remote.port)
)
self._remote._public_packet = self._get_rt() self._remote._public_packet = self._get_rt()
( (
self._remote.cache["strip_level"], self._remote.cache["strip_level"],
@ -60,7 +77,6 @@ class Producer(threading.Thread):
data = None data = None
while not data: while not data:
data = self._fetch_rt_packet() data = self._fetch_rt_packet()
time.sleep(self._remote.DELAY)
return data return data
return fget() return fget()
@ -68,10 +84,10 @@ class Producer(threading.Thread):
def _fetch_rt_packet(self) -> Optional[VbanRtPacket]: def _fetch_rt_packet(self) -> Optional[VbanRtPacket]:
try: try:
data, _ = self._remote.socks[Socket.response].recvfrom(2048) data, _ = self._remote.socks[Socket.response].recvfrom(2048)
# check for packet data # do we have packet data?
if len(data) > HEADER_SIZE: if len(data) > HEADER_SIZE:
# check if packet is of type rt packet response # is the packet of type VBAN RT response?
if self.packet_expected.header == data[: HEADER_SIZE - 4]: if self.packet_expected.header == data[:HEADER_SIZE]:
return VbanRtPacket( return VbanRtPacket(
_kind=self._remote.kind, _kind=self._remote.kind,
_voicemeeterType=data[28:29], _voicemeeterType=data[28:29],
@ -103,8 +119,11 @@ class Producer(threading.Thread):
f"timeout waiting for RtPacket from {self._remote.ip}" f"timeout waiting for RtPacket from {self._remote.ip}"
) from e ) from e
def stopped(self):
return self.stop_event.is_set()
def run(self): def run(self):
while self._remote.running: while not self.stopped():
_pp = self._get_rt() _pp = self._get_rt()
pdirty = _pp.pdirty(self._remote.public_packet) pdirty = _pp.pdirty(self._remote.public_packet)
ldirty = _pp.ldirty( ldirty = _pp.ldirty(
@ -137,10 +156,6 @@ class Updater(threading.Thread):
self._remote = remote self._remote = remote
self.queue = queue self.queue = queue
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
self._remote.socks[Socket.response].settimeout(self._remote.timeout)
self._remote.socks[Socket.response].bind(
(socket.gethostbyname(socket.gethostname()), self._remote.port)
)
self._remote._strip_comp = [False] * (self._remote.kind.num_strip_levels) self._remote._strip_comp = [False] * (self._remote.kind.num_strip_levels)
self._remote._bus_comp = [False] * (self._remote.kind.num_bus_levels) self._remote._bus_comp = [False] * (self._remote.kind.num_bus_levels)