initial commit

initial commit
This commit is contained in:
onyx-and-iris 2022-02-25 14:37:23 +00:00
parent 7b290162e7
commit 838e3c4999
9 changed files with 611 additions and 0 deletions

15
__main__.py Normal file
View File

@ -0,0 +1,15 @@
import vban_cmd
from time import sleep
def test_set_parameter():
with vban_cmd.connect('potato', ip='ws.local') as vban:
for param in ['A1']:
for i in range(3):
setattr(vban.strip[i], param, True)
print(getattr(vban.strip[i], param))
setattr(vban.strip[i], param, False)
print(getattr(vban.strip[i], param))
if __name__ == '__main__':
test_set_parameter()

3
vban_cmd/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .vban_cmd import connect
__ALL__ = ['connect']

72
vban_cmd/channel.py Normal file
View File

@ -0,0 +1,72 @@
import abc
from .errors import VMCMDErrors
from dataclasses import dataclass
@dataclass
class Modes:
""" Channel Modes """
_mute: hex=0x00000001
_solo: hex=0x00000002
_mono: hex=0x00000004
_mutec: hex=0x00000008
_mixdown: hex=0x00000010
_repeat: hex=0x00000020
_mixdownb: hex=0x00000030
_composite: hex=0x00000040
_upmixtv: hex=0x00000050
_updmix2: hex=0x00000060
_upmix4: hex=0x00000070
_upmix6: hex=0x00000080
_center: hex=0x00000090
_lfe: hex=0x000000A0
_rear: hex=0x000000B0
_mask: hex=0x000000F0
_eq: hex=0x00000100
_cross: hex=0x00000200
_eqb: hex=0x00000800
_busa: hex=0x00001000
_busa1: hex=0x00001000
_busa2: hex=0x00002000
_busa3: hex=0x00004000
_busa4: hex=0x00008000
_busa5: hex=0x00080000
_busb: hex=0x00010000
_busb1: hex=0x00010000
_busb2: hex=0x00020000
_busb3: hex=0x00040000
_pan0: hex=0x00000000
_pancolor: hex=0x00100000
_panmod: hex=0x00200000
_panmask: hex=0x00F00000
_postfx_r: hex=0x01000000
_postfx_d: hex=0x02000000
_postfx1: hex=0x04000000
_postfx2: hex=0x08000000
_sel: hex=0x10000000
_monitor: hex=0x20000000
class Channel(abc.ABC):
""" Base class for InputStrip and OutputBus. """
def __init__(self, remote, index):
self._remote = remote
self.index = index
self._modes = Modes()
def getter(self):
""" Returns an RT data packet. """
return self._remote.get_rt()
def setter(self, param, val):
""" Sends a string request RT packet. """
self._remote.set_rt(f'{self.identifier}', param, val)
@abc.abstractmethod
def identifier(self):
pass

176
vban_cmd/dataclass.py Normal file
View File

