Compare commits

...

7 Commits

Author SHA1 Message Date
eed036ca03 patch bump 2023-08-05 14:06:47 +01:00
55211b9b19 replace generator function with factory function 2023-08-05 14:06:39 +01:00
4af7c0f694 initialize stop_event to None
in case outbound mode enabled
2023-08-05 14:05:18 +01:00
f082fa8ac5 reword 2023-08-05 13:40:32 +01:00
cbcca14481 rename until_stopped() to wait_until_stopped() 2023-08-05 13:36:36 +01:00
f584d53835 patch bump 2023-08-05 13:34:56 +01:00
72d182a488 use Threading.Event object to terminate threads
until_stopped() added to Subscriber thread
2023-08-04 23:13:58 +01:00
7 changed files with 63 additions and 48 deletions

View File

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "vban-cmd" name = "vban-cmd"
version = "2.4.2" version = "2.4.4"
description = "Python interface for the VBAN RT Packet Service (Sendtext)" description = "Python interface for the VBAN RT Packet Service (Sendtext)"
authors = ["onyx-and-iris <code@onyxandiris.online>"] authors = ["onyx-and-iris <code@onyxandiris.online>"]
license = "MIT" license = "MIT"

View File

@ -102,7 +102,7 @@ class BusLevel(IRemote):
def fget(i): def fget(i):
return round((((1 << 16) - 1) - i) * -0.01, 1) return round((((1 << 16) - 1) - i) * -0.01, 1)
if self._remote.running and self._remote.event.ldirty: if not self._remote.stopped() and self._remote.event.ldirty:
return tuple( return tuple(
fget(i) fget(i)
for i in self._remote.cache["bus_level"][self.range[0] : self.range[-1]] for i in self._remote.cache["bus_level"][self.range[0] : self.range[-1]]

View File

@ -265,7 +265,7 @@ class VbanRtPacketHeader:
@dataclass @dataclass
class RequestHeader: class RequestHeader:
"""Represents the header of an REQUEST RT PACKET""" """Represents the header of a REQUEST RT PACKET"""
name: str name: str
bps_index: int bps_index: int

View File

@ -296,7 +296,7 @@ class StripLevel(IRemote):
def fget(i): def fget(i):
return round((((1 << 16) - 1) - i) * -0.01, 1) return round((((1 << 16) - 1) - i) * -0.01, 1)
if self._remote.running and self._remote.event.ldirty: if not self._remote.stopped() and self._remote.event.ldirty:
return tuple( return tuple(
fget(i) fget(i)
for i in self._remote.cache["strip_level"][ for i in self._remote.cache["strip_level"][

View File

@ -172,32 +172,24 @@ class VbanMidiOutstream(VbanOutstream):
def _make_stream_pair(remote, kind): def _make_stream_pair(remote, kind):
num_instream, num_outstream, num_midi, num_text = kind.vban num_instream, num_outstream, num_midi, num_text = kind.vban
def _generate_streams(i, dir): def _make_cls(i, dir):
"""generator function for creating instream/outstream tuples""" match dir:
if dir == "in": case "in":
if i < num_instream: if i < num_instream:
yield VbanAudioInstream return VbanAudioInstream(remote, i)
elif i < num_instream + num_midi: elif i < num_instream + num_midi:
yield VbanMidiInstream return VbanMidiInstream(remote, i)
else: else:
yield VbanTextInstream return VbanTextInstream(remote, i)
else: case "out":
if i < num_outstream: if i < num_outstream:
yield VbanAudioOutstream return VbanAudioOutstream(remote, i)
else: else:
yield VbanMidiOutstream return VbanMidiOutstream(remote, i)
return ( return (
tuple( tuple(_make_cls(i, "in") for i in range(num_instream + num_midi + num_text)),
cls(remote, i) tuple(_make_cls(i, "out") for i in range(num_outstream + num_midi)),
for i in range(num_instream + num_midi + num_text)
for cls in _generate_streams(i, "in")
),
tuple(
cls(remote, i)
for i in range(num_outstream + num_midi)
for cls in _generate_streams(i, "out")
),
) )

View File

@ -1,5 +1,6 @@
import logging import logging
import socket import socket
import threading
import time import time
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from pathlib import Path from pathlib import Path
@ -49,6 +50,7 @@ class VbanCmd(metaclass=ABCMeta):
self._pdirty = False self._pdirty = False
self._ldirty = False self._ldirty = False
self._script = str() self._script = str()
self.stop_event = None
@abstractmethod @abstractmethod
def __str__(self): def __str__(self):
@ -88,16 +90,17 @@ class VbanCmd(metaclass=ABCMeta):
def login(self) -> None: def login(self) -> None:
"""Starts the subscriber and updater threads (unless in outbound mode)""" """Starts the subscriber and updater threads (unless in outbound mode)"""
if not self.outbound: if not self.outbound:
self.running = True
self.event.info() self.event.info()
self.subscriber = Subscriber(self) self.stop_event = threading.Event()
self.stop_event.clear()
self.subscriber = Subscriber(self, self.stop_event)
self.subscriber.start() self.subscriber.start()
queue = Queue() queue = Queue()
self.updater = Updater(self, queue) self.updater = Updater(self, queue)
self.updater.start() self.updater.start()
self.producer = Producer(self, queue) self.producer = Producer(self, queue, self.stop_event)
self.producer.start() self.producer.start()
self.logger.info( self.logger.info(
@ -106,6 +109,9 @@ class VbanCmd(metaclass=ABCMeta):
) )
) )
def stopped(self):
return self.stop_event is None or self.stop_event.is_set()
def _set_rt(self, cmd: str, val: Union[str, float]): def _set_rt(self, cmd: str, val: Union[str, float]):
"""Sends a string request command over a network.""" """Sends a string request command over a network."""
self.socks[Socket.request].sendto( self.socks[Socket.request].sendto(
@ -213,8 +219,10 @@ class VbanCmd(metaclass=ABCMeta):
self.logger.info(f"Profile '{name}' applied!") self.logger.info(f"Profile '{name}' applied!")
def logout(self) -> None: def logout(self) -> None:
self.running = False if not self.stopped():
time.sleep(0.2) self.logger.debug("events thread shutdown started")
self.stop_event.set()
self.subscriber.join() # wait for subscriber thread to complete cycle
[sock.close() for sock in self.socks] [sock.close() for sock in self.socks]
self.logger.info(f"{type(self).__name__}: Successfully logged out of {self}") self.logger.info(f"{type(self).__name__}: Successfully logged out of {self}")

View File

@ -14,14 +14,15 @@ logger = logging.getLogger(__name__)
class Subscriber(threading.Thread): class Subscriber(threading.Thread):
"""fire a subscription packet every 10 seconds""" """fire a subscription packet every 10 seconds"""
def __init__(self, remote): def __init__(self, remote, stop_event):
super().__init__(name="subscriber", daemon=True) super().__init__(name="subscriber", daemon=False)
self._remote = remote self._remote = remote
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
self.packet = SubscribeHeader() self.packet = SubscribeHeader()
def run(self): def run(self):
while self._remote.running: while not self.stopped():
try: try:
self._remote.socks[Socket.register].sendto( self._remote.socks[Socket.register].sendto(
self.packet.header, self.packet.header,
@ -30,23 +31,39 @@ class Subscriber(threading.Thread):
self.packet.framecounter = ( self.packet.framecounter = (
int.from_bytes(self.packet.framecounter, "little") + 1 int.from_bytes(self.packet.framecounter, "little") + 1
).to_bytes(4, "little") ).to_bytes(4, "little")
time.sleep(10) self.wait_until_stopped(10)
except socket.gaierror as e: except socket.gaierror as e:
self.logger.exception(f"{type(e).__name__}: {e}") self.logger.exception(f"{type(e).__name__}: {e}")
raise VBANCMDConnectionError( raise VBANCMDConnectionError(
f"unable to resolve hostname {self._remote.ip}" f"unable to resolve hostname {self._remote.ip}"
) from e ) from e
self.logger.debug(f"terminating {self.name} thread")
def stopped(self):
return self.stop_event.is_set()
def wait_until_stopped(self, timeout, period=0.2):
must_end = time.time() + timeout
while time.time() < must_end:
if self.stopped():
break
time.sleep(period)
class Producer(threading.Thread): class Producer(threading.Thread):
"""Continously send job queue to the Updater thread at a rate of self._remote.ratelimit.""" """Continously send job queue to the Updater thread at a rate of self._remote.ratelimit."""
def __init__(self, remote, queue): def __init__(self, remote, queue, stop_event):
super().__init__(name="producer", daemon=True) super().__init__(name="producer", daemon=False)
self._remote = remote self._remote = remote
self.queue = queue self.queue = queue
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
self.packet_expected = VbanRtPacketHeader() self.packet_expected = VbanRtPacketHeader()
self._remote.socks[Socket.response].settimeout(self._remote.timeout)
self._remote.socks[Socket.response].bind(
(socket.gethostbyname(socket.gethostname()), self._remote.port)
)
self._remote._public_packet = self._get_rt() self._remote._public_packet = self._get_rt()
( (
self._remote.cache["strip_level"], self._remote.cache["strip_level"],
@ -60,7 +77,6 @@ class Producer(threading.Thread):
data = None data = None
while not data: while not data:
data = self._fetch_rt_packet() data = self._fetch_rt_packet()
time.sleep(self._remote.DELAY)
return data return data
return fget() return fget()
@ -68,10 +84,10 @@ class Producer(threading.Thread):
def _fetch_rt_packet(self) -> Optional[VbanRtPacket]: def _fetch_rt_packet(self) -> Optional[VbanRtPacket]:
try: try:
data, _ = self._remote.socks[Socket.response].recvfrom(2048) data, _ = self._remote.socks[Socket.response].recvfrom(2048)
# check for packet data # do we have packet data?
if len(data) > HEADER_SIZE: if len(data) > HEADER_SIZE:
# check if packet is of type rt packet response # is the packet of type VBAN RT response?
if self.packet_expected.header == data[: HEADER_SIZE - 4]: if self.packet_expected.header == data[:HEADER_SIZE]:
return VbanRtPacket( return VbanRtPacket(
_kind=self._remote.kind, _kind=self._remote.kind,
_voicemeeterType=data[28:29], _voicemeeterType=data[28:29],
@ -103,8 +119,11 @@ class Producer(threading.Thread):
f"timeout waiting for RtPacket from {self._remote.ip}" f"timeout waiting for RtPacket from {self._remote.ip}"
) from e ) from e
def stopped(self):
return self.stop_event.is_set()
def run(self): def run(self):
while self._remote.running: while not self.stopped():
_pp = self._get_rt() _pp = self._get_rt()
pdirty = _pp.pdirty(self._remote.public_packet) pdirty = _pp.pdirty(self._remote.public_packet)
ldirty = _pp.ldirty( ldirty = _pp.ldirty(
@ -137,10 +156,6 @@ class Updater(threading.Thread):
self._remote = remote self._remote = remote
self.queue = queue self.queue = queue
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
self._remote.socks[Socket.response].settimeout(self._remote.timeout)
self._remote.socks[Socket.response].bind(
(socket.gethostbyname(socket.gethostname()), self._remote.port)
)
self._remote._strip_comp = [False] * (self._remote.kind.num_strip_levels) self._remote._strip_comp = [False] * (self._remote.kind.num_strip_levels)
self._remote._bus_comp = [False] * (self._remote.kind.num_bus_levels) self._remote._bus_comp = [False] * (self._remote.kind.num_bus_levels)