black reformatter

now using black reformatter
This commit is contained in:
onyx-and-iris 2022-03-26 23:00:09 +00:00
parent a94b2bb265
commit 5d878a5df0
11 changed files with 804 additions and 379 deletions

View File

@ -1,3 +1,3 @@
from .vbancmd import connect
__ALL__ = ['connect']
__ALL__ = ["connect"]

View File

@ -4,8 +4,10 @@ from .channel import Channel
from . import kinds
from .meta import bus_mode_prop, bus_bool_prop
class OutputBus(Channel):
""" Base class for output buses. """
"""Base class for output buses."""
@classmethod
def make(cls, is_physical, remote, index, *args, **kwargs):
"""
@ -14,37 +16,41 @@ class OutputBus(Channel):
"""
BusModeMixin = _make_bus_mode_mixin(cls)
OutputBus = PhysicalOutputBus if is_physical else VirtualOutputBus
OB_cls = type(f'Bus{remote.kind.name}', (OutputBus,), {
'levels': BusLevel(remote, index),
'mode': BusModeMixin(remote, index),
})
OB_cls = type(
f"Bus{remote.kind.name}",
(OutputBus,),
{
"levels": BusLevel(remote, index),
"mode": BusModeMixin(remote, index),
},
)
return OB_cls(remote, index, *args, **kwargs)
@property
def identifier(self):
return f'Bus[{self.index}]'
return f"Bus[{self.index}]"
mute = bus_bool_prop('mute')
mute = bus_bool_prop("mute")
mono = bus_bool_prop('mono')
mono = bus_bool_prop("mono")
eq = bus_bool_prop('eq.On')
eq = bus_bool_prop("eq.On")
eq_ab = bus_bool_prop('eq.ab')
eq_ab = bus_bool_prop("eq.ab")
@property
def label(self) -> str:
val = self.getter('label')
val = self.getter("label")
if val is None:
val = self.public_packet.buslabels[self.index]
self._remote.cache[f'{self.identifier}.label'] = [val, False]
self._remote.cache[f"{self.identifier}.label"] = [val, False]
return val
@label.setter
def label(self, val: str):
if not isinstance(val, str):
raise VMCMDErrors('label is a string parameter')
self.setter('label', val)
raise VMCMDErrors("label is a string parameter")
self.setter("label", val)
@property
def gain(self) -> float:
@ -56,16 +62,16 @@ class OutputBus(Channel):
return 0
else:
return ((1 << 16) - 1) - val
val = self.getter('gain')
val = self.getter("gain")
if val is None:
val = round((fget() * 0.01), 1)
self._remote.cache[f'{self.identifier}.gain'] = [val, False]
self._remote.cache[f"{self.identifier}.gain"] = [val, False]
return round(val, 1)
@gain.setter
def gain(self, val: float):
self.setter('gain', val)
self.setter("gain", val)
class PhysicalOutputBus(OutputBus):
@ -101,16 +107,37 @@ class BusLevel(OutputBus):
def all(self) -> tuple:
return self.getter_level()
def _make_bus_level_map(kind):
phys_out, virt_out = kind.outs
return tuple((i, i+8) for i in range(0, (phys_out+virt_out)*8, 8))
return tuple((i, i + 8) for i in range(0, (phys_out + virt_out) * 8, 8))
_bus_maps = {kind.id: _make_bus_level_map(kind) for kind in kinds.all}
def _make_bus_mode_mixin(cls):
""" Creates a mixin of Bus Modes. """
return type('BusModeMixin', (cls,), {
**{f'{mode.lower()}': bus_mode_prop(mode) for mode in
['normal', 'Amix', 'Bmix', 'Repeat', 'Composite', 'TVMix', 'UpMix21',
'UpMix41', 'UpMix61', 'CenterOnly', 'LFEOnly', 'RearOnly']},
})
"""Creates a mixin of Bus Modes."""
return type(
"BusModeMixin",
(cls,),
{
**{
f"{mode.lower()}": bus_mode_prop(mode)
for mode in [
"normal",
"Amix",
"Bmix",
"Repeat",
"Composite",
"TVMix",
"UpMix21",
"UpMix41",
"UpMix61",
"CenterOnly",
"LFEOnly",
"RearOnly",
]
},
},
)

View File

