mirror of
https://github.com/onyx-and-iris/vban-cmd-python.git
synced 2024-11-15 17:10:46 +00:00
6971feb398
misc module added with Event class. base class observable thread now checks for currently registered events.
261 lines
8.6 KiB
Python
261 lines
8.6 KiB
Python
import socket
|
|
import time
|
|
from abc import ABCMeta, abstractmethod
|
|
from enum import IntEnum
|
|
from threading import Thread
|
|
from typing import Iterable, NoReturn, Optional, Union
|
|
|
|
from .misc import Event
|
|
from .packet import (
|
|
HEADER_SIZE,
|
|
RegisterRTHeader,
|
|
TextRequestHeader,
|
|
VBAN_VMRT_Packet_Data,
|
|
VBAN_VMRT_Packet_Header,
|
|
)
|
|
from .subject import Subject
|
|
from .util import comp, script
|
|
|
|
Socket = IntEnum("Socket", "register request response", start=0)
|
|
|
|
|
|
class VbanCmd(metaclass=ABCMeta):
|
|
"""Base class responsible for communicating over VBAN RT Service"""
|
|
|
|
DELAY = 0.001
|
|
# fmt: off
|
|
BPS_OPTS = [
|
|
0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 31250,
|
|
38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800, 921600,
|
|
1000000, 1500000, 2000000, 3000000,
|
|
]
|
|
# fmt: on
|
|
|
|
def __init__(self, **kwargs):
|
|
for attr, val in kwargs.items():
|
|
setattr(self, attr, val)
|
|
|
|
self.text_header = TextRequestHeader(
|
|
name=self.streamname,
|
|
bps_index=self.BPS_OPTS.index(self.bps),
|
|
channel=self.channel,
|
|
)
|
|
self.register_header = RegisterRTHeader()
|
|
self.expected_packet = VBAN_VMRT_Packet_Header()
|
|
|
|
self.socks = tuple(
|
|
socket.socket(socket.AF_INET, socket.SOCK_DGRAM) for _ in Socket
|
|
)
|
|
self.running = True
|
|
self.subject = Subject()
|
|
self.cache = dict()
|
|
self.event = Event(self.subs)
|
|
|
|
@abstractmethod
|
|
def __str__(self):
|
|
"""Ensure subclasses override str magic method"""
|
|
pass
|
|
|
|
def __enter__(self):
|
|
self.login()
|
|
return self
|
|
|
|
def login(self):
|
|
"""Start listening for RT Packets"""
|
|
|
|
self.socks[Socket.response.value].bind(
|
|
(socket.gethostbyname(socket.gethostname()), self.port)
|
|
)
|
|
worker = Thread(target=self._send_register_rt, daemon=True)
|
|
worker.start()
|
|
self._public_packet = self._get_rt()
|
|
worker2 = Thread(target=self._updates, daemon=True)
|
|
worker2.start()
|
|
time.sleep(0.1)
|
|
|
|
def _send_register_rt(self):
|
|
"""Fires a subscription packet every 10 seconds"""
|
|
|
|
while self.running:
|
|
self.socks[Socket.register.value].sendto(
|
|
self.register_header.header,
|
|
(socket.gethostbyname(self.ip), self.port),
|
|
)
|
|
count = int.from_bytes(self.register_header.framecounter, "little") + 1
|
|
self.register_header.framecounter = count.to_bytes(4, "little")
|
|
time.sleep(10)
|
|
|
|
def _fetch_rt_packet(self) -> Optional[VBAN_VMRT_Packet_Data]:
|
|
"""Returns a valid RT Data Packet or None"""
|
|
data, _ = self.socks[Socket.response.value].recvfrom(2048)
|
|
# check for packet data
|
|
if len(data) > HEADER_SIZE:
|
|
# check if packet is of type VBAN
|
|
if self.expected_packet.header == data[: HEADER_SIZE - 4]:
|
|
return VBAN_VMRT_Packet_Data(
|
|
_voicemeeterType=data[28:29],
|
|
_reserved=data[29:30],
|
|
_buffersize=data[30:32],
|
|
_voicemeeterVersion=data[32:36],
|
|
_optionBits=data[36:40],
|
|
_samplerate=data[40:44],
|
|
_inputLeveldB100=data[44:112],
|
|
_outputLeveldB100=data[112:240],
|
|
_TransportBit=data[240:244],
|
|
_stripState=data[244:276],
|
|
_busState=data[276:308],
|
|
_stripGaindB100Layer1=data[308:324],
|
|
_stripGaindB100Layer2=data[324:340],
|
|
_stripGaindB100Layer3=data[340:356],
|
|
_stripGaindB100Layer4=data[356:372],
|
|
_stripGaindB100Layer5=data[372:388],
|
|
_stripGaindB100Layer6=data[388:404],
|
|
_stripGaindB100Layer7=data[404:420],
|
|
_stripGaindB100Layer8=data[420:436],
|
|
_busGaindB100=data[436:452],
|
|
_stripLabelUTF8c60=data[452:932],
|
|
_busLabelUTF8c60=data[932:1412],
|
|
)
|
|
|
|
def _get_rt(self) -> VBAN_VMRT_Packet_Data:
|
|
"""Attempt to fetch data packet until a valid one found"""
|
|
|
|
def fget():
|
|
data = False
|
|
while not data:
|
|
data = self._fetch_rt_packet()
|
|
time.sleep(self.DELAY)
|
|
return data
|
|
|
|
return fget()
|
|
|
|
def _set_rt(
|
|
self,
|
|
id_: str,
|
|
param: Optional[str] = None,
|
|
val: Optional[Union[int, float]] = None,
|
|
):
|
|
"""Sends a string request command over a network."""
|
|
cmd = id_ if not param else f"{id_}.{param}={val}"
|
|
self.socks[Socket.request.value].sendto(
|
|
self.text_header.header + cmd.encode(),
|
|
(socket.gethostbyname(self.ip), self.port),
|
|
)
|
|
count = int.from_bytes(self.text_header.framecounter, "little") + 1
|
|
self.text_header.framecounter = count.to_bytes(4, "little")
|
|
if param:
|
|
self.cache[f"{id_}.{param}"] = val
|
|
if self.sync:
|
|
time.sleep(0.02)
|
|
|
|
@script
|
|
def sendtext(self, cmd):
|
|
"""Sends a multiple parameter string over a network."""
|
|
self._set_rt(cmd)
|
|
time.sleep(self.DELAY)
|
|
|
|
@property
|
|
def type(self) -> str:
|
|
"""Returns the type of Voicemeeter installation."""
|
|
return self.public_packet.voicemeetertype
|
|
|
|
@property
|
|
def version(self) -> str:
|
|
"""Returns Voicemeeter's version as a string"""
|
|
v1, v2, v3, v4 = self.public_packet.voicemeeterversion
|
|
return f"{v1}.{v2}.{v3}.{v4}"
|
|
|
|
@property
|
|
def pdirty(self):
|
|
"""True iff a parameter has changed"""
|
|
return self._pdirty
|
|
|
|
@property
|
|
def ldirty(self):
|
|
"""True iff a level value has changed."""
|
|
self._strip_comp, self._bus_comp = (
|
|
tuple(not x for x in comp(self.cache["strip_level"], self._strip_buf)),
|
|
tuple(not x for x in comp(self.cache["bus_level"], self._bus_buf)),
|
|
)
|
|
return any(any(l) for l in (self._strip_comp, self._bus_comp))
|
|
|
|
@property
|
|
def public_packet(self):
|
|
return self._public_packet
|
|
|
|
def clear_dirty(self):
|
|
while self.pdirty:
|
|
pass
|
|
|
|
def _updates(self) -> NoReturn:
|
|
print(f"Listening for {', '.join(self.event.get())} events")
|
|
self.cache["strip_level"], self.cache["bus_level"] = self._get_levels(
|
|
self.public_packet
|
|
)
|
|
|
|
while self.running:
|
|
start = time.time()
|
|
self._pp = self._get_rt()
|
|
self._strip_buf, self._bus_buf = self._get_levels(self._pp)
|
|
self._pdirty = self._pp.pdirty(self.public_packet)
|
|
|
|
if self.event.ldirty and self.ldirty:
|
|
self.cache["strip_level"] = self._strip_buf
|
|
self.cache["bus_level"] = self._bus_buf
|
|
self.subject.notify("ldirty")
|
|
if self.public_packet != self._pp:
|
|
self._public_packet = self._pp
|
|
if self.event.pdirty and self.pdirty:
|
|
self.subject.notify("pdirty")
|
|
elapsed = time.time() - start
|
|
if self.ratelimit - elapsed > 0:
|
|
time.sleep(self.ratelimit - elapsed)
|
|
|
|
def _get_levels(self, packet) -> Iterable:
|
|
"""
|
|
returns both level arrays (strip_levels, bus_levels) BEFORE math conversion
|
|
|
|
strip levels in PREFADER mode.
|
|
"""
|
|
return (
|
|
tuple(val for val in packet.inputlevels),
|
|
tuple(val for val in packet.outputlevels),
|
|
)
|
|
|
|
def apply(self, data: dict):
|
|
"""
|
|
Sets all parameters of a dict
|
|
|
|
minor delay between each recursion
|
|
"""
|
|
|
|
def param(key):
|
|
obj, m2, *rem = key.split("-")
|
|
index = int(m2) if m2.isnumeric() else int(*rem)
|
|
if obj in ("strip", "bus"):
|
|
return getattr(self, obj)[index]
|
|
else:
|
|
raise ValueError(obj)
|
|
|
|
[param(key).apply(datum).then_wait() for key, datum in data.items()]
|
|
|
|
def apply_config(self, name):
|
|
"""applies a config from memory"""
|
|
error_msg = (
|
|
f"No config with name '{name}' is loaded into memory",
|
|
f"Known configs: {list(self.configs.keys())}",
|
|
)
|
|
try:
|
|
self.apply(self.configs[name])
|
|
print(f"Profile '{name}' applied!")
|
|
except KeyError as e:
|
|
print(("\n").join(error_msg))
|
|
|
|
def logout(self):
|
|
self.running = False
|
|
time.sleep(0.2)
|
|
[sock.close() for sock in self.socks]
|
|
|
|
def __exit__(self, exc_type, exc_value, exc_traceback):
|
|
self.logout()
|