mirror of
https://github.com/onyx-and-iris/vban-cmd-python.git
synced 2026-04-06 23:53:31 +00:00
make changes to sockets.
replace black+isort with ruff upd examples
This commit is contained in:
@@ -1,3 +1,3 @@
|
||||
from .factory import request_vbancmd_obj as api
|
||||
|
||||
__ALL__ = ["api"]
|
||||
__ALL__ = ['api']
|
||||
|
||||
@@ -7,8 +7,8 @@ from .iremote import IRemote
|
||||
from .meta import bus_mode_prop, channel_bool_prop, channel_label_prop
|
||||
|
||||
BusModes = IntEnum(
|
||||
"BusModes",
|
||||
"normal amix bmix repeat composite tvmix upmix21 upmix41 upmix61 centeronly lfeonly rearonly",
|
||||
'BusModes',
|
||||
'normal amix bmix repeat composite tvmix upmix21 upmix41 upmix61 centeronly lfeonly rearonly',
|
||||
start=0,
|
||||
)
|
||||
|
||||
@@ -26,7 +26,7 @@ class Bus(IRemote):
|
||||
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return f"bus[{self.index}]"
|
||||
return f'bus[{self.index}]'
|
||||
|
||||
@property
|
||||
def gain(self) -> float:
|
||||
@@ -36,19 +36,19 @@ class Bus(IRemote):
|
||||
return val * 0.01
|
||||
return (((1 << 16) - 1) - val) * -0.01
|
||||
|
||||
val = self.getter("gain")
|
||||
val = self.getter('gain')
|
||||
return round(val if val else fget(), 1)
|
||||
|
||||
@gain.setter
|
||||
def gain(self, val: float):
|
||||
self.setter("gain", val)
|
||||
self.setter('gain', val)
|
||||
|
||||
def fadeto(self, target: float, time_: int):
|
||||
self.setter("FadeTo", f"({target}, {time_})")
|
||||
self.setter('FadeTo', f'({target}, {time_})')
|
||||
time.sleep(self._remote.DELAY)
|
||||
|
||||
def fadeby(self, change: float, time_: int):
|
||||
self.setter("FadeBy", f"({change}, {time_})")
|
||||
self.setter('FadeBy', f'({change}, {time_})')
|
||||
time.sleep(self._remote.DELAY)
|
||||
|
||||
|
||||
@@ -56,22 +56,22 @@ class BusEQ(IRemote):
|
||||
@classmethod
|
||||
def make(cls, remote, index):
|
||||
BUSEQ_cls = type(
|
||||
f"BusEQ{remote.kind}",
|
||||
f'BusEQ{remote.kind}',
|
||||
(cls,),
|
||||
{
|
||||
**{param: channel_bool_prop(param) for param in ["on", "ab"]},
|
||||
**{param: channel_bool_prop(param) for param in ['on', 'ab']},
|
||||
},
|
||||
)
|
||||
return BUSEQ_cls(remote, index)
|
||||
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return f"bus[{self.index}].eq"
|
||||
return f'bus[{self.index}].eq'
|
||||
|
||||
|
||||
class PhysicalBus(Bus):
|
||||
def __str__(self):
|
||||
return f"{type(self).__name__}{self.index}"
|
||||
return f'{type(self).__name__}{self.index}'
|
||||
|
||||
@property
|
||||
def device(self) -> str:
|
||||
@@ -84,7 +84,7 @@ class PhysicalBus(Bus):
|
||||
|
||||
class VirtualBus(Bus):
|
||||
def __str__(self):
|
||||
return f"{type(self).__name__}{self.index}"
|
||||
return f'{type(self).__name__}{self.index}'
|
||||
|
||||
|
||||
class BusLevel(IRemote):
|
||||
@@ -105,7 +105,7 @@ class BusLevel(IRemote):
|
||||
if not self._remote.stopped() and self._remote.event.ldirty:
|
||||
return tuple(
|
||||
fget(i)
|
||||
for i in self._remote.cache["bus_level"][self.range[0] : self.range[-1]]
|
||||
for i in self._remote.cache['bus_level'][self.range[0] : self.range[-1]]
|
||||
)
|
||||
return tuple(
|
||||
fget(i)
|
||||
@@ -116,7 +116,7 @@ class BusLevel(IRemote):
|
||||
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return f"bus[{self.index}]"
|
||||
return f'bus[{self.index}]'
|
||||
|
||||
@property
|
||||
def all(self) -> tuple:
|
||||
@@ -138,26 +138,26 @@ def _make_bus_mode_mixin():
|
||||
"""Creates a mixin of Bus Modes."""
|
||||
|
||||
modestates = {
|
||||
"normal": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
|
||||
"amix": [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1],
|
||||
"repeat": [0, 2, 2, 0, 0, 2, 2, 0, 0, 2, 2],
|
||||
"bmix": [1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3],
|
||||
"composite": [0, 0, 0, 4, 4, 4, 4, 0, 0, 0, 0],
|
||||
"tvmix": [1, 0, 1, 4, 5, 4, 5, 0, 1, 0, 1],
|
||||
"upmix21": [0, 2, 2, 4, 4, 6, 6, 0, 0, 2, 2],
|
||||
"upmix41": [1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3],
|
||||
"upmix61": [0, 0, 0, 0, 0, 0, 0, 8, 8, 8, 8],
|
||||
"centeronly": [1, 0, 1, 0, 1, 0, 1, 8, 9, 8, 9],
|
||||
"lfeonly": [0, 2, 2, 0, 0, 2, 2, 8, 8, 10, 10],
|
||||
"rearonly": [1, 2, 3, 0, 1, 2, 3, 8, 9, 10, 11],
|
||||
'normal': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
|
||||
'amix': [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1],
|
||||
'repeat': [0, 2, 2, 0, 0, 2, 2, 0, 0, 2, 2],
|
||||
'bmix': [1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3],
|
||||
'composite': [0, 0, 0, 4, 4, 4, 4, 0, 0, 0, 0],
|
||||
'tvmix': [1, 0, 1, 4, 5, 4, 5, 0, 1, 0, 1],
|
||||
'upmix21': [0, 2, 2, 4, 4, 6, 6, 0, 0, 2, 2],
|
||||
'upmix41': [1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3],
|
||||
'upmix61': [0, 0, 0, 0, 0, 0, 0, 8, 8, 8, 8],
|
||||
'centeronly': [1, 0, 1, 0, 1, 0, 1, 8, 9, 8, 9],
|
||||
'lfeonly': [0, 2, 2, 0, 0, 2, 2, 8, 8, 10, 10],
|
||||
'rearonly': [1, 2, 3, 0, 1, 2, 3, 8, 9, 10, 11],
|
||||
}
|
||||
|
||||
def identifier(self) -> str:
|
||||
return f"bus[{self.index}].mode"
|
||||
return f'bus[{self.index}].mode'
|
||||
|
||||
def get(self):
|
||||
states = [
|
||||
(int.from_bytes(self.public_packet.busstate[self.index], "little") & val)
|
||||
(int.from_bytes(self.public_packet.busstate[self.index], 'little') & val)
|
||||
>> 4
|
||||
for val in self._modes.modevals
|
||||
]
|
||||
@@ -166,13 +166,13 @@ def _make_bus_mode_mixin():
|
||||
return k
|
||||
|
||||
return type(
|
||||
"BusModeMixin",
|
||||
'BusModeMixin',
|
||||
(IRemote,),
|
||||
{
|
||||
"identifier": property(identifier),
|
||||
"modestates": modestates,
|
||||
'identifier': property(identifier),
|
||||
'modestates': modestates,
|
||||
**{mode.name: bus_mode_prop(mode.name) for mode in BusModes},
|
||||
"get": get,
|
||||
'get': get,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -186,14 +186,14 @@ def bus_factory(phys_bus, remote, i) -> Union[PhysicalBus, VirtualBus]:
|
||||
BUS_cls = PhysicalBus if phys_bus else VirtualBus
|
||||
BUSMODEMIXIN_cls = _make_bus_mode_mixin()
|
||||
return type(
|
||||
f"{BUS_cls.__name__}{remote.kind}",
|
||||
f'{BUS_cls.__name__}{remote.kind}',
|
||||
(BUS_cls,),
|
||||
{
|
||||
"eq": BusEQ.make(remote, i),
|
||||
"levels": BusLevel(remote, i),
|
||||
"mode": BUSMODEMIXIN_cls(remote, i),
|
||||
**{param: channel_bool_prop(param) for param in ["mute", "mono"]},
|
||||
"label": channel_label_prop(),
|
||||
'eq': BusEQ.make(remote, i),
|
||||
'levels': BusLevel(remote, i),
|
||||
'mode': BUSMODEMIXIN_cls(remote, i),
|
||||
**{param: channel_bool_prop(param) for param in ['mute', 'mono']},
|
||||
'label': channel_label_prop(),
|
||||
},
|
||||
)(remote, i)
|
||||
|
||||
|
||||
@@ -17,30 +17,30 @@ class Command(IRemote):
|
||||
Returns a Command class of a kind.
|
||||
"""
|
||||
CMD_cls = type(
|
||||
f"Command{remote.kind}",
|
||||
f'Command{remote.kind}',
|
||||
(cls,),
|
||||
{
|
||||
**{
|
||||
param: action_fn(param) for param in ["show", "shutdown", "restart"]
|
||||
param: action_fn(param) for param in ['show', 'shutdown', 'restart']
|
||||
},
|
||||
"hide": action_fn("show", val=0),
|
||||
'hide': action_fn('show', val=0),
|
||||
},
|
||||
)
|
||||
return CMD_cls(remote)
|
||||
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return "command"
|
||||
return 'command'
|
||||
|
||||
def set_showvbanchat(self, val: bool):
|
||||
self.setter("DialogShow.VBANCHAT", 1 if val else 0)
|
||||
self.setter('DialogShow.VBANCHAT', 1 if val else 0)
|
||||
|
||||
showvbanchat = property(fset=set_showvbanchat)
|
||||
|
||||
def set_lock(self, val: bool):
|
||||
self.setter("lock", 1 if val else 0)
|
||||
self.setter('lock', 1 if val else 0)
|
||||
|
||||
lock = property(fset=set_lock)
|
||||
|
||||
def reset(self):
|
||||
self._remote.apply_config("reset")
|
||||
self._remote.apply_config('reset')
|
||||
|
||||
@@ -20,73 +20,73 @@ class TOMLStrBuilder:
|
||||
def __init__(self, kind):
|
||||
self.kind = kind
|
||||
self.higher = itertools.chain(
|
||||
[f"strip-{i}" for i in range(kind.num_strip)],
|
||||
[f"bus-{i}" for i in range(kind.num_bus)],
|
||||
[f'strip-{i}' for i in range(kind.num_strip)],
|
||||
[f'bus-{i}' for i in range(kind.num_bus)],
|
||||
)
|
||||
|
||||
def init_config(self, profile=None):
|
||||
self.virt_strip_params = (
|
||||
[
|
||||
"mute = false",
|
||||
"mono = false",
|
||||
"solo = false",
|
||||
"gain = 0.0",
|
||||
'mute = false',
|
||||
'mono = false',
|
||||
'solo = false',
|
||||
'gain = 0.0',
|
||||
]
|
||||
+ [f"A{i} = false" for i in range(1, self.kind.phys_out + 1)]
|
||||
+ [f"B{i} = false" for i in range(1, self.kind.virt_out + 1)]
|
||||
+ [f'A{i} = false' for i in range(1, self.kind.phys_out + 1)]
|
||||
+ [f'B{i} = false' for i in range(1, self.kind.virt_out + 1)]
|
||||
)
|
||||
self.phys_strip_params = self.virt_strip_params + [
|
||||
"comp.knob = 0.0",
|
||||
"gate.knob = 0.0",
|
||||
"denoiser.knob = 0.0",
|
||||
"eq.on = false",
|
||||
'comp.knob = 0.0',
|
||||
'gate.knob = 0.0',
|
||||
'denoiser.knob = 0.0',
|
||||
'eq.on = false',
|
||||
]
|
||||
self.bus_float = ["gain = 0.0"]
|
||||
self.bus_float = ['gain = 0.0']
|
||||
self.bus_params = [
|
||||
"mono = false",
|
||||
"eq.on = false",
|
||||
"mute = false",
|
||||
"gain = 0.0",
|
||||
'mono = false',
|
||||
'eq.on = false',
|
||||
'mute = false',
|
||||
'gain = 0.0',
|
||||
]
|
||||
|
||||
if profile == "reset":
|
||||
if profile == 'reset':
|
||||
self.reset_config()
|
||||
|
||||
def reset_config(self):
|
||||
self.phys_strip_params = list(
|
||||
map(lambda x: x.replace("B1 = false", "B1 = true"), self.phys_strip_params)
|
||||
map(lambda x: x.replace('B1 = false', 'B1 = true'), self.phys_strip_params)
|
||||
)
|
||||
self.virt_strip_params = list(
|
||||
map(lambda x: x.replace("A1 = false", "A1 = true"), self.virt_strip_params)
|
||||
map(lambda x: x.replace('A1 = false', 'A1 = true'), self.virt_strip_params)
|
||||
)
|
||||
|
||||
def build(self, profile="reset"):
|
||||
def build(self, profile='reset'):
|
||||
self.init_config(profile)
|
||||
toml_str = str()
|
||||
for eachclass in self.higher:
|
||||
toml_str += f"[{eachclass}]\n"
|
||||
toml_str += f'[{eachclass}]\n'
|
||||
toml_str = self.join(eachclass, toml_str)
|
||||
return toml_str
|
||||
|
||||
def join(self, eachclass, toml_str):
|
||||
kls, index = eachclass.split("-")
|
||||
kls, index = eachclass.split('-')
|
||||
match kls:
|
||||
case "strip":
|
||||
toml_str += ("\n").join(
|
||||
case 'strip':
|
||||
toml_str += ('\n').join(
|
||||
self.phys_strip_params
|
||||
if int(index) < self.kind.phys_in
|
||||
else self.virt_strip_params
|
||||
)
|
||||
case "bus":
|
||||
toml_str += ("\n").join(self.bus_params)
|
||||
case 'bus':
|
||||
toml_str += ('\n').join(self.bus_params)
|
||||
case _:
|
||||
pass
|
||||
return toml_str + "\n"
|
||||
return toml_str + '\n'
|
||||
|
||||
|
||||
class TOMLDataExtractor:
|
||||
def __init__(self, file):
|
||||
with open(file, "rb") as f:
|
||||
with open(file, 'rb') as f:
|
||||
self._data = tomllib.load(f)
|
||||
|
||||
@property
|
||||
@@ -104,10 +104,10 @@ def dataextraction_factory(file):
|
||||
|
||||
this opens the possibility for other parsers to be added
|
||||
"""
|
||||
if file.suffix == ".toml":
|
||||
if file.suffix == '.toml':
|
||||
extractor = TOMLDataExtractor
|
||||
else:
|
||||
raise ValueError("Cannot extract data from {}".format(file))
|
||||
raise ValueError('Cannot extract data from {}'.format(file))
|
||||
return extractor(file)
|
||||
|
||||
|
||||
@@ -141,25 +141,25 @@ class Loader(metaclass=SingletonType):
|
||||
def defaults(self, kind):
|
||||
self.builder = TOMLStrBuilder(kind)
|
||||
toml_str = self.builder.build()
|
||||
self.register("reset", tomllib.loads(toml_str))
|
||||
self.register('reset', tomllib.loads(toml_str))
|
||||
|
||||
def parse(self, identifier, data):
|
||||
if identifier in self._configs:
|
||||
self.logger.info(
|
||||
f"config file with name {identifier} already in memory, skipping.."
|
||||
f'config file with name {identifier} already in memory, skipping..'
|
||||
)
|
||||
return
|
||||
try:
|
||||
self.parser = dataextraction_factory(data)
|
||||
except tomllib.TOMLDecodeError as e:
|
||||
ERR_MSG = (str(e), f"When attempting to load {identifier}.toml")
|
||||
self.logger.error(f"{type(e).__name__}: {' '.join(ERR_MSG)}")
|
||||
ERR_MSG = (str(e), f'When attempting to load {identifier}.toml')
|
||||
self.logger.error(f'{type(e).__name__}: {" ".join(ERR_MSG)}')
|
||||
return
|
||||
return True
|
||||
|
||||
def register(self, identifier, data=None):
|
||||
self._configs[identifier] = data if data else self.parser.data
|
||||
self.logger.info(f"config {self.name}/{identifier} loaded into memory")
|
||||
self.logger.info(f'config {self.name}/{identifier} loaded into memory')
|
||||
|
||||
def deregister(self):
|
||||
self._configs.clear()
|
||||
@@ -182,18 +182,18 @@ def loader(kind):
|
||||
|
||||
returns configs loaded into memory
|
||||
"""
|
||||
logger_loader = logger.getChild("loader")
|
||||
logger_loader = logger.getChild('loader')
|
||||
loader = Loader(kind)
|
||||
|
||||
for path in (
|
||||
Path.cwd() / "configs" / kind.name,
|
||||
Path.home() / ".config" / "vban-cmd" / kind.name,
|
||||
Path.home() / "Documents" / "Voicemeeter" / "configs" / kind.name,
|
||||
Path.cwd() / 'configs' / kind.name,
|
||||
Path.home() / '.config' / 'vban-cmd' / kind.name,
|
||||
Path.home() / 'Documents' / 'Voicemeeter' / 'configs' / kind.name,
|
||||
):
|
||||
if path.is_dir():
|
||||
logger_loader.info(f"Checking [{path}] for TOML config files:")
|
||||
for file in path.glob("*.toml"):
|
||||
identifier = file.with_suffix("").stem
|
||||
logger_loader.info(f'Checking [{path}] for TOML config files:')
|
||||
for file in path.glob('*.toml'):
|
||||
identifier = file.with_suffix('').stem
|
||||
if loader.parse(identifier, file):
|
||||
loader.register(identifier)
|
||||
return loader.configs
|
||||
@@ -208,5 +208,5 @@ def request_config(kind_id: str):
|
||||
try:
|
||||
configs = loader(kindmap(kind_id))
|
||||
except KeyError:
|
||||
raise VBANCMDError(f"Unknown Voicemeeter kind {kind_id}")
|
||||
raise VBANCMDError(f'Unknown Voicemeeter kind {kind_id}')
|
||||
return configs
|
||||
|
||||
@@ -12,30 +12,30 @@ class Event:
|
||||
self.logger = logger.getChild(self.__class__.__name__)
|
||||
|
||||
def info(self, msg=None):
|
||||
info = (f"{msg} events",) if msg else ()
|
||||
info = (f'{msg} events',) if msg else ()
|
||||
if self.any():
|
||||
info += (f"now listening for {', '.join(self.get())} events",)
|
||||
info += (f'now listening for {", ".join(self.get())} events',)
|
||||
else:
|
||||
info += (f"not listening for any events",)
|
||||
self.logger.info(", ".join(info))
|
||||
info += ('not listening for any events',)
|
||||
self.logger.info(', '.join(info))
|
||||
|
||||
@property
|
||||
def pdirty(self) -> bool:
|
||||
return self.subs["pdirty"]
|
||||
return self.subs['pdirty']
|
||||
|
||||
@pdirty.setter
|
||||
def pdirty(self, val: bool):
|
||||
self.subs["pdirty"] = val
|
||||
self.info(f"pdirty {'added to' if val else 'removed from'}")
|
||||
self.subs['pdirty'] = val
|
||||
self.info(f'pdirty {"added to" if val else "removed from"}')
|
||||
|
||||
@property
|
||||
def ldirty(self) -> bool:
|
||||
return self.subs["ldirty"]
|
||||
return self.subs['ldirty']
|
||||
|
||||
@ldirty.setter
|
||||
def ldirty(self, val: bool):
|
||||
self.subs["ldirty"] = val
|
||||
self.info(f"ldirty {'added to' if val else 'removed from'}")
|
||||
self.subs['ldirty'] = val
|
||||
self.info(f'ldirty {"added to" if val else "removed from"}')
|
||||
|
||||
def get(self) -> list:
|
||||
return [k for k, v in self.subs.items() if v]
|
||||
|
||||
@@ -26,24 +26,24 @@ class FactoryBuilder:
|
||||
"""
|
||||
|
||||
BuilderProgress = IntEnum(
|
||||
"BuilderProgress", "strip bus command macrobutton vban", start=0
|
||||
'BuilderProgress', 'strip bus command macrobutton vban', start=0
|
||||
)
|
||||
|
||||
def __init__(self, factory, kind: KindMapClass):
|
||||
self._factory = factory
|
||||
self.kind = kind
|
||||
self._info = (
|
||||
f"Finished building strips for {self._factory}",
|
||||
f"Finished building buses for {self._factory}",
|
||||
f"Finished building commands for {self._factory}",
|
||||
f"Finished building macrobuttons for {self._factory}",
|
||||
f"Finished building vban in/out streams for {self._factory}",
|
||||
f'Finished building strips for {self._factory}',
|
||||
f'Finished building buses for {self._factory}',
|
||||
f'Finished building commands for {self._factory}',
|
||||
f'Finished building macrobuttons for {self._factory}',
|
||||
f'Finished building vban in/out streams for {self._factory}',
|
||||
)
|
||||
self.logger = logger.getChild(self.__class__.__name__)
|
||||
|
||||
def _pinfo(self, name: str) -> None:
|
||||
"""prints progress status for each step"""
|
||||
name = name.split("_")[1]
|
||||
name = name.split('_')[1]
|
||||
self.logger.info(self._info[int(getattr(self.BuilderProgress, name))])
|
||||
|
||||
def make_strip(self):
|
||||
@@ -78,20 +78,20 @@ class FactoryBase(VbanCmd):
|
||||
|
||||
def __init__(self, kind_id: str, **kwargs):
|
||||
defaultkwargs = {
|
||||
"ip": None,
|
||||
"port": 6980,
|
||||
"streamname": "Command1",
|
||||
"bps": 0,
|
||||
"channel": 0,
|
||||
"ratelimit": 0.01,
|
||||
"timeout": 5,
|
||||
"outbound": False,
|
||||
"sync": False,
|
||||
"pdirty": False,
|
||||
"ldirty": False,
|
||||
'ip': None,
|
||||
'port': 6980,
|
||||
'streamname': 'Command1',
|
||||
'bps': 0,
|
||||
'channel': 0,
|
||||
'ratelimit': 0.01,
|
||||
'timeout': 5,
|
||||
'outbound': False,
|
||||
'sync': False,
|
||||
'pdirty': False,
|
||||
'ldirty': False,
|
||||
}
|
||||
if "subs" in kwargs:
|
||||
defaultkwargs |= kwargs.pop("subs") # for backwards compatibility
|
||||
if 'subs' in kwargs:
|
||||
defaultkwargs |= kwargs.pop('subs') # for backwards compatibility
|
||||
kwargs = defaultkwargs | kwargs
|
||||
self.kind = kindmap(kind_id)
|
||||
super().__init__(**kwargs)
|
||||
@@ -106,7 +106,7 @@ class FactoryBase(VbanCmd):
|
||||
self._configs = None
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"Voicemeeter {self.kind}"
|
||||
return f'Voicemeeter {self.kind}'
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
@@ -198,15 +198,15 @@ def vbancmd_factory(kind_id: str, **kwargs) -> VbanCmd:
|
||||
Returns a VbanCmd class of a kind
|
||||
"""
|
||||
match kind_id:
|
||||
case "basic":
|
||||
case 'basic':
|
||||
_factory = BasicFactory
|
||||
case "banana":
|
||||
case 'banana':
|
||||
_factory = BananaFactory
|
||||
case "potato":
|
||||
case 'potato':
|
||||
_factory = PotatoFactory
|
||||
case _:
|
||||
raise ValueError(f"Unknown Voicemeeter kind '{kind_id}'")
|
||||
return type(f"VbanCmd{kind_id.capitalize()}", (_factory,), {})(kind_id, **kwargs)
|
||||
return type(f'VbanCmd{kind_id.capitalize()}', (_factory,), {})(kind_id, **kwargs)
|
||||
|
||||
|
||||
def request_vbancmd_obj(kind_id: str, **kwargs) -> VbanCmd:
|
||||
@@ -215,12 +215,12 @@ def request_vbancmd_obj(kind_id: str, **kwargs) -> VbanCmd:
|
||||
|
||||
Returns a reference to a VbanCmd class of a kind
|
||||
"""
|
||||
logger_entry = logger.getChild("factory.request_vbancmd_obj")
|
||||
logger_entry = logger.getChild('factory.request_vbancmd_obj')
|
||||
|
||||
VBANCMD_obj = None
|
||||
try:
|
||||
VBANCMD_obj = vbancmd_factory(kind_id, **kwargs)
|
||||
except (ValueError, TypeError) as e:
|
||||
logger_entry.exception(f"{type(e).__name__}: {e}")
|
||||
logger_entry.exception(f'{type(e).__name__}: {e}')
|
||||
raise VBANCMDError(str(e)) from e
|
||||
return VBANCMD_obj
|
||||
|
||||
@@ -93,7 +93,7 @@ class IRemote(metaclass=ABCMeta):
|
||||
|
||||
def getter(self, param):
|
||||
cmd = self._cmd(param)
|
||||
self.logger.debug(f"getter: {cmd}")
|
||||
self.logger.debug(f'getter: {cmd}')
|
||||
if cmd in self._remote.cache:
|
||||
return self._remote.cache.pop(cmd)
|
||||
if self._remote.sync:
|
||||
@@ -101,14 +101,14 @@ class IRemote(metaclass=ABCMeta):
|
||||
|
||||
def setter(self, param, val):
|
||||
"""Sends a string request RT packet."""
|
||||
self.logger.debug(f"setter: {self._cmd(param)}={val}")
|
||||
self.logger.debug(f'setter: {self._cmd(param)}={val}')
|
||||
self._remote._set_rt(self._cmd(param), val)
|
||||
|
||||
def _cmd(self, param):
|
||||
cmd = (self.identifier,)
|
||||
if param:
|
||||
cmd += (f".{param}",)
|
||||
return "".join(cmd)
|
||||
cmd += (f'.{param}',)
|
||||
return ''.join(cmd)
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
@@ -124,10 +124,10 @@ class IRemote(metaclass=ABCMeta):
|
||||
"""Sets all parameters of a dict for the channel."""
|
||||
|
||||
def fget(attr, val):
|
||||
if attr == "mode":
|
||||
return (f"mode.{val}", 1)
|
||||
elif attr == "knob":
|
||||
return ("", val)
|
||||
if attr == 'mode':
|
||||
return (f'mode.{val}', 1)
|
||||
elif attr == 'knob':
|
||||
return ('', val)
|
||||
return (attr, val)
|
||||
|
||||
for attr, val in data.items():
|
||||
@@ -138,7 +138,7 @@ class IRemote(metaclass=ABCMeta):
|
||||
val = 1 if val else 0
|
||||
|
||||
self._remote.cache[self._cmd(attr)] = val
|
||||
self._remote._script += f"{self._cmd(attr)}={val};"
|
||||
self._remote._script += f'{self._cmd(attr)}={val};'
|
||||
else:
|
||||
target = getattr(self, attr)
|
||||
target.apply(val)
|
||||
|
||||
@@ -91,14 +91,14 @@ class PotatoMap(KindMapClass):
|
||||
|
||||
def kind_factory(kind_id):
|
||||
match kind_id:
|
||||
case "basic":
|
||||
case 'basic':
|
||||
_kind_map = BasicMap
|
||||
case "banana":
|
||||
case 'banana':
|
||||
_kind_map = BananaMap
|
||||
case "potato":
|
||||
case 'potato':
|
||||
_kind_map = PotatoMap
|
||||
case _:
|
||||
raise ValueError(f"Unknown Voicemeeter kind {kind_id}")
|
||||
raise ValueError(f'Unknown Voicemeeter kind {kind_id}')
|
||||
return _kind_map(name=kind_id)
|
||||
|
||||
|
||||
|
||||
@@ -5,32 +5,32 @@ class MacroButton(IRemote):
|
||||
"""A placeholder class in case this interface is being used interchangeably with the Remote API"""
|
||||
|
||||
def __str__(self):
|
||||
return f"{type(self).__name__}{self._remote.kind}{self.index}"
|
||||
return f'{type(self).__name__}{self._remote.kind}{self.index}'
|
||||
|
||||
@property
|
||||
def identifier(self):
|
||||
return f"command.button[{self.index}]"
|
||||
return f'command.button[{self.index}]'
|
||||
|
||||
@property
|
||||
def state(self) -> bool:
|
||||
self.logger.warning("button.state commands are not supported over VBAN")
|
||||
self.logger.warning('button.state commands are not supported over VBAN')
|
||||
|
||||
@state.setter
|
||||
def state(self, _):
|
||||
self.logger.warning("button.state commands are not supported over VBAN")
|
||||
self.logger.warning('button.state commands are not supported over VBAN')
|
||||
|
||||
@property
|
||||
def stateonly(self) -> bool:
|
||||
self.logger.warning("button.stateonly commands are not supported over VBAN")
|
||||
self.logger.warning('button.stateonly commands are not supported over VBAN')
|
||||
|
||||
@stateonly.setter
|
||||
def stateonly(self, v):
|
||||
self.logger.warning("button.stateonly commands are not supported over VBAN")
|
||||
self.logger.warning('button.stateonly commands are not supported over VBAN')
|
||||
|
||||
@property
|
||||
def trigger(self) -> bool:
|
||||
self.logger.warning("button.trigger commands are not supported over VBAN")
|
||||
self.logger.warning('button.trigger commands are not supported over VBAN')
|
||||
|
||||
@trigger.setter
|
||||
def trigger(self, _):
|
||||
self.logger.warning("button.trigger commands are not supported over VBAN")
|
||||
self.logger.warning('button.trigger commands are not supported over VBAN')
|
||||
|
||||
@@ -9,16 +9,16 @@ def channel_bool_prop(param):
|
||||
@partial(cache_bool, param=param)
|
||||
def fget(self):
|
||||
cmd = self._cmd(param)
|
||||
self.logger.debug(f"getter: {cmd}")
|
||||
self.logger.debug(f'getter: {cmd}')
|
||||
return (
|
||||
not int.from_bytes(
|
||||
getattr(
|
||||
self.public_packet,
|
||||
f"{'strip' if 'strip' in type(self).__name__.lower() else 'bus'}state",
|
||||
f'{"strip" if "strip" in type(self).__name__.lower() else "bus"}state',
|
||||
)[self.index],
|
||||
"little",
|
||||
'little',
|
||||
)
|
||||
& getattr(self._modes, f"_{param.lower()}")
|
||||
& getattr(self._modes, f'_{param.lower()}')
|
||||
== 0
|
||||
)
|
||||
|
||||
@@ -31,15 +31,15 @@ def channel_bool_prop(param):
|
||||
def channel_label_prop():
|
||||
"""meta function for channel label parameters"""
|
||||
|
||||
@partial(cache_string, param="label")
|
||||
@partial(cache_string, param='label')
|
||||
def fget(self) -> str:
|
||||
return getattr(
|
||||
self.public_packet,
|
||||
f"{'strip' if 'strip' in type(self).__name__.lower() else 'bus'}labels",
|
||||
f'{"strip" if "strip" in type(self).__name__.lower() else "bus"}labels',
|
||||
)[self.index]
|
||||
|
||||
def fset(self, val: str):
|
||||
self.setter("label", str(val))
|
||||
self.setter('label', str(val))
|
||||
|
||||
return property(fget, fset)
|
||||
|
||||
@@ -50,10 +50,10 @@ def strip_output_prop(param):
|
||||
@partial(cache_bool, param=param)
|
||||
def fget(self):
|
||||
cmd = self._cmd(param)
|
||||
self.logger.debug(f"getter: {cmd}")
|
||||
self.logger.debug(f'getter: {cmd}')
|
||||
return (
|
||||
not int.from_bytes(self.public_packet.stripstate[self.index], "little")
|
||||
& getattr(self._modes, f"_bus{param.lower()}")
|
||||
not int.from_bytes(self.public_packet.stripstate[self.index], 'little')
|
||||
& getattr(self._modes, f'_bus{param.lower()}')
|
||||
== 0
|
||||
)
|
||||
|
||||
@@ -69,9 +69,9 @@ def bus_mode_prop(param):
|
||||
@partial(cache_bool, param=param)
|
||||
def fget(self):
|
||||
cmd = self._cmd(param)
|
||||
self.logger.debug(f"getter: {cmd}")
|
||||
self.logger.debug(f'getter: {cmd}')
|
||||
return [
|
||||
(int.from_bytes(self.public_packet.busstate[self.index], "little") & val)
|
||||
(int.from_bytes(self.public_packet.busstate[self.index], 'little') & val)
|
||||
>> 4
|
||||
for val in self._modes.modevals
|
||||
] == self.modestates[param]
|
||||
|
||||
@@ -43,7 +43,7 @@ class VbanRtPacket:
|
||||
|
||||
def _generate_levels(self, levelarray) -> tuple:
|
||||
return tuple(
|
||||
int.from_bytes(levelarray[i : i + 2], "little")
|
||||
int.from_bytes(levelarray[i : i + 2], 'little')
|
||||
for i in range(0, len(levelarray), 2)
|
||||
)
|
||||
|
||||
@@ -79,13 +79,13 @@ class VbanRtPacket:
|
||||
tuple(not val for val in comp(strip_cache, self.strip_levels)),
|
||||
tuple(not val for val in comp(bus_cache, self.bus_levels)),
|
||||
)
|
||||
return any(any(l) for l in (self._strip_comp, self._bus_comp))
|
||||
return any(any(li) for li in (self._strip_comp, self._bus_comp))
|
||||
|
||||
@property
|
||||
def voicemeetertype(self) -> str:
|
||||
"""returns voicemeeter type as a string"""
|
||||
type_ = ("basic", "banana", "potato")
|
||||
return type_[int.from_bytes(self._voicemeeterType, "little") - 1]
|
||||
type_ = ('basic', 'banana', 'potato')
|
||||
return type_[int.from_bytes(self._voicemeeterType, 'little') - 1]
|
||||
|
||||
@property
|
||||
def voicemeeterversion(self) -> tuple:
|
||||
@@ -93,7 +93,7 @@ class VbanRtPacket:
|
||||
return tuple(
|
||||
reversed(
|
||||
tuple(
|
||||
int.from_bytes(self._voicemeeterVersion[i : i + 1], "little")
|
||||
int.from_bytes(self._voicemeeterVersion[i : i + 1], 'little')
|
||||
for i in range(4)
|
||||
)
|
||||
)
|
||||
@@ -102,7 +102,7 @@ class VbanRtPacket:
|
||||
@property
|
||||
def samplerate(self) -> int:
|
||||
"""returns samplerate as an int"""
|
||||
return int.from_bytes(self._samplerate, "little")
|
||||
return int.from_bytes(self._samplerate, 'little')
|
||||
|
||||
@property
|
||||
def inputlevels(self) -> tuple:
|
||||
@@ -132,56 +132,56 @@ class VbanRtPacket:
|
||||
@property
|
||||
def stripgainlayer1(self) -> tuple:
|
||||
return tuple(
|
||||
int.from_bytes(self._stripGaindB100Layer1[i : i + 2], "little")
|
||||
int.from_bytes(self._stripGaindB100Layer1[i : i + 2], 'little')
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
|
||||
@property
|
||||
def stripgainlayer2(self) -> tuple:
|
||||
return tuple(
|
||||
int.from_bytes(self._stripGaindB100Layer2[i : i + 2], "little")
|
||||
int.from_bytes(self._stripGaindB100Layer2[i : i + 2], 'little')
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
|
||||
@property
|
||||
def stripgainlayer3(self) -> tuple:
|
||||
return tuple(
|
||||
int.from_bytes(self._stripGaindB100Layer3[i : i + 2], "little")
|
||||
int.from_bytes(self._stripGaindB100Layer3[i : i + 2], 'little')
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
|
||||
@property
|
||||
def stripgainlayer4(self) -> tuple:
|
||||
return tuple(
|
||||
int.from_bytes(self._stripGaindB100Layer4[i : i + 2], "little")
|
||||
int.from_bytes(self._stripGaindB100Layer4[i : i + 2], 'little')
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
|
||||
@property
|
||||
def stripgainlayer5(self) -> tuple:
|
||||
return tuple(
|
||||
int.from_bytes(self._stripGaindB100Layer5[i : i + 2], "little")
|
||||
int.from_bytes(self._stripGaindB100Layer5[i : i + 2], 'little')
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
|
||||
@property
|
||||
def stripgainlayer6(self) -> tuple:
|
||||
return tuple(
|
||||
int.from_bytes(self._stripGaindB100Layer6[i : i + 2], "little")
|
||||
int.from_bytes(self._stripGaindB100Layer6[i : i + 2], 'little')
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
|
||||
@property
|
||||
def stripgainlayer7(self) -> tuple:
|
||||
return tuple(
|
||||
int.from_bytes(self._stripGaindB100Layer7[i : i + 2], "little")
|
||||
int.from_bytes(self._stripGaindB100Layer7[i : i + 2], 'little')
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
|
||||
@property
|
||||
def stripgainlayer8(self) -> tuple:
|
||||
return tuple(
|
||||
int.from_bytes(self._stripGaindB100Layer8[i : i + 2], "little")
|
||||
int.from_bytes(self._stripGaindB100Layer8[i : i + 2], 'little')
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
|
||||
@@ -189,7 +189,7 @@ class VbanRtPacket:
|
||||
def busgain(self) -> tuple:
|
||||
"""returns tuple of bus gains"""
|
||||
return tuple(
|
||||
int.from_bytes(self._busGaindB100[i : i + 2], "little")
|
||||
int.from_bytes(self._busGaindB100[i : i + 2], 'little')
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
|
||||
@@ -197,7 +197,7 @@ class VbanRtPacket:
|
||||
def striplabels(self) -> tuple:
|
||||
"""returns tuple of strip labels"""
|
||||
return tuple(
|
||||
self._stripLabelUTF8c60[i : i + 60].decode().split("\x00")[0]
|
||||
self._stripLabelUTF8c60[i : i + 60].decode().split('\x00')[0]
|
||||
for i in range(0, 480, 60)
|
||||
)
|
||||
|
||||
@@ -205,7 +205,7 @@ class VbanRtPacket:
|
||||
def buslabels(self) -> tuple:
|
||||
"""returns tuple of bus labels"""
|
||||
return tuple(
|
||||
self._busLabelUTF8c60[i : i + 60].decode().split("\x00")[0]
|
||||
self._busLabelUTF8c60[i : i + 60].decode().split('\x00')[0]
|
||||
for i in range(0, 480, 60)
|
||||
)
|
||||
|
||||
@@ -214,15 +214,15 @@ class VbanRtPacket:
|
||||
class SubscribeHeader:
|
||||
"""Represents the header an RT Packet Service subscription packet"""
|
||||
|
||||
name = "Register RTP"
|
||||
name = 'Register RTP'
|
||||
timeout = 15
|
||||
vban: bytes = "VBAN".encode()
|
||||
format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, "little")
|
||||
format_nbs: bytes = (0).to_bytes(1, "little")
|
||||
format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, "little")
|
||||
format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, "little") # timeout
|
||||
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
|
||||
framecounter: bytes = (0).to_bytes(4, "little")
|
||||
vban: bytes = 'VBAN'.encode()
|
||||
format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, 'little')
|
||||
format_nbs: bytes = (0).to_bytes(1, 'little')
|
||||
format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, 'little')
|
||||
format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, 'little') # timeout
|
||||
streamname: bytes = name.encode('ascii') + bytes(16 - len(name))
|
||||
framecounter: bytes = (0).to_bytes(4, 'little')
|
||||
|
||||
@property
|
||||
def header(self):
|
||||
@@ -233,9 +233,9 @@ class SubscribeHeader:
|
||||
header += self.format_bit
|
||||
header += self.streamname
|
||||
header += self.framecounter
|
||||
assert (
|
||||
len(header) == HEADER_SIZE + 4
|
||||
), f"expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE +4} bytes total)"
|
||||
assert len(header) == HEADER_SIZE + 4, (
|
||||
f'expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE + 4} bytes total)'
|
||||
)
|
||||
return header
|
||||
|
||||
|
||||
@@ -243,13 +243,13 @@ class SubscribeHeader:
|
||||
class VbanRtPacketHeader:
|
||||
"""Represents the header of a VBAN RT response packet"""
|
||||
|
||||
name = "Voicemeeter-RTP"
|
||||
vban: bytes = "VBAN".encode()
|
||||
format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, "little")
|
||||
format_nbs: bytes = (0).to_bytes(1, "little")
|
||||
format_nbc: bytes = (VBAN_SERVICE_RTPACKET).to_bytes(1, "little")
|
||||
format_bit: bytes = (0).to_bytes(1, "little")
|
||||
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
|
||||
name = 'Voicemeeter-RTP'
|
||||
vban: bytes = 'VBAN'.encode()
|
||||
format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, 'little')
|
||||
format_nbs: bytes = (0).to_bytes(1, 'little')
|
||||
format_nbc: bytes = (VBAN_SERVICE_RTPACKET).to_bytes(1, 'little')
|
||||
format_bit: bytes = (0).to_bytes(1, 'little')
|
||||
streamname: bytes = name.encode('ascii') + bytes(16 - len(name))
|
||||
|
||||
@property
|
||||
def header(self):
|
||||
@@ -259,7 +259,7 @@ class VbanRtPacketHeader:
|
||||
header += self.format_nbc
|
||||
header += self.format_bit
|
||||
header += self.streamname
|
||||
assert len(header) == HEADER_SIZE, f"expected header size {HEADER_SIZE} bytes"
|
||||
assert len(header) == HEADER_SIZE, f'expected header size {HEADER_SIZE} bytes'
|
||||
return header
|
||||
|
||||
|
||||
@@ -270,18 +270,18 @@ class RequestHeader:
|
||||
name: str
|
||||
bps_index: int
|
||||
channel: int
|
||||
vban: bytes = "VBAN".encode()
|
||||
nbs: bytes = (0).to_bytes(1, "little")
|
||||
bit: bytes = (0x10).to_bytes(1, "little")
|
||||
framecounter: bytes = (0).to_bytes(4, "little")
|
||||
vban: bytes = 'VBAN'.encode()
|
||||
nbs: bytes = (0).to_bytes(1, 'little')
|
||||
bit: bytes = (0x10).to_bytes(1, 'little')
|
||||
framecounter: bytes = (0).to_bytes(4, 'little')
|
||||
|
||||
@property
|
||||
def sr(self):
|
||||
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little")
|
||||
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def nbc(self):
|
||||
return (self.channel).to_bytes(1, "little")
|
||||
return (self.channel).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def streamname(self):
|
||||
@@ -296,7 +296,7 @@ class RequestHeader:
|
||||
header += self.bit
|
||||
header += self.streamname
|
||||
header += self.framecounter
|
||||
assert (
|
||||
len(header) == HEADER_SIZE + 4
|
||||
), f"expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE +4} bytes total)"
|
||||
assert len(header) == HEADER_SIZE + 4, (
|
||||
f'expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE + 4} bytes total)'
|
||||
)
|
||||
return header
|
||||
|
||||
@@ -20,7 +20,7 @@ class Strip(IRemote):
|
||||
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return f"strip[{self.index}]"
|
||||
return f'strip[{self.index}]'
|
||||
|
||||
@property
|
||||
def limit(self) -> int:
|
||||
@@ -28,25 +28,25 @@ class Strip(IRemote):
|
||||
|
||||
@limit.setter
|
||||
def limit(self, val: int):
|
||||
self.setter("limit", val)
|
||||
self.setter('limit', val)
|
||||
|
||||
@property
|
||||
def gain(self) -> float:
|
||||
val = self.getter("gain")
|
||||
val = self.getter('gain')
|
||||
if val is None:
|
||||
val = self.gainlayer[0].gain
|
||||
return round(val, 1)
|
||||
|
||||
@gain.setter
|
||||
def gain(self, val: float):
|
||||
self.setter("gain", val)
|
||||
self.setter('gain', val)
|
||||
|
||||
def fadeto(self, target: float, time_: int):
|
||||
self.setter("FadeTo", f"({target}, {time_})")
|
||||
self.setter('FadeTo', f'({target}, {time_})')
|
||||
time.sleep(self._remote.DELAY)
|
||||
|
||||
def fadeby(self, change: float, time_: int):
|
||||
self.setter("FadeBy", f"({change}, {time_})")
|
||||
self.setter('FadeBy', f'({change}, {time_})')
|
||||
time.sleep(self._remote.DELAY)
|
||||
|
||||
|
||||
@@ -54,18 +54,18 @@ class PhysicalStrip(Strip):
|
||||
@classmethod
|
||||
def make(cls, remote, index):
|
||||
return type(
|
||||
f"PhysicalStrip{remote.kind}",
|
||||
f'PhysicalStrip{remote.kind}',
|
||||
(cls,),
|
||||
{
|
||||
"comp": StripComp(remote, index),
|
||||
"gate": StripGate(remote, index),
|
||||
"denoiser": StripDenoiser(remote, index),
|
||||
"eq": StripEQ(remote, index),
|
||||
'comp': StripComp(remote, index),
|
||||
'gate': StripGate(remote, index),
|
||||
'denoiser': StripDenoiser(remote, index),
|
||||
'eq': StripEQ(remote, index),
|
||||
},
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return f"{type(self).__name__}{self.index}"
|
||||
return f'{type(self).__name__}{self.index}'
|
||||
|
||||
@property
|
||||
def device(self):
|
||||
@@ -79,7 +79,7 @@ class PhysicalStrip(Strip):
|
||||
class StripComp(IRemote):
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return f"strip[{self.index}].comp"
|
||||
return f'strip[{self.index}].comp'
|
||||
|
||||
@property
|
||||
def knob(self) -> float:
|
||||
@@ -87,7 +87,7 @@ class StripComp(IRemote):
|
||||
|
||||
@knob.setter
|
||||
def knob(self, val: float):
|
||||
self.setter("", val)
|
||||
self.setter('', val)
|
||||
|
||||
@property
|
||||
def gainin(self) -> float:
|
||||
@@ -95,7 +95,7 @@ class StripComp(IRemote):
|
||||
|
||||
@gainin.setter
|
||||
def gainin(self, val: float):
|
||||
self.setter("GainIn", val)
|
||||
self.setter('GainIn', val)
|
||||
|
||||
@property
|
||||
def ratio(self) -> float:
|
||||
@@ -103,7 +103,7 @@ class StripComp(IRemote):
|
||||
|
||||
@ratio.setter
|
||||
def ratio(self, val: float):
|
||||
self.setter("Ratio", val)
|
||||
self.setter('Ratio', val)
|
||||
|
||||
@property
|
||||
def threshold(self) -> float:
|
||||
@@ -111,7 +111,7 @@ class StripComp(IRemote):
|
||||
|
||||
@threshold.setter
|
||||
def threshold(self, val: float):
|
||||
self.setter("Threshold", val)
|
||||
self.setter('Threshold', val)
|
||||
|
||||
@property
|
||||
def attack(self) -> float:
|
||||
@@ -119,7 +119,7 @@ class StripComp(IRemote):
|
||||
|
||||
@attack.setter
|
||||
def attack(self, val: float):
|
||||
self.setter("Attack", val)
|
||||
self.setter('Attack', val)
|
||||
|
||||
@property
|
||||
def release(self) -> float:
|
||||
@@ -127,7 +127,7 @@ class StripComp(IRemote):
|
||||
|
||||
@release.setter
|
||||
def release(self, val: float):
|
||||
self.setter("Release", val)
|
||||
self.setter('Release', val)
|
||||
|
||||
@property
|
||||
def knee(self) -> float:
|
||||
@@ -135,7 +135,7 @@ class StripComp(IRemote):
|
||||
|
||||
@knee.setter
|
||||
def knee(self, val: float):
|
||||
self.setter("Knee", val)
|
||||
self.setter('Knee', val)
|
||||
|
||||
@property
|
||||
def gainout(self) -> float:
|
||||
@@ -143,7 +143,7 @@ class StripComp(IRemote):
|
||||
|
||||
@gainout.setter
|
||||
def gainout(self, val: float):
|
||||
self.setter("GainOut", val)
|
||||
self.setter('GainOut', val)
|
||||
|
||||
@property
|
||||
def makeup(self) -> bool:
|
||||
@@ -151,13 +151,13 @@ class StripComp(IRemote):
|
||||
|
||||
@makeup.setter
|
||||
def makeup(self, val: bool):
|
||||
self.setter("makeup", 1 if val else 0)
|
||||
self.setter('makeup', 1 if val else 0)
|
||||
|
||||
|
||||
class StripGate(IRemote):
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return f"strip[{self.index}].gate"
|
||||
return f'strip[{self.index}].gate'
|
||||
|
||||
@property
|
||||
def knob(self) -> float:
|
||||
@@ -165,7 +165,7 @@ class StripGate(IRemote):
|
||||
|
||||
@knob.setter
|
||||
def knob(self, val: float):
|
||||
self.setter("", val)
|
||||
self.setter('', val)
|
||||
|
||||
@property
|
||||
def threshold(self) -> float:
|
||||
@@ -173,7 +173,7 @@ class StripGate(IRemote):
|
||||
|
||||
@threshold.setter
|
||||
def threshold(self, val: float):
|
||||
self.setter("Threshold", val)
|
||||
self.setter('Threshold', val)
|
||||
|
||||
@property
|
||||
def damping(self) -> float:
|
||||
@@ -181,7 +181,7 @@ class StripGate(IRemote):
|
||||
|
||||
@damping.setter
|
||||
def damping(self, val: float):
|
||||
self.setter("Damping", val)
|
||||
self.setter('Damping', val)
|
||||
|
||||
@property
|
||||
def bpsidechain(self) -> int:
|
||||
@@ -189,7 +189,7 @@ class StripGate(IRemote):
|
||||
|
||||
@bpsidechain.setter
|
||||
def bpsidechain(self, val: int):
|
||||
self.setter("BPSidechain", val)
|
||||
self.setter('BPSidechain', val)
|
||||
|
||||
@property
|
||||
def attack(self) -> float:
|
||||
@@ -197,7 +197,7 @@ class StripGate(IRemote):
|
||||
|
||||
@attack.setter
|
||||
def attack(self, val: float):
|
||||
self.setter("Attack", val)
|
||||
self.setter('Attack', val)
|
||||
|
||||
@property
|
||||
def hold(self) -> float:
|
||||
@@ -205,7 +205,7 @@ class StripGate(IRemote):
|
||||
|
||||
@hold.setter
|
||||
def hold(self, val: float):
|
||||
self.setter("Hold", val)
|
||||
self.setter('Hold', val)
|
||||
|
||||
@property
|
||||
def release(self) -> float:
|
||||
@@ -213,13 +213,13 @@ class StripGate(IRemote):
|
||||
|
||||
@release.setter
|
||||
def release(self, val: float):
|
||||
self.setter("Release", val)
|
||||
self.setter('Release', val)
|
||||
|
||||
|
||||
class StripDenoiser(IRemote):
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return f"strip[{self.index}].denoiser"
|
||||
return f'strip[{self.index}].denoiser'
|
||||
|
||||
@property
|
||||
def knob(self) -> float:
|
||||
@@ -227,13 +227,13 @@ class StripDenoiser(IRemote):
|
||||
|
||||
@knob.setter
|
||||
def knob(self, val: float):
|
||||
self.setter("", val)
|
||||
self.setter('', val)
|
||||
|
||||
|
||||
class StripEQ(IRemote):
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return f"strip[{self.index}].eq"
|
||||
return f'strip[{self.index}].eq'
|
||||
|
||||
@property
|
||||
def on(self):
|
||||
@@ -241,7 +241,7 @@ class StripEQ(IRemote):
|
||||
|
||||
@on.setter
|
||||
def on(self, val: bool):
|
||||
self.setter("on", 1 if val else 0)
|
||||
self.setter('on', 1 if val else 0)
|
||||
|
||||
@property
|
||||
def ab(self):
|
||||
@@ -249,14 +249,14 @@ class StripEQ(IRemote):
|
||||
|
||||
@ab.setter
|
||||
def ab(self, val: bool):
|
||||
self.setter("ab", 1 if val else 0)
|
||||
self.setter('ab', 1 if val else 0)
|
||||
|
||||
|
||||
class VirtualStrip(Strip):
|
||||
def __str__(self):
|
||||
return f"{type(self).__name__}{self.index}"
|
||||
return f'{type(self).__name__}{self.index}'
|
||||
|
||||
mc = channel_bool_prop("mc")
|
||||
mc = channel_bool_prop('mc')
|
||||
|
||||
mono = mc
|
||||
|
||||
@@ -266,13 +266,13 @@ class VirtualStrip(Strip):
|
||||
|
||||
@k.setter
|
||||
def k(self, val: int):
|
||||
self.setter("karaoke", val)
|
||||
self.setter('karaoke', val)
|
||||
|
||||
def appgain(self, name: str, gain: float):
|
||||
self.setter("AppGain", f'("{name}", {gain})')
|
||||
self.setter('AppGain', f'("{name}", {gain})')
|
||||
|
||||
def appmute(self, name: str, mute: bool = None):
|
||||
self.setter("AppMute", f'("{name}", {1 if mute else 0})')
|
||||
self.setter('AppMute', f'("{name}", {1 if mute else 0})')
|
||||
|
||||
|
||||
class StripLevel(IRemote):
|
||||
@@ -299,7 +299,7 @@ class StripLevel(IRemote):
|
||||
if not self._remote.stopped() and self._remote.event.ldirty:
|
||||
return tuple(
|
||||
fget(i)
|
||||
for i in self._remote.cache["strip_level"][
|
||||
for i in self._remote.cache['strip_level'][
|
||||
self.range[0] : self.range[-1]
|
||||
]
|
||||
)
|
||||
@@ -312,7 +312,7 @@ class StripLevel(IRemote):
|
||||
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return f"strip[{self.index}]"
|
||||
return f'strip[{self.index}]'
|
||||
|
||||
@property
|
||||
def prefader(self) -> tuple:
|
||||
@@ -345,31 +345,33 @@ class GainLayer(IRemote):
|
||||
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return f"strip[{self.index}]"
|
||||
return f'strip[{self.index}]'
|
||||
|
||||
@property
|
||||
def gain(self) -> float:
|
||||
def fget():
|
||||
val = getattr(self.public_packet, f"stripgainlayer{self._i+1}")[self.index]
|
||||
val = getattr(self.public_packet, f'stripgainlayer{self._i + 1}')[
|
||||
self.index
|
||||
]
|
||||
if 0 <= val <= 1200:
|
||||
return val * 0.01
|
||||
return (((1 << 16) - 1) - val) * -0.01
|
||||
|
||||
val = self.getter(f"GainLayer[{self._i}]")
|
||||
val = self.getter(f'GainLayer[{self._i}]')
|
||||
return round(val if val else fget(), 1)
|
||||
|
||||
@gain.setter
|
||||
def gain(self, val: float):
|
||||
self.setter(f"GainLayer[{self._i}]", val)
|
||||
self.setter(f'GainLayer[{self._i}]', val)
|
||||
|
||||
|
||||
def _make_gainlayer_mixin(remote, index):
|
||||
"""Creates a GainLayer mixin"""
|
||||
return type(
|
||||
f"GainlayerMixin",
|
||||
'GainlayerMixin',
|
||||
(),
|
||||
{
|
||||
"gainlayer": tuple(
|
||||
'gainlayer': tuple(
|
||||
GainLayer(remote, index, i) for i in range(remote.kind.num_bus)
|
||||
)
|
||||
},
|
||||
@@ -379,14 +381,14 @@ def _make_gainlayer_mixin(remote, index):
|
||||
def _make_channelout_mixin(kind):
|
||||
"""Creates a channel out property mixin"""
|
||||
return type(
|
||||
f"ChannelOutMixin{kind}",
|
||||
f'ChannelOutMixin{kind}',
|
||||
(),
|
||||
{
|
||||
**{
|
||||
f"A{i}": strip_output_prop(f"A{i}") for i in range(1, kind.phys_out + 1)
|
||||
f'A{i}': strip_output_prop(f'A{i}') for i in range(1, kind.phys_out + 1)
|
||||
},
|
||||
**{
|
||||
f"B{i}": strip_output_prop(f"B{i}") for i in range(1, kind.virt_out + 1)
|
||||
f'B{i}': strip_output_prop(f'B{i}') for i in range(1, kind.virt_out + 1)
|
||||
},
|
||||
},
|
||||
)
|
||||
@@ -410,12 +412,12 @@ def strip_factory(is_phys_strip, remote, i) -> Union[PhysicalStrip, VirtualStrip
|
||||
GAINLAYERMIXIN_cls = _make_gainlayer_mixin(remote, i)
|
||||
|
||||
return type(
|
||||
f"{STRIP_cls.__name__}{remote.kind}",
|
||||
f'{STRIP_cls.__name__}{remote.kind}',
|
||||
(STRIP_cls, CHANNELOUTMIXIN_cls, GAINLAYERMIXIN_cls),
|
||||
{
|
||||
"levels": StripLevel(remote, i),
|
||||
**{param: channel_bool_prop(param) for param in ["mono", "solo", "mute"]},
|
||||
"label": channel_label_prop(),
|
||||
'levels': StripLevel(remote, i),
|
||||
**{param: channel_bool_prop(param) for param in ['mono', 'solo', 'mute']},
|
||||
'label': channel_label_prop(),
|
||||
},
|
||||
)(remote, i)
|
||||
|
||||
|
||||
@@ -20,10 +20,10 @@ class Subject:
|
||||
"""run callbacks on update"""
|
||||
|
||||
for o in self._observers:
|
||||
if hasattr(o, "on_update"):
|
||||
if hasattr(o, 'on_update'):
|
||||
o.on_update(event)
|
||||
else:
|
||||
if o.__name__ == f"on_{event}":
|
||||
if o.__name__ == f'on_{event}':
|
||||
o()
|
||||
|
||||
def add(self, observer):
|
||||
@@ -34,15 +34,15 @@ class Subject:
|
||||
for o in iterator:
|
||||
if o not in self._observers:
|
||||
self._observers.append(o)
|
||||
self.logger.info(f"{o} added to event observers")
|
||||
self.logger.info(f'{o} added to event observers')
|
||||
else:
|
||||
self.logger.error(f"Failed to add {o} to event observers")
|
||||
self.logger.error(f'Failed to add {o} to event observers')
|
||||
except TypeError:
|
||||
if observer not in self._observers:
|
||||
self._observers.append(observer)
|
||||
self.logger.info(f"{observer} added to event observers")
|
||||
self.logger.info(f'{observer} added to event observers')
|
||||
else:
|
||||
self.logger.error(f"Failed to add {observer} to event observers")
|
||||
self.logger.error(f'Failed to add {observer} to event observers')
|
||||
|
||||
register = add
|
||||
|
||||
@@ -54,15 +54,15 @@ class Subject:
|
||||
for o in iterator:
|
||||
try:
|
||||
self._observers.remove(o)
|
||||
self.logger.info(f"{o} removed from event observers")
|
||||
self.logger.info(f'{o} removed from event observers')
|
||||
except ValueError:
|
||||
self.logger.error(f"Failed to remove {o} from event observers")
|
||||
self.logger.error(f'Failed to remove {o} from event observers')
|
||||
except TypeError:
|
||||
try:
|
||||
self._observers.remove(observer)
|
||||
self.logger.info(f"{observer} removed from event observers")
|
||||
self.logger.info(f'{observer} removed from event observers')
|
||||
except ValueError:
|
||||
self.logger.error(f"Failed to remove {observer} from event observers")
|
||||
self.logger.error(f'Failed to remove {observer} from event observers')
|
||||
|
||||
deregister = remove
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
from enum import IntEnum
|
||||
from typing import Iterator
|
||||
|
||||
|
||||
@@ -42,15 +41,15 @@ def script(func):
|
||||
def wrapper(*args):
|
||||
remote, script = args
|
||||
if isinstance(script, dict):
|
||||
params = ""
|
||||
params = ''
|
||||
for key, val in script.items():
|
||||
obj, m2, *rem = key.split("-")
|
||||
obj, m2, *rem = key.split('-')
|
||||
index = int(m2) if m2.isnumeric() else int(*rem)
|
||||
params += ";".join(
|
||||
f"{obj}{f'.{m2}stream' if not m2.isnumeric() else ''}[{index}].{k}={int(v) if isinstance(v, bool) else v}"
|
||||
params += ';'.join(
|
||||
f'{obj}{f".{m2}stream" if not m2.isnumeric() else ""}[{index}].{k}={int(v) if isinstance(v, bool) else v}'
|
||||
for k, v in val.items()
|
||||
)
|
||||
params += ";"
|
||||
params += ';'
|
||||
script = params
|
||||
return func(remote, script)
|
||||
|
||||
@@ -83,6 +82,3 @@ def deep_merge(dict1, dict2):
|
||||
yield k, dict1[k]
|
||||
else:
|
||||
yield k, dict2[k]
|
||||
|
||||
|
||||
Socket = IntEnum("Socket", "register request response", start=0)
|
||||
|
||||
@@ -17,7 +17,7 @@ class VbanStream(IRemote):
|
||||
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return f"vban.{self.direction}stream[{self.index}]"
|
||||
return f'vban.{self.direction}stream[{self.index}]'
|
||||
|
||||
@property
|
||||
def on(self) -> bool:
|
||||
@@ -25,7 +25,7 @@ class VbanStream(IRemote):
|
||||
|
||||
@on.setter
|
||||
def on(self, val: bool):
|
||||
self.setter("on", 1 if val else 0)
|
||||
self.setter('on', 1 if val else 0)
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
@@ -33,7 +33,7 @@ class VbanStream(IRemote):
|
||||
|
||||
@name.setter
|
||||
def name(self, val: str):
|
||||
self.setter("name", val)
|
||||
self.setter('name', val)
|
||||
|
||||
@property
|
||||
def ip(self) -> str:
|
||||
@@ -41,7 +41,7 @@ class VbanStream(IRemote):
|
||||
|
||||
@ip.setter
|
||||
def ip(self, val: str):
|
||||
self.setter("ip", val)
|
||||
self.setter('ip', val)
|
||||
|
||||
@property
|
||||
def port(self) -> int:
|
||||
@@ -51,9 +51,9 @@ class VbanStream(IRemote):
|
||||
def port(self, val: int):
|
||||
if not 1024 <= val <= 65535:
|
||||
self.logger.warning(
|
||||
f"port got: {val} but expected a value from 1024 to 65535"
|
||||
f'port got: {val} but expected a value from 1024 to 65535'
|
||||
)
|
||||
self.setter("port", val)
|
||||
self.setter('port', val)
|
||||
|
||||
@property
|
||||
def sr(self) -> int:
|
||||
@@ -63,8 +63,8 @@ class VbanStream(IRemote):
|
||||
def sr(self, val: int):
|
||||
opts = (11025, 16000, 22050, 24000, 32000, 44100, 48000, 64000, 88200, 96000)
|
||||
if val not in opts:
|
||||
self.logger.warning(f"sr got: {val} but expected a value in {opts}")
|
||||
self.setter("sr", val)
|
||||
self.logger.warning(f'sr got: {val} but expected a value in {opts}')
|
||||
self.setter('sr', val)
|
||||
|
||||
@property
|
||||
def channel(self) -> int:
|
||||
@@ -73,8 +73,8 @@ class VbanStream(IRemote):
|
||||
@channel.setter
|
||||
def channel(self, val: int):
|
||||
if not 1 <= val <= 8:
|
||||
self.logger.warning(f"channel got: {val} but expected a value from 1 to 8")
|
||||
self.setter("channel", val)
|
||||
self.logger.warning(f'channel got: {val} but expected a value from 1 to 8')
|
||||
self.setter('channel', val)
|
||||
|
||||
@property
|
||||
def bit(self) -> int:
|
||||
@@ -83,8 +83,8 @@ class VbanStream(IRemote):
|
||||
@bit.setter
|
||||
def bit(self, val: int):
|
||||
if val not in (16, 24):
|
||||
self.logger.warning(f"bit got: {val} but expected value 16 or 24")
|
||||
self.setter("bit", 1 if (val == 16) else 2)
|
||||
self.logger.warning(f'bit got: {val} but expected value 16 or 24')
|
||||
self.setter('bit', 1 if (val == 16) else 2)
|
||||
|
||||
@property
|
||||
def quality(self) -> int:
|
||||
@@ -93,8 +93,8 @@ class VbanStream(IRemote):
|
||||
@quality.setter
|
||||
def quality(self, val: int):
|
||||
if not 0 <= val <= 4:
|
||||
self.logger.warning(f"quality got: {val} but expected a value from 0 to 4")
|
||||
self.setter("quality", val)
|
||||
self.logger.warning(f'quality got: {val} but expected a value from 0 to 4')
|
||||
self.setter('quality', val)
|
||||
|
||||
@property
|
||||
def route(self) -> int:
|
||||
@@ -103,8 +103,8 @@ class VbanStream(IRemote):
|
||||
@route.setter
|
||||
def route(self, val: int):
|
||||
if not 0 <= val <= 8:
|
||||
self.logger.warning(f"route got: {val} but expected a value from 0 to 8")
|
||||
self.setter("route", val)
|
||||
self.logger.warning(f'route got: {val} but expected a value from 0 to 8')
|
||||
self.setter('route', val)
|
||||
|
||||
|
||||
class VbanInstream(VbanStream):
|
||||
@@ -115,11 +115,11 @@ class VbanInstream(VbanStream):
|
||||
"""
|
||||
|
||||
def __str__(self):
|
||||
return f"{type(self).__name__}{self._remote.kind}{self.index}"
|
||||
return f'{type(self).__name__}{self._remote.kind}{self.index}'
|
||||
|
||||
@property
|
||||
def direction(self) -> str:
|
||||
return "in"
|
||||
return 'in'
|
||||
|
||||
@property
|
||||
def sr(self) -> int:
|
||||
@@ -154,11 +154,11 @@ class VbanOutstream(VbanStream):
|
||||
"""
|
||||
|
||||
def __str__(self):
|
||||
return f"{type(self).__name__}{self._remote.kind}{self.index}"
|
||||
return f'{type(self).__name__}{self._remote.kind}{self.index}'
|
||||
|
||||
@property
|
||||
def direction(self) -> str:
|
||||
return "out"
|
||||
return 'out'
|
||||
|
||||
|
||||
class VbanAudioOutstream(VbanOutstream):
|
||||
@@ -174,22 +174,22 @@ def _make_stream_pair(remote, kind):
|
||||
|
||||
def _make_cls(i, direction):
|
||||
match direction:
|
||||
case "in":
|
||||
case 'in':
|
||||
if i < num_instream:
|
||||
return VbanAudioInstream(remote, i)
|
||||
elif i < num_instream + num_midi:
|
||||
return VbanMidiInstream(remote, i)
|
||||
else:
|
||||
return VbanTextInstream(remote, i)
|
||||
case "out":
|
||||
case 'out':
|
||||
if i < num_outstream:
|
||||
return VbanAudioOutstream(remote, i)
|
||||
else:
|
||||
return VbanMidiOutstream(remote, i)
|
||||
|
||||
return (
|
||||
tuple(_make_cls(i, "in") for i in range(num_instream + num_midi + num_text)),
|
||||
tuple(_make_cls(i, "out") for i in range(num_outstream + num_midi)),
|
||||
tuple(_make_cls(i, 'in') for i in range(num_instream + num_midi + num_text)),
|
||||
tuple(_make_cls(i, 'out') for i in range(num_outstream + num_midi)),
|
||||
)
|
||||
|
||||
|
||||
@@ -212,7 +212,7 @@ class Vban:
|
||||
"""if VBAN disabled there can be no communication with it"""
|
||||
|
||||
def disable(self):
|
||||
self.remote._set_rt("vban.Enable", 0)
|
||||
self.remote._set_rt('vban.Enable', 0)
|
||||
|
||||
|
||||
def vban_factory(remote) -> Vban:
|
||||
@@ -222,7 +222,7 @@ def vban_factory(remote) -> Vban:
|
||||
Returns a class that represents the VBAN module.
|
||||
"""
|
||||
VBAN_cls = Vban
|
||||
return type(f"{VBAN_cls.__name__}", (VBAN_cls,), {})(remote)
|
||||
return type(f'{VBAN_cls.__name__}', (VBAN_cls,), {})(remote)
|
||||
|
||||
|
||||
def request_vban_obj(remote) -> Vban:
|
||||
|
||||
@@ -11,7 +11,7 @@ from .error import VBANCMDError
|
||||
from .event import Event
|
||||
from .packet import RequestHeader
|
||||
from .subject import Subject
|
||||
from .util import Socket, deep_merge, script
|
||||
from .util import deep_merge, script
|
||||
from .worker import Producer, Subscriber, Updater
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -31,8 +31,8 @@ class VbanCmd(metaclass=ABCMeta):
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.logger = logger.getChild(self.__class__.__name__)
|
||||
self.event = Event({k: kwargs.pop(k) for k in ("pdirty", "ldirty")})
|
||||
if not kwargs["ip"]:
|
||||
self.event = Event({k: kwargs.pop(k) for k in ('pdirty', 'ldirty')})
|
||||
if not kwargs['ip']:
|
||||
kwargs |= self._conn_from_toml()
|
||||
for attr, val in kwargs.items():
|
||||
setattr(self, attr, val)
|
||||
@@ -42,9 +42,7 @@ class VbanCmd(metaclass=ABCMeta):
|
||||
bps_index=self.BPS_OPTS.index(self.bps),
|
||||
channel=self.channel,
|
||||
)
|
||||
self.socks = tuple(
|
||||
socket.socket(socket.AF_INET, socket.SOCK_DGRAM) for _ in Socket
|
||||
)
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.subject = self.observer = Subject()
|
||||
self.cache = {}
|
||||
self._pdirty = False
|
||||
@@ -65,29 +63,30 @@ class VbanCmd(metaclass=ABCMeta):
|
||||
import tomli as tomllib
|
||||
|
||||
def get_filepath():
|
||||
filepaths = [
|
||||
Path.cwd() / "vban.toml",
|
||||
Path.cwd() / "configs" / "vban.toml",
|
||||
Path.home() / ".config" / "vban-cmd" / "vban.toml",
|
||||
Path.home() / "Documents" / "Voicemeeter" / "configs" / "vban.toml",
|
||||
]
|
||||
for filepath in filepaths:
|
||||
if filepath.exists():
|
||||
return filepath
|
||||
for pn in (
|
||||
Path.cwd() / 'vban.toml',
|
||||
Path.cwd() / 'configs' / 'vban.toml',
|
||||
Path.home() / '.config' / 'vban-cmd' / 'vban.toml',
|
||||
Path.home() / 'Documents' / 'Voicemeeter' / 'configs' / 'vban.toml',
|
||||
):
|
||||
if pn.exists():
|
||||
return pn
|
||||
|
||||
if filepath := get_filepath():
|
||||
with open(filepath, "rb") as f:
|
||||
conn = tomllib.load(f)
|
||||
assert (
|
||||
"connection" in conn and "ip" in conn["connection"]
|
||||
), "expected [connection][ip] in vban config"
|
||||
return conn["connection"]
|
||||
raise VBANCMDError("no ip provided and no vban.toml located.")
|
||||
if not (filepath := get_filepath()):
|
||||
raise VBANCMDError('no ip provided and no vban.toml located.')
|
||||
try:
|
||||
with open(filepath, 'rb') as f:
|
||||
return tomllib.load(f)['connection']
|
||||
except tomllib.TomlDecodeError as e:
|
||||
raise VBANCMDError(f'Error decoding {filepath}: {e}') from e
|
||||
|
||||
def __enter__(self):
|
||||
self.login()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
|
||||
self.logout()
|
||||
|
||||
def login(self) -> None:
|
||||
"""Starts the subscriber and updater threads (unless in outbound mode)"""
|
||||
if not self.outbound:
|
||||
@@ -110,31 +109,41 @@ class VbanCmd(metaclass=ABCMeta):
|
||||
)
|
||||
)
|
||||
|
||||
def logout(self) -> None:
|
||||
if not self.stopped():
|
||||
self.logger.debug('events thread shutdown started')
|
||||
self.stop_event.set()
|
||||
if self.producer is not None:
|
||||
for t in (self.producer, self.subscriber):
|
||||
t.join()
|
||||
self.sock.close()
|
||||
self.logger.info(f'{type(self).__name__}: Successfully logged out of {self}')
|
||||
|
||||
def stopped(self):
|
||||
return self.stop_event is None or self.stop_event.is_set()
|
||||
|
||||
def _set_rt(self, cmd: str, val: Union[str, float]):
|
||||
"""Sends a string request command over a network."""
|
||||
self.socks[Socket.request].sendto(
|
||||
self.packet_request.header + f"{cmd}={val};".encode(),
|
||||
self.sock.sendto(
|
||||
self.packet_request.header + f'{cmd}={val};'.encode(),
|
||||
(socket.gethostbyname(self.ip), self.port),
|
||||
)
|
||||
self.packet_request.framecounter = (
|
||||
int.from_bytes(self.packet_request.framecounter, "little") + 1
|
||||
).to_bytes(4, "little")
|
||||
int.from_bytes(self.packet_request.framecounter, 'little') + 1
|
||||
).to_bytes(4, 'little')
|
||||
self.cache[cmd] = val
|
||||
|
||||
@script
|
||||
def sendtext(self, script):
|
||||
"""Sends a multiple parameter string over a network."""
|
||||
self.socks[Socket.request].sendto(
|
||||
self.sock.sendto(
|
||||
self.packet_request.header + script.encode(),
|
||||
(socket.gethostbyname(self.ip), self.port),
|
||||
)
|
||||
self.packet_request.framecounter = (
|
||||
int.from_bytes(self.packet_request.framecounter, "little") + 1
|
||||
).to_bytes(4, "little")
|
||||
self.logger.debug(f"sendtext: {script}")
|
||||
int.from_bytes(self.packet_request.framecounter, 'little') + 1
|
||||
).to_bytes(4, 'little')
|
||||
self.logger.debug(f'sendtext: {script}')
|
||||
time.sleep(self.DELAY)
|
||||
|
||||
@property
|
||||
@@ -145,7 +154,7 @@ class VbanCmd(metaclass=ABCMeta):
|
||||
@property
|
||||
def version(self) -> str:
|
||||
"""Returns Voicemeeter's version as a string"""
|
||||
return "{0}.{1}.{2}.{3}".format(*self.public_packet.voicemeeterversion)
|
||||
return '{0}.{1}.{2}.{3}'.format(*self.public_packet.voicemeeterversion)
|
||||
|
||||
@property
|
||||
def pdirty(self):
|
||||
@@ -184,16 +193,16 @@ class VbanCmd(metaclass=ABCMeta):
|
||||
"""
|
||||
|
||||
def target(key):
|
||||
match key.split("-"):
|
||||
case ["strip" | "bus" as kls, index] if index.isnumeric():
|
||||
match key.split('-'):
|
||||
case ['strip' | 'bus' as kls, index] if index.isnumeric():
|
||||
target = getattr(self, kls)
|
||||
case [
|
||||
"vban",
|
||||
"in" | "instream" | "out" | "outstream" as direction,
|
||||
'vban',
|
||||
'in' | 'instream' | 'out' | 'outstream' as direction,
|
||||
index,
|
||||
] if index.isnumeric():
|
||||
target = getattr(
|
||||
self.vban, f"{direction.removesuffix('stream')}stream"
|
||||
self.vban, f'{direction.removesuffix("stream")}stream'
|
||||
)
|
||||
case _:
|
||||
ERR_MSG = f"invalid config key '{key}'"
|
||||
@@ -207,36 +216,23 @@ class VbanCmd(metaclass=ABCMeta):
|
||||
"""applies a config from memory"""
|
||||
ERR_MSG = (
|
||||
f"No config with name '{name}' is loaded into memory",
|
||||
f"Known configs: {list(self.configs.keys())}",
|
||||
f'Known configs: {list(self.configs.keys())}',
|
||||
)
|
||||
try:
|
||||
config = self.configs[name]
|
||||
except KeyError as e:
|
||||
self.logger.error(("\n").join(ERR_MSG))
|
||||
raise VBANCMDError(("\n").join(ERR_MSG)) from e
|
||||
self.logger.error(('\n').join(ERR_MSG))
|
||||
raise VBANCMDError(('\n').join(ERR_MSG)) from e
|
||||
|
||||
if "extends" in config:
|
||||
extended = config["extends"]
|
||||
if 'extends' in config:
|
||||
extended = config['extends']
|
||||
config = {
|
||||
k: v
|
||||
for k, v in deep_merge(self.configs[extended], config)
|
||||
if k not in ("extends")
|
||||
if k not in ('extends')
|
||||
}
|
||||
self.logger.debug(
|
||||
f"profile '{name}' extends '{extended}', profiles merged.."
|
||||
)
|
||||
self.apply(config)
|
||||
self.logger.info(f"Profile '{name}' applied!")
|
||||
|
||||
def logout(self) -> None:
|
||||
if not self.stopped():
|
||||
self.logger.debug("events thread shutdown started")
|
||||
self.stop_event.set()
|
||||
if self.producer is not None:
|
||||
for t in (self.producer, self.subscriber):
|
||||
t.join()
|
||||
[sock.close() for sock in self.socks]
|
||||
self.logger.info(f"{type(self).__name__}: Successfully logged out of {self}")
|
||||
|
||||
def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
|
||||
self.logout()
|
||||
|
||||
@@ -6,7 +6,6 @@ from typing import Optional
|
||||
|
||||
from .error import VBANCMDConnectionError
|
||||
from .packet import HEADER_SIZE, SubscribeHeader, VbanRtPacket, VbanRtPacketHeader
|
||||
from .util import Socket
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -15,7 +14,7 @@ class Subscriber(threading.Thread):
|
||||
"""fire a subscription packet every 10 seconds"""
|
||||
|
||||
def __init__(self, remote, stop_event):
|
||||
super().__init__(name="subscriber", daemon=False)
|
||||
super().__init__(name='subscriber', daemon=False)
|
||||
self._remote = remote
|
||||
self.stop_event = stop_event
|
||||
self.logger = logger.getChild(self.__class__.__name__)
|
||||
@@ -24,20 +23,20 @@ class Subscriber(threading.Thread):
|
||||
def run(self):
|
||||
while not self.stopped():
|
||||
try:
|
||||
self._remote.socks[Socket.register].sendto(
|
||||
self._remote.sock.sendto(
|
||||
self.packet.header,
|
||||
(socket.gethostbyname(self._remote.ip), self._remote.port),
|
||||
)
|
||||
self.packet.framecounter = (
|
||||
int.from_bytes(self.packet.framecounter, "little") + 1
|
||||
).to_bytes(4, "little")
|
||||
int.from_bytes(self.packet.framecounter, 'little') + 1
|
||||
).to_bytes(4, 'little')
|
||||
self.wait_until_stopped(10)
|
||||
except socket.gaierror as e:
|
||||
self.logger.exception(f"{type(e).__name__}: {e}")
|
||||
self.logger.exception(f'{type(e).__name__}: {e}')
|
||||
raise VBANCMDConnectionError(
|
||||
f"unable to resolve hostname {self._remote.ip}"
|
||||
f'unable to resolve hostname {self._remote.ip}'
|
||||
) from e
|
||||
self.logger.debug(f"terminating {self.name} thread")
|
||||
self.logger.debug(f'terminating {self.name} thread')
|
||||
|
||||
def stopped(self):
|
||||
return self.stop_event.is_set()
|
||||
@@ -54,20 +53,17 @@ class Producer(threading.Thread):
|
||||
"""Continously send job queue to the Updater thread at a rate of self._remote.ratelimit."""
|
||||
|
||||
def __init__(self, remote, queue, stop_event):
|
||||
super().__init__(name="producer", daemon=False)
|
||||
super().__init__(name='producer', daemon=False)
|
||||
self._remote = remote
|
||||
self.queue = queue
|
||||
self.stop_event = stop_event
|
||||
self.logger = logger.getChild(self.__class__.__name__)
|
||||
self.packet_expected = VbanRtPacketHeader()
|
||||
self._remote.socks[Socket.response].settimeout(self._remote.timeout)
|
||||
self._remote.socks[Socket.response].bind(
|
||||
(socket.gethostbyname(socket.gethostname()), self._remote.port)
|
||||
)
|
||||
self._remote.sock.settimeout(self._remote.timeout)
|
||||
self._remote._public_packet = self._get_rt()
|
||||
(
|
||||
self._remote.cache["strip_level"],
|
||||
self._remote.cache["bus_level"],
|
||||
self._remote.cache['strip_level'],
|
||||
self._remote.cache['bus_level'],
|
||||
) = self._remote._get_levels(self._remote.public_packet)
|
||||
|
||||
def _get_rt(self) -> VbanRtPacket:
|
||||
@@ -83,7 +79,7 @@ class Producer(threading.Thread):
|
||||
|
||||
def _fetch_rt_packet(self) -> Optional[VbanRtPacket]:
|
||||
try:
|
||||
data, _ = self._remote.socks[Socket.response].recvfrom(2048)
|
||||
data, _ = self._remote.sock.recvfrom(2048)
|
||||
# do we have packet data?
|
||||
if len(data) > HEADER_SIZE:
|
||||
# is the packet of type VBAN RT response?
|
||||
@@ -114,9 +110,9 @@ class Producer(threading.Thread):
|
||||
_busLabelUTF8c60=data[932:1412],
|
||||
)
|
||||
except TimeoutError as e:
|
||||
self.logger.exception(f"{type(e).__name__}: {e}")
|
||||
self.logger.exception(f'{type(e).__name__}: {e}')
|
||||
raise VBANCMDConnectionError(
|
||||
f"timeout waiting for RtPacket from {self._remote.ip}"
|
||||
f'timeout waiting for RtPacket from {self._remote.ip}'
|
||||
) from e
|
||||
|
||||
def stopped(self):
|
||||
@@ -127,7 +123,7 @@ class Producer(threading.Thread):
|
||||
_pp = self._get_rt()
|
||||
pdirty = _pp.pdirty(self._remote.public_packet)
|
||||
ldirty = _pp.ldirty(
|
||||
self._remote.cache["strip_level"], self._remote.cache["bus_level"]
|
||||
self._remote.cache['strip_level'], self._remote.cache['bus_level']
|
||||
)
|
||||
|
||||
if pdirty or ldirty:
|
||||
@@ -136,11 +132,11 @@ class Producer(threading.Thread):
|
||||
self._remote._ldirty = ldirty
|
||||
|
||||
if self._remote.event.pdirty:
|
||||
self.queue.put("pdirty")
|
||||
self.queue.put('pdirty')
|
||||
if self._remote.event.ldirty:
|
||||
self.queue.put("ldirty")
|
||||
self.queue.put('ldirty')
|
||||
time.sleep(self._remote.ratelimit)
|
||||
self.logger.debug(f"terminating {self.name} thread")
|
||||
self.logger.debug(f'terminating {self.name} thread')
|
||||
self.queue.put(None)
|
||||
|
||||
|
||||
@@ -152,7 +148,7 @@ class Updater(threading.Thread):
|
||||
"""
|
||||
|
||||
def __init__(self, remote, queue):
|
||||
super().__init__(name="updater", daemon=True)
|
||||
super().__init__(name='updater', daemon=True)
|
||||
self._remote = remote
|
||||
self.queue = queue
|
||||
self.logger = logger.getChild(self.__class__.__name__)
|
||||
@@ -166,19 +162,19 @@ class Updater(threading.Thread):
|
||||
Generate _strip_comp, _bus_comp and update level cache if ldirty.
|
||||
"""
|
||||
while event := self.queue.get():
|
||||
if event == "pdirty" and self._remote.pdirty:
|
||||
if event == 'pdirty' and self._remote.pdirty:
|
||||
self._remote.subject.notify(event)
|
||||
elif event == "ldirty" and self._remote.ldirty:
|
||||
elif event == 'ldirty' and self._remote.ldirty:
|
||||
self._remote._strip_comp, self._remote._bus_comp = (
|
||||
self._remote._public_packet._strip_comp,
|
||||
self._remote._public_packet._bus_comp,
|
||||
)
|
||||
(
|
||||
self._remote.cache["strip_level"],
|
||||
self._remote.cache["bus_level"],
|
||||
self._remote.cache['strip_level'],
|
||||
self._remote.cache['bus_level'],
|
||||
) = (
|
||||
self._remote._public_packet.inputlevels,
|
||||
self._remote._public_packet.outputlevels,
|
||||
)
|
||||
self._remote.subject.notify(event)
|
||||
self.logger.debug(f"terminating {self.name} thread")
|
||||
self.logger.debug(f'terminating {self.name} thread')
|
||||
|
||||
Reference in New Issue
Block a user