import logging import socket import threading import time from abc import ABCMeta, abstractmethod from pathlib import Path from queue import Queue from typing import Iterable, Union from .error import VBANCMDError from .event import Event from .packet import RequestHeader from .subject import Subject from .util import deep_merge, script from .worker import Producer, Subscriber, Updater logger = logging.getLogger(__name__) class VbanCmd(metaclass=ABCMeta): """Base class responsible for communicating with the VBAN RT Packet Service""" DELAY = 0.001 # fmt: off BPS_OPTS = [ 0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 31250, 38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800, 921600, 1000000, 1500000, 2000000, 3000000, ] # fmt: on def __init__(self, **kwargs): self.logger = logger.getChild(self.__class__.__name__) self.event = Event({k: kwargs.pop(k) for k in ('pdirty', 'ldirty')}) if not kwargs['ip']: kwargs |= self._conn_from_toml() for attr, val in kwargs.items(): setattr(self, attr, val) self.packet_request = RequestHeader( name=self.streamname, bps_index=self.BPS_OPTS.index(self.bps), channel=self.channel, ) self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.subject = self.observer = Subject() self.cache = {} self._pdirty = False self._ldirty = False self._script = str() self.stop_event = None self.producer = None @abstractmethod def __str__(self): """Ensure subclasses override str magic method""" pass def _conn_from_toml(self) -> dict: try: import tomllib except ModuleNotFoundError: import tomli as tomllib def get_filepath(): for pn in ( Path.cwd() / 'vban.toml', Path.cwd() / 'configs' / 'vban.toml', Path.home() / '.config' / 'vban-cmd' / 'vban.toml', Path.home() / 'Documents' / 'Voicemeeter' / 'configs' / 'vban.toml', ): if pn.exists(): return pn if not (filepath := get_filepath()): raise VBANCMDError('no ip provided and no vban.toml located.') try: with open(filepath, 'rb') as f: return tomllib.load(f)['connection'] except tomllib.TomlDecodeError as e: raise VBANCMDError(f'Error decoding {filepath}: {e}') from e def __enter__(self): self.login() return self def __exit__(self, exc_type, exc_value, exc_traceback) -> None: self.logout() def login(self) -> None: """Starts the subscriber and updater threads (unless in outbound mode)""" if not self.outbound: self.event.info() self.stop_event = threading.Event() self.stop_event.clear() self.subscriber = Subscriber(self, self.stop_event) self.subscriber.start() queue = Queue() self.updater = Updater(self, queue) self.updater.start() self.producer = Producer(self, queue, self.stop_event) self.producer.start() self.logger.info( "Successfully logged into VBANCMD {kind} with ip='{ip}', port={port}, streamname='{streamname}'".format( **self.__dict__ ) ) def logout(self) -> None: if not self.stopped(): self.logger.debug('events thread shutdown started') self.stop_event.set() if self.producer is not None: for t in (self.producer, self.subscriber): t.join() self.sock.close() self.logger.info(f'{type(self).__name__}: Successfully logged out of {self}') def stopped(self): return self.stop_event is None or self.stop_event.is_set() def _set_rt(self, cmd: str, val: Union[str, float]): """Sends a string request command over a network.""" self.sock.sendto( self.packet_request.header + f'{cmd}={val};'.encode(), (socket.gethostbyname(self.ip), self.port), ) self.packet_request.framecounter = ( int.from_bytes(self.packet_request.framecounter, 'little') + 1 ).to_bytes(4, 'little') self.cache[cmd] = val @script def sendtext(self, script): """Sends a multiple parameter string over a network.""" self.sock.sendto( self.packet_request.header + script.encode(), (socket.gethostbyname(self.ip), self.port), ) self.packet_request.framecounter = ( int.from_bytes(self.packet_request.framecounter, 'little') + 1 ).to_bytes(4, 'little') self.logger.debug(f'sendtext: {script}') time.sleep(self.DELAY) @property def type(self) -> str: """Returns the type of Voicemeeter installation.""" return self.public_packet.voicemeetertype @property def version(self) -> str: """Returns Voicemeeter's version as a string""" return '{0}.{1}.{2}.{3}'.format(*self.public_packet.voicemeeterversion) @property def pdirty(self): """True iff a parameter has changed""" return self._pdirty @property def ldirty(self): """True iff a level value has changed.""" return self._ldirty @property def public_packet(self): return self._public_packet def clear_dirty(self) -> None: while self.pdirty: time.sleep(self.DELAY) def _get_levels(self, packet) -> Iterable: """ returns both level arrays (strip_levels, bus_levels) BEFORE math conversion strip levels in PREFADER mode. """ return ( packet.inputlevels, packet.outputlevels, ) def apply(self, data: dict): """ Sets all parameters of a dict minor delay between each recursion """ def target(key): match key.split('-'): case ['strip' | 'bus' as kls, index] if index.isnumeric(): target = getattr(self, kls) case [ 'vban', 'in' | 'instream' | 'out' | 'outstream' as direction, index, ] if index.isnumeric(): target = getattr( self.vban, f'{direction.removesuffix("stream")}stream' ) case _: ERR_MSG = f"invalid config key '{key}'" self.logger.error(ERR_MSG) raise ValueError(ERR_MSG) return target[int(index)] [target(key).apply(di).then_wait() for key, di in data.items()] def apply_config(self, name): """applies a config from memory""" ERR_MSG = ( f"No config with name '{name}' is loaded into memory", f'Known configs: {list(self.configs.keys())}', ) try: config = self.configs[name] except KeyError as e: self.logger.error(('\n').join(ERR_MSG)) raise VBANCMDError(('\n').join(ERR_MSG)) from e if 'extends' in config: extended = config['extends'] config = { k: v for k, v in deep_merge(self.configs[extended], config) if k not in ('extends') } self.logger.debug( f"profile '{name}' extends '{extended}', profiles merged.." ) self.apply(config) self.logger.info(f"Profile '{name}' applied!")