major version bump due to dependency change.

now packaged with poetry.
added to pypi.

major version bump due to dependency change.

interface reworked to match the remote-api interface.

readme updated with changes to installation

pre-commit hook temporarily removed
This commit is contained in:
onyx-and-iris
2022-06-16 16:10:06 +01:00
parent 933d182f60
commit b9db01c8f4
35 changed files with 1800 additions and 1251 deletions

3
vban_cmd/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .factory import request_vbancmd_obj as api
__ALL__ = ["api"]

269
vban_cmd/base.py Normal file
View File

@@ -0,0 +1,269 @@
import socket
import time
from abc import ABCMeta, abstractmethod
from enum import IntEnum
from threading import Thread
from typing import NoReturn, Optional, Union
from .packet import (
HEADER_SIZE,
RegisterRTHeader,
TextRequestHeader,
VBAN_VMRT_Packet_Data,
VBAN_VMRT_Packet_Header,
)
from .subject import Subject
from .util import script
Socket = IntEnum("Socket", "register request response", start=0)
class VbanCmd(metaclass=ABCMeta):
"""Base class responsible for communicating over VBAN RT Service"""
DELAY = 0.001
# fmt: off
BPS_OPTS = [
0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 31250,
38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800, 921600,
1000000, 1500000, 2000000, 3000000,
]
# fmt: on
def __init__(self, **kwargs):
for attr, val in kwargs.items():
setattr(self, attr, val)
self.text_header = TextRequestHeader(
name=self.streamname,
bps_index=self.BPS_OPTS.index(self.bps),
channel=self.channel,
)
self.register_header = RegisterRTHeader()
self.expected_packet = VBAN_VMRT_Packet_Header()
self.socks = tuple(
socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for _, _ in enumerate(Socket)
)
self.running = True
self.subject = Subject()
self.cache = {}
@abstractmethod
def __str__(self):
"""Ensure subclasses override str magic method"""
pass
def __enter__(self):
self.login()
return self
def login(self):
"""Start listening for RT Packets"""
self.socks[Socket.response.value].bind(
(socket.gethostbyname(socket.gethostname()), self.port)
)
worker = Thread(target=self._send_register_rt, daemon=True)
worker.start()
self._public_packet = self._get_rt()
worker2 = Thread(target=self._updates, daemon=True)
worker2.start()
def _send_register_rt(self):
"""Fires a subscription packet every 10 seconds"""
while self.running:
self.socks[Socket.register.value].sendto(
self.register_header.header,
(socket.gethostbyname(self.ip), self.port),
)
count = int.from_bytes(self.register_header.framecounter, "little") + 1
self.register_header.framecounter = count.to_bytes(4, "little")
time.sleep(10)
def _fetch_rt_packet(self) -> Optional[VBAN_VMRT_Packet_Data]:
"""Returns a valid RT Data Packet or None"""
data, _ = self.socks[Socket.response.value].recvfrom(2048)
# check for packet data
if len(data) > HEADER_SIZE:
# check if packet is of type VBAN
if self.expected_packet.header == data[: HEADER_SIZE - 4]:
# check if packet is of type vmrt_data
if int.from_bytes(data[4:5]) == int(0x60):
return VBAN_VMRT_Packet_Data(
_voicemeeterType=data[28:29],
_reserved=data[29:30],
_buffersize=data[30:32],
_voicemeeterVersion=data[32:36],
_optionBits=data[36:40],
_samplerate=data[40:44],
_inputLeveldB100=data[44:112],
_outputLeveldB100=data[112:240],
_TransportBit=data[240:244],
_stripState=data[244:276],
_busState=data[276:308],
_stripGaindB100Layer1=data[308:324],
_stripGaindB100Layer2=data[324:340],
_stripGaindB100Layer3=data[340:356],
_stripGaindB100Layer4=data[356:372],
_stripGaindB100Layer5=data[372:388],
_stripGaindB100Layer6=data[388:404],
_stripGaindB100Layer7=data[404:420],
_stripGaindB100Layer8=data[420:436],
_busGaindB100=data[436:452],
_stripLabelUTF8c60=data[452:932],
_busLabelUTF8c60=data[932:1412],
)
def _get_rt(self) -> VBAN_VMRT_Packet_Data:
"""Attempt to fetch data packet until a valid one found"""
def fget():
data = False
while not data:
data = self._fetch_rt_packet()
time.sleep(self.DELAY)
return data
return fget()
def _set_rt(
self,
id_: str,
param: Optional[str] = None,
val: Optional[Union[int, float]] = None,
):
"""Sends a string request command over a network."""
cmd = id_ if not param else f"{id_}.{param}={val}"
self.socks[Socket.request.value].sendto(
self.text_header.header + cmd.encode(),
(socket.gethostbyname(self.ip), self.port),
)
count = int.from_bytes(self.text_header.framecounter, "little") + 1
self.text_header.framecounter = count.to_bytes(4, "little")
if param:
self.cache[f"{id_}.{param}"] = val
@script
def sendtext(self, cmd):
"""Sends a multiple parameter string over a network."""
self._set_rt(cmd)
time.sleep(self.DELAY)
@property
def type(self) -> str:
"""Returns the type of Voicemeeter installation."""
return self.public_packet.voicemeetertype
@property
def version(self) -> str:
"""Returns Voicemeeter's version as a tuple"""
v1, v2, v3, v4 = self.public_packet.voicemeeterversion
return f"{v1}.{v2}.{v3}.{v4}"
@property
def pdirty(self):
"""True iff a parameter has changed"""
return self._pdirty
@property
def ldirty(self):
"""True iff a level value has changed."""
return self._ldirty
@property
def public_packet(self):
return self._public_packet
def clear_dirty(self):
while self.pdirty:
pass
def _updates(self) -> NoReturn:
while self.running:
private_packet = self._get_rt()
strip_comp, bus_comp = (
tuple(
not a == b
for a, b in zip(
private_packet.inputlevels, self.public_packet.inputlevels
)
),
tuple(
not a == b
for a, b in zip(
private_packet.outputlevels, self.public_packet.outputlevels
)
),
)
if self._public_packet != private_packet:
self._public_packet = private_packet
if private_packet.pdirty(self.public_packet):
self.subject.notify("pdirty")
if any(any(list_) for list_ in (strip_comp, bus_comp)):
self.subject.notify(
"ldirty",
(
self.public_packet.inputlevels,
strip_comp,
self.public_packet.outputlevels,
bus_comp,
),
)
time.sleep(self.ratelimit)
@property
def strip_levels(self):
"""Returns the full strip level array for a kind, PREFADER mode, before math conversion"""
return tuple(
list(filter(lambda x: x != ((1 << 16) - 1), self.public_packet.inputlevels))
)
@property
def bus_levels(self):
"""Returns the full bus level array for a kind, before math conversion"""
return tuple(
list(
filter(lambda x: x != ((1 << 16) - 1), self.public_packet.outputlevels)
)
)
def apply(self, data: dict):
"""
Sets all parameters of a dict
minor delay between each recursion
"""
def param(key):
obj, m2, *rem = key.split("-")
index = int(m2) if m2.isnumeric() else int(*rem)
if obj in ("strip", "bus"):
return getattr(self, obj)[index]
else:
raise ValueError(obj)
[param(key).apply(datum).then_wait() for key, datum in data.items()]
def apply_config(self, name):
"""applies a config from memory"""
error_msg = (
f"No config with name '{name}' is loaded into memory",
f"Known configs: {list(self.configs.keys())}",
)
try:
self.apply(self.configs[name])
except KeyError as e:
print(("\n").join(error_msg))
print(f"Profile '{name}' applied!")
def logout(self):
self.running = False
time.sleep(0.2)
[sock.close() for sock in self.socks]
def __exit__(self, exc_type, exc_value, exc_traceback):
self.logout()