@ -0,0 +1,176 @@
from dataclasses import dataclass
VBAN_SERVICE_RTPACKETREGISTER=32
VBAN_SERVICE_RTPACKET=33
MAX_PACKET_SIZE = 1436
HEADER_SIZE = (4+1+1+1+1+16+4)
@dataclass
class VBAN_VMRT_Packet_Data:
""" RT Packet Data """
_voicemeeterType: bytes
_reserved: bytes
_buffersize: bytes
_voicemeeterVersion: bytes
_optionBits: bytes
_samplerate: bytes
_inputLeveldB100: bytes
_outputLeveldB100: bytes
_TransportBit: bytes
_stripState: bytes
_busState: bytes
_stripGaindB100Layer1: bytes
_stripGaindB100Layer2: bytes
_stripGaindB100Layer3: bytes
_stripGaindB100Layer4: bytes
_stripGaindB100Layer5: bytes
_stripGaindB100Layer6: bytes
_stripGaindB100Layer7: bytes
_stripGaindB100Layer8: bytes
_busGaindB100: bytes
_stripLabelUTF8c60: bytes
_busLabelUTF8c60: bytes
@property
def voicemeetertype(self) -> str:
""" returns voicemeeter type as a string """
type_ = ('basic', 'banana', 'potato')
return type_[int.from_bytes(self._voicemeeterType, 'little')-1]
@property
def voicemeeterversion(self) -> tuple:
""" returns voicemeeter version as a string """
return tuple(reversed(tuple(int.from_bytes(self._voicemeeterVersion[i:i+1], 'little') for i in range(0, 4))))
@property
def samplerate(self) -> int:
""" returns samplerate as an int """
return int.from_bytes(self._samplerate, 'little')
@property
def inputlevels(self) -> tuple:
""" returns the entire level array across all inputs """
return tuple(int.from_bytes(self._inputLeveldB100[i:i+2], 'little') for i in range(0, 68, 2))
@property
def outputlevels(self) -> tuple:
""" returns the entire level array across all outputs """
return tuple(int.from_bytes(self._outputLeveldB100[i:i+2], 'little') for i in range(0, 128, 2))
@property
def stripstate(self) -> tuple:
""" returns tuple of strip states accessable through bit modes """
return tuple(self._stripState[i:i+4] for i in range(0, 32, 4))
@property
def busstate(self) -> tuple:
""" returns tuple of bus states accessable through bit modes """
return tuple(self._busState[i:i+4] for i in range(0, 32, 4))
"""
these functions return an array of gainlayers[i] across all strips
ie stripgainlayer1 = [strip[0].gainlayer[0], strip[1].gainlayer[0], strip[2].gainlayer[0]...]
"""
@property
def stripgainlayer1(self) -> tuple:
return tuple(int.from_bytes(self._stripGaindB100Layer1[i:i+2], 'little') for i in range(0, 16, 2))
@property
def stripgainlayer2(self) -> tuple:
return tuple(int.from_bytes(self._stripGaindB100Layer2[i:i+2], 'little') for i in range(0, 16, 2))
@property
def stripgainlayer3(self) -> tuple:
return tuple(int.from_bytes(self._stripGaindB100Layer3[i:i+2], 'little') for i in range(0, 16, 2))
@property
def stripgainlayer4(self) -> tuple:
return tuple(int.from_bytes(self._stripGaindB100Layer4[i:i+2], 'little') for i in range(0, 16, 2))
@property
def stripgainlayer5(self) -> tuple:
return tuple(int.from_bytes(self._stripGaindB100Layer5[i:i+2], 'little') for i in range(0, 16, 2))
@property
def stripgainlayer6(self) -> tuple:
return tuple(int.from_bytes(self._stripGaindB100Layer6[i:i+2], 'little') for i in range(0, 16, 2))
@property
def stripgainlayer7(self) -> tuple:
return tuple(int.from_bytes(self._stripGaindB100Layer7[i:i+2], 'little') for i in range(0, 16, 2))
@property
def stripgainlayer8(self) -> tuple:
return tuple(int.from_bytes(self._stripGaindB100Layer8[i:i+2], 'little') for i in range(0, 16, 2))
@property
def busgain(self) -> tuple:
""" returns tuple of bus gains """
return tuple(int.from_bytes(self._busGaindB100[i:i+2], 'little') for i in range(0, 16, 2))
@property
def striplabels(self) -> tuple:
""" returns tuple of strip labels """
return tuple(self._stripLabelUTF8c60[i:i+60].decode().strip('\x00') for i in range(0, 480, 60))
@property
def buslabels(self) -> tuple:
""" returns tuple of bus labels """
return tuple(self._busLabelUTF8c60[i:i+60].decode().strip('\x00') for i in range(0, 480, 60))
@dataclass
class VBAN_VMRT_Packet_Header:
""" RT PACKET header (expected from Voicemeeter server) """
name='Voicemeeter-RTP'
timeout=15
vban: bytes='VBAN'.encode()
format_sr: bytes=(0x60).to_bytes(1, 'little')
format_nbs: bytes=(0).to_bytes(1, 'little')
format_nbc: bytes=(VBAN_SERVICE_RTPACKET).to_bytes(1, 'little')
format_bit: bytes=(0).to_bytes(1, 'little')
streamname: bytes=name.encode('ascii') + bytes(16-len(name))
@property
def header(self):
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
assert len(header) == HEADER_SIZE-4, f'Header expected {HEADER_SIZE-4} bytes'
return header
@dataclass
class RegisterRTHeader:
""" REGISTER RT PACKET header """
name='Register RTP'
timeout=15
vban: bytes='VBAN'.encode()
format_sr: bytes=(0x60).to_bytes(1, 'little')
format_nbs: bytes=(0).to_bytes(1, 'little')
format_nbc: bytes=(VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, 'little')
format_bit: bytes=(timeout & 0x000000FF).to_bytes(1, 'little') # timeout
streamname: bytes=name.encode('ascii') + bytes(16-len(name))
framecounter: bytes=(0).to_bytes(4, 'little')
@property
def header(self):
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
header += self.framecounter
assert len(header) == HEADER_SIZE, f'Header expected {HEADER_SIZE} bytes'
return header
@dataclass
class TextRequestHeader:
""" VBAN-TEXT request header """
name='Command1'
vban: bytes='VBAN'.encode()
sr: bytes=(0x52).to_bytes(1, 'little')
nbs: bytes=(0).to_bytes(1, 'little')
nbc: bytes=(0).to_bytes(1, 'little')
bit: bytes=(0x10).to_bytes(1, 'little')
streamname: bytes=name.encode() + bytes(16-len(name))
framecounter: bytes=(0).to_bytes(4, 'little')
@property
def header(self):
header = self.vban
header += self.sr
header += self.nbs
header += self.nbc
header += self.bit
header += self.streamname
header += self.framecounter
assert len(header) == HEADER_SIZE, f'Header expected {HEADER_SIZE} bytes'
return header