@ -3,86 +3,101 @@ from .errors import VMCMDErrors
from dataclasses import dataclass
from time import sleep
@dataclass
class Modes:
""" Channel Modes """
_mute: hex=0x00000001
_solo: hex=0x00000002
_mono: hex=0x00000004
_mutec: hex=0x00000008
"""Channel Modes"""
_normal: hex=0x00000000
_amix: hex=0x00000010
_repeat: hex=0x00000020
_bmix: hex=0x00000030
_composite: hex=0x00000040
_tvmix: hex=0x00000050
_upmix21: hex=0x00000060
_upmix41: hex=0x00000070
_upmix61: hex=0x00000080
_centeronly:hex=0x00000090
_lfeonly: hex=0x000000A0
_rearonly: hex=0x000000B0
_mute: hex = 0x00000001
_solo: hex = 0x00000002
_mono: hex = 0x00000004
_mutec: hex = 0x00000008
_mask: hex=0x000000F0
_normal: hex = 0x00000000
_amix: hex = 0x00000010
_repeat: hex = 0x00000020
_bmix: hex = 0x00000030
_composite: hex = 0x00000040
_tvmix: hex = 0x00000050
_upmix21: hex = 0x00000060
_upmix41: hex = 0x00000070
_upmix61: hex = 0x00000080
_centeronly: hex = 0x00000090
_lfeonly: hex = 0x000000A0
_rearonly: hex = 0x000000B0
_eq_on: hex=0x00000100
_cross: hex=0x00000200
_eq_ab: hex=0x00000800
_mask: hex = 0x000000F0
_busa: hex=0x00001000
_busa1: hex=0x00001000
_busa2: hex=0x00002000
_busa3: hex=0x00004000
_busa4: hex=0x00008000
_busa5: hex=0x00080000
_eq_on: hex = 0x00000100
_cross: hex = 0x00000200
_eq_ab: hex = 0x00000800
_busb: hex=0x00010000
_busb1: hex=0x00010000
_busb2: hex=0x00020000
_busb3: hex=0x00040000
_busa: hex = 0x00001000
_busa1: hex = 0x00001000
_busa2: hex = 0x00002000
_busa3: hex = 0x00004000
_busa4: hex = 0x00008000
_busa5: hex = 0x00080000
_pan0: hex=0x00000000
_pancolor: hex=0x00100000
_panmod: hex=0x00200000
_panmask: hex=0x00F00000
_busb: hex = 0x00010000
_busb1: hex = 0x00010000
_busb2: hex = 0x00020000
_busb3: hex = 0x00040000
_postfx_r: hex=0x01000000
_postfx_d: hex=0x02000000
_postfx1: hex=0x04000000
_postfx2: hex=0x08000000
_pan0: hex = 0x00000000
_pancolor: hex = 0x00100000
_panmod: hex = 0x00200000
_panmask: hex = 0x00F00000
_sel: hex=0x10000000
_monitor: hex=0x20000000
_postfx_r: hex = 0x01000000
_postfx_d: hex = 0x02000000
_postfx1: hex = 0x04000000
_postfx2: hex = 0x08000000
_sel: hex = 0x10000000
_monitor: hex = 0x20000000
@property
def modevals(self):
return (val for val in [
self._normal, self._amix, self._repeat, self._bmix, self._composite,
self._tvmix, self._upmix21, self._upmix41, self._upmix61, self._centeronly,
self._lfeonly, self._rearonly,
])
return (
val
for val in [
self._normal,
self._amix,
self._repeat,
self._bmix,
self._composite,
self._tvmix,
self._upmix21,
self._upmix41,
self._upmix61,
self._centeronly,
self._lfeonly,
self._rearonly,
]
)
class Channel(abc.ABC):
""" Base class for InputStrip and OutputBus. """
"""Base class for InputStrip and OutputBus."""
def __init__(self, remote, index):
self._remote = remote
self.index = index
self._modes = Modes()
def getter(self, param):
cmd = f'{self.identifier}.{param}'
cmd = f"{self.identifier}.{param}"
if cmd in self._remote.cache and self._remote.cache[cmd][1]:
for _ in range(2):
if self._remote.pdirty:
val = self._remote.cache.pop(f'{self.identifier}.{param}')[0]
val = self._remote.cache.pop(f"{self.identifier}.{param}")[0]
return val
sleep(0.001)
def setter(self, param, val):
""" Sends a string request RT packet. """
self._remote.set_rt(f'{self.identifier}', param, val)
"""Sends a string request RT packet."""
self._remote.set_rt(f"{self.identifier}", param, val)
@abc.abstractmethod
def identifier(self):
@ -90,12 +105,12 @@ class Channel(abc.ABC):
@property
def public_packet(self):
""" Returns an RT data packet. """
"""Returns an RT data packet."""
return self._remote.public_packet
def apply(self, mapping):
""" Sets all parameters of a dict for the strip. """
"""Sets all parameters of a dict for the strip."""
for key, val in mapping.items():
if not hasattr(self, key):
raise VMCMDErrors(f'Invalid {self.identifier} attribute: {key}')
raise VMCMDErrors(f"Invalid {self.identifier} attribute: {key}")
setattr(self, key, val)

View File

@ -2,14 +2,16 @@ import abc
from .errors import VMCMDErrors
from .meta import action_prop
class ICommand(abc.ABC):
""" Command Base Class """
"""Command Base Class"""
def __init__(self, remote):
self._remote = remote
def setter(self, param, val):
""" Sends a string request RT packet. """
self._remote.set_rt(f'{self.identifier}', param, val)
"""Sends a string request RT packet."""
self._remote.set_rt(f"{self.identifier}", param, val)
@abc.abstractmethod
def identifier(self):
@ -17,7 +19,8 @@ class ICommand(abc.ABC):
class Command(ICommand):
""" Command Concrete Class """
"""Command Concrete Class"""
@classmethod
def make(cls, remote):
"""
@ -25,25 +28,33 @@ class Command(ICommand):
Returns a Command class of a kind.
"""
CMD_cls = type(f'Command{remote.kind.name}', (cls,), {
**{param: action_prop(param)
for param in ['show', 'shutdown', 'restart']},
'hide': action_prop('show', val=0),
})
CMD_cls = type(
f"Command{remote.kind.name}",
(cls,),
{
**{
param: action_prop(param)
for param in ["show", "shutdown", "restart"]
},
"hide": action_prop("show", val=0),
},
)
return CMD_cls(remote)
@property
def identifier(self) -> str:
return 'Command'
return "Command"
def set_showvbanchat(self, val: bool):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors('showvbanchat is a boolean parameter')
self.setter('DialogShow.VBANCHAT', 1 if val else 0)
if not isinstance(val, bool) and val not in (0, 1):
raise VMCMDErrors("showvbanchat is a boolean parameter")
self.setter("DialogShow.VBANCHAT", 1 if val else 0)
showvbanchat = property(fset=set_showvbanchat)
def set_lock(self, val: bool):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors('lock is a boolean parameter')
self.setter('lock', 1 if val else 0)
if not isinstance(val, bool) and val not in (0, 1):
raise VMCMDErrors("lock is a boolean parameter")
self.setter("lock", 1 if val else 0)
lock = property(fset=set_lock)

View File