151
vban_cmd/bus.py Normal file
View File

@@ -0,0 +1,151 @@
from abc import abstractmethod
from typing import Union
from .iremote import IRemote
from .meta import bus_mode_prop, channel_bool_prop, channel_label_prop
class Bus(IRemote):
"""
Implements the common interface
Defines concrete implementation for bus
"""
@abstractmethod
def __str__(self):
pass
@property
def identifier(self) -> str:
return f"Bus[{self.index}]"
@property
def gain(self) -> float:
def fget():
val = self.public_packet.busgain[self.index]
if val < 10000:
return -val
elif val == ((1 << 16) - 1):
return 0
else:
return ((1 << 16) - 1) - val
val = self.getter("gain")
if val is None:
val = fget() * 0.01
return round(val, 1)
@gain.setter
def gain(self, val: float):
self.setter("gain", val)
class PhysicalBus(Bus):
def __str__(self):
return f"{type(self).__name__}{self.index}"
@property
def device(self) -> str:
return
@property
def sr(self) -> int:
return
class VirtualBus(Bus):
def __str__(self):
return f"{type(self).__name__}{self.index}"
class BusLevel(IRemote):
def __init__(self, remote, index):
super().__init__(remote, index)
self.level_map = tuple(
(i, i + 8)
for i in range(0, (remote.kind.phys_out + remote.kind.virt_out) * 8, 8)
)
def getter(self):
"""Returns a tuple of level values for the channel."""
range_ = self.level_map[self.index]
return tuple(
round(-i * 0.01, 1) for i in self._remote.bus_levels[range_[0] : range_[-1]]
)
@property
def identifier(self) -> str:
return f"Bus[{self.index}]"
@property
def all(self) -> tuple:
return self.getter()
@property
def updated(self) -> tuple:
return self._remote._bus_comp
def _make_bus_mode_mixin():
"""Creates a mixin of Bus Modes."""
def identifier(self) -> str:
return f"Bus[{self.index}].mode"
return type(
"BusModeMixin",
(IRemote,),
{
"identifier": property(identifier),
**{
mode: bus_mode_prop(mode)
for mode in [
"normal",
"amix",
"bmix",
"repeat",
"composite",
"tvmix",
"upmix21",
"upmix41",
"upmix61",
"centeronly",
"lfeonly",
"rearonly",
]
},
},
)
def bus_factory(phys_bus, remote, i) -> Union[PhysicalBus, VirtualBus]:
"""
Factory method for buses
Returns a physical or virtual bus subclass
"""
BUS_cls = PhysicalBus if phys_bus else VirtualBus
BUSMODEMIXIN_cls = _make_bus_mode_mixin()
return type(
f"{BUS_cls.__name__}{remote.kind}",
(BUS_cls,),
{
"levels": BusLevel(remote, i),
"mode": BUSMODEMIXIN_cls(remote, i),
**{param: channel_bool_prop(param) for param in ["mute", "mono"]},
"eq": channel_bool_prop("eq.On"),
"eq_ab": channel_bool_prop("eq.ab"),
"label": channel_label_prop(),
},
)(remote, i)
def request_bus_obj(phys_bus, remote, i) -> Bus:
"""
Bus entry point. Wraps factory method.
Returns a reference to a bus subclass of a kind
"""
return bus_factory(phys_bus, remote, i)