2
vban_cmd/errors.py Normal file
View File

@ -0,0 +1,2 @@
class VMCMDErrors(Exception):
pass

28
vban_cmd/kinds.py Normal file
View File

@ -0,0 +1,28 @@
import sys
import platform
from collections import namedtuple
from .errors import VMCMDErrors
"""
Represents a major version of Voicemeeter and describes
its strip layout.
"""
VMKind = namedtuple('VMKind', ['id', 'name', 'outs', 'ins', 'executable', 'vban'])
bits = 64 if sys.maxsize > 2**32 else 32
os = platform.system()
_kind_map = {
'basic': VMKind('basic', 'Basic', (2,1), (1,1), 'voicemeeter.exe', (4, 4)),
'banana': VMKind('banana', 'Banana', (3,2), (3,2), 'voicemeeterpro.exe', (8, 8)),
'potato': VMKind('potato', 'Potato', (5,3), (5,3),
f'voicemeeter8{"x64" if bits == 64 else ""}.exe', (8, 8))
}
def get(kind_id):
try:
return _kind_map[kind_id]
except KeyError:
raise VMCMDErrors(f'Invalid Voicemeeter kind: {kind_id}')
all = list(_kind_map.values())

12
vban_cmd/meta.py Normal file
View File

@ -0,0 +1,12 @@
from .errors import VMCMDErrors
def strip_output_prop(param):
""" A channel prop. """
def fget(self):
data = self.getter()
return not int.from_bytes(data.stripstate[self.index], 'little') & getattr(self._modes, f'_bus{param.lower()}') == 0
def fset(self, val):
if not isinstance(val, bool) and val not in (0, 1):
raise VMCMDErrors(f'{param} is a boolean parameter')
self.setter(param, 1 if val else 0)
return property(fget, fset)

153
vban_cmd/strip.py Normal file
View File

