24 Commits

Author SHA1 Message Date
cbcca14481 rename until_stopped() to wait_until_stopped() 2023-08-05 13:36:36 +01:00
f584d53835 patch bump 2023-08-05 13:34:56 +01:00
72d182a488 use Threading.Event object to terminate threads
until_stopped() added to Subscriber thread
2023-08-04 23:13:58 +01:00
ee32f92914 add missing constants
add docstrings that describes data breakdown

move SubscribeHeader above  VbanRtPacketHeader

expand assert failure string
2023-08-04 23:06:51 +01:00
3b65035e50 add double click event for slider 2023-08-04 21:14:33 +01:00
c8b4bde49d patch bump 2023-08-04 16:33:48 +01:00
47e9203b1e use walrus 2023-08-04 16:21:57 +01:00
d48e7ecd79 Correct type annotations None type. 2023-08-02 17:19:08 +01:00
7e09a0d321 VBANCMDConnectionError now subclasses VBANCMDError 2023-08-02 15:45:25 +01:00
d41ee1a12a remove redundant __str__ overrides 2023-07-26 11:32:20 +01:00
1e499cd99d patch bump 2023-07-25 16:23:02 +01:00
9bf52b5c11 num_strip_levels, num_bus_levels added to KindMaps 2023-07-25 16:22:47 +01:00
77ba347e99 fix bus.eq.on example in readme 2023-07-15 08:17:18 +01:00
94fa33cebf md fix 2023-07-13 08:58:06 +01:00
ef105d878b fix logging example 2023-07-13 08:52:42 +01:00
956f759e73 add Logging section to README. 2023-07-13 08:50:24 +01:00
dab519be9f implement midi, text vban streams
kindmaps updated

factory tests updated.

closes #2
2023-07-12 10:24:03 +01:00
a4b91bf5c6 deep_merge implemented
recursively merges dicts in profiles

patch bump
2023-07-12 04:52:50 +01:00
2a98707bf8 Adds ability to extend one config with another
apply_config() checks for 'extends' in TOML config

2.3.0 section added to CHANGELOG

three example extender.toml configs added

minor version bump
2023-07-11 20:27:52 +01:00
8e30c57020 minor version bump 2023-07-08 17:25:53 +01:00
04e18b304b log params on successful connection
raise VBANCMDError if invalid config key in apply_config()
2023-07-08 17:25:38 +01:00
4de384c66c repr method added to factory base 2023-07-08 07:59:51 +01:00
2c8659a4e5 apply extended to support button, vban 2023-07-08 07:59:35 +01:00
41e427e46b button and vban classes added
button is a placeholder class, though.
2023-07-08 07:34:30 +01:00
20 changed files with 619 additions and 134 deletions

View File

@@ -11,6 +11,29 @@ Before any major/minor/patch bump all unit tests will be run to verify they pass
- [x]
## [2.3.2] - 2023-07-12
### Added
- vban.{instream,outstream} tuples now contain classes that represent MIDI and TEXT streams.
### Fixed
- apply_config() now performs a deep merge when extending a config with another.
## [2.3.0] - 2023-07-11
### Added
- user configs may now extend other user configs. check `config extends` section in README.
## [2.2.0] - 2023-07-08
### Added
- button, vban classes implemented
- \__repr\__() method added to base class
## [2.1.2] - 2023-07-05
### Added

View File

