mirror of
https://github.com/onyx-and-iris/vban-cmd-python.git
synced 2025-01-18 10:30:48 +00:00
rename directory
rename directory
This commit is contained in:
parent
3cfc6e587b
commit
8f95942fbd
@ -1,141 +0,0 @@
|
||||
import tkinter as tk
|
||||
from tkinter import ttk
|
||||
from functools import partial
|
||||
from typing import NamedTuple
|
||||
|
||||
import vban_cmd
|
||||
from vban_cmd import kinds
|
||||
|
||||
class ExampleAppErrors(Exception):
|
||||
pass
|
||||
|
||||
class App(tk.Tk):
|
||||
""" Topmost Level of App """
|
||||
@classmethod
|
||||
def make(cls, kind: NamedTuple):
|
||||
"""
|
||||
Factory function for App
|
||||
|
||||
Returns an App class of a kind
|
||||
"""
|
||||
APP_cls = type(f'App{kind.name}', (cls,), {
|
||||
'name': kind.name,
|
||||
'ins': kind.ins,
|
||||
'outs': kind.outs,
|
||||
}
|
||||
)
|
||||
return APP_cls
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.title(f'Voicemeeter{self.name} Example Program')
|
||||
self.phys_in, self.virt_in = self.ins
|
||||
self.col = self.phys_in + self.virt_in
|
||||
self.row = 3
|
||||
self.w = {'Basic': 300, 'Banana': 600, 'Potato': 800}
|
||||
self.h = 150
|
||||
self.defaultsizes = {
|
||||
'Basic': f'{self.w[self.name]}x{self.h}',
|
||||
'Banana': f'{self.w[self.name]}x{self.h}',
|
||||
'Potato': f'{self.w[self.name]}x{self.h}',
|
||||
}
|
||||
self.geometry(self.defaultsizes[self.name])
|
||||
|
||||
""" create tkinter variables, generate widgets and configure rows/cols """
|
||||
self.gains = {
|
||||
'strip': [tk.DoubleVar() for i in range(self.phys_in + self.virt_in)],
|
||||
}
|
||||
self.levels = {
|
||||
'strip': [tk.DoubleVar() for i in range(self.phys_in + self.virt_in)],
|
||||
}
|
||||
[self._make_single_channel(i, j) for i, j in enumerate(i for i in range(0, self.col*2, 2))]
|
||||
scales = [widget for widget in self.winfo_children() if isinstance(widget, tk.Scale)]
|
||||
[scale.bind('<Double-Button-1>', partial(self.reset_gain, index=i)) for i, scale in enumerate(scales)]
|
||||
|
||||
""" configure grid """
|
||||
self.col_row_configure()
|
||||
|
||||
""" initiate watchers/updaters """
|
||||
self.refresh_public_packet()
|
||||
[self.watch_levels(i) for i in range(self.col)]
|
||||
|
||||
@property
|
||||
def id_(self):
|
||||
return 'strip'
|
||||
|
||||
def _make_single_channel(self, i, j):
|
||||
"""
|
||||
Creates a label, progressbar, scale, and mute
|
||||
"""
|
||||
ttk.Label(self, text=f'{vban.strip[i].label}').grid(column=j, row=0, columnspan=2)
|
||||
|
||||
ttk.Progressbar(self, maximum=72, orient='vertical', mode='determinate', variable=self.levels[self.id_][i]).grid(column=j, row=1)
|
||||
ttk.Scale(self, from_=12.0, to=-60.0, orient='vertical', variable=self.gains[self.id_][i],
|
||||
command=partial(self.scale_callback, index=i)).grid(column=j+1, row=1)
|
||||
|
||||
ttk.Button(self, text='MUTE',
|
||||
command=partial(self.toggle, 'mute', i), style=f'Mute{i}.TButton').grid(column=j, row=2, columnspan=2, sticky=(tk.W, tk.E))
|
||||
|
||||
def scale_callback(self, *args, index=None):
|
||||
""" callback function for scale widgets """
|
||||
vban.strip[index].gain = self.gains[self.id_][index].get()
|
||||
|
||||
def reset_gain(self, *args, index=None):
|
||||
""" reset gain to 0 when double click mouse """
|
||||
vban.strip[index].gain = 0
|
||||
self.gains[self.id_][index].set(0)
|
||||
|
||||
def toggle(self, param, index):
|
||||
""" toggles a strip parameter """
|
||||
setattr(vban.strip[index], param, not getattr(vban.strip[index], param))
|
||||
|
||||
def col_row_configure(self):
|
||||
[self.columnconfigure(i, weight=1) for i in range(self.col*2)]
|
||||
[child.grid_configure(padx=1, pady=1)
|
||||
for child in self.winfo_children()]
|
||||
|
||||
|
||||
"""
|
||||
The following functions perform background tasks. Importantly the public packet is constantly updated
|
||||
allowing the vban_cmd interface to fetch updated values.
|
||||
"""
|
||||
def refresh_public_packet(self):
|
||||
self.after(1, self.refresh_public_packet_step)
|
||||
|
||||
def refresh_public_packet_step(self):
|
||||
""" updates public packet in the background """
|
||||
vban.public_packet = vban._get_rt()
|
||||
self.after(25, self.refresh_public_packet_step)
|
||||
|
||||
def watch_levels(self, i):
|
||||
self.after(1, self.watch_levels_step, i)
|
||||
|
||||
def watch_levels_step(self, i):
|
||||
val = vban.strip[i].levels.prefader[0] + vban.strip[i].gain
|
||||
self.levels[self.id_][i].set((0 if vban.strip[i].mute else 100 + (val-30)))
|
||||
self.after(20, self.watch_levels_step, i)
|
||||
|
||||
|
||||
_apps = {kind.id: App.make(kind) for kind in kinds.all}
|
||||
|
||||
def connect(kind_id: str) -> App:
|
||||
""" return App of the kind requested """
|
||||
try:
|
||||
APP_cls = _apps[kind_id]
|
||||
return APP_cls()
|
||||
except KeyError:
|
||||
raise ExampleAppErrors(f'Invalid kind: {kind_id}')
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
kind_id = 'banana'
|
||||
opts = {
|
||||
# make sure VBAN is configured on remote machine then set IP accordingly
|
||||
'ip': 'ws.local',
|
||||
'streamname': 'Command1',
|
||||
'port': 6990,
|
||||
}
|
||||
|
||||
with vban_cmd.connect(kind_id, **opts) as vban:
|
||||
app = connect(kind_id)
|
||||
app.mainloop()
|
@ -1,3 +0,0 @@
|
||||
from .vban_cmd import connect
|
||||
|
||||
__ALL__ = ['connect']
|
128
vban_cmd/bus.py
128
vban_cmd/bus.py
@ -1,128 +0,0 @@
|
||||
from .errors import VMCMDErrors
|
||||
from . import channel
|
||||
from .channel import Channel
|
||||
from . import kinds
|
||||
|
||||
class OutputBus(Channel):
|
||||
""" Base class for output buses. """
|
||||
@classmethod
|
||||
def make(cls, is_physical, remote, index, *args, **kwargs):
|
||||
"""
|
||||
Factory function for output busses.
|
||||
Returns a physical/virtual bus of a kind.
|
||||
"""
|
||||
OutputBus = PhysicalOutputBus if is_physical else VirtualOutputBus
|
||||
OB_cls = type(f'Bus{remote.kind.name}', (OutputBus,), {
|
||||
'levels': BusLevel(remote, index),
|
||||
})
|
||||
return OB_cls(remote, index, *args, **kwargs)
|
||||
|
||||
@property
|
||||
def identifier(self):
|
||||
return f'Bus[{self.index}]'
|
||||
|
||||
@property
|
||||
def mute(self) -> bool:
|
||||
return not int.from_bytes(self.public_packet.busstate[self.index], 'little') & self._modes._mute == 0
|
||||
|
||||
@mute.setter
|
||||
def mute(self, val: bool):
|
||||
if not isinstance(val, bool) and val not in (0,1):
|
||||
raise VMCMDErrors('mute is a boolean parameter')
|
||||
self.setter('mute', 1 if val else 0)
|
||||
|
||||
@property
|
||||
def mono(self) -> bool:
|
||||
return not int.from_bytes(self.public_packet.busstate[self.index], 'little') & self._modes._mono == 0
|
||||
|
||||
@mono.setter
|
||||
def mono(self, val: bool):
|
||||
if not isinstance(val, bool) and val not in (0,1):
|
||||
raise VMCMDErrors('mono is a boolean parameter')
|
||||
self.setter('mono', 1 if val else 0)
|
||||
|
||||
@property
|
||||
def eq(self) -> bool:
|
||||
return not int.from_bytes(self.public_packet.busstate[self.index], 'little') & self._modes._eq == 0
|
||||
|
||||
@eq.setter
|
||||
def eq(self, val: bool):
|
||||
if not isinstance(val, bool) and val not in (0,1):
|
||||
raise ('eq is a boolean parameter')
|
||||
self.setter('eq.On', 1 if val else 0)
|
||||
|
||||
@property
|
||||
def eq_ab(self) -> bool:
|
||||
return not int.from_bytes(self.public_packet.busstate[self.index], 'little') & self._modes._eqb == 0
|
||||
|
||||
@eq_ab.setter
|
||||
def eq_ab(self, val: bool):
|
||||
if not isinstance(val, bool) and val not in (0,1):
|
||||
raise VMCMDErrors('eq_ab is a boolean parameter')
|
||||
self.setter('eq.ab', 1 if val else 0)
|
||||
|
||||
@property
|
||||
def label(self) -> str:
|
||||
return self.public_packet.buslabels[self.index]
|
||||
|
||||
@label.setter
|
||||
def label(self, val: str):
|
||||
if not isinstance(val, str):
|
||||
raise VMCMDErrors('label is a string parameter')
|
||||
self.setter('Label', val)
|
||||
|
||||
@property
|
||||
def gain(self) -> float:
|
||||
def fget():
|
||||
val = self.public_packet.busgain[self.index]
|
||||
if val < 10000:
|
||||
return -val
|
||||
elif val == ((1 << 16) - 1):
|
||||
return 0
|
||||
else:
|
||||
return ((1 << 16) - 1) - val
|
||||
return round((fget() * 0.01), 1)
|
||||
|
||||
@gain.setter
|
||||
def gain(self, val: float):
|
||||
self.setter('gain', val)
|
||||
|
||||
|
||||
class PhysicalOutputBus(OutputBus):
|
||||
@property
|
||||
def device(self) -> str:
|
||||
return
|
||||
|
||||
@property
|
||||
def sr(self) -> int:
|
||||
return
|
||||
|
||||
|
||||
class VirtualOutputBus(OutputBus):
|
||||
pass
|
||||
|
||||
|
||||
class BusLevel(OutputBus):
|
||||
def __init__(self, remote, index):
|
||||
super().__init__(remote, index)
|
||||
self.level_map = _bus_maps[remote.kind.id]
|
||||
|
||||
def getter_level(self, mode=None):
|
||||
def fget(i, data):
|
||||
val = data.outputlevels[i]
|
||||
return -val * 0.01
|
||||
|
||||
range_ = self.level_map[self.index]
|
||||
data = self.public_packet
|
||||
levels = tuple(round(fget(i, data), 1) for i in range(*range_))
|
||||
return levels
|
||||
|
||||
@property
|
||||
def all(self) -> tuple:
|
||||
return self.getter_level()
|
||||
|
||||
def _make_bus_level_map(kind):
|
||||
phys_out, virt_out = kind.outs
|
||||
return tuple((i, i+8) for i in range(0, (phys_out+virt_out)*8, 8))
|
||||
|
||||
_bus_maps = {kind.id: _make_bus_level_map(kind) for kind in kinds.all}
|
@ -1,75 +0,0 @@
|
||||
import abc
|
||||
from .errors import VMCMDErrors
|
||||
from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class Modes:
|
||||
""" Channel Modes """
|
||||
_mute: hex=0x00000001
|
||||
_solo: hex=0x00000002
|
||||
_mono: hex=0x00000004
|
||||
_mutec: hex=0x00000008
|
||||
|
||||
_mixdown: hex=0x00000010
|
||||
_repeat: hex=0x00000020
|
||||
_mixdownb: hex=0x00000030
|
||||
_composite: hex=0x00000040
|
||||
_upmixtv: hex=0x00000050
|
||||
_updmix2: hex=0x00000060
|
||||
_upmix4: hex=0x00000070
|
||||
_upmix6: hex=0x00000080
|
||||
_center: hex=0x00000090
|
||||
_lfe: hex=0x000000A0
|
||||
_rear: hex=0x000000B0
|
||||
|
||||
_mask: hex=0x000000F0
|
||||
|
||||
_eq: hex=0x00000100
|
||||
_cross: hex=0x00000200
|
||||
_eqb: hex=0x00000800
|
||||
|
||||
_busa: hex=0x00001000
|
||||
_busa1: hex=0x00001000
|
||||
_busa2: hex=0x00002000
|
||||
_busa3: hex=0x00004000
|
||||
_busa4: hex=0x00008000
|
||||
_busa5: hex=0x00080000
|
||||
|
||||
_busb: hex=0x00010000
|
||||
_busb1: hex=0x00010000
|
||||
_busb2: hex=0x00020000
|
||||
_busb3: hex=0x00040000
|
||||
|
||||
_pan0: hex=0x00000000
|
||||
_pancolor: hex=0x00100000
|
||||
_panmod: hex=0x00200000
|
||||
_panmask: hex=0x00F00000
|
||||
|
||||
_postfx_r: hex=0x01000000
|
||||
_postfx_d: hex=0x02000000
|
||||
_postfx1: hex=0x04000000
|
||||
_postfx2: hex=0x08000000
|
||||
|
||||
_sel: hex=0x10000000
|
||||
_monitor: hex=0x20000000
|
||||
|
||||
|
||||
class Channel(abc.ABC):
|
||||
""" Base class for InputStrip and OutputBus. """
|
||||
def __init__(self, remote, index):
|
||||
self._remote = remote
|
||||
self.index = index
|
||||
self._modes = Modes()
|
||||
|
||||
def setter(self, param, val):
|
||||
""" Sends a string request RT packet. """
|
||||
self._remote.set_rt(f'{self.identifier}', param, val)
|
||||
|
||||
@abc.abstractmethod
|
||||
def identifier(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
def public_packet(self):
|
||||
""" Returns an RT data packet. """
|
||||
return self._remote.public_packet
|
@ -1,184 +0,0 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
VBAN_SERVICE_RTPACKETREGISTER=32
|
||||
VBAN_SERVICE_RTPACKET=33
|
||||
MAX_PACKET_SIZE = 1436
|
||||
HEADER_SIZE = (4+1+1+1+1+16+4)
|
||||
|
||||
@dataclass
|
||||
class VBAN_VMRT_Packet_Data:
|
||||
""" RT Packet Data """
|
||||
_voicemeeterType: bytes
|
||||
_reserved: bytes
|
||||
_buffersize: bytes
|
||||
_voicemeeterVersion: bytes
|
||||
_optionBits: bytes
|
||||
_samplerate: bytes
|
||||
_inputLeveldB100: bytes
|
||||
_outputLeveldB100: bytes
|
||||
_TransportBit: bytes
|
||||
_stripState: bytes
|
||||
_busState: bytes
|
||||
_stripGaindB100Layer1: bytes
|
||||
_stripGaindB100Layer2: bytes
|
||||
_stripGaindB100Layer3: bytes
|
||||
_stripGaindB100Layer4: bytes
|
||||
_stripGaindB100Layer5: bytes
|
||||
_stripGaindB100Layer6: bytes
|
||||
_stripGaindB100Layer7: bytes
|
||||
_stripGaindB100Layer8: bytes
|
||||
_busGaindB100: bytes
|
||||
_stripLabelUTF8c60: bytes
|
||||
_busLabelUTF8c60: bytes
|
||||
|
||||
@property
|
||||
def voicemeetertype(self) -> str:
|
||||
""" returns voicemeeter type as a string """
|
||||
type_ = ('basic', 'banana', 'potato')
|
||||
return type_[int.from_bytes(self._voicemeeterType, 'little')-1]
|
||||
@property
|
||||
def voicemeeterversion(self) -> tuple:
|
||||
""" returns voicemeeter version as a string """
|
||||
return tuple(reversed(tuple(int.from_bytes(self._voicemeeterVersion[i:i+1], 'little') for i in range(4))))
|
||||
@property
|
||||
def samplerate(self) -> int:
|
||||
""" returns samplerate as an int """
|
||||
return int.from_bytes(self._samplerate, 'little')
|
||||
@property
|
||||
def inputlevels(self) -> tuple:
|
||||
""" returns the entire level array across all inputs """
|
||||
return tuple(((1 << 16) - 1) - int.from_bytes(self._inputLeveldB100[i:i+2], 'little') for i in range(0, 68, 2))
|
||||
@property
|
||||
def outputlevels(self) -> tuple:
|
||||
""" returns the entire level array across all outputs """
|
||||
return tuple(((1 << 16) - 1) - int.from_bytes(self._outputLeveldB100[i:i+2], 'little') for i in range(0, 128, 2))
|
||||
@property
|
||||
def stripstate(self) -> tuple:
|
||||
""" returns tuple of strip states accessable through bit modes """
|
||||
return tuple(self._stripState[i:i+4] for i in range(0, 32, 4))
|
||||
@property
|
||||
def busstate(self) -> tuple:
|
||||
""" returns tuple of bus states accessable through bit modes """
|
||||
return tuple(self._busState[i:i+4] for i in range(0, 32, 4))
|
||||
|
||||
"""
|
||||
these functions return an array of gainlayers[i] across all strips
|
||||
ie stripgainlayer1 = [strip[0].gainlayer[0], strip[1].gainlayer[0], strip[2].gainlayer[0]...]
|
||||
"""
|
||||
@property
|
||||
def stripgainlayer1(self) -> tuple:
|
||||
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer1[i:i+2], 'little') for i in range(0, 16, 2))
|
||||
@property
|
||||
def stripgainlayer2(self) -> tuple:
|
||||
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer2[i:i+2], 'little') for i in range(0, 16, 2))
|
||||
@property
|
||||
def stripgainlayer3(self) -> tuple:
|
||||
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer3[i:i+2], 'little') for i in range(0, 16, 2))
|
||||
@property
|
||||
def stripgainlayer4(self) -> tuple:
|
||||
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer4[i:i+2], 'little') for i in range(0, 16, 2))
|
||||
@property
|
||||
def stripgainlayer5(self) -> tuple:
|
||||
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer5[i:i+2], 'little') for i in range(0, 16, 2))
|
||||
@property
|
||||
def stripgainlayer6(self) -> tuple:
|
||||
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer6[i:i+2], 'little') for i in range(0, 16, 2))
|
||||
@property
|
||||
def stripgainlayer7(self) -> tuple:
|
||||
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer7[i:i+2], 'little') for i in range(0, 16, 2))
|
||||
@property
|
||||
def stripgainlayer8(self) -> tuple:
|
||||
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer8[i:i+2], 'little') for i in range(0, 16, 2))
|
||||
|
||||
@property
|
||||
def busgain(self) -> tuple:
|
||||
""" returns tuple of bus gains """
|
||||
return tuple(((1 << 16) - 1) - int.from_bytes(self._busGaindB100[i:i+2], 'little') for i in range(0, 16, 2))
|
||||
@property
|
||||
def striplabels(self) -> tuple:
|
||||
""" returns tuple of strip labels """
|
||||
return tuple(self._stripLabelUTF8c60[i:i+60].decode().strip('\x00') for i in range(0, 480, 60))
|
||||
@property
|
||||
def buslabels(self) -> tuple:
|
||||
""" returns tuple of bus labels """
|
||||
return tuple(self._busLabelUTF8c60[i:i+60].decode().strip('\x00') for i in range(0, 480, 60))
|
||||
|
||||
@dataclass
|
||||
class VBAN_VMRT_Packet_Header:
|
||||
""" RT PACKET header (expected from Voicemeeter server) """
|
||||
name='Voicemeeter-RTP'
|
||||
vban: bytes='VBAN'.encode()
|
||||
format_sr: bytes=(0x60).to_bytes(1, 'little')
|
||||
format_nbs: bytes=(0).to_bytes(1, 'little')
|
||||
format_nbc: bytes=(VBAN_SERVICE_RTPACKET).to_bytes(1, 'little')
|
||||
format_bit: bytes=(0).to_bytes(1, 'little')
|
||||
streamname: bytes=name.encode('ascii') + bytes(16-len(name))
|
||||
|
||||
@property
|
||||
def header(self):
|
||||
header = self.vban
|
||||
header += self.format_sr
|
||||
header += self.format_nbs
|
||||
header += self.format_nbc
|
||||
header += self.format_bit
|
||||
header += self.streamname
|
||||
assert len(header) == HEADER_SIZE-4, f'Header expected {HEADER_SIZE-4} bytes'
|
||||
return header
|
||||
|
||||
@dataclass
|
||||
class RegisterRTHeader:
|
||||
""" REGISTER RT PACKET header """
|
||||
name='Register RTP'
|
||||
timeout=15
|
||||
vban: bytes='VBAN'.encode()
|
||||
format_sr: bytes=(0x60).to_bytes(1, 'little')
|
||||
format_nbs: bytes=(0).to_bytes(1, 'little')
|
||||
format_nbc: bytes=(VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, 'little')
|
||||
format_bit: bytes=(timeout & 0x000000FF).to_bytes(1, 'little') # timeout
|
||||
streamname: bytes=name.encode('ascii') + bytes(16-len(name))
|
||||
framecounter: bytes=(0).to_bytes(4, 'little')
|
||||
|
||||
@property
|
||||
def header(self):
|
||||
header = self.vban
|
||||
header += self.format_sr
|
||||
header += self.format_nbs
|
||||
header += self.format_nbc
|
||||
header += self.format_bit
|
||||
header += self.streamname
|
||||
header += self.framecounter
|
||||
assert len(header) == HEADER_SIZE, f'Header expected {HEADER_SIZE} bytes'
|
||||
return header
|
||||
|
||||
@dataclass
|
||||
class TextRequestHeader:
|
||||
""" VBAN-TEXT request header """
|
||||
name: str
|
||||
bps_index: int
|
||||
channel: int
|
||||
vban: bytes='VBAN'.encode()
|
||||
nbs: bytes=(0).to_bytes(1, 'little')
|
||||
bit: bytes=(0x10).to_bytes(1, 'little')
|
||||
framecounter: bytes=(0).to_bytes(4, 'little')
|
||||
|
||||
@property
|
||||
def sr(self):
|
||||
return (0x40 + self.bps_index).to_bytes(1, 'little')
|
||||
@property
|
||||
def nbc(self):
|
||||
return (self.channel).to_bytes(1, 'little')
|
||||
@property
|
||||
def streamname(self):
|
||||
return self.name.encode() + bytes(16-len(self.name))
|
||||
|
||||
@property
|
||||
def header(self):
|
||||
header = self.vban
|
||||
header += self.sr
|
||||
header += self.nbs
|
||||
header += self.nbc
|
||||
header += self.bit
|
||||
header += self.streamname
|
||||
header += self.framecounter
|
||||
assert len(header) == HEADER_SIZE, f'Header expected {HEADER_SIZE} bytes'
|
||||
return header
|
@ -1,2 +0,0 @@
|
||||
class VMCMDErrors(Exception):
|
||||
pass
|
@ -1,28 +0,0 @@
|
||||
import sys
|
||||
import platform
|
||||
from collections import namedtuple
|
||||
from .errors import VMCMDErrors
|
||||
|
||||
"""
|
||||
Represents a major version of Voicemeeter and describes
|
||||
its strip layout.
|
||||
"""
|
||||
VMKind = namedtuple('VMKind', ['id', 'name', 'outs', 'ins', 'executable', 'vban'])
|
||||
|
||||
bits = 64 if sys.maxsize > 2**32 else 32
|
||||
os = platform.system()
|
||||
|
||||
_kind_map = {
|
||||
'basic': VMKind('basic', 'Basic', (2,1), (1,1), 'voicemeeter.exe', (4, 4)),
|
||||
'banana': VMKind('banana', 'Banana', (3,2), (3,2), 'voicemeeterpro.exe', (8, 8)),
|
||||
'potato': VMKind('potato', 'Potato', (5,3), (5,3),
|
||||
f'voicemeeter8{"x64" if bits == 64 else ""}.exe', (8, 8))
|
||||
}
|
||||
|
||||
def get(kind_id):
|
||||
try:
|
||||
return _kind_map[kind_id]
|
||||
except KeyError:
|
||||
raise VMCMDErrors(f'Invalid Voicemeeter kind: {kind_id}')
|
||||
|
||||
all = list(_kind_map.values())
|
@ -1,12 +0,0 @@
|
||||
from .errors import VMCMDErrors
|
||||
|
||||
def strip_output_prop(param):
|
||||
""" A strip output prop. """
|
||||
def fget(self):
|
||||
data = self._remote.public_packet
|
||||
return not int.from_bytes(data.stripstate[self.index], 'little') & getattr(self._modes, f'_bus{param.lower()}') == 0
|
||||
def fset(self, val):
|
||||
if not isinstance(val, bool) and val not in (0, 1):
|
||||
raise VMCMDErrors(f'{param} is a boolean parameter')
|
||||
self.setter(param, 1 if val else 0)
|
||||
return property(fget, fset)
|
@ -1,216 +0,0 @@
|
||||
from .errors import VMCMDErrors
|
||||
from . import channel
|
||||
from .channel import Channel
|
||||
from . import kinds
|
||||
from .meta import strip_output_prop
|
||||
|
||||
class InputStrip(Channel):
|
||||
""" Base class for input strips. """
|
||||
@classmethod
|
||||
def make(cls, is_physical, remote, index, **kwargs):
|
||||
"""
|
||||
Factory function for input strips.
|
||||
Returns a physical/virtual strip of a kind.
|
||||
"""
|
||||
PhysStrip, VirtStrip = _strip_pairs[remote.kind.id]
|
||||
InputStrip = PhysStrip if is_physical else VirtStrip
|
||||
GainLayerMixin = _make_gainlayer_mixin(remote, index)
|
||||
IS_cls = type(f'Strip{remote.kind.name}', (InputStrip, GainLayerMixin), {
|
||||
'levels': StripLevel(remote, index),
|
||||
})
|
||||
return IS_cls(remote, index, **kwargs)
|
||||
|
||||
@property
|
||||
def identifier(self):
|
||||
return f'Strip[{self.index}]'
|
||||
|
||||
@property
|
||||
def mono(self) -> bool:
|
||||
return not int.from_bytes(self.public_packet.stripstate[self.index], 'little') & self._modes._mono == 0
|
||||
|
||||
@mono.setter
|
||||
def mono(self, val: bool):
|
||||
if not isinstance(val, bool) and val not in (0,1):
|
||||
raise VMCMDErrors('mono is a boolean parameter')
|
||||
self.setter('mono', 1 if val else 0)
|
||||
|
||||
@property
|
||||
def solo(self) -> bool:
|
||||
return not int.from_bytes(self.public_packet.stripstate[self.index], 'little') & self._modes._solo == 0
|
||||
|
||||
@solo.setter
|
||||
def solo(self, val: bool):
|
||||
if not isinstance(val, bool) and val not in (0,1):
|
||||
raise VMCMDErrors('solo is a boolean parameter')
|
||||
self.setter('solo', 1 if val else 0)
|
||||
|
||||
@property
|
||||
def mute(self) -> bool:
|
||||
return not int.from_bytes(self.public_packet.stripstate[self.index], 'little') & self._modes._mute == 0
|
||||
|
||||
@mute.setter
|
||||
def mute(self, val: bool):
|
||||
if not isinstance(val, bool) and val not in (0,1):
|
||||
raise VMCMDErrors('mute is a boolean parameter')
|
||||
self.setter('mute', 1 if val else 0)
|
||||
|
||||
@property
|
||||
def limit(self) -> int:
|
||||
return
|
||||
|
||||
@limit.setter
|
||||
def limit(self, val: int):
|
||||
if val not in range(-40,13):
|
||||
raise VMCMDErrors('Expected value from -40 to 12')
|
||||
self.setter('limit', val)
|
||||
|
||||
@property
|
||||
def label(self) -> str:
|
||||
return self.public_packet.striplabels[self.index]
|
||||
|
||||
@label.setter
|
||||
def label(self, val: str):
|
||||
if not isinstance(val, str):
|
||||
raise VMCMDErrors('label is a string parameter')
|
||||
self.setter('label', val)
|
||||
|
||||
@property
|
||||
def gain(self) -> float:
|
||||
return self.gainlayer[0].gain
|
||||
|
||||
@gain.setter
|
||||
def gain(self, val: float):
|
||||
self.setter('gain', val)
|
||||
|
||||
|
||||
class PhysicalInputStrip(InputStrip):
|
||||
@property
|
||||
def comp(self) -> float:
|
||||
return
|
||||
|
||||
@comp.setter
|
||||
def comp(self, val: float):
|
||||
self.setter('Comp', val)
|
||||
|
||||
@property
|
||||
def gate(self) -> float:
|
||||
return
|
||||
|
||||
@gate.setter
|
||||
def gate(self, val: float):
|
||||
self.setter('gate', val)
|
||||
|
||||
@property
|
||||
def device(self):
|
||||
return
|
||||
|
||||
@property
|
||||
def sr(self):
|
||||
return
|
||||
|
||||
|
||||
class VirtualInputStrip(InputStrip):
|
||||
@property
|
||||
def mc(self) -> bool:
|
||||
return
|
||||
|
||||
@mc.setter
|
||||
def mc(self, val: bool):
|
||||
if not isinstance(val, bool) and val not in (0,1):
|
||||
raise VMCMDErrors('mc is a boolean parameter')
|
||||
self.setter('mc', 1 if val else 0)
|
||||
mono = mc
|
||||
|
||||
@property
|
||||
def k(self) -> int:
|
||||
return
|
||||
|
||||
@k.setter
|
||||
def k(self, val: int):
|
||||
if val not in range(5):
|
||||
raise VMCMDErrors('Expected value from 0 to 4')
|
||||
self.setter('karaoke', val)
|
||||
|
||||
|
||||
class StripLevel(InputStrip):
|
||||
def __init__(self, remote, index):
|
||||
super().__init__(remote, index)
|
||||
self.level_map = _strip_maps[remote.kind.id]
|
||||
|
||||
def getter_level(self, mode=None):
|
||||
def fget(i, data):
|
||||
val = data.inputlevels[i]
|
||||
return -val * 0.01
|
||||
|
||||
range_ = self.level_map[self.index]
|
||||
data = self.public_packet
|
||||
levels = tuple(round(fget(i, data), 1) for i in range(*range_))
|
||||
return levels
|
||||
|
||||
@property
|
||||
def prefader(self) -> tuple:
|
||||
return self.getter_level()
|
||||
|
||||
@property
|
||||
def postfader(self) -> tuple:
|
||||
return
|
||||
|
||||
@property
|
||||
def postmute(self) -> tuple:
|
||||
return
|
||||
|
||||
|
||||
class GainLayer(InputStrip):
|
||||
def __init__(self, remote, index, i):
|
||||
super().__init__(remote, index)
|
||||
self._i = i
|
||||
|
||||
@property
|
||||
def gain(self) -> float:
|
||||
def fget():
|
||||
val = getattr(self.public_packet, f'stripgainlayer{self._i+1}')[self.index]
|
||||
if val < 10000:
|
||||
return -val
|
||||
elif val == ((1 << 16) - 1):
|
||||
return 0
|
||||
else:
|
||||
return ((1 << 16) - 1) - val
|
||||
return round((fget() * 0.01), 1)
|
||||
|
||||
@gain.setter
|
||||
def gain(self, val: float):
|
||||
self.setter(f'GainLayer[{self._i}]', val)
|
||||
|
||||
|
||||
def _make_gainlayer_mixin(remote, index):
|
||||
""" Creates a GainLayer mixin """
|
||||
return type(f'GainlayerMixin', (), {
|
||||
'gainlayer': tuple(GainLayer(remote, index, i) for i in range(8))
|
||||
})
|
||||
|
||||
def _make_strip_mixin(kind):
|
||||
""" Creates a mixin with the kind's strip layout set as class variables. """
|
||||
num_A, num_B = kind.outs
|
||||
return type(f'StripMixin{kind.name}', (), {
|
||||
**{f'A{i}': strip_output_prop(f'A{i}') for i in range(1, num_A+1)},
|
||||
**{f'B{i}': strip_output_prop(f'B{i}') for i in range(1, num_B+1)}
|
||||
})
|
||||
|
||||
_strip_mixins = {kind.id: _make_strip_mixin(kind) for kind in kinds.all}
|
||||
|
||||
def _make_strip_pair(kind):
|
||||
""" Creates a PhysicalInputStrip and a VirtualInputStrip of a kind. """
|
||||
StripMixin = _strip_mixins[kind.id]
|
||||
PhysStrip = type(f'PhysicalInputStrip{kind.name}', (PhysicalInputStrip, StripMixin), {})
|
||||
VirtStrip = type(f'VirtualInputStrip{kind.name}', (VirtualInputStrip, StripMixin), {})
|
||||
return (PhysStrip, VirtStrip)
|
||||
|
||||
_strip_pairs = {kind.id: _make_strip_pair(kind) for kind in kinds.all}
|
||||
|
||||
def _make_strip_level_map(kind):
|
||||
phys_in, virt_in = kind.ins
|
||||
phys_map = tuple((i, i+2) for i in range(0, phys_in*2, 2))
|
||||
virt_map = tuple((i, i+8) for i in range(phys_in*2, phys_in*2+virt_in*8, 8))
|
||||
return phys_map+virt_map
|
||||
|
||||
_strip_maps = {kind.id: _make_strip_level_map(kind) for kind in kinds.all}
|
@ -1,195 +0,0 @@
|
||||
import abc
|
||||
import select
|
||||
import socket
|
||||
from time import sleep
|
||||
from threading import Thread
|
||||
from typing import NamedTuple, NoReturn
|
||||
|
||||
from .errors import VMCMDErrors
|
||||
from . import kinds
|
||||
from .dataclass import (
|
||||
HEADER_SIZE,
|
||||
VBAN_VMRT_Packet_Data,
|
||||
VBAN_VMRT_Packet_Header,
|
||||
RegisterRTHeader,
|
||||
TextRequestHeader
|
||||
)
|
||||
from .strip import InputStrip
|
||||
from .bus import OutputBus
|
||||
|
||||
class VbanCmd(abc.ABC):
|
||||
def __init__(self, **kwargs):
|
||||
self._ip = kwargs['ip']
|
||||
self._port = kwargs['port']
|
||||
self._streamname = kwargs['streamname']
|
||||
self._bps = kwargs['bps']
|
||||
self._channel = kwargs['channel']
|
||||
self._delay = kwargs['delay']
|
||||
self._bps_opts = \
|
||||
[0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 31250,
|
||||
38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800,921600,
|
||||
1000000, 1500000, 2000000, 3000000]
|
||||
|
||||
if self._channel not in range(256):
|
||||
raise VMCMDErrors('Channel must be in range 0 to 255')
|
||||
self._text_header = TextRequestHeader(
|
||||
name=self._streamname,
|
||||
bps_index=self._bps_opts.index(self._bps),
|
||||
channel=self._channel
|
||||
)
|
||||
self._register_rt_header = RegisterRTHeader()
|
||||
self.expected_packet = VBAN_VMRT_Packet_Header()
|
||||
|
||||
self._rt_register_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self._rt_packet_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self._sendrequest_string_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
|
||||
is_readable = []
|
||||
is_writable = [self._rt_register_socket, self._rt_packet_socket, self._sendrequest_string_socket]
|
||||
is_error = []
|
||||
self.ready_to_read, self.ready_to_write, in_error = select.select(is_readable, is_writable, is_error, 60)
|
||||
self._public_packet = None
|
||||
|
||||
def __enter__(self):
|
||||
self._rt_packet_socket.bind((socket.gethostbyname(socket.gethostname()), self._port))
|
||||
worker = Thread(target=self._send_register_rt, daemon=True)
|
||||
worker.start()
|
||||
self._public_packet = self._get_rt()
|
||||
return self
|
||||
|
||||
def _send_register_rt(self):
|
||||
if self._rt_register_socket in self.ready_to_write:
|
||||
while True:
|
||||
self._rt_register_socket.sendto(
|
||||
self._register_rt_header.header + bytes(1), (socket.gethostbyname(self._ip), self._port)
|
||||
)
|
||||
count = int.from_bytes(self._register_rt_header.framecounter, 'little') + 1
|
||||
self._register_rt_header.framecounter = count.to_bytes(4, 'little')
|
||||
sleep(10)
|
||||
|
||||
def _fetch_rt_packet(self):
|
||||
if self._rt_packet_socket in self.ready_to_write:
|
||||
data, _ = self._rt_packet_socket.recvfrom(1024*1024*2)
|
||||
# check for packet data
|
||||
if len(data) > HEADER_SIZE:
|
||||
# check if packet is of type rt service
|
||||
if self.expected_packet.header == data[:HEADER_SIZE-4]:
|
||||
return VBAN_VMRT_Packet_Data(
|
||||
_voicemeeterType=data[28:29],
|
||||
_reserved=data[29:30],
|
||||
_buffersize=data[30:32],
|
||||
_voicemeeterVersion=data[32:36],
|
||||
_optionBits=data[36:40],
|
||||
_samplerate=data[40:44],
|
||||
_inputLeveldB100=data[44:112],
|
||||
_outputLeveldB100=data[112:240],
|
||||
_TransportBit=data[240:244],
|
||||
_stripState=data[244:276],
|
||||
_busState=data[276:308],
|
||||
_stripGaindB100Layer1=data[308:324],
|
||||
_stripGaindB100Layer2=data[324:340],
|
||||
_stripGaindB100Layer3=data[340:356],
|
||||
_stripGaindB100Layer4=data[356:372],
|
||||
_stripGaindB100Layer5=data[372:388],
|
||||
_stripGaindB100Layer6=data[388:404],
|
||||
_stripGaindB100Layer7=data[404:420],
|
||||
_stripGaindB100Layer8=data[420:436],
|
||||
_busGaindB100=data[436:452],
|
||||
_stripLabelUTF8c60=data[452:932],
|
||||
_busLabelUTF8c60=data[932:1412],
|
||||
)
|
||||
|
||||
@property
|
||||
def public_packet(self):
|
||||
return self._public_packet or self._get_rt()
|
||||
@public_packet.setter
|
||||
def public_packet(self, val):
|
||||
self._public_packet = val
|
||||
|
||||
def _get_rt(self):
|
||||
def fget():
|
||||
data = False
|
||||
while not data:
|
||||
data = self._fetch_rt_packet()
|
||||
return data
|
||||
return fget()
|
||||
|
||||
def set_rt(self, id_, param=None, val=None):
|
||||
cmd = id_ if not param and val else f'{id_}.{param}={val}'
|
||||
if self._sendrequest_string_socket in self.ready_to_write:
|
||||
self._sendrequest_string_socket.sendto(
|
||||
self._text_header.header + cmd.encode(), (socket.gethostbyname(self._ip), self._port)
|
||||
)
|
||||
count = int.from_bytes(self._text_header.framecounter, 'little') + 1
|
||||
self._text_header.framecounter = count.to_bytes(4, 'little')
|
||||
|
||||
def sendtext(self, cmd):
|
||||
self.set_rt(cmd)
|
||||
sleep(self._delay)
|
||||
|
||||
@property
|
||||
def type(self):
|
||||
return self.public_packet.voicemeetertype
|
||||
|
||||
@property
|
||||
def version(self):
|
||||
return self.public_packet.voicemeeterversion
|
||||
|
||||
def show(self) -> NoReturn:
|
||||
""" Shows Voicemeeter if it's hidden. """
|
||||
self.set_rt('Command', 'Show', 1)
|
||||
def hide(self) -> NoReturn:
|
||||
""" Hides Voicemeeter if it's shown. """
|
||||
self.set_rt('Command', 'Show', 0)
|
||||
def shutdown(self) -> NoReturn:
|
||||
""" Closes Voicemeeter. """
|
||||
self.set_rt('Command', 'Shutdown', 1)
|
||||
def restart(self) -> NoReturn:
|
||||
""" Restarts Voicemeeter's audio engine. """
|
||||
self.set_rt('Command', 'Restart', 1)
|
||||
|
||||
def close(self):
|
||||
self._rt_register_socket.close()
|
||||
self._sendrequest_string_socket.close()
|
||||
self._rt_packet_socket.close()
|
||||
|
||||
def __exit__(self, exc_type, exc_value, exc_traceback):
|
||||
self.close()
|
||||
|
||||
|
||||
def _make_remote(kind: NamedTuple) -> VbanCmd:
|
||||
"""
|
||||
Creates a new remote class and sets its number of inputs
|
||||
and outputs for a VM kind.
|
||||
|
||||
The returned class will subclass VbanCmd.
|
||||
"""
|
||||
def init(self, **kwargs):
|
||||
defaultkwargs = {
|
||||
'ip': None, 'port': 6990, 'streamname': 'Command1', 'bps': 0,
|
||||
'channel': 0, 'delay': 0.001
|
||||
}
|
||||
kwargs = defaultkwargs | kwargs
|
||||
VbanCmd.__init__(self, **kwargs)
|
||||
self.kind = kind
|
||||
self.phys_in, self.virt_in = kind.ins
|
||||
self.phys_out, self.virt_out = kind.outs
|
||||
self.strip = \
|
||||
tuple(InputStrip.make((i < self.phys_in), self, i)
|
||||
for i in range(self.phys_in + self.virt_in))
|
||||
self.bus = \
|
||||
tuple(OutputBus.make((i < self.phys_out), self, i)
|
||||
for i in range(self.phys_out + self.virt_out))
|
||||
|
||||
return type(f'VbanCmd{kind.name}', (VbanCmd,), {
|
||||
'__init__': init,
|
||||
})
|
||||
|
||||
_remotes = {kind.id: _make_remote(kind) for kind in kinds.all}
|
||||
|
||||
def connect(kind_id: str, **kwargs):
|
||||
try:
|
||||
VBANCMD_cls = _remotes[kind_id]
|
||||
return VBANCMD_cls(**kwargs)
|
||||
except KeyError as err:
|
||||
raise VMCMDErrors(f'Invalid Voicemeeter kind: {kind_id}')
|
Loading…
Reference in New Issue
Block a user