diff --git a/pyproject.toml b/pyproject.toml index 107bf39..645c098 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "vban-cmd" -version = "1.3.1" +version = "1.3.2" description = "Python interface for the VBAN RT Packet Service (Sendtext)" authors = ["onyx-and-iris "] license = "MIT" diff --git a/vban_cmd/base.py b/vban_cmd/base.py index 80e35e1..a9ff3b5 100644 --- a/vban_cmd/base.py +++ b/vban_cmd/base.py @@ -1,22 +1,13 @@ import socket import time from abc import ABCMeta, abstractmethod -from enum import IntEnum -from threading import Thread from typing import Iterable, NoReturn, Optional, Union from .misc import Event -from .packet import ( - HEADER_SIZE, - RegisterRTHeader, - TextRequestHeader, - VBAN_VMRT_Packet_Data, - VBAN_VMRT_Packet_Header, -) +from .packet import TextRequestHeader from .subject import Subject -from .util import comp, script - -Socket = IntEnum("Socket", "register request response", start=0) +from .util import Socket, comp, script +from .worker import Subscriber, Updater class VbanCmd(metaclass=ABCMeta): @@ -40,13 +31,9 @@ class VbanCmd(metaclass=ABCMeta): bps_index=self.BPS_OPTS.index(self.bps), channel=self.channel, ) - self.register_header = RegisterRTHeader() - self.expected_packet = VBAN_VMRT_Packet_Header() - self.socks = tuple( socket.socket(socket.AF_INET, socket.SOCK_DGRAM) for _ in Socket ) - self.running = True self.subject = Subject() self.cache = dict() self.event = Event(self.subs) @@ -61,73 +48,14 @@ class VbanCmd(metaclass=ABCMeta): return self def login(self): - """Start listening for RT Packets""" + """Starts the subscriber and updater threads""" + self.running = True - self.socks[Socket.response.value].bind( - (socket.gethostbyname(socket.gethostname()), self.port) - ) - worker = Thread(target=self._send_register_rt, daemon=True) - worker.start() - self._public_packet = self._get_rt() - worker2 = Thread(target=self._updates, daemon=True) - worker2.start() - time.sleep(0.1) + self.subscriber = Subscriber(self) + self.subscriber.start() - def _send_register_rt(self): - """Fires a subscription packet every 10 seconds""" - - while self.running: - self.socks[Socket.register.value].sendto( - self.register_header.header, - (socket.gethostbyname(self.ip), self.port), - ) - count = int.from_bytes(self.register_header.framecounter, "little") + 1 - self.register_header.framecounter = count.to_bytes(4, "little") - time.sleep(10) - - def _fetch_rt_packet(self) -> Optional[VBAN_VMRT_Packet_Data]: - """Returns a valid RT Data Packet or None""" - data, _ = self.socks[Socket.response.value].recvfrom(2048) - # check for packet data - if len(data) > HEADER_SIZE: - # check if packet is of type VBAN - if self.expected_packet.header == data[: HEADER_SIZE - 4]: - return VBAN_VMRT_Packet_Data( - _voicemeeterType=data[28:29], - _reserved=data[29:30], - _buffersize=data[30:32], - _voicemeeterVersion=data[32:36], - _optionBits=data[36:40], - _samplerate=data[40:44], - _inputLeveldB100=data[44:112], - _outputLeveldB100=data[112:240], - _TransportBit=data[240:244], - _stripState=data[244:276], - _busState=data[276:308], - _stripGaindB100Layer1=data[308:324], - _stripGaindB100Layer2=data[324:340], - _stripGaindB100Layer3=data[340:356], - _stripGaindB100Layer4=data[356:372], - _stripGaindB100Layer5=data[372:388], - _stripGaindB100Layer6=data[388:404], - _stripGaindB100Layer7=data[404:420], - _stripGaindB100Layer8=data[420:436], - _busGaindB100=data[436:452], - _stripLabelUTF8c60=data[452:932], - _busLabelUTF8c60=data[932:1412], - ) - - def _get_rt(self) -> VBAN_VMRT_Packet_Data: - """Attempt to fetch data packet until a valid one found""" - - def fget(): - data = False - while not data: - data = self._fetch_rt_packet() - time.sleep(self.DELAY) - return data - - return fget() + self.updater = Updater(self) + self.updater.start() def _set_rt( self, @@ -137,7 +65,7 @@ class VbanCmd(metaclass=ABCMeta): ): """Sends a string request command over a network.""" cmd = id_ if not param else f"{id_}.{param}={val}" - self.socks[Socket.request.value].sendto( + self.socks[Socket.request].sendto( self.text_header.header + cmd.encode(), (socket.gethostbyname(self.ip), self.port), ) @@ -187,30 +115,6 @@ class VbanCmd(metaclass=ABCMeta): while self.pdirty: pass - def _updates(self) -> NoReturn: - print(f"Listening for {', '.join(self.event.get())} events") - self.cache["strip_level"], self.cache["bus_level"] = self._get_levels( - self.public_packet - ) - - while self.running: - start = time.time() - self._pp = self._get_rt() - self._strip_buf, self._bus_buf = self._get_levels(self._pp) - self._pdirty = self._pp.pdirty(self.public_packet) - - if self.event.ldirty and self.ldirty: - self.cache["strip_level"] = self._strip_buf - self.cache["bus_level"] = self._bus_buf - self.subject.notify("ldirty") - if self.public_packet != self._pp: - self._public_packet = self._pp - if self.event.pdirty and self.pdirty: - self.subject.notify("pdirty") - elapsed = time.time() - start - if self.ratelimit - elapsed > 0: - time.sleep(self.ratelimit - elapsed) - def _get_levels(self, packet) -> Iterable: """ returns both level arrays (strip_levels, bus_levels) BEFORE math conversion diff --git a/vban_cmd/util.py b/vban_cmd/util.py index 2c92da5..a67793a 100644 --- a/vban_cmd/util.py +++ b/vban_cmd/util.py @@ -1,3 +1,4 @@ +from enum import IntEnum from typing import Iterator @@ -64,3 +65,6 @@ def comp(t0: tuple, t1: tuple) -> Iterator[bool]: if b <= 9500: yield a == b yield True + + +Socket = IntEnum("Socket", "register request response", start=0) diff --git a/vban_cmd/worker.py b/vban_cmd/worker.py new file mode 100644 index 0000000..0115976 --- /dev/null +++ b/vban_cmd/worker.py @@ -0,0 +1,123 @@ +import socket +import threading +import time +from enum import IntEnum +from typing import Optional + +from .packet import ( + HEADER_SIZE, + RegisterRTHeader, + VBAN_VMRT_Packet_Data, + VBAN_VMRT_Packet_Header, +) +from .util import Socket + + +class Subscriber(threading.Thread): + """fire a subscription packet every 10 seconds""" + + def __init__(self, remote): + super().__init__(name="subscriber", target=self.register, daemon=True) + self._rem = remote + self.register_header = RegisterRTHeader() + + def register(self): + while self._rem.running: + try: + self._rem.socks[Socket.register].sendto( + self.register_header.header, + (socket.gethostbyname(self._rem.ip), self._rem.port), + ) + count = int.from_bytes(self.register_header.framecounter, "little") + 1 + self.register_header.framecounter = count.to_bytes(4, "little") + time.sleep(10) + except socket.gaierror as e: + print(f"Unable to resolve hostname {self._rem.ip}") + self._rem.socks[Socket.register].close() + raise e + + +class Updater(threading.Thread): + """ + continously updates the public packet + + notifies observers of event updates + """ + + def __init__(self, remote): + super().__init__(name="updater", target=self.update, daemon=True) + self._rem = remote + self._rem.socks[Socket.response].bind( + (socket.gethostbyname(socket.gethostname()), self._rem.port) + ) + self.expected_packet = VBAN_VMRT_Packet_Header() + self._rem._public_packet = self._get_rt() + + def _fetch_rt_packet(self) -> Optional[VBAN_VMRT_Packet_Data]: + """Returns a valid RT Data Packet or None""" + data, _ = self._rem.socks[Socket.response].recvfrom(2048) + # check for packet data + if len(data) > HEADER_SIZE: + # check if packet is of type VBAN + if self.expected_packet.header == data[: HEADER_SIZE - 4]: + return VBAN_VMRT_Packet_Data( + _voicemeeterType=data[28:29], + _reserved=data[29:30], + _buffersize=data[30:32], + _voicemeeterVersion=data[32:36], + _optionBits=data[36:40], + _samplerate=data[40:44], + _inputLeveldB100=data[44:112], + _outputLeveldB100=data[112:240], + _TransportBit=data[240:244], + _stripState=data[244:276], + _busState=data[276:308], + _stripGaindB100Layer1=data[308:324], + _stripGaindB100Layer2=data[324:340], + _stripGaindB100Layer3=data[340:356], + _stripGaindB100Layer4=data[356:372], + _stripGaindB100Layer5=data[372:388], + _stripGaindB100Layer6=data[388:404], + _stripGaindB100Layer7=data[404:420], + _stripGaindB100Layer8=data[420:436], + _busGaindB100=data[436:452], + _stripLabelUTF8c60=data[452:932], + _busLabelUTF8c60=data[932:1412], + ) + + def _get_rt(self) -> VBAN_VMRT_Packet_Data: + """Attempt to fetch data packet until a valid one found""" + + def fget(): + data = False + while not data: + data = self._fetch_rt_packet() + time.sleep(self._rem.DELAY) + return data + + return fget() + + def update(self): + print(f"Listening for {', '.join(self._rem.event.get())} events") + ( + self._rem.cache["strip_level"], + self._rem.cache["bus_level"], + ) = self._rem._get_levels(self._rem.public_packet) + + while self._rem.running: + start = time.time() + _pp = self._get_rt() + self._rem._strip_buf, self._rem._bus_buf = self._rem._get_levels(_pp) + self._rem._pdirty = _pp.pdirty(self._rem.public_packet) + + if self._rem.event.ldirty and self._rem.ldirty: + self._rem.cache["strip_level"] = self._rem._strip_buf + self._rem.cache["bus_level"] = self._rem._bus_buf + self._rem.subject.notify("ldirty") + if self._rem.public_packet != _pp: + self._rem._public_packet = _pp + if self._rem.event.pdirty and self._rem.pdirty: + self._rem.subject.notify("pdirty") + elapsed = time.time() - start + if self._rem.ratelimit - elapsed > 0: + time.sleep(self._rem.ratelimit - elapsed)