diff --git a/README.md b/README.md index 95ce231..a409094 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ Check examples directory for a more meaningful example. ### Example 1 ```python -import vban_cmd +import vbancmd class ManyThings: def __init__(self, vban): @@ -75,7 +75,7 @@ class ManyThings: def main(): - with vban_cmd.connect(kind_id, ip=ip) as vban: + with vbancmd.connect(kind_id, ip=ip) as vban: do = ManyThings(vban) do.things() do.other_things() @@ -118,7 +118,7 @@ Sends a TEXT command, for example: vban.sendtext('Strip[0].Mute=1;Strip[3].A3=0;Bus[2].Mute=0;Bus[3].Eq.On=1') ``` #### `vban.show()` -Shows Voicemeeter if it's hide. No effect otherwise. +Shows Voicemeeter if it's hidden. No effect otherwise. #### `vban.hide()` Hides Voicemeeter if it's shown. No effect otherwise. #### `vban.shutdown()` diff --git a/setup.py b/setup.py index 4350ba9..abd957b 100644 --- a/setup.py +++ b/setup.py @@ -1,10 +1,10 @@ from setuptools import setup setup( - name='vban_cmd', + name='vbancmd', version='0.0.1', description='VBAN CMD Python API', - packages=['vban_cmd'], + packages=['vbancmd'], install_requires=[ ], extras_require={ diff --git a/tests/__init__.py b/tests/__init__.py index cc88407..b199ff2 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,9 +1,8 @@ -import vban_cmd -from vban_cmd import kinds -from vban_cmd.channel import Modes +import vbancmd +from vbancmd import kinds +from vbancmd.channel import Modes import socket from threading import Thread -from time import sleep _kind = 'potato' opts = { @@ -14,7 +13,7 @@ opts = { 'channel': 3 } -vbanrs = {kind.id: vban_cmd.connect(_kind, **opts) for kind in kinds.all} +vbanrs = {kind.id: vbancmd.connect(_kind, **opts) for kind in kinds.all} tests = vbanrs[_kind] def setup_package(): diff --git a/vbancmd/__init__.py b/vbancmd/__init__.py new file mode 100644 index 0000000..4a776d9 --- /dev/null +++ b/vbancmd/__init__.py @@ -0,0 +1,3 @@ +from .vbancmd import connect + +__ALL__ = ['connect'] diff --git a/vbancmd/bus.py b/vbancmd/bus.py new file mode 100644 index 0000000..31fa02d --- /dev/null +++ b/vbancmd/bus.py @@ -0,0 +1,128 @@ +from .errors import VMCMDErrors +from . import channel +from .channel import Channel +from . import kinds + +class OutputBus(Channel): + """ Base class for output buses. """ + @classmethod + def make(cls, is_physical, remote, index, *args, **kwargs): + """ + Factory function for output busses. + Returns a physical/virtual bus of a kind. + """ + OutputBus = PhysicalOutputBus if is_physical else VirtualOutputBus + OB_cls = type(f'Bus{remote.kind.name}', (OutputBus,), { + 'levels': BusLevel(remote, index), + }) + return OB_cls(remote, index, *args, **kwargs) + + @property + def identifier(self): + return f'Bus[{self.index}]' + + @property + def mute(self) -> bool: + return not int.from_bytes(self.public_packet.busstate[self.index], 'little') & self._modes._mute == 0 + + @mute.setter + def mute(self, val: bool): + if not isinstance(val, bool) and val not in (0,1): + raise VMCMDErrors('mute is a boolean parameter') + self.setter('mute', 1 if val else 0) + + @property + def mono(self) -> bool: + return not int.from_bytes(self.public_packet.busstate[self.index], 'little') & self._modes._mono == 0 + + @mono.setter + def mono(self, val: bool): + if not isinstance(val, bool) and val not in (0,1): + raise VMCMDErrors('mono is a boolean parameter') + self.setter('mono', 1 if val else 0) + + @property + def eq(self) -> bool: + return not int.from_bytes(self.public_packet.busstate[self.index], 'little') & self._modes._eq == 0 + + @eq.setter + def eq(self, val: bool): + if not isinstance(val, bool) and val not in (0,1): + raise ('eq is a boolean parameter') + self.setter('eq.On', 1 if val else 0) + + @property + def eq_ab(self) -> bool: + return not int.from_bytes(self.public_packet.busstate[self.index], 'little') & self._modes._eqb == 0 + + @eq_ab.setter + def eq_ab(self, val: bool): + if not isinstance(val, bool) and val not in (0,1): + raise VMCMDErrors('eq_ab is a boolean parameter') + self.setter('eq.ab', 1 if val else 0) + + @property + def label(self) -> str: + return self.public_packet.buslabels[self.index] + + @label.setter + def label(self, val: str): + if not isinstance(val, str): + raise VMCMDErrors('label is a string parameter') + self.setter('Label', val) + + @property + def gain(self) -> float: + def fget(): + val = self.public_packet.busgain[self.index] + if val < 10000: + return -val + elif val == ((1 << 16) - 1): + return 0 + else: + return ((1 << 16) - 1) - val + return round((fget() * 0.01), 1) + + @gain.setter + def gain(self, val: float): + self.setter('gain', val) + + +class PhysicalOutputBus(OutputBus): + @property + def device(self) -> str: + return + + @property + def sr(self) -> int: + return + + +class VirtualOutputBus(OutputBus): + pass + + +class BusLevel(OutputBus): + def __init__(self, remote, index): + super().__init__(remote, index) + self.level_map = _bus_maps[remote.kind.id] + + def getter_level(self, mode=None): + def fget(i, data): + val = data.outputlevels[i] + return -val * 0.01 + + range_ = self.level_map[self.index] + data = self.public_packet + levels = tuple(round(fget(i, data), 1) for i in range(*range_)) + return levels + + @property + def all(self) -> tuple: + return self.getter_level() + +def _make_bus_level_map(kind): + phys_out, virt_out = kind.outs + return tuple((i, i+8) for i in range(0, (phys_out+virt_out)*8, 8)) + +_bus_maps = {kind.id: _make_bus_level_map(kind) for kind in kinds.all} diff --git a/vbancmd/channel.py b/vbancmd/channel.py new file mode 100644 index 0000000..9e1937c --- /dev/null +++ b/vbancmd/channel.py @@ -0,0 +1,75 @@ +import abc +from .errors import VMCMDErrors +from dataclasses import dataclass + +@dataclass +class Modes: + """ Channel Modes """ + _mute: hex=0x00000001 + _solo: hex=0x00000002 + _mono: hex=0x00000004 + _mutec: hex=0x00000008 + + _mixdown: hex=0x00000010 + _repeat: hex=0x00000020 + _mixdownb: hex=0x00000030 + _composite: hex=0x00000040 + _upmixtv: hex=0x00000050 + _updmix2: hex=0x00000060 + _upmix4: hex=0x00000070 + _upmix6: hex=0x00000080 + _center: hex=0x00000090 + _lfe: hex=0x000000A0 + _rear: hex=0x000000B0 + + _mask: hex=0x000000F0 + + _eq: hex=0x00000100 + _cross: hex=0x00000200 + _eqb: hex=0x00000800 + + _busa: hex=0x00001000 + _busa1: hex=0x00001000 + _busa2: hex=0x00002000 + _busa3: hex=0x00004000 + _busa4: hex=0x00008000 + _busa5: hex=0x00080000 + + _busb: hex=0x00010000 + _busb1: hex=0x00010000 + _busb2: hex=0x00020000 + _busb3: hex=0x00040000 + + _pan0: hex=0x00000000 + _pancolor: hex=0x00100000 + _panmod: hex=0x00200000 + _panmask: hex=0x00F00000 + + _postfx_r: hex=0x01000000 + _postfx_d: hex=0x02000000 + _postfx1: hex=0x04000000 + _postfx2: hex=0x08000000 + + _sel: hex=0x10000000 + _monitor: hex=0x20000000 + + +class Channel(abc.ABC): + """ Base class for InputStrip and OutputBus. """ + def __init__(self, remote, index): + self._remote = remote + self.index = index + self._modes = Modes() + + def setter(self, param, val): + """ Sends a string request RT packet. """ + self._remote.set_rt(f'{self.identifier}', param, val) + + @abc.abstractmethod + def identifier(self): + pass + + @property + def public_packet(self): + """ Returns an RT data packet. """ + return self._remote.public_packet diff --git a/vbancmd/dataclass.py b/vbancmd/dataclass.py new file mode 100644 index 0000000..af126c6 --- /dev/null +++ b/vbancmd/dataclass.py @@ -0,0 +1,184 @@ +from dataclasses import dataclass + +VBAN_SERVICE_RTPACKETREGISTER=32 +VBAN_SERVICE_RTPACKET=33 +MAX_PACKET_SIZE = 1436 +HEADER_SIZE = (4+1+1+1+1+16+4) + +@dataclass +class VBAN_VMRT_Packet_Data: + """ RT Packet Data """ + _voicemeeterType: bytes + _reserved: bytes + _buffersize: bytes + _voicemeeterVersion: bytes + _optionBits: bytes + _samplerate: bytes + _inputLeveldB100: bytes + _outputLeveldB100: bytes + _TransportBit: bytes + _stripState: bytes + _busState: bytes + _stripGaindB100Layer1: bytes + _stripGaindB100Layer2: bytes + _stripGaindB100Layer3: bytes + _stripGaindB100Layer4: bytes + _stripGaindB100Layer5: bytes + _stripGaindB100Layer6: bytes + _stripGaindB100Layer7: bytes + _stripGaindB100Layer8: bytes + _busGaindB100: bytes + _stripLabelUTF8c60: bytes + _busLabelUTF8c60: bytes + + @property + def voicemeetertype(self) -> str: + """ returns voicemeeter type as a string """ + type_ = ('basic', 'banana', 'potato') + return type_[int.from_bytes(self._voicemeeterType, 'little')-1] + @property + def voicemeeterversion(self) -> tuple: + """ returns voicemeeter version as a string """ + return tuple(reversed(tuple(int.from_bytes(self._voicemeeterVersion[i:i+1], 'little') for i in range(4)))) + @property + def samplerate(self) -> int: + """ returns samplerate as an int """ + return int.from_bytes(self._samplerate, 'little') + @property + def inputlevels(self) -> tuple: + """ returns the entire level array across all inputs """ + return tuple(((1 << 16) - 1) - int.from_bytes(self._inputLeveldB100[i:i+2], 'little') for i in range(0, 68, 2)) + @property + def outputlevels(self) -> tuple: + """ returns the entire level array across all outputs """ + return tuple(((1 << 16) - 1) - int.from_bytes(self._outputLeveldB100[i:i+2], 'little') for i in range(0, 128, 2)) + @property + def stripstate(self) -> tuple: + """ returns tuple of strip states accessable through bit modes """ + return tuple(self._stripState[i:i+4] for i in range(0, 32, 4)) + @property + def busstate(self) -> tuple: + """ returns tuple of bus states accessable through bit modes """ + return tuple(self._busState[i:i+4] for i in range(0, 32, 4)) + + """ + these functions return an array of gainlayers[i] across all strips + ie stripgainlayer1 = [strip[0].gainlayer[0], strip[1].gainlayer[0], strip[2].gainlayer[0]...] + """ + @property + def stripgainlayer1(self) -> tuple: + return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer1[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def stripgainlayer2(self) -> tuple: + return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer2[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def stripgainlayer3(self) -> tuple: + return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer3[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def stripgainlayer4(self) -> tuple: + return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer4[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def stripgainlayer5(self) -> tuple: + return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer5[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def stripgainlayer6(self) -> tuple: + return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer6[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def stripgainlayer7(self) -> tuple: + return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer7[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def stripgainlayer8(self) -> tuple: + return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer8[i:i+2], 'little') for i in range(0, 16, 2)) + + @property + def busgain(self) -> tuple: + """ returns tuple of bus gains """ + return tuple(((1 << 16) - 1) - int.from_bytes(self._busGaindB100[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def striplabels(self) -> tuple: + """ returns tuple of strip labels """ + return tuple(self._stripLabelUTF8c60[i:i+60].decode().strip('\x00') for i in range(0, 480, 60)) + @property + def buslabels(self) -> tuple: + """ returns tuple of bus labels """ + return tuple(self._busLabelUTF8c60[i:i+60].decode().strip('\x00') for i in range(0, 480, 60)) + +@dataclass +class VBAN_VMRT_Packet_Header: + """ RT PACKET header (expected from Voicemeeter server) """ + name='Voicemeeter-RTP' + vban: bytes='VBAN'.encode() + format_sr: bytes=(0x60).to_bytes(1, 'little') + format_nbs: bytes=(0).to_bytes(1, 'little') + format_nbc: bytes=(VBAN_SERVICE_RTPACKET).to_bytes(1, 'little') + format_bit: bytes=(0).to_bytes(1, 'little') + streamname: bytes=name.encode('ascii') + bytes(16-len(name)) + + @property + def header(self): + header = self.vban + header += self.format_sr + header += self.format_nbs + header += self.format_nbc + header += self.format_bit + header += self.streamname + assert len(header) == HEADER_SIZE-4, f'Header expected {HEADER_SIZE-4} bytes' + return header + +@dataclass +class RegisterRTHeader: + """ REGISTER RT PACKET header """ + name='Register RTP' + timeout=15 + vban: bytes='VBAN'.encode() + format_sr: bytes=(0x60).to_bytes(1, 'little') + format_nbs: bytes=(0).to_bytes(1, 'little') + format_nbc: bytes=(VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, 'little') + format_bit: bytes=(timeout & 0x000000FF).to_bytes(1, 'little') # timeout + streamname: bytes=name.encode('ascii') + bytes(16-len(name)) + framecounter: bytes=(0).to_bytes(4, 'little') + + @property + def header(self): + header = self.vban + header += self.format_sr + header += self.format_nbs + header += self.format_nbc + header += self.format_bit + header += self.streamname + header += self.framecounter + assert len(header) == HEADER_SIZE, f'Header expected {HEADER_SIZE} bytes' + return header + +@dataclass +class TextRequestHeader: + """ VBAN-TEXT request header """ + name: str + bps_index: int + channel: int + vban: bytes='VBAN'.encode() + nbs: bytes=(0).to_bytes(1, 'little') + bit: bytes=(0x10).to_bytes(1, 'little') + framecounter: bytes=(0).to_bytes(4, 'little') + + @property + def sr(self): + return (0x40 + self.bps_index).to_bytes(1, 'little') + @property + def nbc(self): + return (self.channel).to_bytes(1, 'little') + @property + def streamname(self): + return self.name.encode() + bytes(16-len(self.name)) + + @property + def header(self): + header = self.vban + header += self.sr + header += self.nbs + header += self.nbc + header += self.bit + header += self.streamname + header += self.framecounter + assert len(header) == HEADER_SIZE, f'Header expected {HEADER_SIZE} bytes' + return header diff --git a/vbancmd/errors.py b/vbancmd/errors.py new file mode 100644 index 0000000..43e9b48 --- /dev/null +++ b/vbancmd/errors.py @@ -0,0 +1,2 @@ +class VMCMDErrors(Exception): + pass diff --git a/vbancmd/kinds.py b/vbancmd/kinds.py new file mode 100644 index 0000000..172d753 --- /dev/null +++ b/vbancmd/kinds.py @@ -0,0 +1,28 @@ +import sys +import platform +from collections import namedtuple +from .errors import VMCMDErrors + +""" +Represents a major version of Voicemeeter and describes +its strip layout. +""" +VMKind = namedtuple('VMKind', ['id', 'name', 'outs', 'ins', 'executable', 'vban']) + +bits = 64 if sys.maxsize > 2**32 else 32 +os = platform.system() + +_kind_map = { + 'basic': VMKind('basic', 'Basic', (2,1), (1,1), 'voicemeeter.exe', (4, 4)), + 'banana': VMKind('banana', 'Banana', (3,2), (3,2), 'voicemeeterpro.exe', (8, 8)), + 'potato': VMKind('potato', 'Potato', (5,3), (5,3), + f'voicemeeter8{"x64" if bits == 64 else ""}.exe', (8, 8)) +} + +def get(kind_id): + try: + return _kind_map[kind_id] + except KeyError: + raise VMCMDErrors(f'Invalid Voicemeeter kind: {kind_id}') + +all = list(_kind_map.values()) diff --git a/vbancmd/meta.py b/vbancmd/meta.py new file mode 100644 index 0000000..af36ec2 --- /dev/null +++ b/vbancmd/meta.py @@ -0,0 +1,12 @@ +from .errors import VMCMDErrors + +def strip_output_prop(param): + """ A strip output prop. """ + def fget(self): + data = self._remote.public_packet + return not int.from_bytes(data.stripstate[self.index], 'little') & getattr(self._modes, f'_bus{param.lower()}') == 0 + def fset(self, val): + if not isinstance(val, bool) and val not in (0, 1): + raise VMCMDErrors(f'{param} is a boolean parameter') + self.setter(param, 1 if val else 0) + return property(fget, fset) diff --git a/vbancmd/strip.py b/vbancmd/strip.py new file mode 100644 index 0000000..ff4339d --- /dev/null +++ b/vbancmd/strip.py @@ -0,0 +1,216 @@ +from .errors import VMCMDErrors +from . import channel +from .channel import Channel +from . import kinds +from .meta import strip_output_prop + +class InputStrip(Channel): + """ Base class for input strips. """ + @classmethod + def make(cls, is_physical, remote, index, **kwargs): + """ + Factory function for input strips. + Returns a physical/virtual strip of a kind. + """ + PhysStrip, VirtStrip = _strip_pairs[remote.kind.id] + InputStrip = PhysStrip if is_physical else VirtStrip + GainLayerMixin = _make_gainlayer_mixin(remote, index) + IS_cls = type(f'Strip{remote.kind.name}', (InputStrip, GainLayerMixin), { + 'levels': StripLevel(remote, index), + }) + return IS_cls(remote, index, **kwargs) + + @property + def identifier(self): + return f'Strip[{self.index}]' + + @property + def mono(self) -> bool: + return not int.from_bytes(self.public_packet.stripstate[self.index], 'little') & self._modes._mono == 0 + + @mono.setter + def mono(self, val: bool): + if not isinstance(val, bool) and val not in (0,1): + raise VMCMDErrors('mono is a boolean parameter') + self.setter('mono', 1 if val else 0) + + @property + def solo(self) -> bool: + return not int.from_bytes(self.public_packet.stripstate[self.index], 'little') & self._modes._solo == 0 + + @solo.setter + def solo(self, val: bool): + if not isinstance(val, bool) and val not in (0,1): + raise VMCMDErrors('solo is a boolean parameter') + self.setter('solo', 1 if val else 0) + + @property + def mute(self) -> bool: + return not int.from_bytes(self.public_packet.stripstate[self.index], 'little') & self._modes._mute == 0 + + @mute.setter + def mute(self, val: bool): + if not isinstance(val, bool) and val not in (0,1): + raise VMCMDErrors('mute is a boolean parameter') + self.setter('mute', 1 if val else 0) + + @property + def limit(self) -> int: + return + + @limit.setter + def limit(self, val: int): + if val not in range(-40,13): + raise VMCMDErrors('Expected value from -40 to 12') + self.setter('limit', val) + + @property + def label(self) -> str: + return self.public_packet.striplabels[self.index] + + @label.setter + def label(self, val: str): + if not isinstance(val, str): + raise VMCMDErrors('label is a string parameter') + self.setter('label', val) + + @property + def gain(self) -> float: + return self.gainlayer[0].gain + + @gain.setter + def gain(self, val: float): + self.setter('gain', val) + + +class PhysicalInputStrip(InputStrip): + @property + def comp(self) -> float: + return + + @comp.setter + def comp(self, val: float): + self.setter('Comp', val) + + @property + def gate(self) -> float: + return + + @gate.setter + def gate(self, val: float): + self.setter('gate', val) + + @property + def device(self): + return + + @property + def sr(self): + return + + +class VirtualInputStrip(InputStrip): + @property + def mc(self) -> bool: + return + + @mc.setter + def mc(self, val: bool): + if not isinstance(val, bool) and val not in (0,1): + raise VMCMDErrors('mc is a boolean parameter') + self.setter('mc', 1 if val else 0) + mono = mc + + @property + def k(self) -> int: + return + + @k.setter + def k(self, val: int): + if val not in range(5): + raise VMCMDErrors('Expected value from 0 to 4') + self.setter('karaoke', val) + + +class StripLevel(InputStrip): + def __init__(self, remote, index): + super().__init__(remote, index) + self.level_map = _strip_maps[remote.kind.id] + + def getter_level(self, mode=None): + def fget(i, data): + val = data.inputlevels[i] + return -val * 0.01 + + range_ = self.level_map[self.index] + data = self.public_packet + levels = tuple(round(fget(i, data), 1) for i in range(*range_)) + return levels + + @property + def prefader(self) -> tuple: + return self.getter_level() + + @property + def postfader(self) -> tuple: + return + + @property + def postmute(self) -> tuple: + return + + +class GainLayer(InputStrip): + def __init__(self, remote, index, i): + super().__init__(remote, index) + self._i = i + + @property + def gain(self) -> float: + def fget(): + val = getattr(self.public_packet, f'stripgainlayer{self._i+1}')[self.index] + if val < 10000: + return -val + elif val == ((1 << 16) - 1): + return 0 + else: + return ((1 << 16) - 1) - val + return round((fget() * 0.01), 1) + + @gain.setter + def gain(self, val: float): + self.setter(f'GainLayer[{self._i}]', val) + + +def _make_gainlayer_mixin(remote, index): + """ Creates a GainLayer mixin """ + return type(f'GainlayerMixin', (), { + 'gainlayer': tuple(GainLayer(remote, index, i) for i in range(8)) + }) + +def _make_strip_mixin(kind): + """ Creates a mixin with the kind's strip layout set as class variables. """ + num_A, num_B = kind.outs + return type(f'StripMixin{kind.name}', (), { + **{f'A{i}': strip_output_prop(f'A{i}') for i in range(1, num_A+1)}, + **{f'B{i}': strip_output_prop(f'B{i}') for i in range(1, num_B+1)} + }) + +_strip_mixins = {kind.id: _make_strip_mixin(kind) for kind in kinds.all} + +def _make_strip_pair(kind): + """ Creates a PhysicalInputStrip and a VirtualInputStrip of a kind. """ + StripMixin = _strip_mixins[kind.id] + PhysStrip = type(f'PhysicalInputStrip{kind.name}', (PhysicalInputStrip, StripMixin), {}) + VirtStrip = type(f'VirtualInputStrip{kind.name}', (VirtualInputStrip, StripMixin), {}) + return (PhysStrip, VirtStrip) + +_strip_pairs = {kind.id: _make_strip_pair(kind) for kind in kinds.all} + +def _make_strip_level_map(kind): + phys_in, virt_in = kind.ins + phys_map = tuple((i, i+2) for i in range(0, phys_in*2, 2)) + virt_map = tuple((i, i+8) for i in range(phys_in*2, phys_in*2+virt_in*8, 8)) + return phys_map+virt_map + +_strip_maps = {kind.id: _make_strip_level_map(kind) for kind in kinds.all} diff --git a/vbancmd/vbancmd.py b/vbancmd/vbancmd.py new file mode 100644 index 0000000..400465a --- /dev/null +++ b/vbancmd/vbancmd.py @@ -0,0 +1,195 @@ +import abc +import select +import socket +from time import sleep +from threading import Thread +from typing import NamedTuple, NoReturn + +from .errors import VMCMDErrors +from . import kinds +from .dataclass import ( + HEADER_SIZE, + VBAN_VMRT_Packet_Data, + VBAN_VMRT_Packet_Header, + RegisterRTHeader, + TextRequestHeader +) +from .strip import InputStrip +from .bus import OutputBus + +class VbanCmd(abc.ABC): + def __init__(self, **kwargs): + self._ip = kwargs['ip'] + self._port = kwargs['port'] + self._streamname = kwargs['streamname'] + self._bps = kwargs['bps'] + self._channel = kwargs['channel'] + self._delay = kwargs['delay'] + self._bps_opts = \ + [0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 31250, + 38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800,921600, + 1000000, 1500000, 2000000, 3000000] + + if self._channel not in range(256): + raise VMCMDErrors('Channel must be in range 0 to 255') + self._text_header = TextRequestHeader( + name=self._streamname, + bps_index=self._bps_opts.index(self._bps), + channel=self._channel + ) + self._register_rt_header = RegisterRTHeader() + self.expected_packet = VBAN_VMRT_Packet_Header() + + self._rt_register_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._rt_packet_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._sendrequest_string_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + is_readable = [] + is_writable = [self._rt_register_socket, self._rt_packet_socket, self._sendrequest_string_socket] + is_error = [] + self.ready_to_read, self.ready_to_write, in_error = select.select(is_readable, is_writable, is_error, 60) + self._public_packet = None + + def __enter__(self): + self._rt_packet_socket.bind((socket.gethostbyname(socket.gethostname()), self._port)) + worker = Thread(target=self._send_register_rt, daemon=True) + worker.start() + self._public_packet = self._get_rt() + return self + + def _send_register_rt(self): + if self._rt_register_socket in self.ready_to_write: + while True: + self._rt_register_socket.sendto( + self._register_rt_header.header + bytes(1), (socket.gethostbyname(self._ip), self._port) + ) + count = int.from_bytes(self._register_rt_header.framecounter, 'little') + 1 + self._register_rt_header.framecounter = count.to_bytes(4, 'little') + sleep(10) + + def _fetch_rt_packet(self): + if self._rt_packet_socket in self.ready_to_write: + data, _ = self._rt_packet_socket.recvfrom(1024*1024*2) + # check for packet data + if len(data) > HEADER_SIZE: + # check if packet is of type rt service + if self.expected_packet.header == data[:HEADER_SIZE-4]: + return VBAN_VMRT_Packet_Data( + _voicemeeterType=data[28:29], + _reserved=data[29:30], + _buffersize=data[30:32], + _voicemeeterVersion=data[32:36], + _optionBits=data[36:40], + _samplerate=data[40:44], + _inputLeveldB100=data[44:112], + _outputLeveldB100=data[112:240], + _TransportBit=data[240:244], + _stripState=data[244:276], + _busState=data[276:308], + _stripGaindB100Layer1=data[308:324], + _stripGaindB100Layer2=data[324:340], + _stripGaindB100Layer3=data[340:356], + _stripGaindB100Layer4=data[356:372], + _stripGaindB100Layer5=data[372:388], + _stripGaindB100Layer6=data[388:404], + _stripGaindB100Layer7=data[404:420], + _stripGaindB100Layer8=data[420:436], + _busGaindB100=data[436:452], + _stripLabelUTF8c60=data[452:932], + _busLabelUTF8c60=data[932:1412], + ) + + @property + def public_packet(self): + return self._public_packet or self._get_rt() + @public_packet.setter + def public_packet(self, val): + self._public_packet = val + + def _get_rt(self): + def fget(): + data = False + while not data: + data = self._fetch_rt_packet() + return data + return fget() + + def set_rt(self, id_, param=None, val=None): + cmd = id_ if not param and val else f'{id_}.{param}={val}' + if self._sendrequest_string_socket in self.ready_to_write: + self._sendrequest_string_socket.sendto( + self._text_header.header + cmd.encode(), (socket.gethostbyname(self._ip), self._port) + ) + count = int.from_bytes(self._text_header.framecounter, 'little') + 1 + self._text_header.framecounter = count.to_bytes(4, 'little') + + def sendtext(self, cmd): + self.set_rt(cmd) + sleep(self._delay) + + @property + def type(self): + return self.public_packet.voicemeetertype + + @property + def version(self): + return self.public_packet.voicemeeterversion + + def show(self) -> NoReturn: + """ Shows Voicemeeter if it's hidden. """ + self.set_rt('Command', 'Show', 1) + def hide(self) -> NoReturn: + """ Hides Voicemeeter if it's shown. """ + self.set_rt('Command', 'Show', 0) + def shutdown(self) -> NoReturn: + """ Closes Voicemeeter. """ + self.set_rt('Command', 'Shutdown', 1) + def restart(self) -> NoReturn: + """ Restarts Voicemeeter's audio engine. """ + self.set_rt('Command', 'Restart', 1) + + def close(self): + self._rt_register_socket.close() + self._sendrequest_string_socket.close() + self._rt_packet_socket.close() + + def __exit__(self, exc_type, exc_value, exc_traceback): + self.close() + + +def _make_remote(kind: NamedTuple) -> VbanCmd: + """ + Creates a new remote class and sets its number of inputs + and outputs for a VM kind. + + The returned class will subclass VbanCmd. + """ + def init(self, **kwargs): + defaultkwargs = { + 'ip': None, 'port': 6990, 'streamname': 'Command1', 'bps': 0, + 'channel': 0, 'delay': 0.001 + } + kwargs = defaultkwargs | kwargs + VbanCmd.__init__(self, **kwargs) + self.kind = kind + self.phys_in, self.virt_in = kind.ins + self.phys_out, self.virt_out = kind.outs + self.strip = \ + tuple(InputStrip.make((i < self.phys_in), self, i) + for i in range(self.phys_in + self.virt_in)) + self.bus = \ + tuple(OutputBus.make((i < self.phys_out), self, i) + for i in range(self.phys_out + self.virt_out)) + + return type(f'VbanCmd{kind.name}', (VbanCmd,), { + '__init__': init, + }) + +_remotes = {kind.id: _make_remote(kind) for kind in kinds.all} + +def connect(kind_id: str, **kwargs): + try: + VBANCMD_cls = _remotes[kind_id] + return VBANCMD_cls(**kwargs) + except KeyError as err: + raise VMCMDErrors(f'Invalid Voicemeeter kind: {kind_id}')