@ -0,0 +1,153 @@
from .errors import VMCMDErrors
from . import channel
from .channel import Channel
from . import kinds
from .meta import strip_output_prop
class InputStrip(Channel):
""" Base class for input strips. """
@classmethod
def make(cls, is_physical, remote, index, **kwargs):
"""
Factory function for input strips.
Returns a physical/virtual strip of a kind.
"""
PhysStrip, VirtStrip = _strip_pairs[remote.kind.id]
InputStrip = PhysStrip if is_physical else VirtStrip
IS_cls = type(f'Strip{remote.kind.name}', (InputStrip,), {
})
return IS_cls(remote, index, **kwargs)
@property
def identifier(self):
return f'Strip[{self.index}]'
@property
def mono(self) -> bool:
data = self.getter()
return not int.from_bytes(data.stripstate[self.index], 'little') & self._modes._mono == 0
@mono.setter
def mono(self, val: bool):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors('mono is a boolean parameter')
self.setter('mono', 1 if val else 0)
@property
def solo(self) -> bool:
data = self.getter()
return not int.from_bytes(data.stripstate[self.index], 'little') & self._modes._solo == 0
@solo.setter
def solo(self, val: bool):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors('solo is a boolean parameter')
self.setter('solo', 1 if val else 0)
@property
def mute(self) -> bool:
data = self.getter()
return not int.from_bytes(data.stripstate[self.index], 'little') & self._modes._mute == 0
@mute.setter
def mute(self, val: bool):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors('mute is a boolean parameter')
self.setter('mute', 1 if val else 0)
@property
def limit(self) -> int:
data = self.getter()
return
@limit.setter
def limit(self, val: int):
if val not in range(-40,13):
raise VMCMDErrors('Expected value from -40 to 12')
self.setter('limit', val)
@property
def label(self) -> str:
data = self.getter()
return data.striplabels[self.index]
@label.setter
def label(self, val: str):
if not isinstance(val, str):
raise VMCMDErrors('label is a string parameter')
self.setter('label', val)
class PhysicalInputStrip(InputStrip):
@property
def comp(self) -> float:
data = self.getter()
return
@comp.setter
def comp(self, val: float):
self.setter('Comp', val)
@property
def gate(self) -> float:
data = self.getter()
return
@gate.setter
def gate(self, val: float):
self.setter('gate', val)
@property
def device(self):
data = self.getter()
return
@property
def sr(self):
data = self.getter()
return
class VirtualInputStrip(InputStrip):
@property
def mc(self) -> bool:
data = self.getter()
return
@mc.setter
def mc(self, val: bool):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors('mc is a boolean parameter')
self.setter('mc', 1 if val else 0)
mono = mc
@property
def k(self) -> int:
data = self.getter()
return
@k.setter
def k(self, val: int):
if val not in range(5):
raise VMCMDErrors('Expected value from 0 to 4')
self.setter('karaoke', val)
def _make_strip_mixin(kind):
""" Creates a mixin with the kind's strip layout set as class variables. """
num_A, num_B = kind.outs
return type(f'StripMixin{kind.name}', (), {
**{f'A{i}': strip_output_prop(f'A{i}') for i in range(1, num_A+1)},
**{f'B{i}': strip_output_prop(f'B{i}') for i in range(1, num_B+1)}
})
_strip_mixins = {kind.id: _make_strip_mixin(kind) for kind in kinds.all}
def _make_strip_pair(kind):
""" Creates a PhysicalInputStrip and a VirtualInputStrip of a kind. """
StripMixin = _strip_mixins[kind.id]
PhysStrip = type(f'PhysicalInputStrip{kind.name}', (PhysicalInputStrip, StripMixin), {})
VirtStrip = type(f'VirtualInputStrip{kind.name}', (VirtualInputStrip, StripMixin), {})
return (PhysStrip, VirtStrip)
_strip_pairs = {kind.id: _make_strip_pair(kind) for kind in kinds.all}

150
vban_cmd/vban_cmd.py Normal file
View File