49
vban_cmd/command.py Normal file
View File

@@ -0,0 +1,49 @@
from .error import VMCMDErrors
from .iremote import IRemote
from .meta import action_prop
class Command(IRemote):
"""
Implements the common interface
Defines concrete implementation for command
"""
@classmethod
def make(cls, remote):
"""
Factory function for command class.
Returns a Command class of a kind.
"""
CMD_cls = type(
f"Command{remote.kind}",
(cls,),
{
**{
param: action_prop(param)
for param in ["show", "shutdown", "restart"]
},
"hide": action_prop("show", val=0),
},
)
return CMD_cls(remote)
@property
def identifier(self) -> str:
return "Command"
def set_showvbanchat(self, val: bool):
if not isinstance(val, bool) and val not in (0, 1):
raise VMCMDErrors("showvbanchat is a boolean parameter")
self.setter("DialogShow.VBANCHAT", 1 if val else 0)
showvbanchat = property(fset=set_showvbanchat)
def set_lock(self, val: bool):
if not isinstance(val, bool) and val not in (0, 1):
raise VMCMDErrors("lock is a boolean parameter")
self.setter("lock", 1 if val else 0)
lock = property(fset=set_lock)

191
vban_cmd/config.py Normal file
View File

@@ -0,0 +1,191 @@
import itertools
from pathlib import Path
import tomllib
from .kinds import request_kind_map as kindmap
class TOMLStrBuilder:
"""builds a config profile, as a string, for the toml parser"""
def __init__(self, kind):
self.kind = kind
self.phys_in, self.virt_in = kind.ins
self.phys_out, self.virt_out = kind.outs
self.higher = itertools.chain(
[f"strip-{i}" for i in range(kind.num_strip)],
[f"bus-{i}" for i in range(kind.num_bus)],
)
def init_config(self, profile=None):
self.virt_strip_params = (
[
"mute = false",
"mono = false",
"solo = false",
"gain = 0.0",
]
+ [f"A{i} = false" for i in range(1, self.phys_out + 1)]
+ [f"B{i} = false" for i in range(1, self.virt_out + 1)]
)
self.phys_strip_params = self.virt_strip_params + [
"comp = 0.0",
"gate = 0.0",
]
self.bus_bool = ["mono = false", "eq = false", "mute = false"]
if profile == "reset":
self.reset_config()
def reset_config(self):
self.phys_strip_params = list(
map(lambda x: x.replace("B1 = false", "B1 = true"), self.phys_strip_params)
)
self.virt_strip_params = list(
map(lambda x: x.replace("A1 = false", "A1 = true"), self.virt_strip_params)
)
def build(self, profile="reset"):
self.init_config(profile)
toml_str = str()
for eachclass in self.higher:
toml_str += f"[{eachclass}]\n"
toml_str = self.join(eachclass, toml_str)
return toml_str
def join(self, eachclass, toml_str):
kls, index = eachclass.split("-")
match kls:
case "strip":
toml_str += ("\n").join(
self.phys_strip_params
if int(index) < self.phys_in
else self.virt_strip_params
)
case "bus":
toml_str += ("\n").join(self.bus_bool)
case _:
pass
return toml_str + "\n"
class TOMLDataExtractor:
def __init__(self, file):
self._data = dict()
with open(file, "rb") as f:
self._data = tomllib.load(f)
@property
def data(self):
return self._data
@data.setter
def data(self, value):
self._data = value
def dataextraction_factory(file):
"""
factory function for parser
this opens the possibility for other parsers to be added
"""
if file.suffix == ".toml":
extractor = TOMLDataExtractor
else:
raise ValueError("Cannot extract data from {}".format(file))
return extractor(file)
class SingletonType(type):
"""ensure only a single instance of Loader object"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(SingletonType, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class Loader(metaclass=SingletonType):
"""
invokes the parser
checks if config already in memory
loads data into memory if not found
"""
def __init__(self, kind):
self._kind = kind
self._configs = dict()
self.defaults(kind)
self.parser = None
def defaults(self, kind):
self.builder = TOMLStrBuilder(kind)
toml_str = self.builder.build()
self.register("reset", tomllib.loads(toml_str))
def parse(self, identifier, data):
if identifier in self._configs:
print(f"config file with name {identifier} already in memory, skipping..")
return False
self.parser = dataextraction_factory(data)
return True
def register(self, identifier, data=None):
self._configs[identifier] = data if data else self.parser.data
print(f"config {self.name}/{identifier} loaded into memory")
def deregister(self):
self._configs.clear()
self.defaults(self._kind)
@property
def configs(self):
return self._configs
@property
def name(self):
return self._kind.name
def loader(kind):
"""
traverses defined paths for config files
directs the loader
returns configs loaded into memory
"""
loader = Loader(kind)
for path in (
Path.cwd() / "configs" / kind.name,
Path(__file__).parent / "configs" / kind.name,
Path.home() / "Documents/Voicemeeter" / "configs" / kind.name,
):
if path.is_dir():
print(f"Checking [{path}] for TOML config files:")
for file in path.glob("*.toml"):
identifier = file.with_suffix("").stem
if loader.parse(identifier, file):
loader.register(identifier)
return loader.configs
def request_config(kind_id: str):
"""
config entry point.
Returns all configs loaded into memory for a kind
"""
try:
configs = loader(kindmap(kind_id))
except KeyError as e:
print(f"Unknown Voicemeeter kind '{kind_id}'")
return configs

4
vban_cmd/error.py Normal file
View File

@@ -0,0 +1,4 @@
class VMCMDErrors(Exception):
"""general errors"""
pass

190
vban_cmd/factory.py Normal file
View File

@@ -0,0 +1,190 @@
from abc import abstractmethod
from enum import IntEnum
from functools import cached_property
from typing import Iterable, NoReturn, Self
from .base import VbanCmd
from .bus import request_bus_obj as bus
from .command import Command
from .config import request_config as configs
from .kinds import KindMapClass
from .kinds import request_kind_map as kindmap
from .strip import request_strip_obj as strip
class FactoryBuilder:
"""
Builder class for factories.
Separates construction from representation.
"""
BuilderProgress = IntEnum("BuilderProgress", "strip bus command", start=0)
def __init__(self, factory, kind: KindMapClass):
self._factory = factory
self.kind = kind
self._info = (
f"Finished building strips for {self._factory}",
f"Finished building buses for {self._factory}",
f"Finished building commands for {self._factory}",
)
def _pinfo(self, name: str) -> NoReturn:
"""prints progress status for each step"""
name = name.split("_")[1]
print(self._info[int(getattr(self.BuilderProgress, name))])
def make_strip(self) -> Self:
self._factory.strip = tuple(
strip(self.kind.phys_in < i, self._factory, i)
for i in range(self.kind.num_strip)
)
return self
def make_bus(self) -> Self:
self._factory.bus = tuple(
bus(self.kind.phys_out < i, self._factory, i)
for i in range(self.kind.num_bus)
)
return self
def make_command(self) -> Self:
self._factory.command = Command.make(self._factory)
return self
class FactoryBase(VbanCmd):
"""Base class for factories, subclasses VbanCmd."""
def __init__(self, kind_id: str, **kwargs):
defaultkwargs = {
"ip": None,
"port": 6980,
"streamname": "Command1",
"bps": 0,
"channel": 0,
"ratelimit": 0,
"sync": False,
}
kwargs = defaultkwargs | kwargs
self.kind = kindmap(kind_id)
super().__init__(**kwargs)
self.builder = FactoryBuilder(self, self.kind)
self._steps = (
self.builder.make_strip,
self.builder.make_bus,
self.builder.make_command,
)
self._configs = None
def __str__(self) -> str:
return f"Voicemeeter {self.kind}"
@property
@abstractmethod
def steps(self):
pass
@cached_property
def configs(self):
self._configs = configs(self.kind.name)
return self._configs
class BasicFactory(FactoryBase):
"""
Represents a Basic VbanCmd subclass
Responsible for directing the builder class
"""
def __new__(cls, *args, **kwargs):
if cls is BasicFactory:
raise TypeError(f"'{cls.__name__}' does not support direct instantiation")
return object.__new__(cls)
def __init__(self, kind_id, **kwargs):
super().__init__(kind_id, **kwargs)
[step()._pinfo(step.__name__) for step in self.steps]
@property
def steps(self) -> Iterable:
"""steps required to build the interface for a kind"""
return self._steps
class BananaFactory(FactoryBase):
"""
Represents a Banana VbanCmd subclass
Responsible for directing the builder class
"""
def __new__(cls, *args, **kwargs):
if cls is BananaFactory:
raise TypeError(f"'{cls.__name__}' does not support direct instantiation")
return object.__new__(cls)
def __init__(self, kind_id, **kwargs):
super().__init__(kind_id, **kwargs)
[step()._pinfo(step.__name__) for step in self.steps]
@property
def steps(self) -> Iterable:
"""steps required to build the interface for a kind"""
return self._steps
class PotatoFactory(FactoryBase):
"""
Represents a Potato VbanCmd subclass
Responsible for directing the builder class
"""
def __new__(cls, *args, **kwargs):
if cls is PotatoFactory:
raise TypeError(f"'{cls.__name__}' does not support direct instantiation")
return object.__new__(cls)
def __init__(self, kind_id: str, **kwargs):
super().__init__(kind_id, **kwargs)
[step()._pinfo(step.__name__) for step in self.steps]
@property
def steps(self) -> Iterable:
"""steps required to build the interface for a kind"""
return self._steps
def vbancmd_factory(kind_id: str, **kwargs) -> VbanCmd:
"""
Factory method, invokes a factory creation class of a kind
Returns a VbanCmd class of a kind
"""
match kind_id:
case "basic":
_factory = BasicFactory
case "banana":
_factory = BananaFactory
case "potato":
_factory = PotatoFactory
case _:
raise ValueError(f"Unknown Voicemeeter kind '{kind_id}'")
return type(f"VbanCmd{kind_id.capitalize()}", (_factory,), {})(kind_id, **kwargs)
def request_vbancmd_obj(kind_id: str, **kwargs) -> VbanCmd:
"""
Interface entry point. Wraps factory method and handles errors
Returns a reference to a VbanCmd class of a kind
"""
VBANCMD_obj = None
try:
VBANCMD_obj = vbancmd_factory(kind_id, **kwargs)
except (ValueError, TypeError) as e:
raise SystemExit(e)
return VBANCMD_obj

120
vban_cmd/iremote.py Normal file
View File

@@ -0,0 +1,120 @@
import time
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
@dataclass
class Modes:
"""Channel Modes"""
_mute: hex = 0x00000001
_solo: hex = 0x00000002
_mono: hex = 0x00000004
_mc: hex = 0x00000008
_amix: hex = 0x00000010
_repeat: hex = 0x00000020
_bmix: hex = 0x00000030
_composite: hex = 0x00000040
_tvmix: hex = 0x00000050
_upmix21: hex = 0x00000060
_upmix41: hex = 0x00000070
_upmix61: hex = 0x00000080
_centeronly: hex = 0x00000090
_lfeonly: hex = 0x000000A0
_rearonly: hex = 0x000000B0
_mask: hex = 0x000000F0
_eq_on: hex = 0x00000100
_cross: hex = 0x00000200
_eq_ab: hex = 0x00000800
_busa: hex = 0x00001000
_busa1: hex = 0x00001000
_busa2: hex = 0x00002000
_busa3: hex = 0x00004000
_busa4: hex = 0x00008000
_busa5: hex = 0x00080000
_busb: hex = 0x00010000
_busb1: hex = 0x00010000
_busb2: hex = 0x00020000
_busb3: hex = 0x00040000
_pan0: hex = 0x00000000
_pancolor: hex = 0x00100000
_panmod: hex = 0x00200000
_panmask: hex = 0x00F00000
_postfx_r: hex = 0x01000000
_postfx_d: hex = 0x02000000
_postfx1: hex = 0x04000000
_postfx2: hex = 0x08000000
_sel: hex = 0x10000000
_monitor: hex = 0x20000000
@property
def modevals(self):
return (
val
for val in [
self._amix,
self._repeat,
self._bmix,
self._composite,
self._tvmix,
self._upmix21,
self._upmix41,
self._upmix61,
self._centeronly,
self._lfeonly,
self._rearonly,
]
)
class IRemote(metaclass=ABCMeta):
"""
Common interface between base class and extended (higher) classes
Provides some default implementation
"""
def __init__(self, remote, index=None):
self._remote = remote
self.index = index
self._modes = Modes()
def getter(self, param):
cmd = f"{self.identifier}.{param}"
if cmd in self._remote.cache:
return self._remote.cache.pop(cmd)
def setter(self, param, val):
"""Sends a string request RT packet."""
self._remote._set_rt(f"{self.identifier}", param, val)
@abstractmethod
def identifier(self):
pass
@property
def public_packet(self):
"""Returns an RT data packet."""
return self._remote.public_packet
def apply(self, data):
"""Sets all parameters of a dict for the channel."""
script = ""
for attr, val in data.items():
if hasattr(self, attr):
self._remote.cache[f"{self.identifier}[{self.index}].{attr}"] = val
script += f"{self.identifier}[{self.index}].{attr}={val};"
self._remote.sendtext(script)
return self
def then_wait(self):
time.sleep(self._remote.DELAY)

104
vban_cmd/kinds.py Normal file
View File

@@ -0,0 +1,104 @@
from dataclasses import dataclass
from enum import Enum, unique
@unique
class KindId(Enum):
BASIC = 1
BANANA = 2
POTATO = 3
class SingletonType(type):
"""ensure only a single instance of a kind map object"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(SingletonType, cls).__call__(*args, **kwargs)
return cls._instances[cls]
@dataclass
class KindMapClass(metaclass=SingletonType):
name: str
ins: tuple
outs: tuple
vban: tuple
@property
def phys_in(self):
return self.ins[0]
@property
def virt_in(self):
return self.ins[-1]
@property
def phys_out(self):
return self.outs[0]
@property
def virt_out(self):
return self.outs[-1]
@property
def num_strip(self):
return sum(self.ins)
@property
def num_bus(self):
return sum(self.outs)
def __str__(self) -> str:
return self.name.capitalize()
@dataclass
class BasicMap(KindMapClass):
name: str
ins: tuple = (2, 1)
outs: tuple = (1, 1)
vban: tuple = (4, 4)
@dataclass
class BananaMap(KindMapClass):
name: str
ins: tuple = (3, 2)
outs: tuple = (3, 2)
vban: tuple = (8, 8)
@dataclass
class PotatoMap(KindMapClass):
name: str
ins: tuple = (5, 3)
outs: tuple = (5, 3)
vban: tuple = (8, 8)
def kind_factory(kind_id):
match kind_id:
case "basic":
_kind_map = BasicMap
case "banana":
_kind_map = BananaMap
case "potato":
_kind_map = PotatoMap
case _:
raise ValueError(f"Unknown Voicemeeter kind {kind_id}")
return _kind_map(name=kind_id)
def request_kind_map(kind_id):
KIND_obj = None
try:
KIND_obj = kind_factory(kind_id)
except ValueError as e:
print(e)
return KIND_obj
kinds_all = list(request_kind_map(kind_id.name.lower()) for kind_id in KindId)