@ -1,13 +1,15 @@
from dataclasses import dataclass
VBAN_SERVICE_RTPACKETREGISTER=32
VBAN_SERVICE_RTPACKET=33
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
MAX_PACKET_SIZE = 1436
HEADER_SIZE = (4+1+1+1+1+16+4)
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 + 4
@dataclass
class VBAN_VMRT_Packet_Data:
""" RT Packet Data """
"""RT Packet Data"""
_voicemeeterType: bytes
_reserved: bytes
_buffersize: bytes
@ -15,7 +17,7 @@ class VBAN_VMRT_Packet_Data:
_optionBits: bytes
_samplerate: bytes
_inputLeveldB100: bytes
_outputLeveldB100: bytes
_outputLeveldB100: bytes
_TransportBit: bytes
_stripState: bytes
_busState: bytes
@ -32,165 +34,244 @@ class VBAN_VMRT_Packet_Data:
_busLabelUTF8c60: bytes
def isdirty(self, other):
""" defines the dirty flag """
if \
self._stripState == other._stripState and \
self._busState == other._busState and \
self._stripLabelUTF8c60 == other._stripLabelUTF8c60 and \
self._busLabelUTF8c60 == other._busLabelUTF8c60 and \
self._stripGaindB100Layer1 == other._stripGaindB100Layer1 and \
self._busGaindB100 == other._busGaindB100:
"""defines the dirty flag"""
if (
self._stripState == other._stripState
and self._busState == other._busState
and self._stripLabelUTF8c60 == other._stripLabelUTF8c60
and self._busLabelUTF8c60 == other._busLabelUTF8c60
and self._stripGaindB100Layer1 == other._stripGaindB100Layer1
and self._busGaindB100 == other._busGaindB100
):
return False
return True
@property
def voicemeetertype(self) -> str:
""" returns voicemeeter type as a string """
type_ = ('basic', 'banana', 'potato')
return type_[int.from_bytes(self._voicemeeterType, 'little')-1]
"""returns voicemeeter type as a string"""
type_ = ("basic", "banana", "potato")
return type_[int.from_bytes(self._voicemeeterType, "little") - 1]
@property
def voicemeeterversion(self) -> tuple:
""" returns voicemeeter version as a string """
return tuple(reversed(tuple(int.from_bytes(self._voicemeeterVersion[i:i+1], 'little') for i in range(4))))
"""returns voicemeeter version as a string"""
return tuple(
reversed(
tuple(
int.from_bytes(self._voicemeeterVersion[i : i + 1], "little")
for i in range(4)
)
)
)
@property
def samplerate(self) -> int:
""" returns samplerate as an int """
return int.from_bytes(self._samplerate, 'little')
"""returns samplerate as an int"""
return int.from_bytes(self._samplerate, "little")
@property
def inputlevels(self) -> tuple:
""" returns the entire level array across all inputs """
return tuple(((1 << 16) - 1) - int.from_bytes(self._inputLeveldB100[i:i+2], 'little') for i in range(0, 68, 2))
"""returns the entire level array across all inputs"""
return tuple(
((1 << 16) - 1) - int.from_bytes(self._inputLeveldB100[i : i + 2], "little")
for i in range(0, 68, 2)
)
@property
def outputlevels(self) -> tuple:
""" returns the entire level array across all outputs """
return tuple(((1 << 16) - 1) - int.from_bytes(self._outputLeveldB100[i:i+2], 'little') for i in range(0, 128, 2))
"""returns the entire level array across all outputs"""
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._outputLeveldB100[i : i + 2], "little")
for i in range(0, 128, 2)
)
@property
def stripstate(self) -> tuple:
""" returns tuple of strip states accessable through bit modes """
return tuple(self._stripState[i:i+4] for i in range(0, 32, 4))
"""returns tuple of strip states accessable through bit modes"""
return tuple(self._stripState[i : i + 4] for i in range(0, 32, 4))
@property
def busstate(self) -> tuple:
""" returns tuple of bus states accessable through bit modes """
return tuple(self._busState[i:i+4] for i in range(0, 32, 4))
"""returns tuple of bus states accessable through bit modes"""
return tuple(self._busState[i : i + 4] for i in range(0, 32, 4))
"""
these functions return an array of gainlayers[i] across all strips
ie stripgainlayer1 = [strip[0].gainlayer[0], strip[1].gainlayer[0], strip[2].gainlayer[0]...]
"""
"""
@property
def stripgainlayer1(self) -> tuple:
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer1[i:i+2], 'little') for i in range(0, 16, 2))
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer1[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def stripgainlayer2(self) -> tuple:
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer2[i:i+2], 'little') for i in range(0, 16, 2))
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer2[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def stripgainlayer3(self) -> tuple:
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer3[i:i+2], 'little') for i in range(0, 16, 2))
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer3[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def stripgainlayer4(self) -> tuple:
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer4[i:i+2], 'little') for i in range(0, 16, 2))
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer4[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def stripgainlayer5(self) -> tuple:
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer5[i:i+2], 'little') for i in range(0, 16, 2))
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer5[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def stripgainlayer6(self) -> tuple:
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer6[i:i+2], 'little') for i in range(0, 16, 2))
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer6[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def stripgainlayer7(self) -> tuple:
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer7[i:i+2], 'little') for i in range(0, 16, 2))
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer7[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def stripgainlayer8(self) -> tuple:
return tuple(((1 << 16) - 1) - int.from_bytes(self._stripGaindB100Layer8[i:i+2], 'little') for i in range(0, 16, 2))
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer8[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def busgain(self) -> tuple:
""" returns tuple of bus gains """
return tuple(((1 << 16) - 1) - int.from_bytes(self._busGaindB100[i:i+2], 'little') for i in range(0, 16, 2))
"""returns tuple of bus gains"""
return tuple(
((1 << 16) - 1) - int.from_bytes(self._busGaindB100[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def striplabels(self) -> tuple:
""" returns tuple of strip labels """
return tuple(self._stripLabelUTF8c60[i:i+60].decode('ascii').split('\x00')[0] for i in range(0, 480, 60))
"""returns tuple of strip labels"""
return tuple(
self._stripLabelUTF8c60[i : i + 60].decode("ascii")
for i in range(0, 480, 60)
)
@property
def buslabels(self) -> tuple:
""" returns tuple of bus labels """
return tuple(self._busLabelUTF8c60[i:i+60].decode('ascii').split('\x00')[0] for i in range(0, 480, 60))
"""returns tuple of bus labels"""
return tuple(
self._busLabelUTF8c60[i : i + 60].decode("ascii") for i in range(0, 480, 60)
)
@dataclass
class VBAN_VMRT_Packet_Header:
""" RT PACKET header (expected from Voicemeeter server) """
name='Voicemeeter-RTP'
vban: bytes='VBAN'.encode()
format_sr: bytes=(0x60).to_bytes(1, 'little')
format_nbs: bytes=(0).to_bytes(1, 'little')
format_nbc: bytes=(VBAN_SERVICE_RTPACKET).to_bytes(1, 'little')
format_bit: bytes=(0).to_bytes(1, 'little')
streamname: bytes=name.encode('ascii') + bytes(16-len(name))
"""RT PACKET header (expected from Voicemeeter server)"""
name = "Voicemeeter-RTP"
vban: bytes = "VBAN".encode()
format_sr: bytes = (0x60).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKET).to_bytes(1, "little")
format_bit: bytes = (0).to_bytes(1, "little")
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
@property
def header(self):
header = self.vban
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
assert len(header) == HEADER_SIZE-4, f'Header expected {HEADER_SIZE-4} bytes'
assert len(header) == HEADER_SIZE - 4, f"Header expected {HEADER_SIZE-4} bytes"
return header
@dataclass
class RegisterRTHeader:
""" REGISTER RT PACKET header """
name='Register RTP'
timeout=15
vban: bytes='VBAN'.encode()
format_sr: bytes=(0x60).to_bytes(1, 'little')
format_nbs: bytes=(0).to_bytes(1, 'little')
format_nbc: bytes=(VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, 'little')
format_bit: bytes=(timeout & 0x000000FF).to_bytes(1, 'little') # timeout
streamname: bytes=name.encode('ascii') + bytes(16-len(name))
framecounter: bytes=(0).to_bytes(4, 'little')
"""REGISTER RT PACKET header"""
name = "Register RTP"
timeout = 15
vban: bytes = "VBAN".encode()
format_sr: bytes = (0x60).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, "little")
format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, "little") # timeout
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
framecounter: bytes = (0).to_bytes(4, "little")
@property
def header(self):
header = self.vban
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
header += self.framecounter
assert len(header) == HEADER_SIZE, f'Header expected {HEADER_SIZE} bytes'
assert len(header) == HEADER_SIZE, f"Header expected {HEADER_SIZE} bytes"
return header
@dataclass
class TextRequestHeader:
""" VBAN-TEXT request header """
"""VBAN-TEXT request header"""
name: str
bps_index: int
channel: int
vban: bytes='VBAN'.encode()
nbs: bytes=(0).to_bytes(1, 'little')
bit: bytes=(0x10).to_bytes(1, 'little')
framecounter: bytes=(0).to_bytes(4, 'little')
vban: bytes = "VBAN".encode()
nbs: bytes = (0).to_bytes(1, "little")
bit: bytes = (0x10).to_bytes(1, "little")
framecounter: bytes = (0).to_bytes(4, "little")
@property
def sr(self):
return (0x40 + self.bps_index).to_bytes(1, 'little')
return (0x40 + self.bps_index).to_bytes(1, "little")
@property
def nbc(self):
return (self.channel).to_bytes(1, 'little')
return (self.channel).to_bytes(1, "little")
@property
def streamname(self):
return self.name.encode() + bytes(16-len(self.name))
return self.name.encode() + bytes(16 - len(self.name))
@property
def header(self):
header = self.vban
header = self.vban
header += self.sr
header += self.nbs
header += self.nbc
header += self.bit
header += self.streamname
header += self.framecounter
assert len(header) == HEADER_SIZE, f'Header expected {HEADER_SIZE} bytes'
assert len(header) == HEADER_SIZE, f"Header expected {HEADER_SIZE} bytes"
return header

View File

@ -7,22 +7,30 @@ from .errors import VMCMDErrors
Represents a major version of Voicemeeter and describes
its strip layout.
"""
VMKind = namedtuple('VMKind', ['id', 'name', 'ins', 'outs', 'executable', 'vban'])
VMKind = namedtuple("VMKind", ["id", "name", "ins", "outs", "executable", "vban"])
bits = 64 if sys.maxsize > 2**32 else 32
os = platform.system()
_kind_map = {
'basic': VMKind('basic', 'Basic', (2,1), (1,1), 'voicemeeter.exe', (4, 4)),
'banana': VMKind('banana', 'Banana', (3,2), (3,2), 'voicemeeterpro.exe', (8, 8)),
'potato': VMKind('potato', 'Potato', (5,3), (5,3),
f'voicemeeter8{"x64" if bits == 64 else ""}.exe', (8, 8))
"basic": VMKind("basic", "Basic", (2, 1), (1, 1), "voicemeeter.exe", (4, 4)),
"banana": VMKind("banana", "Banana", (3, 2), (3, 2), "voicemeeterpro.exe", (8, 8)),
"potato": VMKind(
"potato",
"Potato",
(5, 3),
(5, 3),
f'voicemeeter8{"x64" if bits == 64 else ""}.exe',
(8, 8),
),
}
def get(kind_id):
try:
return _kind_map[kind_id]
except KeyError:
raise VMCMDErrors(f'Invalid Voicemeeter kind: {kind_id}')
raise VMCMDErrors(f"Invalid Voicemeeter kind: {kind_id}")
all = list(_kind_map.values())