@ -0,0 +1,150 @@
import abc
import select
import socket
from time import sleep
import sys
from threading import Thread
from typing import NamedTuple
from .errors import VMCMDErrors
from . import kinds
from .dataclass import (
HEADER_SIZE,
MAX_PACKET_SIZE,
VBAN_VMRT_Packet_Data,
VBAN_VMRT_Packet_Header,
RegisterRTHeader,
TextRequestHeader
)
from .strip import InputStrip
class VbanCmd(abc.ABC):
def __init__(self, *args, **kwargs):
self._ip = kwargs['ip']
self._port = kwargs['port']
self._streamname = kwargs['streamname']
self._bps = kwargs['bps']
self._channel = kwargs['channel']
self._delay = kwargs['delay']
self._bps_opts = \
[0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400,19200, 31250,
38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800,921600,
1000000, 1500000, 2000000, 3000000]
self._text_header = TextRequestHeader()
self._register_rt_header = RegisterRTHeader()
self.expected_packet = VBAN_VMRT_Packet_Header()
self._rt_register_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._rt_packet_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sendrequest_string_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
is_readable = []
is_writable = [self._rt_register_socket, self._rt_packet_socket, self._sendrequest_string_socket]
is_error = []
self.ready_to_read, self.ready_to_write, in_error = select.select(is_readable, is_writable, is_error, 60)
def __enter__(self):
self._rt_packet_socket.bind((socket.gethostbyname(socket.gethostname()), self._port))
worker = Thread(target=self._send_register_rt, daemon=True)
worker.start()
return self
def _send_register_rt(self):
if self._rt_register_socket in self.ready_to_write:
while True:
self._rt_register_socket.sendto(
self._register_rt_header.header + bytes(1), (socket.gethostbyname(self._ip), self._port)
)
count = int.from_bytes(self._register_rt_header.framecounter, 'little') + 1
self._register_rt_header.framecounter = count.to_bytes(4, 'little')
sleep(10)
def _fetch_rt_packet(self):
data, _ = self._rt_packet_socket.recvfrom(1024*2)
# check for packet data
if len(data) > HEADER_SIZE:
# check if packet is of type rt service
if self.expected_packet.header == data[:HEADER_SIZE-4]:
return VBAN_VMRT_Packet_Data(
_voicemeeterType=data[28:29],
_reserved=data[29:30],
_buffersize=data[30:32],
_voicemeeterVersion=data[32:36],
_optionBits=data[36:40],
_samplerate=data[40:44],
_inputLeveldB100=data[44:112],
_outputLeveldB100=data[112:240],
_TransportBit=data[240:244],
_stripState=data[244:276],
_busState=data[276:308],
_stripGaindB100Layer1=data[308:324],
_stripGaindB100Layer2=data[324:340],
_stripGaindB100Layer3=data[340:356],
_stripGaindB100Layer4=data[356:372],
_stripGaindB100Layer5=data[372:388],
_stripGaindB100Layer6=data[388:404],
_stripGaindB100Layer7=data[404:420],
_stripGaindB100Layer8=data[420:436],
_busGaindB100=data[436:452],
_stripLabelUTF8c60=data[452:932],
_busLabelUTF8c60=data[932:1412],
)
return False
def get_rt(self):
data = False
while not data:
data = self._fetch_rt_packet()
return data
def set_rt(self, id_, param, val):
cmd = f'{id_}.{param}={val}'
if self._sendrequest_string_socket in self.ready_to_write:
print(f'sending {cmd} to {socket.gethostbyname(self._ip)}:{self._port}')
self._sendrequest_string_socket.sendto(
self._text_header.header + cmd.encode(), (socket.gethostbyname(self._ip), self._port)
)
count = int.from_bytes(self._text_header.framecounter, 'little') + 1
self._text_header.framecounter = count.to_bytes(4, 'little')
sleep(self._delay)
def __exit__(self, exc_type, exc_value, exc_traceback):
self._rt_packet_socket.close()
sys.exit()
def _make_remote(kind: NamedTuple) -> VbanCmd:
"""
Creates a new remote class and sets its number of inputs
and outputs for a VM kind.
The returned class will subclass VbanCmd.
"""
def init(self, *args, **kwargs):
defaultkwargs = {
'ip': None, 'port': 6990, 'streamname': 'Command1', 'bps': 0,
'channel': 0, 'delay': 0.03,
}
kwargs = defaultkwargs | kwargs
VbanCmd.__init__(self, *args, **kwargs)
self.kind = kind
self.phys_in, self.virt_in = kind.ins
self.phys_out, self.virt_out = kind.outs
self.strip = \
tuple(InputStrip.make((i < self.phys_in), self, i)
for i in range(self.phys_in + self.virt_in))
return type(f'VbanCmd{kind.name}', (VbanCmd,), {
'__init__': init,
})
_remotes = {kind.id: _make_remote(kind) for kind in kinds.all}
def connect(kind_id: str, *args, **kwargs):
try:
VBANCMD_cls = _remotes[kind_id]
return VBANCMD_cls(**kwargs)
except KeyError as err:
raise VMCMDErrors(f'Invalid Voicemeeter kind: {kind_id}')