109
vban_cmd/meta.py Normal file
View File

@@ -0,0 +1,109 @@
from functools import partial
from .error import VMCMDErrors
from .util import cache_bool, cache_string
def channel_bool_prop(param):
"""meta function for channel boolean parameters"""
@partial(cache_bool, param=param)
def fget(self):
return (
not int.from_bytes(
getattr(
self.public_packet,
f"{'strip' if 'strip' in type(self).__name__.lower() else 'bus'}state",
)[self.index],
"little",
)
& getattr(self._modes, f'_{param.replace(".", "_").lower()}')
== 0
)
def fset(self, val):
if not isinstance(val, bool) and val not in (0, 1):
raise VMCMDErrors(f"{param} is a boolean parameter")
self.setter(param, 1 if val else 0)
return property(fget, fset)
def channel_label_prop():
"""meta function for channel label parameters"""
@partial(cache_string, param="label")
def fget(self) -> str:
return getattr(
self.public_packet,
f"{'strip' if 'strip' in type(self).__name__.lower() else 'bus'}labels",
)[self.index]
def fset(self, val: str):
if not isinstance(val, str):
raise VMCMDErrors("label is a string parameter")
self.setter("label", val)
return property(fget, fset)
def strip_output_prop(param):
"""meta function for strip output parameters. (A1-A5, B1-B3)"""
@partial(cache_bool, param=param)
def fget(self):
return (
not int.from_bytes(self.public_packet.stripstate[self.index], "little")
& getattr(self._modes, f"_bus{param.lower()}")
== 0
)
def fset(self, val):
if not isinstance(val, bool) and val not in (0, 1):
raise VMCMDErrors(f"{param} is a boolean parameter")
self.setter(param, 1 if val else 0)
return property(fget, fset)
def bus_mode_prop(param):
"""meta function for bus mode parameters"""
@partial(cache_bool, param=param)
def fget(self):
modelist = {
"amix": (1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1),
"repeat": (0, 2, 2, 0, 0, 2, 2, 0, 0, 2, 2),
"bmix": (1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3),
"composite": (0, 0, 0, 4, 4, 4, 4, 0, 0, 0, 0),
"tvmix": (1, 0, 1, 4, 5, 4, 5, 0, 1, 0, 1),
"upmix21": (0, 2, 2, 4, 4, 6, 6, 0, 0, 2, 2),
"upmix41": (1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3),
"upmix61": (0, 0, 0, 0, 0, 0, 0, 8, 8, 8, 8),
"centeronly": (1, 0, 1, 0, 1, 0, 1, 8, 9, 8, 9),
"lfeonly": (0, 2, 2, 0, 0, 2, 2, 8, 8, 10, 10),
"rearonly": (1, 2, 3, 0, 1, 2, 3, 8, 9, 10, 11),
}
vals = (
int.from_bytes(self.public_packet.busstate[self.index], "little") & val
for val in self._modes.modevals
)
if param == "normal":
return not any(vals)
return tuple(round(val / 16) for val in vals) == modelist[param]
def fset(self, val):
if not isinstance(val, bool) and val not in (0, 1):
raise VMCMDErrors(f"{param} is a boolean parameter")
self.setter(param, 1 if val else 0)
return property(fget, fset)
def action_prop(param, val=1):
"""A param that performs an action"""
def fdo(self):
self.setter(param, val)
return fdo