View File

@ -1,84 +1,274 @@
from .errors import VMCMDErrors
from time import sleep
def strip_bool_prop(param):
""" A strip bool prop. """
def fget(self):
val = self.getter(param)
if val is None:
val = not int.from_bytes(self.public_packet.stripstate[self.index], 'little') & getattr(self._modes, f'_{param}') == 0
self._remote.cache[f'{self.identifier}.{param}'] = [val, False]
return val
return val == 1
def fset(self, val):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors(f'{param} is a boolean parameter')
self.setter(param, 1 if val else 0)
return property(fget, fset)
"""A strip bool prop."""
def bus_bool_prop(param):
""" A bus bool prop. """
def fget(self):
val = self.getter(param)
if val is None:
val = not int.from_bytes(self.public_packet.busstate[self.index], 'little') & getattr(self._modes, f'_{param.replace(".", "_").lower()}') == 0
self._remote.cache[f'{self.identifier}.{param}'] = [val, False]
val = (
not int.from_bytes(self.public_packet.stripstate[self.index], "little")
& getattr(self._modes, f"_{param}")
== 0
)
self._remote.cache[f"{self.identifier}.{param}"] = [val, False]
return val
return val == 1
def fset(self, val):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors(f'{param} is a boolean parameter')
self.setter(param, 1 if val else 0)
return property(fget, fset)
def strip_output_prop(param):
""" A strip output prop. """
def fget(self):
val = self.getter(param)
if val is None:
val = not int.from_bytes(self.public_packet.stripstate[self.index], 'little') & getattr(self._modes, f'_bus{param.lower()}') == 0
self._remote.cache[f'{self.identifier}.{param}'] = [val, False]
return val
return val == 1
def fset(self, val):
if not isinstance(val, bool) and val not in (0, 1):
raise VMCMDErrors(f'{param} is a boolean parameter')
raise VMCMDErrors(f"{param} is a boolean parameter")
self.setter(param, 1 if val else 0)
return property(fget, fset)
def bus_bool_prop(param):
"""A bus bool prop."""
def fget(self):
val = self.getter(param)
if val is None:
val = (
not int.from_bytes(self.public_packet.busstate[self.index], "little")
& getattr(self._modes, f'_{param.replace(".", "_").lower()}')
== 0
)
self._remote.cache[f"{self.identifier}.{param}"] = [val, False]
return val
return val == 1
def fset(self, val):
if not isinstance(val, bool) and val not in (0, 1):
raise VMCMDErrors(f"{param} is a boolean parameter")
self.setter(param, 1 if val else 0)
return property(fget, fset)
def strip_output_prop(param):
"""A strip output prop."""
def fget(self):
val = self.getter(param)
if val is None:
val = (
not int.from_bytes(self.public_packet.stripstate[self.index], "little")
& getattr(self._modes, f"_bus{param.lower()}")
== 0
)
self._remote.cache[f"{self.identifier}.{param}"] = [val, False]
return val
return val == 1
def fset(self, val):
if not isinstance(val, bool) and val not in (0, 1):
raise VMCMDErrors(f"{param} is a boolean parameter")
self.setter(param, 1 if val else 0)
return property(fget, fset)
def bus_mode_prop(param):
""" A bus mode prop. """
"""A bus mode prop."""
def fget(self):
data = self.public_packet
modes = {
'normal': (False, False, False, False, False, False, False, False, False, False, False, False),
'amix': (False, True, False, True, False, True, False, True, False, True, False, True),
'repeat': (False, False, True, True, False, False, True, True, False, False, True, True),
'bmix': (False, True, True, True, False, True, True, True, False, True, True, True),
'composite': (False, False, False, False, True, True, True, True, False, False, False, False),
'tvmix': (False, True, False, True, True, True, True, True, False, True, False, True),
'upmix21': (False, False, True, True, True, True, True, True, False, False, True, True),
'upmix41': (False, True, True, True, True, True, True, True, False, True, True, True),
'upmix61': (False, False, False, False, False, False, False, False, True, True, True, True),
'centeronly': (False, True, False, True, False, True, False, True, True, True, True, True),
'lfeonly': (False, False, True, True, False, False, True, True, True, True, True, True),
'rearonly': (False, True, True, True, False, True, True, True, True, True, True, True),
"normal": (
False,
False,
False,
False,
False,
False,
False,
False,
False,
False,
False,
False,
),
"amix": (
False,
True,
False,
True,
False,
True,
False,
True,
False,
True,
False,
True,
),
"repeat": (
False,
False,
True,
True,
False,
False,
True,
True,
False,
False,
True,
True,
),
"bmix": (
False,
True,
True,
True,
False,
True,
True,
True,
False,
True,
True,
True,
),
"composite": (
False,
False,
False,
False,
True,
True,
True,
True,
False,
False,
False,
False,
),
"tvmix": (
False,
True,
False,
True,
True,
True,
True,
True,
False,
True,
False,
True,
),
"upmix21": (
False,
False,
True,
True,
True,
True,
True,
True,
False,
False,
True,
True,
),
"upmix41": (
False,
True,
True,
True,
True,
True,
True,
True,
False,
True,
True,
True,
),
"upmix61": (
False,
False,
False,
False,
False,
False,
False,
False,
True,
True,
True,
True,
),
"centeronly": (
False,
True,
False,
True,
False,
True,
False,
True,
True,
True,
True,
True,
),
"lfeonly": (
False,
False,
True,
True,
False,
False,
True,
True,
True,
True,
True,
True,
),
"rearonly": (
False,
True,
True,
True,
False,
True,
True,
True,
True,
True,
True,
True,
),
}
vals = tuple(not int.from_bytes(data.busstate[self.index], 'little') & val == 0 for val in self._modes.modevals)
val = self.getter(f'mode.{param}')
vals = tuple(
not int.from_bytes(data.busstate[self.index], "little") & val == 0
for val in self._modes.modevals
)
val = self.getter(f"mode.{param}")
if val is None:
val = vals == modes[param.lower()]
self._remote.cache[f'{self.identifier}.mode.{param}'] = [val, False]
self._remote.cache[f"{self.identifier}.mode.{param}"] = [val, False]
return val
return val == 1
def fset(self, val):
if not isinstance(val, bool) and val not in (0, 1):
raise VMCMDErrors(f'mode.{param} is a boolean parameter')
self.setter(f'mode.{param}', 1 if val else 0)
raise VMCMDErrors(f"mode.{param} is a boolean parameter")
self.setter(f"mode.{param}", 1 if val else 0)
return property(fget, fset)
def action_prop(param, val=1):
""" A param that performs an action """
"""A param that performs an action"""
def fdo(self):
self.setter(param, val)
return fdo

