diff --git a/vban_cmd/packet.py b/vban_cmd/packet.py index 738acf1..2b29f34 100644 --- a/vban_cmd/packet.py +++ b/vban_cmd/packet.py @@ -1,5 +1,4 @@ from dataclasses import dataclass -from typing import Generator from .util import comp @@ -13,9 +12,30 @@ HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 + 4 class VbanRtPacket: """Represents the body of a VBAN RT data packet""" - def __init__(self, **kwargs): - for k, v in kwargs.items(): - setattr(self, k, v) + def __init__(self, kind=None, data=None): + self._kind = kind + self._voicemeeterType = data[28:29] + self._reserved = data[29:30] + self._buffersize = data[30:32] + self._voicemeeterVersion = data[32:36] + self._optionBits = data[36:40] + self._samplerate = data[40:44] + self._inputLeveldB100 = data[44:112] + self._outputLeveldB100 = data[112:240] + self._TransportBit = data[240:244] + self._stripState = data[244:276] + self._busState = data[276:308] + self._stripGaindB100Layer1 = data[308:324] + self._stripGaindB100Layer2 = data[324:340] + self._stripGaindB100Layer3 = data[340:356] + self._stripGaindB100Layer4 = data[356:372] + self._stripGaindB100Layer5 = data[372:388] + self._stripGaindB100Layer6 = data[388:404] + self._stripGaindB100Layer7 = data[404:420] + self._stripGaindB100Layer8 = data[420:436] + self._busGaindB100 = data[436:452] + self._stripLabelUTF8c60 = data[452:932] + self._busLabelUTF8c60 = data[932:1412] self._strip_level = self._generate_levels(self._inputLeveldB100) self._bus_level = self._generate_levels(self._outputLeveldB100) diff --git a/vban_cmd/worker.py b/vban_cmd/worker.py index 03c287b..5ceb619 100644 --- a/vban_cmd/worker.py +++ b/vban_cmd/worker.py @@ -4,98 +4,52 @@ import threading import time from typing import Optional -from .error import VBANCMDError +from .error import VBANCMDConnectionError from .packet import HEADER_SIZE, SubscribeHeader, VbanRtPacket, VbanRtPacketHeader -from .util import Socket, comp +from .util import Socket + +logger = logging.getLogger(__name__) class Subscriber(threading.Thread): """fire a subscription packet every 10 seconds""" def __init__(self, remote): - super().__init__(name="subscriber", target=self.subscribe, daemon=True) + super().__init__(name="subscriber", daemon=True) self._remote = remote + self.logger = logger.getChild(self.__class__.__name__) self.packet = SubscribeHeader() - def subscribe(self): + def run(self): while self._remote.running: try: self._remote.socks[Socket.register].sendto( self.packet.header, (socket.gethostbyname(self._remote.ip), self._remote.port), ) - count = int.from_bytes(self.packet.framecounter, "little") + 1 - self.packet.framecounter = count.to_bytes(4, "little") + self.packet.framecounter = ( + int.from_bytes(self.packet.framecounter, "little") + 1 + ).to_bytes(4, "little") time.sleep(10) - except socket.gaierror: - err_msg = f"Unable to resolve hostname {self._remote.ip}" - print(err_msg) - raise VBANCMDError(err_msg) + except socket.gaierror as e: + self.logger.exception(f"{type(e).__name__}: {e}") + raise VBANCMDConnectionError(f"unable to resolve hostname {self._remote.ip}") from e -class Updater(threading.Thread): - """ - continously updates the public packet +class Producer(threading.Thread): + """Continously send job queue to the Updater thread at a rate of self._remote.ratelimit.""" - notifies observers of event updates - """ - - logger = logging.getLogger("worker.updater") - - def __init__(self, remote): - super().__init__(name="updater", target=self.update, daemon=True) + def __init__(self, remote, queue): + super().__init__(name="producer", daemon=True) self._remote = remote - self._remote.socks[Socket.response].settimeout(5) - self._remote.socks[Socket.response].bind( - (socket.gethostbyname(socket.gethostname()), self._remote.port) - ) + self.queue = queue + self.logger = logger.getChild(self.__class__.__name__) self.packet_expected = VbanRtPacketHeader() self._remote._public_packet = self._get_rt() ( self._remote.cache["strip_level"], self._remote.cache["bus_level"], ) = self._remote._get_levels(self._remote.public_packet) - p_in, v_in = self._remote.kind.ins - self._remote._strip_comp = [False] * (2 * p_in + 8 * v_in) - self._remote._bus_comp = [False] * (self._remote.kind.num_bus * 8) - - def _fetch_rt_packet(self) -> Optional[VbanRtPacket]: - try: - data, _ = self._remote.socks[Socket.response].recvfrom(2048) - # check for packet data - if len(data) > HEADER_SIZE: - # check if packet is of type rt packet response - if self.packet_expected.header == data[: HEADER_SIZE - 4]: - self.logger.debug("valid packet received") - return VbanRtPacket( - _kind=self._remote.kind, - _voicemeeterType=data[28:29], - _reserved=data[29:30], - _buffersize=data[30:32], - _voicemeeterVersion=data[32:36], - _optionBits=data[36:40], - _samplerate=data[40:44], - _inputLeveldB100=data[44:112], - _outputLeveldB100=data[112:240], - _TransportBit=data[240:244], - _stripState=data[244:276], - _busState=data[276:308], - _stripGaindB100Layer1=data[308:324], - _stripGaindB100Layer2=data[324:340], - _stripGaindB100Layer3=data[340:356], - _stripGaindB100Layer4=data[356:372], - _stripGaindB100Layer5=data[372:388], - _stripGaindB100Layer6=data[388:404], - _stripGaindB100Layer7=data[404:420], - _stripGaindB100Layer8=data[420:436], - _busGaindB100=data[436:452], - _stripLabelUTF8c60=data[452:932], - _busLabelUTF8c60=data[932:1412], - ) - except TimeoutError: - err_msg = f"Unable to establish connection with {self._remote.ip}" - print(err_msg) - raise VBANCMDError(err_msg) def _get_rt(self) -> VbanRtPacket: """Attempt to fetch data packet until a valid one found""" @@ -109,9 +63,20 @@ class Updater(threading.Thread): return fget() - def update(self): + def _fetch_rt_packet(self) -> Optional[VbanRtPacket]: + try: + data, _ = self._remote.socks[Socket.response].recvfrom(2048) + # check for packet data + if len(data) > HEADER_SIZE: + # check if packet is of type rt packet response + if self.packet_expected.header == data[: HEADER_SIZE - 4]: + return VbanRtPacket(kind=self._remote.kind, data=data) + except TimeoutError as e: + self.logger.exception(f"{type(e).__name__}: {e}") + raise VBANCMDConnectionError(f"timeout waiting for RtPacket from {self._remote.ip}") from e + + def run(self): while self._remote.running: - start = time.time() _pp = self._get_rt() pdirty = _pp.pdirty(self._remote.public_packet) ldirty = _pp.ldirty( @@ -119,24 +84,63 @@ class Updater(threading.Thread): ) if pdirty or ldirty: - self.logger.debug("dirty state, updating public packet") self._remote._public_packet = _pp - self._remote._pdirty = pdirty - self._remote._ldirty = ldirty + self._remote._pdirty = pdirty + self._remote._ldirty = ldirty - if self._remote.event.pdirty and self._remote.pdirty: - self._remote.subject.notify("pdirty") - if self._remote.event.ldirty and self._remote.ldirty: + if self._remote.event.pdirty: + self.queue.put("pdirty") + if self._remote.event.ldirty: + self.queue.put("ldirty") + time.sleep(self._remote.ratelimit) + self.logger.debug(f"terminating {self.getName()} thread") + self.queue.put(None) + + +class Updater(threading.Thread): + """ + continously updates the public packet + + notifies observers of event updates + """ + + def __init__(self, remote, queue): + super().__init__(name="updater", daemon=True) + self._remote = remote + self.queue = queue + self.logger = logger.getChild(self.__class__.__name__) + self._remote.socks[Socket.response].settimeout(self._remote.timeout) + self._remote.socks[Socket.response].bind( + (socket.gethostbyname(socket.gethostname()), self._remote.port) + ) + p_in, v_in = self._remote.kind.ins + self._remote._strip_comp = [False] * (2 * p_in + 8 * v_in) + self._remote._bus_comp = [False] * (self._remote.kind.num_bus * 8) + + def run(self): + """ + Continously update observers of dirty states. + + Generate _strip_comp, _bus_comp and update level cache if ldirty. + """ + while True: + event = self.queue.get() + if event is None: + self.logger.debug(f"terminating {self.getName()} thread") + break + + if event == "pdirty" and self._remote.pdirty: + self._remote.subject.notify(event) + elif event == "ldirty" and self._remote.ldirty: self._remote._strip_comp, self._remote._bus_comp = ( - _pp._strip_comp, - _pp._bus_comp, + self._remote._public_packet._strip_comp, + self._remote._public_packet._bus_comp, ) - self._remote.cache["strip_level"], self._remote.cache["bus_level"] = ( - _pp.inputlevels, - _pp.outputlevels, + ( + self._remote.cache["strip_level"], + self._remote.cache["bus_level"], + ) = ( + self._remote._public_packet.inputlevels, + self._remote._public_packet.outputlevels, ) - self._remote.subject.notify("ldirty") - - elapsed = time.time() - start - if self._remote.ratelimit - elapsed > 0: - time.sleep(self._remote.ratelimit - elapsed) + self._remote.subject.notify(event)