@@ -8,7 +8,7 @@
# VBAN CMD
This python interface allows you to get and set Voicemeeter parameter values over a network.
This python interface allows you to transmit Voicemeeter parameters over a network.
It may be used standalone or to extend the [Voicemeeter Remote Python API](https://github.com/onyx-and-iris/voicemeeter-api-python)
@@ -44,7 +44,7 @@ port = 6980
streamname = "Command1"
```
It should be placed next to your `__main__.py` file.
It should be placed in \<user home directory\> / "Documents" / "Voicemeeter" / "configs"
Alternatively you may pass `ip`, `port`, `streamname` as keyword arguments.
@@ -251,7 +251,6 @@ The following properties are available.
example:
```python
vban.bus[4].eq = true
print(vban.bus[0].label)
```
@@ -262,6 +261,10 @@ The following properties are available.
- `on`: boolean
- `ab`: boolean
```python
vban.bus[4].eq.on = true
```
##### Modes
The following properties are available.
@@ -359,8 +362,8 @@ vban.apply(
Or for each class you may do:
```python
vban.strip[0].apply(mute: true, gain: 3.2, A1: true)
vban.bus[0].apply(A1: true)
vban.strip[0].apply({"mute": True, "gain": 3.2, "A1": True})
vban.vban.outstream[0].apply({"on": True, "name": "streamname", "bit": 24})
```
## Config Files
@@ -369,7 +372,7 @@ vban.bus[0].apply(A1: true)
You may load config files in TOML format.
Three example configs have been included with the package. Remember to save
current settings before loading a user config. To set one you may do:
current settings before loading a user config. To load one you may do:
```python
import vban_cmd
@@ -379,6 +382,27 @@ with vban_cmd.api('banana') as vban:
will load a config file at configs/banana/example.toml for Voicemeeter Banana.
Your configs may be located in one of the following paths:
- \<current working directory\> / "configs" / kind_id
- \<user home directory\> / ".config" / "vban-cmd" / kind_id
- \<user home directory\> / "Documents" / "Voicemeeter" / "configs" / kind_id
If a config with the same name is located in multiple locations, only the first one found is loaded into memory, in the above order.
#### `config extends`
You may also load a config that extends another config with overrides or additional parameters.
You just need to define a key `extends` in the config TOML, that names the config to be extended.
Three example 'extender' configs are included with the repo. You may load them with:
```python
import voicemeeterlib
with voicemeeterlib.api('banana') as vm:
vm.apply_config('extender')
```
## Events
Level updates are considered high volume, by default they are NOT listened for. Use `subs` keyword arg to initialize event updates.
@@ -485,12 +509,27 @@ Returns a `VbanRtPacket`. Designed to be used internally by the interface but av
States not guaranteed to be current (requires use of dirty parameters to confirm).
### `Errors`
## Errors
- `errors.VBANCMDError`: Exception raised when general errors occur.
- `errors.VBANCMDConnectionError`: Exception raised when connection/timeout errors occur.
### `Tests`
## Logging
It's possible to see the messages sent by the interface's setters and getters, may be useful for debugging.
example:
```python
import vban_cmd
logging.basicConfig(level=logging.DEBUG)
opts = {"ip": "ip.local", "port": 6980, "streamname": "Command1"}
with vban_cmd.api('banana', **opts) as vban:
...
```
## Tests
First make sure you installed the [development dependencies](https://github.com/onyx-and-iris/vban-cmd-python#installation)

View File

@@ -0,0 +1,12 @@
extends = "example"
[strip-0]
label = "strip0_extended"
A1 = false
gain = 0.0
[bus-0]
label = "bus0_extended"
mute = false
[vban-in-3]
name = "vban_extended"

View File

@@ -0,0 +1,12 @@
extends = "example"
[strip-0]
label = "strip0_extended"
A1 = false
gain = 0.0
[bus-0]
label = "bus0_extended"
mute = false
[vban-in-3]
name = "vban_extended"

View File

@@ -0,0 +1,12 @@
extends = "example"
[strip-0]
label = "strip0_extended"
A1 = false
gain = 0.0
[bus-0]
label = "bus0_extended"
mute = false
[vban-in-3]
name = "vban_extended"

View File

@@ -8,6 +8,8 @@ from tkinter import ttk
class App(tk.Tk):
INDEX = 3
def __init__(self, vban):
super().__init__()
self.vban = vban
@@ -15,8 +17,8 @@ class App(tk.Tk):
self.vban.observer.add(self.on_ldirty)
# create widget variables
self.button_var = tk.BooleanVar(value=vban.strip[3].mute)
self.slider_var = tk.DoubleVar(value=vban.strip[3].gain)
self.button_var = tk.BooleanVar(value=vban.strip[self.INDEX].mute)
self.slider_var = tk.DoubleVar(value=vban.strip[self.INDEX].gain)
self.meter_var = tk.DoubleVar(value=self._get_level())
self.gainlabel_var = tk.StringVar(value=self.slider_var.get())
@@ -24,11 +26,12 @@ class App(tk.Tk):
self.style = ttk.Style()
self.style.theme_use("clam")
self.style.configure(
"Mute.TButton", foreground="#cd5c5c" if vban.strip[3].mute else "#5a5a5a"
"Mute.TButton",
foreground="#cd5c5c" if vban.strip[self.INDEX].mute else "#5a5a5a",
)
# create labelframe and grid it onto the mainframe
self.labelframe = tk.LabelFrame(text=self.vban.strip[3].label)
self.labelframe = tk.LabelFrame(text=self.vban.strip[self.INDEX].label)
self.labelframe.grid(padx=1)
# create slider and grid it onto the labelframe
@@ -44,6 +47,7 @@ class App(tk.Tk):
column=0,
row=0,
)
slider.bind("<Double-Button-1>", self.on_button_double_click)
# create level meter and grid it onto the labelframe
level_meter = ttk.Progressbar(
@@ -72,18 +76,23 @@ class App(tk.Tk):
def on_slider_move(self, *args):
val = round(self.slider_var.get(), 1)
self.vban.strip[3].gain = val
self.vban.strip[self.INDEX].gain = val
self.gainlabel_var.set(val)
def on_button_press(self):
self.button_var.set(not self.button_var.get())
self.vban.strip[3].mute = self.button_var.get()
self.vban.strip[self.INDEX].mute = self.button_var.get()
self.style.configure(
"Mute.TButton", foreground="#cd5c5c" if self.button_var.get() else "#5a5a5a"
)
def on_button_double_click(self, e):
self.slider_var.set(0)
self.gainlabel_var.set(0)
self.vban.strip[self.INDEX].gain = 0
def _get_level(self):
val = max(self.vban.strip[3].levels.prefader)
val = max(self.vban.strip[self.INDEX].levels.prefader)
return 0 if self.button_var.get() else 72 + val - 12 + self.slider_var.get()
def on_ldirty(self):

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "vban-cmd"
version = "2.1.2"
version = "2.4.3"
description = "Python interface for the VBAN RT Packet Service (Sendtext)"
authors = ["onyx-and-iris <code@onyxandiris.online>"]
license = "MIT"

View File

@@ -14,9 +14,13 @@ class TestRemoteFactories:
assert hasattr(vban, "strip")
assert hasattr(vban, "bus")
assert hasattr(vban, "command")
assert hasattr(vban, "button")
assert hasattr(vban, "vban")
assert len(vban.strip) == 3
assert len(vban.bus) == 2
assert len(vban.button) == 80
assert len(vban.vban.instream) == 6 and len(vban.vban.outstream) == 5
@pytest.mark.skipif(
data.name != "banana",
@@ -26,9 +30,13 @@ class TestRemoteFactories:
assert hasattr(vban, "strip")
assert hasattr(vban, "bus")
assert hasattr(vban, "command")
assert hasattr(vban, "button")
assert hasattr(vban, "vban")
assert len(vban.strip) == 5
assert len(vban.bus) == 5
assert len(vban.button) == 80
assert len(vban.vban.instream) == 10 and len(vban.vban.outstream) == 9
@pytest.mark.skipif(
data.name != "potato",
@@ -38,6 +46,10 @@ class TestRemoteFactories:
assert hasattr(vban, "strip")
assert hasattr(vban, "bus")
assert hasattr(vban, "command")
assert hasattr(vban, "button")
assert hasattr(vban, "vban")
assert len(vban.strip) == 8
assert len(vban.bus) == 8
assert len(vban.button) == 80
assert len(vban.vban.instream) == 10 and len(vban.vban.outstream) == 9

View File

@@ -102,7 +102,7 @@ class BusLevel(IRemote):
def fget(i):
return round((((1 << 16) - 1) - i) * -0.01, 1)
if self._remote.running and self._remote.event.ldirty:
if not self._remote.stopped() and self._remote.event.ldirty:
return tuple(
fget(i)
for i in self._remote.cache["bus_level"][self.range[0] : self.range[-1]]

View File

@@ -1,6 +1,6 @@
class VBANCMDError(Exception):
"""Exception raised when general errors occur"""
"""Base VBANCMD Exception class. Raised when general errors occur"""
class VBANCMDConnectionError(Exception):
class VBANCMDConnectionError(VBANCMDError):
"""Exception raised when connection/timeout errors occur"""

View File

@@ -2,7 +2,7 @@ import logging
from abc import abstractmethod
from enum import IntEnum
from functools import cached_property
from typing import Iterable, NoReturn
from typing import Iterable
from .bus import request_bus_obj as bus
from .command import Command
@@ -10,7 +10,9 @@ from .config import request_config as configs
from .error import VBANCMDError
from .kinds import KindMapClass
from .kinds import request_kind_map as kindmap
from .macrobutton import MacroButton
from .strip import request_strip_obj as strip
from .vban import request_vban_obj as vban
from .vbancmd import VbanCmd
logger = logging.getLogger(__name__)
@@ -23,7 +25,9 @@ class FactoryBuilder:
Separates construction from representation.
"""
BuilderProgress = IntEnum("BuilderProgress", "strip bus command", start=0)
BuilderProgress = IntEnum(
"BuilderProgress", "strip bus command macrobutton vban", start=0
)
def __init__(self, factory, kind: KindMapClass):
self._factory = factory
@@ -32,10 +36,12 @@ class FactoryBuilder:
f"Finished building strips for {self._factory}",
f"Finished building buses for {self._factory}",
f"Finished building commands for {self._factory}",
f"Finished building macrobuttons for {self._factory}",
f"Finished building vban in/out streams for {self._factory}",
)
self.logger = logger.getChild(self.__class__.__name__)
def _pinfo(self, name: str) -> NoReturn:
def _pinfo(self, name: str) -> None:
"""prints progress status for each step"""
name = name.split("_")[1]
self.logger.info(self._info[int(getattr(self.BuilderProgress, name))])
@@ -58,6 +64,14 @@ class FactoryBuilder:
self._factory.command = Command.make(self._factory)
return self
def make_macrobutton(self):
self._factory.button = tuple(MacroButton(self._factory, i) for i in range(80))
return self
def make_vban(self):
self._factory.vban = vban(self._factory)
return self
class FactoryBase(VbanCmd):
"""Base class for factories, subclasses VbanCmd."""
@@ -86,12 +100,20 @@ class FactoryBase(VbanCmd):
self.builder.make_strip,
self.builder.make_bus,
self.builder.make_command,
self.builder.make_macrobutton,
self.builder.make_vban,
)
self._configs = None
def __str__(self) -> str:
return f"Voicemeeter {self.kind}"
def __repr__(self):
return (
type(self).__name__
+ f"({self.kind}, ip='{self.ip}', port={self.port}, streamname='{self.streamname}')"
)
@property
@abstractmethod
def steps(self):

View File

@@ -110,6 +110,7 @@ class IRemote(metaclass=ABCMeta):
cmd += (f".{param}",)
return "".join(cmd)
@property
@abstractmethod
def identifier(self):
pass

View File

@@ -53,6 +53,14 @@ class KindMapClass(metaclass=SingletonType):
def num_bus(self):
return sum(self.outs)
@property
def num_strip_levels(self) -> int:
return 2 * self.phys_in + 8 * self.virt_in
@property
def num_bus_levels(self) -> int:
return 8 * (self.phys_out + self.virt_out)
def __str__(self) -> str:
return self.name.capitalize()
@@ -62,7 +70,7 @@ class BasicMap(KindMapClass):
name: str
ins: tuple = (2, 1)
outs: tuple = (1, 1)
vban: tuple = (4, 4)
vban: tuple = (4, 4, 1, 1)
@dataclass
@@ -70,7 +78,7 @@ class BananaMap(KindMapClass):
name: str
ins: tuple = (3, 2)
outs: tuple = (3, 2)
vban: tuple = (8, 8)
vban: tuple = (8, 8, 1, 1)
@dataclass
@@ -78,7 +86,7 @@ class PotatoMap(KindMapClass):
name: str
ins: tuple = (5, 3)
outs: tuple = (5, 3)
vban: tuple = (8, 8)
vban: tuple = (8, 8, 1, 1)
def kind_factory(kind_id):

36
vban_cmd/macrobutton.py Normal file
View File

@@ -0,0 +1,36 @@
from .iremote import IRemote
class MacroButton(IRemote):
"""A placeholder class in case this interface is being used interchangeably with the Remote API"""
def __str__(self):
return f"{type(self).__name__}{self._remote.kind}{self.index}"
@property
def identifier(self):
return f"command.button[{self.index}]"
@property
def state(self) -> bool:
self.logger.warning("button.state commands are not supported over VBAN")
@state.setter
def state(self, _):
self.logger.warning("button.state commands are not supported over VBAN")
@property
def stateonly(self) -> bool:
self.logger.warning("button.stateonly commands are not supported over VBAN")
@stateonly.setter
def stateonly(self, v):
self.logger.warning("button.stateonly commands are not supported over VBAN")
@property
def trigger(self) -> bool:
self.logger.warning("button.trigger commands are not supported over VBAN")
@trigger.setter
def trigger(self, _):
self.logger.warning("button.trigger commands are not supported over VBAN")

View File

@@ -3,10 +3,14 @@ from dataclasses import dataclass
from .kinds import KindMapClass
from .util import comp
VBAN_PROTOCOL_TXT = 0x40
VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 + 4
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
@dataclass
@@ -14,28 +18,28 @@ class VbanRtPacket:
"""Represents the body of a VBAN RT data packet"""
_kind: KindMapClass
_voicemeeterType: bytes
_reserved: bytes
_buffersize: bytes
_voicemeeterVersion: bytes
_optionBits: bytes
_samplerate: bytes
_inputLeveldB100: bytes
_outputLeveldB100: bytes
_TransportBit: bytes
_stripState: bytes
_busState: bytes
_stripGaindB100Layer1: bytes
_stripGaindB100Layer2: bytes
_stripGaindB100Layer3: bytes
_stripGaindB100Layer4: bytes
_stripGaindB100Layer5: bytes
_stripGaindB100Layer6: bytes
_stripGaindB100Layer7: bytes
_stripGaindB100Layer8: bytes
_busGaindB100: bytes
_stripLabelUTF8c60: bytes
_busLabelUTF8c60: bytes
_voicemeeterType: bytes # data[28:29]
_reserved: bytes # data[29:30]
_buffersize: bytes # data[30:32]
_voicemeeterVersion: bytes # data[32:36]
_optionBits: bytes # data[36:40]
_samplerate: bytes # data[40:44]
_inputLeveldB100: bytes # data[44:112]
_outputLeveldB100: bytes # data[112:240]
_TransportBit: bytes # data[240:244]
_stripState: bytes # data[244:276]
_busState: bytes # data[276:308]
_stripGaindB100Layer1: bytes # data[308:324]
_stripGaindB100Layer2: bytes # data[324:340]
_stripGaindB100Layer3: bytes # data[340:356]
_stripGaindB100Layer4: bytes # data[356:372]
_stripGaindB100Layer5: bytes # data[372:388]
_stripGaindB100Layer6: bytes # data[388:404]
_stripGaindB100Layer7: bytes # data[404:420]
_stripGaindB100Layer8: bytes # data[420:436]
_busGaindB100: bytes # data[436:452]
_stripLabelUTF8c60: bytes # data[452:932]
_busLabelUTF8c60: bytes # data[932:1412]
def _generate_levels(self, levelarray) -> tuple:
return tuple(
@@ -103,12 +107,12 @@ class VbanRtPacket:
@property
def inputlevels(self) -> tuple:
"""returns the entire level array across all inputs for a kind"""
return self.strip_levels[0 : (2 * self._kind.phys_in + 8 * self._kind.virt_in)]
return self.strip_levels[0 : self._kind.num_strip_levels]
@property
def outputlevels(self) -> tuple:
"""returns the entire level array across all outputs for a kind"""
return self.bus_levels[0 : 8 * self._kind.num_bus]
return self.bus_levels[0 : self._kind.num_bus_levels]
@property
def stripstate(self) -> tuple:
@@ -206,13 +210,42 @@ class VbanRtPacket:
)
@dataclass
class SubscribeHeader:
"""Represents the header an RT Packet Service subscription packet"""
name = "Register RTP"
timeout = 15
vban: bytes = "VBAN".encode()
format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, "little")
format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, "little") # timeout
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
framecounter: bytes = (0).to_bytes(4, "little")
@property
def header(self):
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
header += self.framecounter
assert (
len(header) == HEADER_SIZE + 4
), f"expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE +4} bytes total)"
return header
@dataclass
class VbanRtPacketHeader:
"""Represents the header of VBAN RT data packet"""
"""Represents the header of a VBAN RT response packet"""
name = "Voicemeeter-RTP"
vban: bytes = "VBAN".encode()
format_sr: bytes = (0x60).to_bytes(1, "little")
format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKET).to_bytes(1, "little")
format_bit: bytes = (0).to_bytes(1, "little")
@@ -226,13 +259,13 @@ class VbanRtPacketHeader:
header += self.format_nbc
header += self.format_bit
header += self.streamname
assert len(header) == HEADER_SIZE - 4, f"Header expected {HEADER_SIZE-4} bytes"
assert len(header) == HEADER_SIZE, f"expected header size {HEADER_SIZE} bytes"
return header
@dataclass
class RequestHeader:
"""Represents a REQUEST RT PACKET header"""
"""Represents the header of an REQUEST RT PACKET"""
name: str
bps_index: int
@@ -244,7 +277,7 @@ class RequestHeader:
@property
def sr(self):
return (0x40 + self.bps_index).to_bytes(1, "little")
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little")
@property
def nbc(self):
@@ -263,32 +296,7 @@ class RequestHeader:
header += self.bit
header += self.streamname
header += self.framecounter
assert len(header) == HEADER_SIZE, f"Header expected {HEADER_SIZE} bytes"
return header
@dataclass
class SubscribeHeader:
"""Represents a packet used to subscribe to the RT Packet Service"""
name = "Register RTP"
timeout = 15
vban: bytes = "VBAN".encode()
format_sr: bytes = (0x60).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, "little")
format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, "little") # timeout
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
framecounter: bytes = (0).to_bytes(4, "little")
@property
def header(self):
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
header += self.framecounter
assert len(header) == HEADER_SIZE, f"Header expected {HEADER_SIZE} bytes"
assert (
len(header) == HEADER_SIZE + 4
), f"expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE +4} bytes total)"
return header

View File

@@ -296,7 +296,7 @@ class StripLevel(IRemote):
def fget(i):
return round((((1 << 16) - 1) - i) * -0.01, 1)
if self._remote.running and self._remote.event.ldirty:
if not self._remote.stopped() and self._remote.event.ldirty:
return tuple(
fget(i)
for i in self._remote.cache["strip_level"][

View File

@@ -73,4 +73,18 @@ def comp(t0: tuple, t1: tuple) -> Iterator[bool]:
yield True
def deep_merge(dict1, dict2):
"""Generator function for deep merging two dicts"""
for k in set(dict1) | set(dict2):
if k in dict1 and k in dict2:
if isinstance(dict1[k], dict) and isinstance(dict2[k], dict):
yield k, dict(deep_merge(dict1[k], dict2[k]))
else:
yield k, dict2[k]
elif k in dict1:
yield k, dict1[k]
else:
yield k, dict2[k]
Socket = IntEnum("Socket", "register request response", start=0)

242
vban_cmd/vban.py Normal file
View File

@@ -0,0 +1,242 @@
from abc import abstractmethod
from .iremote import IRemote
from .kinds import kinds_all
class VbanStream(IRemote):
"""
Implements the common interface
Defines concrete implementation for vban stream
"""
@abstractmethod
def __str__(self):
pass
@property
def identifier(self) -> str:
return f"vban.{self.direction}stream[{self.index}]"
@property
def on(self) -> bool:
return
@on.setter
def on(self, val: bool):
self.setter("on", 1 if val else 0)
@property
def name(self) -> str:
return
@name.setter
def name(self, val: str):
self.setter("name", val)
@property
def ip(self) -> str:
return
@ip.setter
def ip(self, val: str):
self.setter("ip", val)
@property
def port(self) -> int:
return
@port.setter
def port(self, val: int):
if not 1024 <= val <= 65535:
self.logger.warning(
f"port got: {val} but expected a value from 1024 to 65535"
)
self.setter("port", val)
@property
def sr(self) -> int:
return
@sr.setter
def sr(self, val: int):
opts = (11025, 16000, 22050, 24000, 32000, 44100, 48000, 64000, 88200, 96000)
if val not in opts:
self.logger.warning(f"sr got: {val} but expected a value in {opts}")
self.setter("sr", val)
@property
def channel(self) -> int:
return
@channel.setter
def channel(self, val: int):
if not 1 <= val <= 8:
self.logger.warning(f"channel got: {val} but expected a value from 1 to 8")
self.setter("channel", val)
@property
def bit(self) -> int:
return
@bit.setter
def bit(self, val: int):
if val not in (16, 24):
self.logger.warning(f"bit got: {val} but expected value 16 or 24")
self.setter("bit", 1 if (val == 16) else 2)
@property
def quality(self) -> int:
return
@quality.setter
def quality(self, val: int):
if not 0 <= val <= 4:
self.logger.warning(f"quality got: {val} but expected a value from 0 to 4")
self.setter("quality", val)
@property
def route(self) -> int:
return
@route.setter
def route(self, val: int):
if not 0 <= val <= 8:
self.logger.warning(f"route got: {val} but expected a value from 0 to 8")
self.setter("route", val)
class VbanInstream(VbanStream):
"""
class representing a vban instream
subclasses VbanStream
"""
def __str__(self):
return f"{type(self).__name__}{self._remote.kind}{self.index}"
@property
def direction(self) -> str:
return "in"
@property
def sr(self) -> int:
return
@property
def channel(self) -> int:
return
@property
def bit(self) -> int:
return
class VbanAudioInstream(VbanInstream):
"""Represents a VBAN Audio Instream"""
class VbanMidiInstream(VbanInstream):
"""Represents a VBAN Midi Instream"""
class VbanTextInstream(VbanInstream):
"""Represents a VBAN Text Instream"""
class VbanOutstream(VbanStream):
"""
class representing a vban outstream
Subclasses VbanStream
"""
def __str__(self):
return f"{type(self).__name__}{self._remote.kind}{self.index}"
@property
def direction(self) -> str:
return "out"
class VbanAudioOutstream(VbanOutstream):
"""Represents a VBAN Audio Outstream"""
class VbanMidiOutstream(VbanOutstream):
"""Represents a VBAN Midi Outstream"""
def _make_stream_pair(remote, kind):
num_instream, num_outstream, num_midi, num_text = kind.vban
def _generate_streams(i, dir):
"""generator function for creating instream/outstream tuples"""
if dir == "in":
if i < num_instream:
yield VbanAudioInstream
elif i < num_instream + num_midi:
yield VbanMidiInstream
else:
yield VbanTextInstream
else:
if i < num_outstream:
yield VbanAudioOutstream
else:
yield VbanMidiOutstream
return (
tuple(
cls(remote, i)
for i in range(num_instream + num_midi + num_text)
for cls in _generate_streams(i, "in")
),
tuple(
cls(remote, i)
for i in range(num_outstream + num_midi)
for cls in _generate_streams(i, "out")
),
)
def _make_stream_pairs(remote):
return {kind.name: _make_stream_pair(remote, kind) for kind in kinds_all}
class Vban:
"""
class representing the vban module
Contains two tuples, one for each stream type
"""
def __init__(self, remote):
self.remote = remote
self.instream, self.outstream = _make_stream_pairs(remote)[remote.kind.name]
def enable(self):
"""if VBAN disabled there can be no communication with it"""
def disable(self):
self.remote._set_rt("vban.Enable", 0)
def vban_factory(remote) -> Vban:
"""
Factory method for vban
Returns a class that represents the VBAN module.
"""
VBAN_cls = Vban
return type(f"{VBAN_cls.__name__}", (VBAN_cls,), {})(remote)
def request_vban_obj(remote) -> Vban:
"""
Vban entry point.
Returns a reference to a Vban class of a kind
"""
return vban_factory(remote)

View File

@@ -1,16 +1,17 @@
import logging
import socket
import threading
import time
from abc import ABCMeta, abstractmethod
from pathlib import Path
from queue import Queue
from typing import Iterable, Optional, Union
from typing import Iterable, Union
from .error import VBANCMDError
from .event import Event
from .packet import RequestHeader
from .subject import Subject
from .util import Socket, script
from .util import Socket, deep_merge, script
from .worker import Producer, Subscriber, Updater
logger = logging.getLogger(__name__)
@@ -64,8 +65,9 @@ class VbanCmd(metaclass=ABCMeta):
def get_filepath():
filepaths = [
Path.cwd() / "vban.toml",
Path.cwd() / "configs" / "vban.toml",
Path.home() / ".config" / "vban-cmd" / "vban.toml",
Path.home() / "Documents" / "Voicemeeter" / "vban.toml",
Path.home() / "Documents" / "Voicemeeter" / "configs" / "vban.toml",
]
for filepath in filepaths:
if filepath.exists():
@@ -75,32 +77,39 @@ class VbanCmd(metaclass=ABCMeta):
with open(filepath, "rb") as f:
conn = tomllib.load(f)
assert (
"ip" in conn["connection"]
), "please provide ip, by kwarg or config"
"connection" in conn and "ip" in conn["connection"]
), "expected [connection][ip] in vban config"
return conn["connection"]
else:
raise VBANCMDError("no ip provided and no vban.toml located.")
def __enter__(self):
self.login()
return self
def login(self):
def login(self) -> None:
"""Starts the subscriber and updater threads (unless in outbound mode)"""
if not self.outbound:
self.running = True
self.event.info()
self.subscriber = Subscriber(self)
self.stop_event = threading.Event()
self.stop_event.clear()
self.subscriber = Subscriber(self, self.stop_event)
self.subscriber.start()
queue = Queue()
self.updater = Updater(self, queue)
self.updater.start()
self.producer = Producer(self, queue)
self.producer = Producer(self, queue, self.stop_event)
self.producer.start()
self.logger.info(f"{type(self).__name__}: Successfully logged into {self}")
self.logger.info(
"Successfully logged into VBANCMD {kind} with ip='{ip}', port={port}, streamname='{streamname}'".format(
**self.__dict__
)
)
def stopped(self):
return self.stop_event.is_set()
def _set_rt(self, cmd: str, val: Union[str, float]):
"""Sends a string request command over a network."""
@@ -123,7 +132,7 @@ class VbanCmd(metaclass=ABCMeta):
self.packet_request.framecounter = (
int.from_bytes(self.packet_request.framecounter, "little") + 1
).to_bytes(4, "little")
self.logger.debug(f"sendtext: [{self.ip}:{self.port}] {script}")
self.logger.debug(f"sendtext: {script}")
time.sleep(self.DELAY)
@property
@@ -150,7 +159,7 @@ class VbanCmd(metaclass=ABCMeta):
def public_packet(self):
return self._public_packet
def clear_dirty(self):
def clear_dirty(self) -> None:
while self.pdirty:
time.sleep(self.DELAY)
@@ -175,30 +184,46 @@ class VbanCmd(metaclass=ABCMeta):
def param(key):
obj, m2, *rem = key.split("-")
index = int(m2) if m2.isnumeric() else int(*rem)
if obj in ("strip", "bus"):
if obj in ("strip", "bus", "button"):
return getattr(self, obj)[index]
else:
elif obj == "vban":
return getattr(getattr(self, obj), f"{m2}stream")[index]
raise ValueError(obj)
[param(key).apply(datum).then_wait() for key, datum in data.items()]
def apply_config(self, name):
"""applies a config from memory"""
error_msg = (
ERR_MSG = (
f"No config with name '{name}' is loaded into memory",
f"Known configs: {list(self.configs.keys())}",
)
try:
self.apply(self.configs[name])
self.logger.info(f"Profile '{name}' applied!")
except KeyError:
self.logger.error(("\n").join(error_msg))
config = self.configs[name]
except KeyError as e:
self.logger.error(("\n").join(ERR_MSG))
raise VBANCMDError(("\n").join(ERR_MSG)) from e
def logout(self):
self.running = False
time.sleep(0.2)
if "extends" in config:
extended = config["extends"]
config = {
k: v
for k, v in deep_merge(self.configs[extended], config)
if k not in ("extends")
}
self.logger.debug(
f"profile '{name}' extends '{extended}', profiles merged.."
)
self.apply(config)
self.logger.info(f"Profile '{name}' applied!")
def logout(self) -> None:
if not self.stopped():
self.logger.debug("events thread shutdown started")
self.stop_event.set()
self.subscriber.join() # wait for subscriber thread to complete cycle
[sock.close() for sock in self.socks]
self.logger.info(f"{type(self).__name__}: Successfully logged out of {self}")
def __exit__(self, exc_type, exc_value, exc_traceback):
def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
self.logout()

View File

@@ -14,14 +14,15 @@ logger = logging.getLogger(__name__)
class Subscriber(threading.Thread):
"""fire a subscription packet every 10 seconds"""
def __init__(self, remote):
super().__init__(name="subscriber", daemon=True)
def __init__(self, remote, stop_event):
super().__init__(name="subscriber", daemon=False)
self._remote = remote
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__)
self.packet = SubscribeHeader()
def run(self):
while self._remote.running:
while not self.stopped():
try:
self._remote.socks[Socket.register].sendto(
self.packet.header,
@@ -30,23 +31,39 @@ class Subscriber(threading.Thread):
self.packet.framecounter = (
int.from_bytes(self.packet.framecounter, "little") + 1
).to_bytes(4, "little")
time.sleep(10)
self.wait_until_stopped(10)
except socket.gaierror as e:
self.logger.exception(f"{type(e).__name__}: {e}")
raise VBANCMDConnectionError(
f"unable to resolve hostname {self._remote.ip}"
) from e
self.logger.debug(f"terminating {self.name} thread")
def stopped(self):
return self.stop_event.is_set()
def wait_until_stopped(self, timeout, period=0.2):
must_end = time.time() + timeout
while time.time() < must_end:
if self.stopped():
break
time.sleep(period)
class Producer(threading.Thread):
"""Continously send job queue to the Updater thread at a rate of self._remote.ratelimit."""
def __init__(self, remote, queue):
super().__init__(name="producer", daemon=True)
def __init__(self, remote, queue, stop_event):
super().__init__(name="producer", daemon=False)
self._remote = remote
self.queue = queue
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__)
self.packet_expected = VbanRtPacketHeader()
self._remote.socks[Socket.response].settimeout(self._remote.timeout)
self._remote.socks[Socket.response].bind(
(socket.gethostbyname(socket.gethostname()), self._remote.port)
)
self._remote._public_packet = self._get_rt()
(
self._remote.cache["strip_level"],
@@ -60,7 +77,6 @@ class Producer(threading.Thread):
data = None
while not data:
data = self._fetch_rt_packet()
time.sleep(self._remote.DELAY)
return data
return fget()
@@ -68,10 +84,10 @@ class Producer(threading.Thread):
def _fetch_rt_packet(self) -> Optional[VbanRtPacket]:
try:
data, _ = self._remote.socks[Socket.response].recvfrom(2048)
# check for packet data
# do we have packet data?
if len(data) > HEADER_SIZE:
# check if packet is of type rt packet response
if self.packet_expected.header == data[: HEADER_SIZE - 4]:
# is the packet of type VBAN RT response?
if self.packet_expected.header == data[:HEADER_SIZE]:
return VbanRtPacket(
_kind=self._remote.kind,
_voicemeeterType=data[28:29],
@@ -103,8 +119,11 @@ class Producer(threading.Thread):
f"timeout waiting for RtPacket from {self._remote.ip}"
) from e
def stopped(self):
return self.stop_event.is_set()
def run(self):
while self._remote.running:
while not self.stopped():
_pp = self._get_rt()
pdirty = _pp.pdirty(self._remote.public_packet)
ldirty = _pp.ldirty(
@@ -137,13 +156,8 @@ class Updater(threading.Thread):
self._remote = remote
self.queue = queue
self.logger = logger.getChild(self.__class__.__name__)
self._remote.socks[Socket.response].settimeout(self._remote.timeout)
self._remote.socks[Socket.response].bind(
(socket.gethostbyname(socket.gethostname()), self._remote.port)
)
p_in, v_in = self._remote.kind.ins
self._remote._strip_comp = [False] * (2 * p_in + 8 * v_in)
self._remote._bus_comp = [False] * (self._remote.kind.num_bus * 8)
self._remote._strip_comp = [False] * (self._remote.kind.num_strip_levels)
self._remote._bus_comp = [False] * (self._remote.kind.num_bus_levels)
def run(self):
"""
@@ -151,12 +165,7 @@ class Updater(threading.Thread):
Generate _strip_comp, _bus_comp and update level cache if ldirty.
"""
while True:
event = self.queue.get()
if event is None:
self.logger.debug(f"terminating {self.name} thread")
break
while event := self.queue.get():
if event == "pdirty" and self._remote.pdirty:
self._remote.subject.notify(event)
elif event == "ldirty" and self._remote.ldirty:
@@ -172,3 +181,4 @@ class Updater(threading.Thread):
self._remote._public_packet.outputlevels,
)
self._remote.subject.notify(event)
self.logger.debug(f"terminating {self.name} thread")