View File

@ -5,73 +5,76 @@ from pathlib import Path
profiles = {}
def _make_blank_profile(kind):
phys_in, virt_in = kind.ins
phys_out, virt_out = kind.outs
all_input_strip_config = {
'gain': 0.0,
'solo': False,
'mute': False,
'mono': False,
**{f'A{i}': False for i in range(1, phys_out+1)},
**{f'B{i}': False for i in range(1, virt_out+1)},
"gain": 0.0,
"solo": False,
"mute": False,
"mono": False,
**{f"A{i}": False for i in range(1, phys_out + 1)},
**{f"B{i}": False for i in range(1, virt_out + 1)},
}
phys_input_strip_config={
'comp': 0.0,
'gate': 0.0,
phys_input_strip_config = {
"comp": 0.0,
"gate": 0.0,
}
output_bus_config = {
'gain': 0.0,
'eq': False,
'mute': False,
'mono': False,
"gain": 0.0,
"eq": False,
"mute": False,
"mono": False,
}
all_ = {f'strip-{i}': all_input_strip_config for i in range(phys_in+virt_in)}
phys = {f'strip-{i}': phys_input_strip_config for i in range(phys_in)}
all_ = {f"strip-{i}": all_input_strip_config for i in range(phys_in + virt_in)}
phys = {f"strip-{i}": phys_input_strip_config for i in range(phys_in)}
abc = all_
for i in phys.keys():
abc[i] = all_[i] | phys[i]
return {
**abc,
**{f'bus-{i}': output_bus_config for i in range(phys_out+virt_out)}
**{f"bus-{i}": output_bus_config for i in range(phys_out + virt_out)},
}
def _make_base_profile(kind):
phys_in, virt_in = kind.ins
blank = _make_blank_profile(kind)
overrides = {
**{f'strip-{i}': dict(B1=True) for i in range(phys_in)},
**{f'strip-{i}': dict(A1=True) for i in range(phys_in, phys_in+virt_in)}
**{f"strip-{i}": dict(B1=True) for i in range(phys_in)},
**{f"strip-{i}": dict(A1=True) for i in range(phys_in, phys_in + virt_in)},
}
base = blank
for i in overrides.keys():
base[i] = blank[i] | overrides[i]
return base
for kind in kinds.all:
profiles[kind.id] = {
'blank': _make_blank_profile(kind),
'base': _make_base_profile(kind)
"blank": _make_blank_profile(kind),
"base": _make_base_profile(kind),
}
# Load profiles from config files in profiles/<kind_id>/<profile>.toml
for kind in kinds.all:
for kind in kinds.all:
profiles_paths = [
Path(project_path()) / 'profiles' / kind.id,
Path.cwd() / 'profiles' / kind.id,
Path.home() / 'Documents/Voicemeeter' / 'profiles' / kind.id,
Path(project_path()) / "profiles" / kind.id,
Path.cwd() / "profiles" / kind.id,
Path.home() / "Documents/Voicemeeter" / "profiles" / kind.id,
]
for path in profiles_paths:
if path.is_dir():
filenames = list(path.glob('*.toml'))
filenames = list(path.glob("*.toml"))
configs = {}
for filename in filenames:
name = filename.with_suffix('').stem
name = filename.with_suffix("").stem
try:
configs[name] = toml.load(filename)
except toml.TomlDecodeError:
print(f'Invalid TOML profile: {kind.id}/{filename.stem}')
print(f"Invalid TOML profile: {kind.id}/{filename.stem}")
for name, cfg in configs.items():
print(f'Loaded profile {kind.id}/{name}')
print(f"Loaded profile {kind.id}/{name}")
profiles[kind.id][name] = cfg

View File

@ -4,8 +4,10 @@ from .channel import Channel
from . import kinds
from .meta import strip_output_prop, strip_bool_prop
class InputStrip(Channel):
""" Base class for input strips. """
"""Base class for input strips."""
@classmethod
def make(cls, is_physical, remote, index, **kwargs):
"""
@ -15,20 +17,24 @@ class InputStrip(Channel):
PhysStrip, VirtStrip = _strip_pairs[remote.kind.id]
InputStrip = PhysStrip if is_physical else VirtStrip
GainLayerMixin = _make_gainlayer_mixin(remote, index)
IS_cls = type(f'Strip{remote.kind.name}', (InputStrip, GainLayerMixin), {
'levels': StripLevel(remote, index),
})
IS_cls = type(
f"Strip{remote.kind.name}",
(InputStrip, GainLayerMixin),
{
"levels": StripLevel(remote, index),
},
)
return IS_cls(remote, index, **kwargs)
@property
def identifier(self):
return f'Strip[{self.index}]'
return f"Strip[{self.index}]"
mono = strip_bool_prop('mono')
mono = strip_bool_prop("mono")
solo = strip_bool_prop('solo')
solo = strip_bool_prop("solo")
mute = strip_bool_prop('mute')
mute = strip_bool_prop("mute")
@property
def limit(self) -> int:
@ -36,35 +42,35 @@ class InputStrip(Channel):
@limit.setter
def limit(self, val: int):
if val not in range(-40,13):
raise VMCMDErrors('Expected value from -40 to 12')
self.setter('limit', val)
if val not in range(-40, 13):
raise VMCMDErrors("Expected value from -40 to 12")
self.setter("limit", val)
@property
def label(self) -> str:
val = self.getter('label')
val = self.getter("label")
if val is None:
val = self.public_packet.striplabels[self.index]
self._remote.cache[f'{self.identifier}.label'] = [val, False]
self._remote.cache[f"{self.identifier}.label"] = [val, False]
return val
@label.setter
def label(self, val: str):
if not isinstance(val, str):
raise VMCMDErrors('label is a string parameter')
self.setter('label', val)
raise VMCMDErrors("label is a string parameter")
self.setter("label", val)
@property
def gain(self) -> float:
val = self.getter('GainLayer[0]')
val = self.getter("GainLayer[0]")
if val is None:
val = self.gainlayer[0].gain
self._remote.cache[f'{self.identifier}.GainLayer[0]'] = [val, False]
self._remote.cache[f"{self.identifier}.GainLayer[0]"] = [val, False]
return round(val, 1)
@gain.setter
def gain(self, val: float):
self.setter('gain', val)
self.setter("gain", val)
class PhysicalInputStrip(InputStrip):
@ -74,7 +80,7 @@ class PhysicalInputStrip(InputStrip):
@comp.setter
def comp(self, val: float):
self.setter('Comp', val)
self.setter("Comp", val)
@property
def gate(self) -> float:
@ -82,8 +88,8 @@ class PhysicalInputStrip(InputStrip):
@gate.setter
def gate(self, val: float):
self.setter('gate', val)
self.setter("gate", val)
@property
def device(self):
return
@ -100,9 +106,10 @@ class VirtualInputStrip(InputStrip):
@mc.setter
def mc(self, val: bool):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors('mc is a boolean parameter')
self.setter('mc', 1 if val else 0)
if not isinstance(val, bool) and val not in (0, 1):
raise VMCMDErrors("mc is a boolean parameter")
self.setter("mc", 1 if val else 0)
mono = mc
@property
@ -112,8 +119,8 @@ class VirtualInputStrip(InputStrip):
@k.setter
def k(self, val: int):
if val not in range(5):
raise VMCMDErrors('Expected value from 0 to 4')
self.setter('karaoke', val)
raise VMCMDErrors("Expected value from 0 to 4")
self.setter("karaoke", val)
class StripLevel(InputStrip):
@ -152,54 +159,73 @@ class GainLayer(InputStrip):
@property
def gain(self) -> float:
def fget():
val = getattr(self.public_packet, f'stripgainlayer{self._i+1}')[self.index]
val = getattr(self.public_packet, f"stripgainlayer{self._i+1}")[self.index]
if val < 10000:
return -val
elif val == ((1 << 16) - 1):
return 0
else:
return ((1 << 16) - 1) - val
val = self.getter(f'GainLayer[{self._i}]')
val = self.getter(f"GainLayer[{self._i}]")
if val is None:
val = round((fget() * 0.01), 1)
self._remote.cache[f'{self.identifier}.GainLayer[{self._i}]'] = [val, False]
self._remote.cache[f"{self.identifier}.GainLayer[{self._i}]"] = [val, False]
return val
return round(val, 1)
@gain.setter
def gain(self, val: float):
self.setter(f'GainLayer[{self._i}]', val)
self.setter(f"GainLayer[{self._i}]", val)
def _make_gainlayer_mixin(remote, index):
""" Creates a GainLayer mixin """
return type(f'GainlayerMixin', (), {
'gainlayer': tuple(GainLayer(remote, index, i) for i in range(8))
})
"""Creates a GainLayer mixin"""
return type(
f"GainlayerMixin",
(),
{"gainlayer": tuple(GainLayer(remote, index, i) for i in range(8))},
)
def _make_strip_mixin(kind):
""" Creates a mixin with the kind's strip layout set as class variables. """
"""Creates a mixin with the kind's strip layout set as class variables."""
num_A, num_B = kind.outs
return type(f'StripMixin{kind.name}', (), {
**{f'A{i}': strip_output_prop(f'A{i}') for i in range(1, num_A+1)},
**{f'B{i}': strip_output_prop(f'B{i}') for i in range(1, num_B+1)}
})
return type(
f"StripMixin{kind.name}",
(),
{
**{f"A{i}": strip_output_prop(f"A{i}") for i in range(1, num_A + 1)},
**{f"B{i}": strip_output_prop(f"B{i}") for i in range(1, num_B + 1)},
},
)
_strip_mixins = {kind.id: _make_strip_mixin(kind) for kind in kinds.all}
def _make_strip_pair(kind):
""" Creates a PhysicalInputStrip and a VirtualInputStrip of a kind. """
"""Creates a PhysicalInputStrip and a VirtualInputStrip of a kind."""
StripMixin = _strip_mixins[kind.id]
PhysStrip = type(f'PhysicalInputStrip{kind.name}', (PhysicalInputStrip, StripMixin), {})
VirtStrip = type(f'VirtualInputStrip{kind.name}', (VirtualInputStrip, StripMixin), {})
PhysStrip = type(
f"PhysicalInputStrip{kind.name}", (PhysicalInputStrip, StripMixin), {}
)
VirtStrip = type(
f"VirtualInputStrip{kind.name}", (VirtualInputStrip, StripMixin), {}
)
return (PhysStrip, VirtStrip)
_strip_pairs = {kind.id: _make_strip_pair(kind) for kind in kinds.all}
def _make_strip_level_map(kind):
phys_in, virt_in = kind.ins
phys_map = tuple((i, i+2) for i in range(0, phys_in*2, 2))
virt_map = tuple((i, i+8) for i in range(phys_in*2, phys_in*2+virt_in*8, 8))
return phys_map+virt_map
phys_map = tuple((i, i + 2) for i in range(0, phys_in * 2, 2))
virt_map = tuple(
(i, i + 8) for i in range(phys_in * 2, phys_in * 2 + virt_in * 8, 8)
)
return phys_map + virt_map
_strip_maps = {kind.id: _make_strip_level_map(kind) for kind in kinds.all}
_strip_maps = {kind.id: _make_strip_level_map(kind) for kind in kinds.all}

View File

@ -2,14 +2,18 @@ from pathlib import Path
PROJECT_DIR = str(Path(__file__).parents[1])
def project_path():
return PROJECT_DIR
def cache(func):
""" check if recently cached was an updated value """
"""check if recently cached was an updated value"""
def wrapper(*args, **kwargs):
# setup cache check
res = func(*args, **kwargs)
# update cache
return res
return wrapper
return wrapper

View File

@ -13,44 +13,76 @@ from .dataclass import (
VBAN_VMRT_Packet_Data,
VBAN_VMRT_Packet_Header,
RegisterRTHeader,
TextRequestHeader
TextRequestHeader,
)
from .strip import InputStrip
from .bus import OutputBus
from .command import Command
class VbanCmd(abc.ABC):
def __init__(self, **kwargs):
self._ip = kwargs['ip']
self._port = kwargs['port']
self._streamname = kwargs['streamname']
self._bps = kwargs['bps']
self._channel = kwargs['channel']
self._delay = kwargs['delay']
self._ratelimiter = kwargs['ratelimiter']
self._bps_opts = \
[0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 31250,
38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800,921600,
1000000, 1500000, 2000000, 3000000]
self._ip = kwargs["ip"]
self._port = kwargs["port"]
self._streamname = kwargs["streamname"]
self._bps = kwargs["bps"]
self._channel = kwargs["channel"]
self._delay = kwargs["delay"]
self._ratelimiter = kwargs["ratelimiter"]
self._bps_opts = [
0,
110,
150,
300,
600,
1200,
2400,
4800,
9600,
14400,
19200,
31250,
38400,
57600,
115200,
128000,
230400,
250000,
256000,
460800,
921600,
1000000,
1500000,
2000000,
3000000,
]
if self._channel not in range(256):
raise VMCMDErrors('Channel must be in range 0 to 255')
raise VMCMDErrors("Channel must be in range 0 to 255")
self._text_header = TextRequestHeader(
name=self._streamname,
name=self._streamname,
bps_index=self._bps_opts.index(self._bps),
channel=self._channel
)
channel=self._channel,
)
self._register_rt_header = RegisterRTHeader()
self.expected_packet = VBAN_VMRT_Packet_Header()
self._rt_register_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._rt_packet_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sendrequest_string_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sendrequest_string_socket = socket.socket(
socket.AF_INET, socket.SOCK_DGRAM
)
is_readable = []
is_writable = [self._rt_register_socket, self._rt_packet_socket, self._sendrequest_string_socket]
is_writable = [
self._rt_register_socket,
self._rt_packet_socket,
self._sendrequest_string_socket,
]
is_error = []
self.ready_to_read, self.ready_to_write, in_error = select.select(is_readable, is_writable, is_error, 60)
self.ready_to_read, self.ready_to_write, in_error = select.select(
is_readable, is_writable, is_error, 60
)
self._public_packet = None
self.running = True
self._pdirty = False
@ -69,7 +101,9 @@ class VbanCmd(abc.ABC):
Register to RT service
Keep public packet updated.
"""
self._rt_packet_socket.bind((socket.gethostbyname(socket.gethostname()), self._port))
self._rt_packet_socket.bind(
(socket.gethostbyname(socket.gethostname()), self._port)
)
worker = Thread(target=self._send_register_rt, daemon=True)
worker.start()
self._public_packet = self._get_rt()
@ -77,7 +111,7 @@ class VbanCmd(abc.ABC):
worker2.start()
def _send_register_rt(self):
"""
"""
Continuously register to the RT Packet Service
This function to be run in its own thread.
@ -85,20 +119,23 @@ class VbanCmd(abc.ABC):
while self.running:
if self._rt_register_socket in self.ready_to_write:
self._rt_register_socket.sendto(
self._register_rt_header.header, (socket.gethostbyname(self._ip), self._port)
)
count = int.from_bytes(self._register_rt_header.framecounter, 'little') + 1
self._register_rt_header.framecounter = count.to_bytes(4, 'little')
self._register_rt_header.header,
(socket.gethostbyname(self._ip), self._port),
)
count = (
int.from_bytes(self._register_rt_header.framecounter, "little") + 1
)
self._register_rt_header.framecounter = count.to_bytes(4, "little")
sleep(10)
def _fetch_rt_packet(self) -> Optional[VBAN_VMRT_Packet_Data]:
""" Returns a valid RT Data Packet or None """
"""Returns a valid RT Data Packet or None"""
if self._rt_packet_socket in self.ready_to_write:
data, _ = self._rt_packet_socket.recvfrom(1024*1024*2)
data, _ = self._rt_packet_socket.recvfrom(1024 * 1024 * 2)
# check for packet data
if len(data) > HEADER_SIZE:
# check if packet is of type rt service
if self.expected_packet.header == data[:HEADER_SIZE-4]:
if self.expected_packet.header == data[: HEADER_SIZE - 4]:
return VBAN_VMRT_Packet_Data(
_voicemeeterType=data[28:29],
_reserved=data[29:30],
@ -126,18 +163,19 @@ class VbanCmd(abc.ABC):
@property
def pdirty(self):
""" True iff a parameter has changed """
"""True iff a parameter has changed"""
return self._pdirty
@property
def public_packet(self):
return self._public_packet
@public_packet.setter
def public_packet(self, val):
self._public_packet = val
def _keepupdated(self) -> NoReturn:
"""
"""
Continously update public packet in background.
Set parameter dirty flag.
@ -153,62 +191,73 @@ class VbanCmd(abc.ABC):
self.public_packet = private_packet
def _get_rt(self) -> VBAN_VMRT_Packet_Data:
""" Attempt to fetch data packet until a valid one found """
"""Attempt to fetch data packet until a valid one found"""
def fget():
data = False
while not data:
data = self._fetch_rt_packet()
return data
return fget()
def set_rt(self, id_: str, param: Optional[str]=None, val: Optional[Union[int, float]]=None):
""" Sends a string request command over a network. """
cmd = id_ if not param and val else f'{id_}.{param}={val}'
def set_rt(
self,
id_: str,
param: Optional[str] = None,
val: Optional[Union[int, float]] = None,
):
"""Sends a string request command over a network."""
cmd = id_ if not param and val else f"{id_}.{param}={val}"
if self._sendrequest_string_socket in self.ready_to_write:
self._sendrequest_string_socket.sendto(
self._text_header.header + cmd.encode(), (socket.gethostbyname(self._ip), self._port)
)
count = int.from_bytes(self._text_header.framecounter, 'little') + 1
self._text_header.framecounter = count.to_bytes(4, 'little')
self.cache[f'{id_}.{param}'] = [val, True]
self._text_header.header + cmd.encode(),
(socket.gethostbyname(self._ip), self._port),
)
count = int.from_bytes(self._text_header.framecounter, "little") + 1
self._text_header.framecounter = count.to_bytes(4, "little")
self.cache[f"{id_}.{param}"] = [val, True]
sleep(self._ratelimiter)
def sendtext(self, cmd):
""" Sends a multiple parameter string over a network. """
"""Sends a multiple parameter string over a network."""
self.set_rt(cmd)
sleep(self._delay)
@property
def type(self):
""" Returns the type of Voicemeeter installation. """
"""Returns the type of Voicemeeter installation."""
return self.public_packet.voicemeetertype
@property
def version(self):
""" Returns Voicemeeter's version as a tuple """
"""Returns Voicemeeter's version as a tuple"""
return self.public_packet.voicemeeterversion
def show(self) -> NoReturn:
""" Shows Voicemeeter if it's hidden. """
self.set_rt('Command', 'Show', 1)
"""Shows Voicemeeter if it's hidden."""
self.command.show()
def hide(self) -> NoReturn:
""" Hides Voicemeeter if it's shown. """
self.set_rt('Command', 'Show', 0)
"""Hides Voicemeeter if it's shown."""
self.command.hide()
def shutdown(self) -> NoReturn:
""" Closes Voicemeeter. """
self.set_rt('Command', 'Shutdown', 1)
"""Closes Voicemeeter."""
self.command.shutdown()
def restart(self) -> NoReturn:
""" Restarts Voicemeeter's audio engine. """
self.set_rt('Command', 'Restart', 1)
"""Restarts Voicemeeter's audio engine."""
self.command.restart()
def apply(self, mapping: dict):
""" Sets all parameters of a di """
"""Sets all parameters of a di"""
for key, submapping in mapping.items():
obj, index = key.split('-')
obj, index = key.split("-")
if obj in ('strip'):
if obj in ("strip"):
target = self.strip[int(index)]
elif obj in ('bus'):
elif obj in ("bus"):
target = self.bus[int(index)]
else:
raise ValueError(obj)
@ -217,9 +266,9 @@ class VbanCmd(abc.ABC):
def apply_profile(self, name: str):
try:
profile = self.profiles[name]
if 'extends' in profile:
base = self.profiles[profile['extends']]
del profile['extends']
if "extends" in profile:
base = self.profiles[profile["extends"]]
del profile["extends"]
for key in profile.keys():
if key in base:
base[key] |= profile[key]
@ -228,13 +277,13 @@ class VbanCmd(abc.ABC):
profile = base
self.apply(profile)
except KeyError:
raise VMCMDErrors(f'Unknown profile: {self.kind.id}/{name}')
raise VMCMDErrors(f"Unknown profile: {self.kind.id}/{name}")
def reset(self) -> NoReturn:
self.apply_profile('base')
self.apply_profile("base")
def logout(self):
""" sets thread flag, closes sockets """
"""sets thread flag, closes sockets"""
self.running = False
sleep(0.2)
self._rt_register_socket.close()
@ -252,38 +301,49 @@ def _make_remote(kind: NamedTuple) -> VbanCmd:
The returned class will subclass VbanCmd.
"""
def init(self, **kwargs):
defaultkwargs = {
'ip': None, 'port': 6990, 'streamname': 'Command1', 'bps': 0,
'channel': 0, 'delay': 0.001, 'ratelimiter': 0.018
}
"ip": None,
"port": 6990,
"streamname": "Command1",
"bps": 0,
"channel": 0,
"delay": 0.001,
"ratelimiter": 0.018,
}
kwargs = defaultkwargs | kwargs
VbanCmd.__init__(self, **kwargs)
self.kind = kind
self.phys_in, self.virt_in = kind.ins
self.phys_out, self.virt_out = kind.outs
self.strip = \
tuple(InputStrip.make((i < self.phys_in), self, i)
for i in range(self.phys_in + self.virt_in))
self.bus = \
tuple(OutputBus.make((i < self.phys_out), self, i)
for i in range(self.phys_out + self.virt_out))
self.strip = tuple(
InputStrip.make((i < self.phys_in), self, i)
for i in range(self.phys_in + self.virt_in)
)
self.bus = tuple(
OutputBus.make((i < self.phys_out), self, i)
for i in range(self.phys_out + self.virt_out)
)
self.command = Command.make(self)
def get_profiles(self):
return profiles.profiles[kind.id]
return type(f'VbanCmd{kind.name}', (VbanCmd,), {
'__init__': init,
'profiles': property(get_profiles)
})
return type(
f"VbanCmd{kind.name}",
(VbanCmd,),
{"__init__": init, "profiles": property(get_profiles)},
)
_remotes = {kind.id: _make_remote(kind) for kind in kinds.all}
def connect(kind_id: str, **kwargs):
""" Connect to Voicemeeter and sets its strip layout. """
"""Connect to Voicemeeter and sets its strip layout."""
try:
VBANCMD_cls = _remotes[kind_id]
return VBANCMD_cls(**kwargs)
except KeyError as err:
raise VMCMDErrors(f'Invalid Voicemeeter kind: {kind_id}')
raise VMCMDErrors(f"Invalid Voicemeeter kind: {kind_id}")