Compare commits

..

No commits in common. "eed036ca030187a1d7d3a2dcc464b44be4de4b56" and "ee32f9291447db243ae18fdb0987ce86afd61291" have entirely different histories.

7 changed files with 48 additions and 63 deletions

View File

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "vban-cmd" name = "vban-cmd"
version = "2.4.4" version = "2.4.2"
description = "Python interface for the VBAN RT Packet Service (Sendtext)" description = "Python interface for the VBAN RT Packet Service (Sendtext)"
authors = ["onyx-and-iris <code@onyxandiris.online>"] authors = ["onyx-and-iris <code@onyxandiris.online>"]
license = "MIT" license = "MIT"

View File

@ -102,7 +102,7 @@ class BusLevel(IRemote):
def fget(i): def fget(i):
return round((((1 << 16) - 1) - i) * -0.01, 1) return round((((1 << 16) - 1) - i) * -0.01, 1)
if not self._remote.stopped() and self._remote.event.ldirty: if self._remote.running and self._remote.event.ldirty:
return tuple( return tuple(
fget(i) fget(i)
for i in self._remote.cache["bus_level"][self.range[0] : self.range[-1]] for i in self._remote.cache["bus_level"][self.range[0] : self.range[-1]]

View File

@ -265,7 +265,7 @@ class VbanRtPacketHeader:
@dataclass @dataclass
class RequestHeader: class RequestHeader:
"""Represents the header of a REQUEST RT PACKET""" """Represents the header of an REQUEST RT PACKET"""
name: str name: str
bps_index: int bps_index: int

View File

@ -296,7 +296,7 @@ class StripLevel(IRemote):
def fget(i): def fget(i):
return round((((1 << 16) - 1) - i) * -0.01, 1) return round((((1 << 16) - 1) - i) * -0.01, 1)
if not self._remote.stopped() and self._remote.event.ldirty: if self._remote.running and self._remote.event.ldirty:
return tuple( return tuple(
fget(i) fget(i)
for i in self._remote.cache["strip_level"][ for i in self._remote.cache["strip_level"][

View File

@ -172,24 +172,32 @@ class VbanMidiOutstream(VbanOutstream):
def _make_stream_pair(remote, kind): def _make_stream_pair(remote, kind):
num_instream, num_outstream, num_midi, num_text = kind.vban num_instream, num_outstream, num_midi, num_text = kind.vban
def _make_cls(i, dir): def _generate_streams(i, dir):
match dir: """generator function for creating instream/outstream tuples"""
case "in": if dir == "in":
if i < num_instream: if i < num_instream:
return VbanAudioInstream(remote, i) yield VbanAudioInstream
elif i < num_instream + num_midi: elif i < num_instream + num_midi:
return VbanMidiInstream(remote, i) yield VbanMidiInstream
else:
yield VbanTextInstream
else: else:
return VbanTextInstream(remote, i)
case "out":
if i < num_outstream: if i < num_outstream:
return VbanAudioOutstream(remote, i) yield VbanAudioOutstream
else: else:
return VbanMidiOutstream(remote, i) yield VbanMidiOutstream
return ( return (
tuple(_make_cls(i, "in") for i in range(num_instream + num_midi + num_text)), tuple(
tuple(_make_cls(i, "out") for i in range(num_outstream + num_midi)), cls(remote, i)
for i in range(num_instream + num_midi + num_text)
for cls in _generate_streams(i, "in")
),
tuple(
cls(remote, i)
for i in range(num_outstream + num_midi)
for cls in _generate_streams(i, "out")
),
) )

View File

@ -1,6 +1,5 @@
import logging import logging
import socket import socket
import threading
import time import time
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from pathlib import Path from pathlib import Path
@ -50,7 +49,6 @@ class VbanCmd(metaclass=ABCMeta):
self._pdirty = False self._pdirty = False
self._ldirty = False self._ldirty = False
self._script = str() self._script = str()
self.stop_event = None
@abstractmethod @abstractmethod
def __str__(self): def __str__(self):
@ -90,17 +88,16 @@ class VbanCmd(metaclass=ABCMeta):
def login(self) -> None: def login(self) -> None:
"""Starts the subscriber and updater threads (unless in outbound mode)""" """Starts the subscriber and updater threads (unless in outbound mode)"""
if not self.outbound: if not self.outbound:
self.running = True
self.event.info() self.event.info()
self.stop_event = threading.Event() self.subscriber = Subscriber(self)
self.stop_event.clear()
self.subscriber = Subscriber(self, self.stop_event)
self.subscriber.start() self.subscriber.start()
queue = Queue() queue = Queue()
self.updater = Updater(self, queue) self.updater = Updater(self, queue)
self.updater.start() self.updater.start()
self.producer = Producer(self, queue, self.stop_event) self.producer = Producer(self, queue)
self.producer.start() self.producer.start()
self.logger.info( self.logger.info(
@ -109,9 +106,6 @@ class VbanCmd(metaclass=ABCMeta):
) )
) )
def stopped(self):
return self.stop_event is None or self.stop_event.is_set()
def _set_rt(self, cmd: str, val: Union[str, float]): def _set_rt(self, cmd: str, val: Union[str, float]):
"""Sends a string request command over a network.""" """Sends a string request command over a network."""
self.socks[Socket.request].sendto( self.socks[Socket.request].sendto(
@ -219,10 +213,8 @@ class VbanCmd(metaclass=ABCMeta):
self.logger.info(f"Profile '{name}' applied!") self.logger.info(f"Profile '{name}' applied!")
def logout(self) -> None: def logout(self) -> None:
if not self.stopped(): self.running = False
self.logger.debug("events thread shutdown started") time.sleep(0.2)
self.stop_event.set()
self.subscriber.join() # wait for subscriber thread to complete cycle
[sock.close() for sock in self.socks] [sock.close() for sock in self.socks]
self.logger.info(f"{type(self).__name__}: Successfully logged out of {self}") self.logger.info(f"{type(self).__name__}: Successfully logged out of {self}")

View File

@ -14,15 +14,14 @@ logger = logging.getLogger(__name__)
class Subscriber(threading.Thread): class Subscriber(threading.Thread):
"""fire a subscription packet every 10 seconds""" """fire a subscription packet every 10 seconds"""
def __init__(self, remote, stop_event): def __init__(self, remote):
super().__init__(name="subscriber", daemon=False) super().__init__(name="subscriber", daemon=True)
self._remote = remote self._remote = remote
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
self.packet = SubscribeHeader() self.packet = SubscribeHeader()
def run(self): def run(self):
while not self.stopped(): while self._remote.running:
try: try:
self._remote.socks[Socket.register].sendto( self._remote.socks[Socket.register].sendto(
self.packet.header, self.packet.header,
@ -31,39 +30,23 @@ class Subscriber(threading.Thread):
self.packet.framecounter = ( self.packet.framecounter = (
int.from_bytes(self.packet.framecounter, "little") + 1 int.from_bytes(self.packet.framecounter, "little") + 1
).to_bytes(4, "little") ).to_bytes(4, "little")
self.wait_until_stopped(10) time.sleep(10)
except socket.gaierror as e: except socket.gaierror as e:
self.logger.exception(f"{type(e).__name__}: {e}") self.logger.exception(f"{type(e).__name__}: {e}")
raise VBANCMDConnectionError( raise VBANCMDConnectionError(
f"unable to resolve hostname {self._remote.ip}" f"unable to resolve hostname {self._remote.ip}"
) from e ) from e
self.logger.debug(f"terminating {self.name} thread")
def stopped(self):
return self.stop_event.is_set()
def wait_until_stopped(self, timeout, period=0.2):
must_end = time.time() + timeout
while time.time() < must_end:
if self.stopped():
break
time.sleep(period)
class Producer(threading.Thread): class Producer(threading.Thread):
"""Continously send job queue to the Updater thread at a rate of self._remote.ratelimit.""" """Continously send job queue to the Updater thread at a rate of self._remote.ratelimit."""
def __init__(self, remote, queue, stop_event): def __init__(self, remote, queue):
super().__init__(name="producer", daemon=False) super().__init__(name="producer", daemon=True)
self._remote = remote self._remote = remote
self.queue = queue self.queue = queue
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
self.packet_expected = VbanRtPacketHeader() self.packet_expected = VbanRtPacketHeader()
self._remote.socks[Socket.response].settimeout(self._remote.timeout)
self._remote.socks[Socket.response].bind(
(socket.gethostbyname(socket.gethostname()), self._remote.port)
)
self._remote._public_packet = self._get_rt() self._remote._public_packet = self._get_rt()
( (
self._remote.cache["strip_level"], self._remote.cache["strip_level"],
@ -77,6 +60,7 @@ class Producer(threading.Thread):
data = None data = None
while not data: while not data:
data = self._fetch_rt_packet() data = self._fetch_rt_packet()
time.sleep(self._remote.DELAY)
return data return data
return fget() return fget()
@ -84,10 +68,10 @@ class Producer(threading.Thread):
def _fetch_rt_packet(self) -> Optional[VbanRtPacket]: def _fetch_rt_packet(self) -> Optional[VbanRtPacket]:
try: try:
data, _ = self._remote.socks[Socket.response].recvfrom(2048) data, _ = self._remote.socks[Socket.response].recvfrom(2048)
# do we have packet data? # check for packet data
if len(data) > HEADER_SIZE: if len(data) > HEADER_SIZE:
# is the packet of type VBAN RT response? # check if packet is of type rt packet response
if self.packet_expected.header == data[:HEADER_SIZE]: if self.packet_expected.header == data[: HEADER_SIZE - 4]:
return VbanRtPacket( return VbanRtPacket(
_kind=self._remote.kind, _kind=self._remote.kind,
_voicemeeterType=data[28:29], _voicemeeterType=data[28:29],
@ -119,11 +103,8 @@ class Producer(threading.Thread):
f"timeout waiting for RtPacket from {self._remote.ip}" f"timeout waiting for RtPacket from {self._remote.ip}"
) from e ) from e
def stopped(self):
return self.stop_event.is_set()
def run(self): def run(self):
while not self.stopped(): while self._remote.running:
_pp = self._get_rt() _pp = self._get_rt()
pdirty = _pp.pdirty(self._remote.public_packet) pdirty = _pp.pdirty(self._remote.public_packet)
ldirty = _pp.ldirty( ldirty = _pp.ldirty(
@ -156,6 +137,10 @@ class Updater(threading.Thread):
self._remote = remote self._remote = remote
self.queue = queue self.queue = queue
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
self._remote.socks[Socket.response].settimeout(self._remote.timeout)
self._remote.socks[Socket.response].bind(
(socket.gethostbyname(socket.gethostname()), self._remote.port)
)
self._remote._strip_comp = [False] * (self._remote.kind.num_strip_levels) self._remote._strip_comp = [False] * (self._remote.kind.num_strip_levels)
self._remote._bus_comp = [False] * (self._remote.kind.num_bus_levels) self._remote._bus_comp = [False] * (self._remote.kind.num_bus_levels)