284
vban_cmd/packet.py Normal file
View File

@@ -0,0 +1,284 @@
from dataclasses import dataclass
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 + 4
@dataclass
class VBAN_VMRT_Packet_Data:
"""Represents the structure of a VMRT data packet"""
_voicemeeterType: bytes
_reserved: bytes
_buffersize: bytes
_voicemeeterVersion: bytes
_optionBits: bytes
_samplerate: bytes
_inputLeveldB100: bytes
_outputLeveldB100: bytes
_TransportBit: bytes
_stripState: bytes
_busState: bytes
_stripGaindB100Layer1: bytes
_stripGaindB100Layer2: bytes
_stripGaindB100Layer3: bytes
_stripGaindB100Layer4: bytes
_stripGaindB100Layer5: bytes
_stripGaindB100Layer6: bytes
_stripGaindB100Layer7: bytes
_stripGaindB100Layer8: bytes
_busGaindB100: bytes
_stripLabelUTF8c60: bytes
_busLabelUTF8c60: bytes
def pdirty(self, other):
"""True iff any defined parameter has changed"""
return not (
self._stripState == other._stripState
and self._busState == other._busState
and self._stripLabelUTF8c60 == other._stripLabelUTF8c60
and self._busLabelUTF8c60 == other._busLabelUTF8c60
and self._stripGaindB100Layer1 == other._stripGaindB100Layer1
and self._stripGaindB100Layer2 == other._stripGaindB100Layer2
and self._stripGaindB100Layer3 == other._stripGaindB100Layer3
and self._stripGaindB100Layer4 == other._stripGaindB100Layer4
and self._stripGaindB100Layer5 == other._stripGaindB100Layer5
and self._stripGaindB100Layer6 == other._stripGaindB100Layer6
and self._stripGaindB100Layer7 == other._stripGaindB100Layer7
and self._stripGaindB100Layer8 == other._stripGaindB100Layer8
and self._busGaindB100 == other._busGaindB100
)
@property
def voicemeetertype(self) -> str:
"""returns voicemeeter type as a string"""
type_ = ("basic", "banana", "potato")
return type_[int.from_bytes(self._voicemeeterType, "little") - 1]
@property
def voicemeeterversion(self) -> tuple:
"""returns voicemeeter version as a tuple"""
return tuple(
reversed(
tuple(
int.from_bytes(self._voicemeeterVersion[i : i + 1], "little")
for i in range(4)
)
)
)
@property
def samplerate(self) -> int:
"""returns samplerate as an int"""
return int.from_bytes(self._samplerate, "little")
@property
def inputlevels(self) -> tuple:
"""returns the entire level array across all inputs"""
return tuple(
((1 << 16) - 1) - int.from_bytes(self._inputLeveldB100[i : i + 2], "little")
for i in range(0, 68, 2)
)
@property
def outputlevels(self) -> tuple:
"""returns the entire level array across all outputs"""
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._outputLeveldB100[i : i + 2], "little")
for i in range(0, 128, 2)
)
@property
def stripstate(self) -> tuple:
"""returns tuple of strip states accessable through bit modes"""
return tuple(self._stripState[i : i + 4] for i in range(0, 32, 4))
@property
def busstate(self) -> tuple:
"""returns tuple of bus states accessable through bit modes"""
return tuple(self._busState[i : i + 4] for i in range(0, 32, 4))
"""
these functions return an array of gainlayers[i] across all strips
ie stripgainlayer1 = [strip[0].gainlayer[0], strip[1].gainlayer[0], strip[2].gainlayer[0]...]
"""
@property
def stripgainlayer1(self) -> tuple:
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer1[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def stripgainlayer2(self) -> tuple:
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer2[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def stripgainlayer3(self) -> tuple:
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer3[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def stripgainlayer4(self) -> tuple:
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer4[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def stripgainlayer5(self) -> tuple:
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer5[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def stripgainlayer6(self) -> tuple:
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer6[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def stripgainlayer7(self) -> tuple:
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer7[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def stripgainlayer8(self) -> tuple:
return tuple(
((1 << 16) - 1)
- int.from_bytes(self._stripGaindB100Layer8[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def busgain(self) -> tuple:
"""returns tuple of bus gains"""
return tuple(
((1 << 16) - 1) - int.from_bytes(self._busGaindB100[i : i + 2], "little")
for i in range(0, 16, 2)
)
@property
def striplabels(self) -> tuple:
"""returns tuple of strip labels"""
return tuple(
self._stripLabelUTF8c60[i : i + 60].decode("ascii").split("\x00")[0]
for i in range(0, 480, 60)
)
@property
def buslabels(self) -> tuple:
"""returns tuple of bus labels"""
return tuple(
self._busLabelUTF8c60[i : i + 60].decode("ascii").split("\x00")[0]
for i in range(0, 480, 60)
)
@dataclass
class VBAN_VMRT_Packet_Header:
"""Represents a RESPONSE RT PACKET header"""
name = "Voicemeeter-RTP"
vban: bytes = "VBAN".encode()
format_sr: bytes = (0x60).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKET).to_bytes(1, "little")
format_bit: bytes = (0).to_bytes(1, "little")
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
@property
def header(self):
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
assert len(header) == HEADER_SIZE - 4, f"Header expected {HEADER_SIZE-4} bytes"
return header
@dataclass
class TextRequestHeader:
"""Represents a REQUEST RT PACKET header"""
name: str
bps_index: int
channel: int
vban: bytes = "VBAN".encode()
nbs: bytes = (0).to_bytes(1, "little")
bit: bytes = (0x10).to_bytes(1, "little")
framecounter: bytes = (0).to_bytes(4, "little")
@property
def sr(self):
return (0x40 + self.bps_index).to_bytes(1, "little")
@property
def nbc(self):
return (self.channel).to_bytes(1, "little")
@property
def streamname(self):
return self.name.encode() + bytes(16 - len(self.name))
@property
def header(self):
header = self.vban
header += self.sr
header += self.nbs
header += self.nbc
header += self.bit
header += self.streamname
header += self.framecounter
assert len(header) == HEADER_SIZE, f"Header expected {HEADER_SIZE} bytes"
return header
@dataclass
class RegisterRTHeader:
"""Represents a REGISTER RT PACKET header"""
name = "Register RTP"
timeout = 15
vban: bytes = "VBAN".encode()
format_sr: bytes = (0x60).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, "little")
format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, "little") # timeout
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
framecounter: bytes = (0).to_bytes(4, "little")
@property
def header(self):
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
header += self.framecounter
assert len(header) == HEADER_SIZE, f"Header expected {HEADER_SIZE} bytes"
return header

225
vban_cmd/strip.py Normal file
View File

@@ -0,0 +1,225 @@
from abc import abstractmethod
from typing import Union
from .iremote import IRemote
from .kinds import kinds_all
from .meta import channel_bool_prop, channel_label_prop, strip_output_prop
class Strip(IRemote):
"""
Implements the common interface
Defines concrete implementation for strip
"""
@abstractmethod
def __str__(self):
pass
@property
def identifier(self) -> str:
return f"Strip[{self.index}]"
@property
def limit(self) -> int:
return
@limit.setter
def limit(self, val: int):
self.setter("limit", val)
@property
def gain(self) -> float:
val = self.getter("gain")
if val is None:
val = self.gainlayer[0].gain
return round(val, 1)
@gain.setter
def gain(self, val: float):
self.setter("gain", val)
class PhysicalStrip(Strip):
def __str__(self):
return f"{type(self).__name__}{self.index}"
@property
def comp(self) -> float:
return
@comp.setter
def comp(self, val: float):
self.setter("Comp", val)
@property
def gate(self) -> float:
return
@gate.setter
def gate(self, val: float):
self.setter("gate", val)
@property
def device(self):
return
@property
def sr(self):
return
class VirtualStrip(Strip):
def __str__(self):
return f"{type(self).__name__}{self.index}"
mc = channel_bool_prop("mc")
mono = mc
@property
def k(self) -> int:
return
@k.setter
def k(self, val: int):
self.setter("karaoke", val)
class StripLevel(IRemote):
def __init__(self, remote, index):
super().__init__(remote, index)
phys_map = tuple((i, i + 2) for i in range(0, remote.kind.phys_in * 2, 2))
virt_map = tuple(
(i, i + 8)
for i in range(
remote.kind.phys_in * 2,
remote.kind.phys_in * 2 + remote.kind.virt_in * 8,
8,
)
)
self.level_map = phys_map + virt_map
def getter_prefader(self):
range_ = self.level_map[self.index]
return tuple(
round(-i * 0.01, 1)
for i in self._remote.strip_levels[range_[0] : range_[-1]]
)
@property
def identifier(self) -> str:
return f"Strip[{self.index}]"
@property
def prefader(self) -> tuple:
return self.getter_prefader()
@property
def postfader(self) -> tuple:
return
@property
def postmute(self) -> tuple:
return
@property
def updated(self) -> tuple:
return self._remote._strip_comp
class GainLayer(IRemote):
def __init__(self, remote, index, i):
super().__init__(remote, index)
self._i = i
@property
def identifier(self) -> str:
return f"Strip[{self.index}]"
@property
def gain(self) -> float:
def fget():
val = getattr(self.public_packet, f"stripgainlayer{self._i+1}")[self.index]
if val < 10000:
return -val
elif val == ((1 << 16) - 1):
return 0
else:
return ((1 << 16) - 1) - val
val = self.getter(f"GainLayer[{self._i}]")
if val is None:
val = fget() * 0.01
return round(val, 1)
@gain.setter
def gain(self, val: float):
self.setter(f"GainLayer[{self._i}]", val)
def _make_gainlayer_mixin(remote, index):
"""Creates a GainLayer mixin"""
return type(
f"GainlayerMixin",
(),
{
"gainlayer": tuple(
GainLayer(remote, index, i) for i in range(remote.kind.num_bus)
)
},
)
def _make_channelout_mixin(kind):
"""Creates a channel out property mixin"""
return type(
f"ChannelOutMixin{kind}",
(),
{
**{
f"A{i}": strip_output_prop(f"A{i}") for i in range(1, kind.phys_out + 1)
},
**{
f"B{i}": strip_output_prop(f"B{i}") for i in range(1, kind.virt_out + 1)
},
},
)
_make_channelout_mixins = {
kind.name: _make_channelout_mixin(kind) for kind in kinds_all
}
def strip_factory(is_phys_strip, remote, i) -> Union[PhysicalStrip, VirtualStrip]:
"""
Factory method for strips
Mixes in required classes
Returns a physical or virtual strip subclass
"""
STRIP_cls = PhysicalStrip if is_phys_strip else VirtualStrip
CHANNELOUTMIXIN_cls = _make_channelout_mixins[remote.kind.name]
GAINLAYERMIXIN_cls = _make_gainlayer_mixin(remote, i)
return type(
f"{STRIP_cls.__name__}{remote.kind}",
(STRIP_cls, CHANNELOUTMIXIN_cls, GAINLAYERMIXIN_cls),
{
"levels": StripLevel(remote, i),
**{param: channel_bool_prop(param) for param in ["mono", "solo", "mute"]},
"label": channel_label_prop(),
},
)(remote, i)
def request_strip_obj(is_phys_strip, remote, i) -> Strip:
"""
Strip entry point. Wraps factory method.
Returns a reference to a strip subclass of a kind
"""
return strip_factory(is_phys_strip, remote, i)

39
vban_cmd/subject.py Normal file
View File

@@ -0,0 +1,39 @@
class Subject:
"""Adds support for observers"""
def __init__(self):
"""list of current observers"""
self._observers = list()
@property
def observers(self) -> list:
"""returns the current observers"""
return self._observers
def notify(self, modifier=None, data=None):
"""run callbacks on update"""
[o.on_update(modifier, data) for o in self._observers]
def add(self, observer):
"""adds an observer to _observers"""
if observer not in self._observers:
self._observers.append(observer)
else:
print(f"Failed to add: {observer}")
def remove(self, observer):
"""removes an observer from _observers"""
try:
self._observers.remove(observer)
except ValueError:
print(f"Failed to remove: {observer}")
def clear(self):
"""clears the _observers list"""
self._observers.clear()

51
vban_cmd/util.py Normal file
View File

@@ -0,0 +1,51 @@
def cache_bool(func, param):
"""Check cache for a bool prop"""
def wrapper(*args, **kwargs):
self, *rem = args
cmd = f"{self.identifier}.{param}"
if cmd in self._remote.cache:
return self._remote.cache.pop(cmd) == 1
return func(*args, **kwargs)
return wrapper
def cache_string(func, param):
"""Check cache for a string prop"""
def wrapper(*args, **kwargs):
self, *rem = args
cmd = f"{self.identifier}.{param}"
if cmd in self._remote.cache:
return self._remote.cache.pop(cmd)
return func(*args, **kwargs)
return wrapper
def depth(d):
if isinstance(d, dict):
return 1 + (max(map(depth, d.values())) if d else 0)
return 0
def script(func):
"""Convert dictionary to script"""
def wrapper(*args):
remote, script = args
if isinstance(script, dict):
params = ""
for key, val in script.items():
obj, m2, *rem = key.split("-")
index = int(m2) if m2.isnumeric() else int(*rem)
params += ";".join(
f"{obj}{f'.{m2}stream' if not m2.isnumeric() else ''}[{index}].{k}={int(v) if isinstance(v, bool) else v}"
for k, v in val.items()
)
params += ";"
script = params
return func(remote, script)
return wrapper