diff --git a/__main__.py b/__main__.py new file mode 100644 index 0000000..e8db578 --- /dev/null +++ b/__main__.py @@ -0,0 +1,15 @@ +import vban_cmd +from time import sleep + +def test_set_parameter(): + with vban_cmd.connect('potato', ip='ws.local') as vban: + for param in ['A1']: + for i in range(3): + setattr(vban.strip[i], param, True) + print(getattr(vban.strip[i], param)) + setattr(vban.strip[i], param, False) + print(getattr(vban.strip[i], param)) + + +if __name__ == '__main__': + test_set_parameter() diff --git a/vban_cmd/__init__.py b/vban_cmd/__init__.py new file mode 100644 index 0000000..5211377 --- /dev/null +++ b/vban_cmd/__init__.py @@ -0,0 +1,3 @@ +from .vban_cmd import connect + +__ALL__ = ['connect'] diff --git a/vban_cmd/channel.py b/vban_cmd/channel.py new file mode 100644 index 0000000..64d79a5 --- /dev/null +++ b/vban_cmd/channel.py @@ -0,0 +1,72 @@ +import abc +from .errors import VMCMDErrors +from dataclasses import dataclass + +@dataclass +class Modes: + """ Channel Modes """ + _mute: hex=0x00000001 + _solo: hex=0x00000002 + _mono: hex=0x00000004 + _mutec: hex=0x00000008 + + _mixdown: hex=0x00000010 + _repeat: hex=0x00000020 + _mixdownb: hex=0x00000030 + _composite: hex=0x00000040 + _upmixtv: hex=0x00000050 + _updmix2: hex=0x00000060 + _upmix4: hex=0x00000070 + _upmix6: hex=0x00000080 + _center: hex=0x00000090 + _lfe: hex=0x000000A0 + _rear: hex=0x000000B0 + + _mask: hex=0x000000F0 + + _eq: hex=0x00000100 + _cross: hex=0x00000200 + _eqb: hex=0x00000800 + + _busa: hex=0x00001000 + _busa1: hex=0x00001000 + _busa2: hex=0x00002000 + _busa3: hex=0x00004000 + _busa4: hex=0x00008000 + _busa5: hex=0x00080000 + + _busb: hex=0x00010000 + _busb1: hex=0x00010000 + _busb2: hex=0x00020000 + _busb3: hex=0x00040000 + + _pan0: hex=0x00000000 + _pancolor: hex=0x00100000 + _panmod: hex=0x00200000 + _panmask: hex=0x00F00000 + + _postfx_r: hex=0x01000000 + _postfx_d: hex=0x02000000 + _postfx1: hex=0x04000000 + _postfx2: hex=0x08000000 + + _sel: hex=0x10000000 + _monitor: hex=0x20000000 + +class Channel(abc.ABC): + """ Base class for InputStrip and OutputBus. """ + def __init__(self, remote, index): + self._remote = remote + self.index = index + self._modes = Modes() + + def getter(self): + """ Returns an RT data packet. """ + return self._remote.get_rt() + def setter(self, param, val): + """ Sends a string request RT packet. """ + self._remote.set_rt(f'{self.identifier}', param, val) + + @abc.abstractmethod + def identifier(self): + pass diff --git a/vban_cmd/dataclass.py b/vban_cmd/dataclass.py new file mode 100644 index 0000000..81833c5 --- /dev/null +++ b/vban_cmd/dataclass.py @@ -0,0 +1,176 @@ +from dataclasses import dataclass + +VBAN_SERVICE_RTPACKETREGISTER=32 +VBAN_SERVICE_RTPACKET=33 +MAX_PACKET_SIZE = 1436 +HEADER_SIZE = (4+1+1+1+1+16+4) + +@dataclass +class VBAN_VMRT_Packet_Data: + """ RT Packet Data """ + _voicemeeterType: bytes + _reserved: bytes + _buffersize: bytes + _voicemeeterVersion: bytes + _optionBits: bytes + _samplerate: bytes + _inputLeveldB100: bytes + _outputLeveldB100: bytes + _TransportBit: bytes + _stripState: bytes + _busState: bytes + _stripGaindB100Layer1: bytes + _stripGaindB100Layer2: bytes + _stripGaindB100Layer3: bytes + _stripGaindB100Layer4: bytes + _stripGaindB100Layer5: bytes + _stripGaindB100Layer6: bytes + _stripGaindB100Layer7: bytes + _stripGaindB100Layer8: bytes + _busGaindB100: bytes + _stripLabelUTF8c60: bytes + _busLabelUTF8c60: bytes + + @property + def voicemeetertype(self) -> str: + """ returns voicemeeter type as a string """ + type_ = ('basic', 'banana', 'potato') + return type_[int.from_bytes(self._voicemeeterType, 'little')-1] + @property + def voicemeeterversion(self) -> tuple: + """ returns voicemeeter version as a string """ + return tuple(reversed(tuple(int.from_bytes(self._voicemeeterVersion[i:i+1], 'little') for i in range(0, 4)))) + @property + def samplerate(self) -> int: + """ returns samplerate as an int """ + return int.from_bytes(self._samplerate, 'little') + @property + def inputlevels(self) -> tuple: + """ returns the entire level array across all inputs """ + return tuple(int.from_bytes(self._inputLeveldB100[i:i+2], 'little') for i in range(0, 68, 2)) + @property + def outputlevels(self) -> tuple: + """ returns the entire level array across all outputs """ + return tuple(int.from_bytes(self._outputLeveldB100[i:i+2], 'little') for i in range(0, 128, 2)) + @property + def stripstate(self) -> tuple: + """ returns tuple of strip states accessable through bit modes """ + return tuple(self._stripState[i:i+4] for i in range(0, 32, 4)) + @property + def busstate(self) -> tuple: + """ returns tuple of bus states accessable through bit modes """ + return tuple(self._busState[i:i+4] for i in range(0, 32, 4)) + + """ + these functions return an array of gainlayers[i] across all strips + ie stripgainlayer1 = [strip[0].gainlayer[0], strip[1].gainlayer[0], strip[2].gainlayer[0]...] + """ + @property + def stripgainlayer1(self) -> tuple: + return tuple(int.from_bytes(self._stripGaindB100Layer1[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def stripgainlayer2(self) -> tuple: + return tuple(int.from_bytes(self._stripGaindB100Layer2[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def stripgainlayer3(self) -> tuple: + return tuple(int.from_bytes(self._stripGaindB100Layer3[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def stripgainlayer4(self) -> tuple: + return tuple(int.from_bytes(self._stripGaindB100Layer4[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def stripgainlayer5(self) -> tuple: + return tuple(int.from_bytes(self._stripGaindB100Layer5[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def stripgainlayer6(self) -> tuple: + return tuple(int.from_bytes(self._stripGaindB100Layer6[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def stripgainlayer7(self) -> tuple: + return tuple(int.from_bytes(self._stripGaindB100Layer7[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def stripgainlayer8(self) -> tuple: + return tuple(int.from_bytes(self._stripGaindB100Layer8[i:i+2], 'little') for i in range(0, 16, 2)) + + @property + def busgain(self) -> tuple: + """ returns tuple of bus gains """ + return tuple(int.from_bytes(self._busGaindB100[i:i+2], 'little') for i in range(0, 16, 2)) + @property + def striplabels(self) -> tuple: + """ returns tuple of strip labels """ + return tuple(self._stripLabelUTF8c60[i:i+60].decode().strip('\x00') for i in range(0, 480, 60)) + @property + def buslabels(self) -> tuple: + """ returns tuple of bus labels """ + return tuple(self._busLabelUTF8c60[i:i+60].decode().strip('\x00') for i in range(0, 480, 60)) + +@dataclass +class VBAN_VMRT_Packet_Header: + """ RT PACKET header (expected from Voicemeeter server) """ + name='Voicemeeter-RTP' + timeout=15 + vban: bytes='VBAN'.encode() + format_sr: bytes=(0x60).to_bytes(1, 'little') + format_nbs: bytes=(0).to_bytes(1, 'little') + format_nbc: bytes=(VBAN_SERVICE_RTPACKET).to_bytes(1, 'little') + format_bit: bytes=(0).to_bytes(1, 'little') + streamname: bytes=name.encode('ascii') + bytes(16-len(name)) + + @property + def header(self): + header = self.vban + header += self.format_sr + header += self.format_nbs + header += self.format_nbc + header += self.format_bit + header += self.streamname + assert len(header) == HEADER_SIZE-4, f'Header expected {HEADER_SIZE-4} bytes' + return header + +@dataclass +class RegisterRTHeader: + """ REGISTER RT PACKET header """ + name='Register RTP' + timeout=15 + vban: bytes='VBAN'.encode() + format_sr: bytes=(0x60).to_bytes(1, 'little') + format_nbs: bytes=(0).to_bytes(1, 'little') + format_nbc: bytes=(VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, 'little') + format_bit: bytes=(timeout & 0x000000FF).to_bytes(1, 'little') # timeout + streamname: bytes=name.encode('ascii') + bytes(16-len(name)) + framecounter: bytes=(0).to_bytes(4, 'little') + + @property + def header(self): + header = self.vban + header += self.format_sr + header += self.format_nbs + header += self.format_nbc + header += self.format_bit + header += self.streamname + header += self.framecounter + assert len(header) == HEADER_SIZE, f'Header expected {HEADER_SIZE} bytes' + return header + +@dataclass +class TextRequestHeader: + """ VBAN-TEXT request header """ + name='Command1' + vban: bytes='VBAN'.encode() + sr: bytes=(0x52).to_bytes(1, 'little') + nbs: bytes=(0).to_bytes(1, 'little') + nbc: bytes=(0).to_bytes(1, 'little') + bit: bytes=(0x10).to_bytes(1, 'little') + streamname: bytes=name.encode() + bytes(16-len(name)) + framecounter: bytes=(0).to_bytes(4, 'little') + + @property + def header(self): + header = self.vban + header += self.sr + header += self.nbs + header += self.nbc + header += self.bit + header += self.streamname + header += self.framecounter + assert len(header) == HEADER_SIZE, f'Header expected {HEADER_SIZE} bytes' + return header diff --git a/vban_cmd/errors.py b/vban_cmd/errors.py new file mode 100644 index 0000000..43e9b48 --- /dev/null +++ b/vban_cmd/errors.py @@ -0,0 +1,2 @@ +class VMCMDErrors(Exception): + pass diff --git a/vban_cmd/kinds.py b/vban_cmd/kinds.py new file mode 100644 index 0000000..172d753 --- /dev/null +++ b/vban_cmd/kinds.py @@ -0,0 +1,28 @@ +import sys +import platform +from collections import namedtuple +from .errors import VMCMDErrors + +""" +Represents a major version of Voicemeeter and describes +its strip layout. +""" +VMKind = namedtuple('VMKind', ['id', 'name', 'outs', 'ins', 'executable', 'vban']) + +bits = 64 if sys.maxsize > 2**32 else 32 +os = platform.system() + +_kind_map = { + 'basic': VMKind('basic', 'Basic', (2,1), (1,1), 'voicemeeter.exe', (4, 4)), + 'banana': VMKind('banana', 'Banana', (3,2), (3,2), 'voicemeeterpro.exe', (8, 8)), + 'potato': VMKind('potato', 'Potato', (5,3), (5,3), + f'voicemeeter8{"x64" if bits == 64 else ""}.exe', (8, 8)) +} + +def get(kind_id): + try: + return _kind_map[kind_id] + except KeyError: + raise VMCMDErrors(f'Invalid Voicemeeter kind: {kind_id}') + +all = list(_kind_map.values()) diff --git a/vban_cmd/meta.py b/vban_cmd/meta.py new file mode 100644 index 0000000..93c184e --- /dev/null +++ b/vban_cmd/meta.py @@ -0,0 +1,12 @@ +from .errors import VMCMDErrors + +def strip_output_prop(param): + """ A channel prop. """ + def fget(self): + data = self.getter() + return not int.from_bytes(data.stripstate[self.index], 'little') & getattr(self._modes, f'_bus{param.lower()}') == 0 + def fset(self, val): + if not isinstance(val, bool) and val not in (0, 1): + raise VMCMDErrors(f'{param} is a boolean parameter') + self.setter(param, 1 if val else 0) + return property(fget, fset) \ No newline at end of file diff --git a/vban_cmd/strip.py b/vban_cmd/strip.py new file mode 100644 index 0000000..558cb34 --- /dev/null +++ b/vban_cmd/strip.py @@ -0,0 +1,153 @@ +from .errors import VMCMDErrors +from . import channel +from .channel import Channel +from . import kinds +from .meta import strip_output_prop + +class InputStrip(Channel): + """ Base class for input strips. """ + @classmethod + def make(cls, is_physical, remote, index, **kwargs): + """ + Factory function for input strips. + Returns a physical/virtual strip of a kind. + """ + PhysStrip, VirtStrip = _strip_pairs[remote.kind.id] + InputStrip = PhysStrip if is_physical else VirtStrip + IS_cls = type(f'Strip{remote.kind.name}', (InputStrip,), { + }) + return IS_cls(remote, index, **kwargs) + + @property + def identifier(self): + return f'Strip[{self.index}]' + + @property + def mono(self) -> bool: + data = self.getter() + return not int.from_bytes(data.stripstate[self.index], 'little') & self._modes._mono == 0 + + @mono.setter + def mono(self, val: bool): + if not isinstance(val, bool) and val not in (0,1): + raise VMCMDErrors('mono is a boolean parameter') + self.setter('mono', 1 if val else 0) + + @property + def solo(self) -> bool: + data = self.getter() + return not int.from_bytes(data.stripstate[self.index], 'little') & self._modes._solo == 0 + + @solo.setter + def solo(self, val: bool): + if not isinstance(val, bool) and val not in (0,1): + raise VMCMDErrors('solo is a boolean parameter') + self.setter('solo', 1 if val else 0) + + @property + def mute(self) -> bool: + data = self.getter() + return not int.from_bytes(data.stripstate[self.index], 'little') & self._modes._mute == 0 + + @mute.setter + def mute(self, val: bool): + if not isinstance(val, bool) and val not in (0,1): + raise VMCMDErrors('mute is a boolean parameter') + self.setter('mute', 1 if val else 0) + + @property + def limit(self) -> int: + data = self.getter() + return + + @limit.setter + def limit(self, val: int): + if val not in range(-40,13): + raise VMCMDErrors('Expected value from -40 to 12') + self.setter('limit', val) + + @property + def label(self) -> str: + data = self.getter() + return data.striplabels[self.index] + + @label.setter + def label(self, val: str): + if not isinstance(val, str): + raise VMCMDErrors('label is a string parameter') + self.setter('label', val) + + +class PhysicalInputStrip(InputStrip): + @property + def comp(self) -> float: + data = self.getter() + return + + @comp.setter + def comp(self, val: float): + self.setter('Comp', val) + + @property + def gate(self) -> float: + data = self.getter() + return + + @gate.setter + def gate(self, val: float): + self.setter('gate', val) + + @property + def device(self): + data = self.getter() + return + + @property + def sr(self): + data = self.getter() + return + + +class VirtualInputStrip(InputStrip): + @property + def mc(self) -> bool: + data = self.getter() + return + + @mc.setter + def mc(self, val: bool): + if not isinstance(val, bool) and val not in (0,1): + raise VMCMDErrors('mc is a boolean parameter') + self.setter('mc', 1 if val else 0) + mono = mc + + @property + def k(self) -> int: + data = self.getter() + return + + @k.setter + def k(self, val: int): + if val not in range(5): + raise VMCMDErrors('Expected value from 0 to 4') + self.setter('karaoke', val) + + +def _make_strip_mixin(kind): + """ Creates a mixin with the kind's strip layout set as class variables. """ + num_A, num_B = kind.outs + return type(f'StripMixin{kind.name}', (), { + **{f'A{i}': strip_output_prop(f'A{i}') for i in range(1, num_A+1)}, + **{f'B{i}': strip_output_prop(f'B{i}') for i in range(1, num_B+1)} + }) + +_strip_mixins = {kind.id: _make_strip_mixin(kind) for kind in kinds.all} + +def _make_strip_pair(kind): + """ Creates a PhysicalInputStrip and a VirtualInputStrip of a kind. """ + StripMixin = _strip_mixins[kind.id] + PhysStrip = type(f'PhysicalInputStrip{kind.name}', (PhysicalInputStrip, StripMixin), {}) + VirtStrip = type(f'VirtualInputStrip{kind.name}', (VirtualInputStrip, StripMixin), {}) + return (PhysStrip, VirtStrip) + +_strip_pairs = {kind.id: _make_strip_pair(kind) for kind in kinds.all} diff --git a/vban_cmd/vban_cmd.py b/vban_cmd/vban_cmd.py new file mode 100644 index 0000000..0edddb7 --- /dev/null +++ b/vban_cmd/vban_cmd.py @@ -0,0 +1,150 @@ +import abc +import select +import socket +from time import sleep +import sys +from threading import Thread +from typing import NamedTuple + +from .errors import VMCMDErrors +from . import kinds +from .dataclass import ( + HEADER_SIZE, + MAX_PACKET_SIZE, + VBAN_VMRT_Packet_Data, + VBAN_VMRT_Packet_Header, + RegisterRTHeader, + TextRequestHeader +) +from .strip import InputStrip + +class VbanCmd(abc.ABC): + def __init__(self, *args, **kwargs): + self._ip = kwargs['ip'] + self._port = kwargs['port'] + self._streamname = kwargs['streamname'] + self._bps = kwargs['bps'] + self._channel = kwargs['channel'] + self._delay = kwargs['delay'] + self._bps_opts = \ + [0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400,19200, 31250, + 38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800,921600, + 1000000, 1500000, 2000000, 3000000] + + self._text_header = TextRequestHeader() + self._register_rt_header = RegisterRTHeader() + self.expected_packet = VBAN_VMRT_Packet_Header() + + self._rt_register_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._rt_packet_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._sendrequest_string_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + is_readable = [] + is_writable = [self._rt_register_socket, self._rt_packet_socket, self._sendrequest_string_socket] + is_error = [] + self.ready_to_read, self.ready_to_write, in_error = select.select(is_readable, is_writable, is_error, 60) + + + def __enter__(self): + self._rt_packet_socket.bind((socket.gethostbyname(socket.gethostname()), self._port)) + worker = Thread(target=self._send_register_rt, daemon=True) + worker.start() + return self + + def _send_register_rt(self): + if self._rt_register_socket in self.ready_to_write: + while True: + self._rt_register_socket.sendto( + self._register_rt_header.header + bytes(1), (socket.gethostbyname(self._ip), self._port) + ) + count = int.from_bytes(self._register_rt_header.framecounter, 'little') + 1 + self._register_rt_header.framecounter = count.to_bytes(4, 'little') + sleep(10) + + def _fetch_rt_packet(self): + data, _ = self._rt_packet_socket.recvfrom(1024*2) + # check for packet data + if len(data) > HEADER_SIZE: + # check if packet is of type rt service + if self.expected_packet.header == data[:HEADER_SIZE-4]: + return VBAN_VMRT_Packet_Data( + _voicemeeterType=data[28:29], + _reserved=data[29:30], + _buffersize=data[30:32], + _voicemeeterVersion=data[32:36], + _optionBits=data[36:40], + _samplerate=data[40:44], + _inputLeveldB100=data[44:112], + _outputLeveldB100=data[112:240], + _TransportBit=data[240:244], + _stripState=data[244:276], + _busState=data[276:308], + _stripGaindB100Layer1=data[308:324], + _stripGaindB100Layer2=data[324:340], + _stripGaindB100Layer3=data[340:356], + _stripGaindB100Layer4=data[356:372], + _stripGaindB100Layer5=data[372:388], + _stripGaindB100Layer6=data[388:404], + _stripGaindB100Layer7=data[404:420], + _stripGaindB100Layer8=data[420:436], + _busGaindB100=data[436:452], + _stripLabelUTF8c60=data[452:932], + _busLabelUTF8c60=data[932:1412], + ) + return False + + def get_rt(self): + data = False + while not data: + data = self._fetch_rt_packet() + return data + + def set_rt(self, id_, param, val): + cmd = f'{id_}.{param}={val}' + if self._sendrequest_string_socket in self.ready_to_write: + print(f'sending {cmd} to {socket.gethostbyname(self._ip)}:{self._port}') + self._sendrequest_string_socket.sendto( + self._text_header.header + cmd.encode(), (socket.gethostbyname(self._ip), self._port) + ) + count = int.from_bytes(self._text_header.framecounter, 'little') + 1 + self._text_header.framecounter = count.to_bytes(4, 'little') + sleep(self._delay) + + def __exit__(self, exc_type, exc_value, exc_traceback): + self._rt_packet_socket.close() + sys.exit() + + +def _make_remote(kind: NamedTuple) -> VbanCmd: + """ + Creates a new remote class and sets its number of inputs + and outputs for a VM kind. + + The returned class will subclass VbanCmd. + """ + def init(self, *args, **kwargs): + defaultkwargs = { + 'ip': None, 'port': 6990, 'streamname': 'Command1', 'bps': 0, + 'channel': 0, 'delay': 0.03, + } + kwargs = defaultkwargs | kwargs + VbanCmd.__init__(self, *args, **kwargs) + self.kind = kind + self.phys_in, self.virt_in = kind.ins + self.phys_out, self.virt_out = kind.outs + self.strip = \ + tuple(InputStrip.make((i < self.phys_in), self, i) + for i in range(self.phys_in + self.virt_in)) + + return type(f'VbanCmd{kind.name}', (VbanCmd,), { + '__init__': init, + }) + +_remotes = {kind.id: _make_remote(kind) for kind in kinds.all} + +def connect(kind_id: str, *args, **kwargs): + try: + VBANCMD_cls = _remotes[kind_id] + return VBANCMD_cls(**kwargs) + except KeyError as err: + raise VMCMDErrors(f'Invalid Voicemeeter kind: {kind_id}')