mirror of
https://github.com/onyx-and-iris/voicemeeter-api-python.git
synced 2025-01-18 17:10:47 +00:00
346 lines
12 KiB
Python
346 lines
12 KiB
Python
import ctypes as ct
|
|
import logging
|
|
import threading
|
|
import time
|
|
from abc import abstractmethod
|
|
from queue import Queue
|
|
from typing import Iterable, Optional, Union
|
|
|
|
from .cbindings import CBindings
|
|
from .error import CAPIError, VMError
|
|
from .event import Event
|
|
from .inst import BITS
|
|
from .kinds import KindId
|
|
from .misc import Midi, VmGui
|
|
from .subject import Subject
|
|
from .updater import Producer, Updater
|
|
from .util import deep_merge, grouper, polling, script, timeout
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Remote(CBindings):
|
|
"""Base class responsible for wrapping the C Remote API"""
|
|
|
|
DELAY = 0.001
|
|
|
|
def __init__(self, **kwargs):
|
|
self.strip_mode = 0
|
|
self.cache = {}
|
|
self.midi = Midi()
|
|
self.subject = self.observer = Subject()
|
|
self.event = Event(
|
|
{k: kwargs.pop(k) for k in ("pdirty", "mdirty", "midi", "ldirty")}
|
|
)
|
|
self.gui = VmGui()
|
|
self.stop_event = None
|
|
self.logger = logger.getChild(self.__class__.__name__)
|
|
|
|
for attr, val in kwargs.items():
|
|
setattr(self, attr, val)
|
|
|
|
if self.bits not in (32, 64):
|
|
self.logger.warning(
|
|
f"kwarg bits got {self.bits}, expected either 32 or 64, defaulting to 64"
|
|
)
|
|
self.bits = 64
|
|
|
|
def __enter__(self):
|
|
"""setup procedures"""
|
|
self.login()
|
|
if self.event.any():
|
|
self.init_thread()
|
|
return self
|
|
|
|
@abstractmethod
|
|
def __str__(self):
|
|
"""Ensure subclasses override str magic method"""
|
|
pass
|
|
|
|
def init_thread(self):
|
|
"""Starts updates thread."""
|
|
self.event.info()
|
|
|
|
self.logger.debug("initiating events thread")
|
|
self.stop_event = threading.Event()
|
|
self.stop_event.clear()
|
|
queue = Queue()
|
|
self.updater = Updater(self, queue)
|
|
self.updater.start()
|
|
self.producer = Producer(self, queue, self.stop_event)
|
|
self.producer.start()
|
|
|
|
def stopped(self):
|
|
return self.stop_event is None or self.stop_event.is_set()
|
|
|
|
@timeout
|
|
def login(self) -> None:
|
|
"""Login to the API, initialize dirty parameters"""
|
|
self.gui.launched = self.call(self.bind_login, ok=(0, 1)) == 0
|
|
if not self.gui.launched:
|
|
self.logger.info(
|
|
"Voicemeeter engine running but GUI not launched. Launching the GUI now."
|
|
)
|
|
self.run_voicemeeter(self.kind.name)
|
|
|
|
def run_voicemeeter(self, kind_id: str) -> None:
|
|
if kind_id not in (kind.name.lower() for kind in KindId):
|
|
raise VMError(f"Unexpected Voicemeeter type: '{kind_id}'")
|
|
value = KindId[kind_id.upper()].value
|
|
if BITS == 64 and self.bits == 64:
|
|
value += 3
|
|
self.call(self.bind_run_voicemeeter, value)
|
|
|
|
@property
|
|
def type(self) -> str:
|
|
"""Returns the type of Voicemeeter installation (basic, banana, potato)."""
|
|
type_ = ct.c_long()
|
|
self.call(self.bind_get_voicemeeter_type, ct.byref(type_))
|
|
return KindId(type_.value).name.lower()
|
|
|
|
@property
|
|
def version(self) -> str:
|
|
"""Returns Voicemeeter's version as a string"""
|
|
ver = ct.c_long()
|
|
self.call(self.bind_get_voicemeeter_version, ct.byref(ver))
|
|
return "{}.{}.{}.{}".format(
|
|
(ver.value & 0xFF000000) >> 24,
|
|
(ver.value & 0x00FF0000) >> 16,
|
|
(ver.value & 0x0000FF00) >> 8,
|
|
ver.value & 0x000000FF,
|
|
)
|
|
|
|
@property
|
|
def pdirty(self) -> bool:
|
|
"""True iff UI parameters have been updated."""
|
|
return self.call(self.bind_is_parameters_dirty, ok=(0, 1)) == 1
|
|
|
|
@property
|
|
def mdirty(self) -> bool:
|
|
"""True iff MB parameters have been updated."""
|
|
try:
|
|
return self.call(self.bind_macro_button_is_dirty, ok=(0, 1)) == 1
|
|
except AttributeError as e:
|
|
self.logger.exception(f"{type(e).__name__}: {e}")
|
|
raise CAPIError("VBVMR_MacroButton_IsDirty", -9) from e
|
|
|
|
@property
|
|
def ldirty(self) -> bool:
|
|
"""True iff levels have been updated."""
|
|
self._strip_buf, self._bus_buf = self._get_levels()
|
|
return not (
|
|
self.cache.get("strip_level") == self._strip_buf
|
|
and self.cache.get("bus_level") == self._bus_buf
|
|
)
|
|
|
|
def clear_dirty(self) -> None:
|
|
try:
|
|
while self.pdirty or self.mdirty:
|
|
pass
|
|
except CAPIError as e:
|
|
if not (e.fn_name == "VBVMR_MacroButton_IsDirty" and e.code == -9):
|
|
raise
|
|
self.logger.error(f"{e} clearing pdirty only.")
|
|
while self.pdirty:
|
|
pass
|
|
|
|
@polling
|
|
def get(self, param: str, is_string: Optional[bool] = False) -> Union[str, float]:
|
|
"""Gets a string or float parameter"""
|
|
if is_string:
|
|
buf = ct.create_unicode_buffer(512)
|
|
self.call(self.bind_get_parameter_string_w, param.encode(), ct.byref(buf))
|
|
else:
|
|
buf = ct.c_float()
|
|
self.call(self.bind_get_parameter_float, param.encode(), ct.byref(buf))
|
|
return buf.value
|
|
|
|
def set(self, param: str, val: Union[str, float]) -> None:
|
|
"""Sets a string or float parameter. Caches value"""
|
|
if isinstance(val, str):
|
|
if len(val) >= 512:
|
|
raise VMError("String is too long")
|
|
self.call(
|
|
self.bind_set_parameter_string_w, param.encode(), ct.c_wchar_p(val)
|
|
)
|
|
else:
|
|
self.call(
|
|
self.bind_set_parameter_float, param.encode(), ct.c_float(float(val))
|
|
)
|
|
self.cache[param] = val
|
|
|
|
@polling
|
|
def get_buttonstatus(self, id_: int, mode: int) -> int:
|
|
"""Gets a macrobutton parameter"""
|
|
c_state = ct.c_float()
|
|
try:
|
|
self.call(
|
|
self.bind_macro_button_get_status,
|
|
ct.c_long(id_),
|
|
ct.byref(c_state),
|
|
ct.c_long(mode),
|
|
)
|
|
except AttributeError as e:
|
|
self.logger.exception(f"{type(e).__name__}: {e}")
|
|
raise CAPIError("VBVMR_MacroButton_GetStatus", -9) from e
|
|
return int(c_state.value)
|
|
|
|
def set_buttonstatus(self, id_: int, val: int, mode: int) -> None:
|
|
"""Sets a macrobutton parameter. Caches value"""
|
|
c_state = ct.c_float(float(val))
|
|
try:
|
|
self.call(
|
|
self.bind_macro_button_set_status,
|
|
ct.c_long(id_),
|
|
c_state,
|
|
ct.c_long(mode),
|
|
)
|
|
except AttributeError as e:
|
|
self.logger.exception(f"{type(e).__name__}: {e}")
|
|
raise CAPIError("VBVMR_MacroButton_SetStatus", -9) from e
|
|
self.cache[f"mb_{id_}_{mode}"] = int(c_state.value)
|
|
|
|
def get_num_devices(self, direction: str = None) -> int:
|
|
"""Retrieves number of physical devices connected"""
|
|
if direction not in ("in", "out"):
|
|
raise VMError("Expected a direction: in or out")
|
|
func = getattr(self, f"bind_{direction}put_get_device_number")
|
|
res = self.call(func, ok_exp=lambda r: r >= 0)
|
|
return res
|
|
|
|
def get_device_description(self, index: int, direction: str = None) -> tuple:
|
|
"""Returns a tuple of device parameters"""
|
|
if direction not in ("in", "out"):
|
|
raise VMError("Expected a direction: in or out")
|
|
type_ = ct.c_long()
|
|
name = ct.create_unicode_buffer(256)
|
|
hwid = ct.create_unicode_buffer(256)
|
|
func = getattr(self, f"bind_{direction}put_get_device_desc_w")
|
|
self.call(
|
|
func,
|
|
ct.c_long(index),
|
|
ct.byref(type_),
|
|
ct.byref(name),
|
|
ct.byref(hwid),
|
|
)
|
|
return (name.value, type_.value, hwid.value)
|
|
|
|
def get_level(self, type_: int, index: int) -> float:
|
|
"""Retrieves a single level value"""
|
|
val = ct.c_float()
|
|
self.call(
|
|
self.bind_get_level, ct.c_long(type_), ct.c_long(index), ct.byref(val)
|
|
)
|
|
return val.value
|
|
|
|
def _get_levels(self) -> Iterable:
|
|
"""
|
|
returns both level arrays (strip_levels, bus_levels) BEFORE math conversion
|
|
"""
|
|
return (
|
|
tuple(
|
|
self.get_level(self.strip_mode, i)
|
|
for i in range(self.kind.num_strip_levels)
|
|
),
|
|
tuple(self.get_level(3, i) for i in range(self.kind.num_bus_levels)),
|
|
)
|
|
|
|
def get_midi_message(self):
|
|
n = ct.c_long(1024)
|
|
buf = ct.create_string_buffer(1024)
|
|
res = self.call(
|
|
self.bind_get_midi_message,
|
|
ct.byref(buf),
|
|
n,
|
|
ok=(-5, -6), # no data received from midi device
|
|
ok_exp=lambda r: r >= 0,
|
|
)
|
|
if res > 0:
|
|
vals = tuple(
|
|
grouper(3, (int.from_bytes(buf[i], "little") for i in range(res)))
|
|
)
|
|
for msg in vals:
|
|
ch, pitch, vel = msg
|
|
if not self.midi._channel or self.midi._channel != ch:
|
|
self.midi._channel = ch
|
|
self.midi._most_recent = pitch
|
|
self.midi._set(pitch, vel)
|
|
return True
|
|
|
|
@script
|
|
def sendtext(self, script: str):
|
|
"""Sets many parameters from a script"""
|
|
if len(script) > 48000:
|
|
raise ValueError("Script too large, max size 48kB")
|
|
self.call(self.bind_set_parameters, script.encode())
|
|
time.sleep(self.DELAY * 5)
|
|
|
|
def apply(self, data: dict):
|
|
"""
|
|
Sets all parameters of a dict
|
|
|
|
minor delay between each recursion
|
|
"""
|
|
|
|
def target(key):
|
|
match key.split("-"):
|
|
case ["strip" | "bus" | "button" as kls, index] if index.isnumeric():
|
|
target = getattr(self, kls)
|
|
case [
|
|
"vban",
|
|
"in" | "instream" | "out" | "outstream" as direction,
|
|
index,
|
|
] if index.isnumeric():
|
|
target = getattr(
|
|
self.vban, f"{direction.removesuffix('stream')}stream"
|
|
)
|
|
case _:
|
|
ERR_MSG = f"invalid config key '{key}'"
|
|
self.logger.error(ERR_MSG)
|
|
raise ValueError(ERR_MSG)
|
|
return target[int(index)]
|
|
|
|
[target(key).apply(di).then_wait() for key, di in data.items()]
|
|
|
|
def apply_config(self, name):
|
|
"""applies a config from memory"""
|
|
ERR_MSG = (
|
|
f"No config with name '{name}' is loaded into memory",
|
|
f"Known configs: {list(self.configs.keys())}",
|
|
)
|
|
try:
|
|
config = self.configs[name]
|
|
except KeyError as e:
|
|
self.logger.error(("\n").join(ERR_MSG))
|
|
raise VMError(("\n").join(ERR_MSG)) from e
|
|
|
|
if "extends" in config:
|
|
extended = config["extends"]
|
|
config = {
|
|
k: v
|
|
for k, v in deep_merge(self.configs[extended], config)
|
|
if k not in ("extends")
|
|
}
|
|
self.logger.debug(
|
|
f"profile '{name}' extends '{extended}', profiles merged.."
|
|
)
|
|
self.apply(config)
|
|
self.logger.info(f"Profile '{name}' applied!")
|
|
|
|
def end_thread(self):
|
|
if not self.stopped():
|
|
self.logger.debug("events thread shutdown started")
|
|
self.stop_event.set()
|
|
self.producer.join() # wait for producer thread to complete cycle
|
|
|
|
def logout(self) -> None:
|
|
"""Logout of the API"""
|
|
time.sleep(0.1)
|
|
self.call(self.bind_logout)
|
|
self.logger.info(f"{type(self).__name__}: Successfully logged out of {self}")
|
|
|
|
def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
|
|
"""teardown procedures"""
|
|
self.end_thread()
|
|
self.logout()
|