Compare commits

..

32 Commits
v2.9.0 ... dev

Author SHA1 Message Date
ba85373f94 move steps abstract method into VbanCmd. 2026-03-16 23:16:01 +00:00
22cc980fc2
Merge pull request #7 from onyx-and-iris/dependabot/pip/virtualenv-20.36.1
Bump virtualenv from 20.29.0 to 20.36.1
2026-03-15 16:20:26 +00:00
dependabot[bot]
2986d451ad
Bump virtualenv from 20.29.0 to 20.36.1
Bumps [virtualenv](https://github.com/pypa/virtualenv) from 20.29.0 to 20.36.1.
- [Release notes](https://github.com/pypa/virtualenv/releases)
- [Changelog](https://github.com/pypa/virtualenv/blob/main/docs/changelog.rst)
- [Commits](https://github.com/pypa/virtualenv/compare/20.29.0...20.36.1)

---
updated-dependencies:
- dependency-name: virtualenv
  dependency-version: 20.36.1
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-15 16:19:39 +00:00
96f0008654
Merge pull request #6 from onyx-and-iris/dependabot/pip/filelock-3.20.3
Bump filelock from 3.16.1 to 3.20.3
2026-03-15 16:18:24 +00:00
dependabot[bot]
6ddead68d0
Bump filelock from 3.16.1 to 3.20.3
Bumps [filelock](https://github.com/tox-dev/py-filelock) from 3.16.1 to 3.20.3.
- [Release notes](https://github.com/tox-dev/py-filelock/releases)
- [Changelog](https://github.com/tox-dev/filelock/blob/main/docs/changelog.rst)
- [Commits](https://github.com/tox-dev/py-filelock/compare/3.16.1...3.20.3)

---
updated-dependencies:
- dependency-name: filelock
  dependency-version: 3.20.3
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-15 16:17:48 +00:00
2794b14cf1 resolves the hostname once and use it throughout the package
this is more efficient and fails faster on error.

patch bump
2026-03-11 04:04:53 +00:00
c46ca8a8c8 patch bump 2026-03-09 05:21:57 +00:00
f8b56b4a30 swap out implementation of {IRemote}.apply(). It now uses individual requests instead of script requests. 2026-03-09 05:21:32 +00:00
09259269d7 rename ratelimit decorator to script_ratelimit for clarity
script_ratelimit now defaults to None.

add note in README
2026-03-09 05:20:25 +00:00
242401e294 add VBANCMDPacketError to exception hierarchy
raise them when we fail to validate incoming packets.

add VbanRTRequestHeader post_init to validate the provided bps value.

VbanRTSubscribeHeader and VbanRTRequestHeader properties now return int type. They are then directly packed into the header.
2026-03-09 05:19:41 +00:00
98ec9b715f fixes bug receiving inconsistent level updates
patch bump
2026-03-07 15:44:32 +00:00
5f7b62a0e0 add missing cached_property 2026-03-07 14:29:32 +00:00
d1bcbfed6f minor bump 2026-03-07 14:24:44 +00:00
ab80bbf226 use host kwarg/env var in examples 2026-03-07 14:23:29 +00:00
ad58852a77 improve efficiency with cached properties and struct.unpack 2026-03-07 14:22:25 +00:00
5363584940 improve to_bytes efficiency with struct.pack 2026-03-07 14:20:31 +00:00
9f43ee18d3 add more enums so we can remove some of the constants
rename some of the packet classes

patch bump
2026-03-07 00:03:46 +00:00
3cde874a3c remove unnecessary assignment 2026-03-03 20:03:09 +00:00
3d01321be3 separate ping from pong
this separates concerns and allows the pong_timeout to strictly handle timeouts for pongs.

patch bump
2026-03-03 19:47:15 +00:00
2dd52a7258 move ping timeout logic into decorator
patch bump
2026-03-03 18:21:14 +00:00
28cbef5ef6 patch bump 2026-03-03 15:48:23 +00:00
5b3b35fca3 flag value 2026-03-03 15:48:09 +00:00
7b3149a1e1 patch bump 2026-03-03 15:38:11 +00:00
230d9f0eb3 upd test config 2026-03-03 15:37:38 +00:00
c9a505df0a convert Modes class to a Flag Enum type and rename it to ChannelModes
move it into vban_cmd.packet
2026-03-03 15:36:56 +00:00
3e3bec6d50 remove the sleep(), the @ratelimit decorator already handles this. 2026-03-03 15:31:31 +00:00
55b3125e10 fixes regression in apply()
patch bump
2026-03-02 23:52:06 +00:00
7b3340042c upd reference to ip 2026-03-02 23:26:38 +00:00
6ea0859180 patch bump 2026-03-02 23:25:03 +00:00
81ed963bea fix references to remote.ip 2026-03-02 23:24:09 +00:00
0b99b6a67f update references to ip kwarg 2026-03-02 23:21:57 +00:00
86d0aa91c3 add a ratelimit decorator to {VbanCmd}.sendtext()
ip kwarg renamed to host.
2026-03-02 23:20:45 +00:00
19 changed files with 630 additions and 591 deletions

View File

@ -41,14 +41,14 @@ Load VBAN connection info from toml config. A valid `vban.toml` might look like
```toml
[connection]
ip = "gamepc.local"
host = "localhost"
port = 6980
streamname = "Command1"
```
It should be placed in \<user home directory\> / "Documents" / "Voicemeeter" / "configs"
Alternatively you may pass `ip`, `port`, `streamname` as keyword arguments.
Alternatively you may pass `host`, `port`, `streamname` as keyword arguments.
#### `__main__.py`
@ -85,7 +85,7 @@ def main():
KIND_ID = 'banana'
with vban_cmd.api(
KIND_ID, ip='gamepc.local', port=6980, streamname='Command1'
KIND_ID, host='localhost', port=6980, streamname='Command1'
) as vban:
do = ManyThings(vban)
do.things()
@ -474,7 +474,7 @@ example:
import vban_cmd
opts = {
'ip': '<ip address>',
'host': '<ip address>',
'streamname': 'Command1',
'port': 6980,
}
@ -541,14 +541,15 @@ print(vban.event.get())
You may pass the following optional keyword arguments:
- `ip`: str='localhost', ip or hostname of remote machine
- `host`: str='localhost', ip or hostname of remote machine
- `port`: int=6980, vban udp port of remote machine.
- `streamname`: str='Command1', name of the stream to connect to.
- `bps`: int=256000, bps rate of the stream.
- `channel`: int=0, channel on which to send the UDP requests.
- `pdirty`: boolean=False, parameter updates
- `ldirty`: boolean=False, level updates
- `timeout`: int=5, amount of time (seconds) to wait for an incoming RT data packet (parameter states).
- `script_ratelimit`: float | None=None, ratelimit for vban.sendtext() specifically.
- `timeout`: int=5, timeout for socket operations.
- `disable_rt_listeners`: boolean=False, set `True` if you don't wish to receive RT packets.
- You can still send Matrix string requests ending with `?` and receive a response.
@ -591,7 +592,7 @@ import vban_cmd
logging.basicConfig(level=logging.DEBUG)
opts = {'ip': 'ip.local', 'port': 6980, 'streamname': 'Command1'}
opts = {'host': 'localhost', 'port': 6980, 'streamname': 'Command1'}
with vban_cmd.api('banana', **opts) as vban:
...
```

View File

@ -103,7 +103,7 @@ class App(tk.Tk):
def main():
KIND_ID = 'banana'
conn = {
'ip': os.environ.get('VBANCMD_IP', 'localhost'),
'host': os.environ.get('VBANCMD_HOST', 'localhost'),
'port': int(os.environ.get('VBANCMD_PORT', 6980)),
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
}

View File

@ -94,7 +94,7 @@ class Observer:
def main():
KIND_ID = 'potato'
conn = {
'ip': os.environ.get('VBANCMD_IP', 'localhost'),
'host': os.environ.get('VBANCMD_HOST', 'localhost'),
'port': int(os.environ.get('VBANCMD_PORT', 6980)),
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
}

View File

@ -25,7 +25,7 @@ class App:
def main():
KIND_ID = 'banana'
conn = {
'ip': os.environ.get('VBANCMD_IP', 'localhost'),
'host': os.environ.get('VBANCMD_HOST', 'localhost'),
'port': int(os.environ.get('VBANCMD_PORT', 6980)),
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
}

42
poetry.lock generated
View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.0.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand.
[[package]]
name = "cachetools"
@ -55,7 +55,7 @@ description = "Backport of PEP 654 (exception groups)"
optional = false
python-versions = ">=3.7"
groups = ["dev"]
markers = "python_version < \"3.11\""
markers = "python_version == \"3.10\""
files = [
{file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"},
{file = "exceptiongroup-1.2.2.tar.gz", hash = "sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc"},
@ -66,21 +66,16 @@ test = ["pytest (>=6)"]
[[package]]
name = "filelock"
version = "3.16.1"
version = "3.20.3"
description = "A platform independent file lock."
optional = false
python-versions = ">=3.8"
python-versions = ">=3.10"
groups = ["dev"]
files = [
{file = "filelock-3.16.1-py3-none-any.whl", hash = "sha256:2082e5703d51fbf98ea75855d9d5527e33d8ff23099bec374a134febee6946b0"},
{file = "filelock-3.16.1.tar.gz", hash = "sha256:c249fbfcd5db47e5e2d6d62198e565475ee65e4831e2561c8e313fa7eb961435"},
{file = "filelock-3.20.3-py3-none-any.whl", hash = "sha256:4b0dda527ee31078689fc205ec4f1c1bf7d56cf88b6dc9426c4f230e46c2dce1"},
{file = "filelock-3.20.3.tar.gz", hash = "sha256:18c57ee915c7ec61cff0ecf7f0f869936c7c30191bb0cf406f1341778d0834e1"},
]
[package.extras]
docs = ["furo (>=2024.8.6)", "sphinx (>=8.0.2)", "sphinx-autodoc-typehints (>=2.4.1)"]
testing = ["covdefaults (>=2.3)", "coverage (>=7.6.1)", "diff-cover (>=9.2)", "pytest (>=8.3.3)", "pytest-asyncio (>=0.24)", "pytest-cov (>=5)", "pytest-mock (>=3.14)", "pytest-timeout (>=2.3.1)", "virtualenv (>=20.26.4)"]
typing = ["typing-extensions (>=4.12.2)"]
[[package]]
name = "iniconfig"
version = "2.0.0"
@ -243,7 +238,7 @@ description = "A lil' TOML parser"
optional = false
python-versions = ">=3.8"
groups = ["main", "dev"]
markers = "python_version < \"3.11\""
markers = "python_version == \"3.10\""
files = [
{file = "tomli-2.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249"},
{file = "tomli-2.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6"},
@ -309,37 +304,38 @@ test = ["devpi-process (>=1.0.2)", "pytest (>=8.3.3)", "pytest-mock (>=3.14)"]
[[package]]
name = "typing-extensions"
version = "4.12.2"
description = "Backported and Experimental Type Hints for Python 3.8+"
version = "4.15.0"
description = "Backported and Experimental Type Hints for Python 3.9+"
optional = false
python-versions = ">=3.8"
python-versions = ">=3.9"
groups = ["dev"]
markers = "python_version < \"3.11\""
markers = "python_version == \"3.10\""
files = [
{file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"},
{file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"},
{file = "typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548"},
{file = "typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466"},
]
[[package]]
name = "virtualenv"
version = "20.29.0"
version = "20.36.1"
description = "Virtual Python Environment builder"
optional = false
python-versions = ">=3.8"
groups = ["dev"]
files = [
{file = "virtualenv-20.29.0-py3-none-any.whl", hash = "sha256:c12311863497992dc4b8644f8ea82d3b35bb7ef8ee82e6630d76d0197c39baf9"},
{file = "virtualenv-20.29.0.tar.gz", hash = "sha256:6345e1ff19d4b1296954cee076baaf58ff2a12a84a338c62b02eda39f20aa982"},
{file = "virtualenv-20.36.1-py3-none-any.whl", hash = "sha256:575a8d6b124ef88f6f51d56d656132389f961062a9177016a50e4f507bbcc19f"},
{file = "virtualenv-20.36.1.tar.gz", hash = "sha256:8befb5c81842c641f8ee658481e42641c68b5eab3521d8e092d18320902466ba"},
]
[package.dependencies]
distlib = ">=0.3.7,<1"
filelock = ">=3.12.2,<4"
filelock = {version = ">=3.20.1,<4", markers = "python_version >= \"3.10\""}
platformdirs = ">=3.9.1,<5"
typing-extensions = {version = ">=4.13.2", markers = "python_version < \"3.11\""}
[package.extras]
docs = ["furo (>=2023.7.26)", "proselint (>=0.13)", "sphinx (>=7.1.2,!=7.3)", "sphinx-argparse (>=0.4)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=23.6)"]
test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.4)", "pytest-env (>=0.8.2)", "pytest-freezer (>=0.4.8)", "pytest-mock (>=3.11.1)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=68)", "time-machine (>=2.10)"]
test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.4)", "pytest-env (>=0.8.2)", "pytest-freezer (>=0.4.8) ; platform_python_implementation == \"PyPy\" or platform_python_implementation == \"GraalVM\" or platform_python_implementation == \"CPython\" and sys_platform == \"win32\" and python_version >= \"3.13\"", "pytest-mock (>=3.11.1)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=68)", "time-machine (>=2.10) ; platform_python_implementation == \"CPython\""]
[[package]]
name = "virtualenv-pyenv"

View File

@ -1,6 +1,6 @@
[project]
name = "vban-cmd"
version = "2.9.0"
version = "2.10.3"
description = "Python interface for the VBAN RT Packet Service (Sendtext)"
authors = [{ name = "Onyx and Iris", email = "code@onyxandiris.online" }]
license = { text = "MIT" }

View File

@ -11,7 +11,7 @@ from vban_cmd.kinds import request_kind_map as kindmap
KIND_ID = os.environ.get('KIND', 'potato')
opts = {
'ip': os.getenv('VBANCMD_IP', 'localhost'),
'host': os.getenv('VBANCMD_HOST', 'localhost'),
'streamname': os.getenv('VBANCMD_STREAMNAME', 'Command1'),
'port': int(os.getenv('VBANCMD_PORT', 6980)),
}

View File

@ -1,6 +1,18 @@
from .packet.enums import ServiceTypes, SubProtocols
class VBANCMDError(Exception):
"""Base VBANCMD Exception class."""
class VBANCMDConnectionError(VBANCMDError):
"""Exception raised when connection/timeout errors occur"""
class VBANCMDPacketError(VBANCMDError):
"""Exception raised when packet parsing errors occur"""
def __init__(self, message: str, protocol: SubProtocols, type_: ServiceTypes):
super().__init__(message)
self.protocol = protocol
self.type = type_

View File

@ -1,4 +1,3 @@
import abc
import logging
from enum import IntEnum
from functools import cached_property
@ -84,18 +83,20 @@ class FactoryBase(VbanCmd):
def __init__(self, kind_id: str, **kwargs):
defaultkwargs = {
'ip': 'localhost',
'host': 'localhost',
'port': 6980,
'streamname': 'Command1',
'bps': 256000,
'channel': 0,
'ratelimit': 0.01,
'timeout': 5,
'script_ratelimit': None, # if None or 0, no rate limit applied to script commands
'timeout': 5, # timeout on socket operations, in seconds
'disable_rt_listeners': False,
'sync': False,
'pdirty': False,
'ldirty': False,
}
if 'ip' in kwargs:
defaultkwargs['host'] = kwargs.pop('ip') # for backwards compatibility
if 'subs' in kwargs:
defaultkwargs |= kwargs.pop('subs') # for backwards compatibility
kwargs = defaultkwargs | kwargs
@ -120,11 +121,6 @@ class FactoryBase(VbanCmd):
+ f"({self.kind}, ip='{self.ip}', port={self.port}, streamname='{self.streamname}')"
)
@property
@abc.abstractmethod
def steps(self):
pass
@cached_property
def configs(self):
self._configs = configs(self.kind.name)

View File

@ -1,83 +1,9 @@
import abc
import logging
import time
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class Modes:
"""Channel Modes"""
_mute: hex = 0x00000001
_solo: hex = 0x00000002
_mono: hex = 0x00000004
_mc: hex = 0x00000008
_amix: hex = 0x00000010
_repeat: hex = 0x00000020
_bmix: hex = 0x00000030
_composite: hex = 0x00000040
_tvmix: hex = 0x00000050
_upmix21: hex = 0x00000060
_upmix41: hex = 0x00000070
_upmix61: hex = 0x00000080
_centeronly: hex = 0x00000090
_lfeonly: hex = 0x000000A0
_rearonly: hex = 0x000000B0
_mask: hex = 0x000000F0
_on: hex = 0x00000100 # eq.on
_cross: hex = 0x00000200
_ab: hex = 0x00000800 # eq.ab
_busa: hex = 0x00001000
_busa1: hex = 0x00001000
_busa2: hex = 0x00002000
_busa3: hex = 0x00004000
_busa4: hex = 0x00008000
_busa5: hex = 0x00080000
_busb: hex = 0x00010000
_busb1: hex = 0x00010000
_busb2: hex = 0x00020000
_busb3: hex = 0x00040000
_pan0: hex = 0x00000000
_pancolor: hex = 0x00100000
_panmod: hex = 0x00200000
_panmask: hex = 0x00F00000
_postfx_r: hex = 0x01000000
_postfx_d: hex = 0x02000000
_postfx1: hex = 0x04000000
_postfx2: hex = 0x08000000
_sel: hex = 0x10000000
_monitor: hex = 0x20000000
@property
def modevals(self):
return (
val
for val in [
self._amix,
self._repeat,
self._bmix,
self._composite,
self._tvmix,
self._upmix21,
self._upmix41,
self._upmix61,
self._centeronly,
self._lfeonly,
self._rearonly,
]
)
class IRemote(abc.ABC):
"""
Common interface between base class and extended (higher) classes
@ -89,7 +15,6 @@ class IRemote(abc.ABC):
self._remote = remote
self.index = index
self.logger = logger.getChild(self.__class__.__name__)
self._modes = Modes()
def getter(self, param):
cmd = self._cmd(param)
@ -125,27 +50,16 @@ class IRemote(abc.ABC):
def fget(attr, val):
if attr == 'mode':
return (f'mode.{val}', 1)
elif attr == 'knob':
return ('', val)
return (attr, val)
return (getattr(self, attr), val, 1)
return (self, attr, val)
for attr, val in data.items():
if not isinstance(val, dict):
if attr in dir(self): # avoid calling getattr (with hasattr)
attr, val = fget(attr, val)
if isinstance(val, bool):
val = 1 if val else 0
self._remote.cache[self._cmd(attr)] = val
self._remote._script += f'{self._cmd(attr)}={val};'
target, attr, val = fget(attr, val)
setattr(target, attr, val)
else:
self.logger.error(f'invalid attribute {attr} for {self}')
else:
target = getattr(self, attr)
target.apply(val)
self._remote.sendtext(self._remote._script)
return self
def then_wait(self):
self._remote._script = str()
time.sleep(self._remote.DELAY)

View File

@ -1,6 +1,7 @@
from functools import partial
from .enums import NBS, BusModes
from .packet.enums import ChannelModes
from .util import cache_bool, cache_float, cache_int, cache_string
@ -27,7 +28,7 @@ def channel_bool_prop(param):
elif param.lower() == 'mc':
return channel_state.mc
else:
return channel_state.get_mode(getattr(self._modes, f'_{param.lower()}'))
return channel_state.get_mode(getattr(ChannelModes, param.upper()).value)
def fset(self, val):
self.setter(param, 1 if val else 0)
@ -55,7 +56,9 @@ def channel_int_prop(param):
bit_9 = (channel_state._state >> 9) & 1
return (bit_9 << 1) | bit_2
else:
return channel_state.get_mode_int(getattr(self._modes, f'_{param.lower()}'))
return channel_state.get_mode_int(
getattr(ChannelModes, param.upper()).value
)
def fset(self, val):
self.setter(param, val)
@ -89,7 +92,7 @@ def strip_output_prop(param):
strip_state = self.public_packets[NBS.zero].states.strip[self.index]
return strip_state.get_mode(getattr(self._modes, f'_bus{param.lower()}'))
return strip_state.get_mode(getattr(ChannelModes, f'BUS{param.upper()}').value)
def fset(self, val):
self.setter(param, 1 if val else 0)

83
vban_cmd/packet/enums.py Normal file
View File

@ -0,0 +1,83 @@
from enum import Flag
class SubProtocols(Flag):
"""Sub Protocols - Bit flags that can be combined"""
AUDIO = 0x00
SERIAL = 0x20
TEXT = 0x40
SERVICE = 0x60
MASK = 0xE0
class ServiceTypes(Flag):
"""Service Types - Bit flags that can be combined"""
PING = 0
PONG = 0
CHATUTF8 = 1
RTPACKETREGISTER = 32
RTPACKET = 33
REQUESTREPLY = 0x02 # A Matrix reply
FNCT_REPLY = 0x80 # An RTPacket reply
class StreamTypes(Flag):
"""Stream Types - Bit flags that can be combined"""
ASCII = 0x00
UTF8 = 0x10
WCHAR = 0x20
class ChannelModes(Flag):
"""Channel Modes - Bit flags that can be combined"""
MUTE = 0x00000001
SOLO = 0x00000002
MONO = 0x00000004
MC = 0x00000008
AMIX = 0x00000010
REPEAT = 0x00000020
BMIX = 0x00000030
COMPOSITE = 0x00000040
TVMIX = 0x00000050
UPMIX21 = 0x00000060
UPMIX41 = 0x00000070
UPMIX61 = 0x00000080
CENTERONLY = 0x00000090
LFEONLY = 0x000000A0
REARONLY = 0x000000B0
MASK = 0x000000F0
ON = 0x00000100 # eq.on
CROSS = 0x00000200
AB = 0x00000800 # eq.ab
BUSA = 0x00001000
BUSA1 = 0x00001000
BUSA2 = 0x00002000
BUSA3 = 0x00004000
BUSA4 = 0x00008000
BUSA5 = 0x00080000
BUSB = 0x00010000
BUSB1 = 0x00010000
BUSB2 = 0x00020000
BUSB3 = 0x00040000
PAN0 = 0x00000000
PANCOLOR = 0x00100000
PANMOD = 0x00200000
PANMASK = 0x00F00000
POSTFX_R = 0x01000000
POSTFX_D = 0x02000000
POSTFX1 = 0x04000000
POSTFX2 = 0x08000000
SEL = 0x10000000
MONITOR = 0x20000000

View File

@ -1,29 +1,124 @@
import struct
from dataclasses import dataclass
from vban_cmd.enums import NBS
from vban_cmd.error import VBANCMDPacketError
from vban_cmd.kinds import KindMapClass
VBAN_PROTOCOL_TXT = 0x40
VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
VBAN_SERVICE_PING = 0
VBAN_SERVICE_PONG = 0 # PONG uses same service type as PING
VBAN_SERVICE_MASK = 0xE0
VBAN_PROTOCOL_MASK = 0xE0
VBAN_SERVICE_REQUESTREPLY = 0x02
VBAN_SERVICE_FNCT_REPLY = 0x02
from .enums import ServiceTypes, StreamTypes, SubProtocols
PINGPONG_PACKET_SIZE = 704 # Size of the PING/PONG header + payload in bytes
MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
STREAMNAME_MAX_LENGTH = 16
# fmt: off
BPS_OPTS = [
0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 31250,
38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800, 921600,
1000000, 1500000, 2000000, 3000000,
]
# fmt: on
@dataclass
class VbanPacket:
"""Represents the header of an incoming VBAN data packet"""
class VbanPingHeader:
"""Represents the header of a PING packet"""
name: str = 'PING0'
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0
format_nbc: int = ServiceTypes.PING.value
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:STREAMNAME_MAX_LENGTH].ljust(
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod
def to_bytes(cls, framecounter: int = 0) -> bytes:
"""Creates the PING header bytes only."""
header = cls(framecounter=framecounter)
return struct.pack(
'<4s4B16sI',
header.vban,
header.format_sr,
header.format_nbs,
header.format_nbc,
header.format_bit,
header.streamname,
header.framecounter,
)
@dataclass
class VbanPongHeader:
"""Represents the header of a PONG response packet"""
name: str = 'PING0'
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0
format_nbc: int = ServiceTypes.PONG.value
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:STREAMNAME_MAX_LENGTH].ljust(
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a PONG response packet from bytes."""
parsed = _parse_vban_service_header(data)
# PONG responses use the same service type as PING (0x00)
# and are identified by having payload data
if parsed['format_nbc'] != ServiceTypes.PONG.value:
raise VBANCMDPacketError(
f'Not a PONG response packet: {parsed["format_nbc"]:02x}',
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
type_=ServiceTypes(parsed['format_nbc']),
)
return cls(**parsed)
@classmethod
def is_pong_response(cls, data: bytes) -> bool:
"""Check if packet is a PONG response by analyzing the actual response format."""
try:
parsed = _parse_vban_service_header(data)
# Validate this is a service protocol packet with PING/PONG service type
if parsed['format_nbc'] != ServiceTypes.PONG.value:
return False
if parsed['name'] not in ['PING0', 'VBAN Service']:
return False
# PONG should have payload data (same size as PING)
return len(data) >= PINGPONG_PACKET_SIZE
except (ValueError, Exception):
return False
@dataclass
class VbanRTPacket:
"""Represents the header of an incoming RTPacket"""
nbs: NBS
_kind: KindMapClass
@ -53,10 +148,10 @@ class VbanPacket:
@dataclass
class VbanSubscribeHeader:
"""Represents the header of a subscription packet"""
class VbanRTSubscribeHeader:
"""Represents the header of an RT subscription packet"""
nbs: NBS = NBS.zero
_nbs: NBS = NBS.zero
name: str = 'Register-RTP'
timeout: int = 15
@ -65,38 +160,106 @@ class VbanSubscribeHeader:
return b'VBAN'
@property
def format_sr(self) -> bytes:
return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little')
def sr(self) -> int:
return SubProtocols.SERVICE.value
@property
def format_nbs(self) -> bytes:
return (self.nbs.value & 0xFF).to_bytes(1, 'little')
def nbs(self) -> int:
return self._nbs.value & 0xFF
@property
def format_nbc(self) -> bytes:
return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little')
def nbc(self) -> int:
return ServiceTypes.RTPACKETREGISTER.value
@property
def format_bit(self) -> bytes:
return (self.timeout & 0xFF).to_bytes(1, 'little')
def bit(self) -> int:
return self.timeout & 0xFF
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust(
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
header = cls(nbs=nbs)
header = cls(_nbs=nbs)
data = bytearray()
data.extend(header.vban)
data.extend(header.format_sr)
data.extend(header.format_nbs)
data.extend(header.format_nbc)
data.extend(header.format_bit)
data.extend(header.streamname)
data.extend(framecounter.to_bytes(4, 'little'))
return bytes(data)
return struct.pack(
'<4s4B16sI',
header.vban,
header.sr,
header.nbs,
header.nbc,
header.bit,
header.streamname,
framecounter,
)
@dataclass
class VbanRTRequestHeader:
"""Represents the header of an RT request packet"""
name: str
bps: int
channel: int
framecounter: int = 0
def __post_init__(self):
if self.bps not in BPS_OPTS:
raise ValueError(
f'Invalid bps value: {self.bps}. Must be one of {BPS_OPTS}'
)
self.bps_index = BPS_OPTS.index(self.bps)
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def sr(self) -> int:
return self.bps_index | SubProtocols.TEXT.value
@property
def nbs(self) -> int:
return 0
@property
def nbc(self) -> int:
return self.channel
@property
def bit(self) -> int:
return StreamTypes.UTF8.value
@property
def streamname(self) -> bytes:
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust(
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod
def to_bytes(cls, name: str, bps: int, channel: int, framecounter: int) -> bytes:
header = cls(name=name, bps=bps, channel=channel, framecounter=framecounter)
return struct.pack(
'<4s4B16sI',
header.vban,
header.sr,
header.nbs,
header.nbc,
header.bit,
header.streamname,
header.framecounter,
)
@classmethod
def encode_with_payload(
cls, name: str, bps: int, channel: int, framecounter: int, payload: str
) -> bytes:
"""Creates the complete packet with header and payload."""
return cls.to_bytes(name, bps, channel, framecounter) + payload.encode()
def _parse_vban_service_header(data: bytes) -> dict:
@ -113,9 +276,12 @@ def _parse_vban_service_header(data: bytes) -> dict:
format_bit = data[7]
# Verify this is a service protocol packet
protocol = format_sr & VBAN_PROTOCOL_MASK
if protocol != VBAN_PROTOCOL_SERVICE:
raise ValueError(f'Not a service protocol packet: {protocol:02x}')
protocol = format_sr & SubProtocols.MASK.value
if protocol != SubProtocols.SERVICE.value:
raise VBANCMDPacketError(
f'Invalid protocol in service header: {protocol:02x}',
protocol=SubProtocols(protocol),
)
# Extract stream name and frame counter
name = data[8:24].rstrip(b'\x00').decode('utf-8', errors='ignore')
@ -132,13 +298,13 @@ def _parse_vban_service_header(data: bytes) -> dict:
@dataclass
class VbanResponseHeader:
"""Represents the header of a response packet"""
class VbanRTResponseHeader:
"""Represents the header of an RT response packet"""
name: str = 'Voicemeeter-RTP'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_RTPACKET
format_nbc: int = ServiceTypes.RTPACKET.value
format_bit: int = 0
framecounter: int = 0
@ -148,7 +314,9 @@ class VbanResponseHeader:
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust(
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod
def from_bytes(cls, data: bytes):
@ -156,9 +324,11 @@ class VbanResponseHeader:
parsed = _parse_vban_service_header(data)
# Validate this is an RTPacket response
if parsed['format_nbc'] != VBAN_SERVICE_RTPACKET:
raise ValueError(
f'Not an RTPacket response packet: {parsed["format_nbc"]:02x}'
if parsed['format_nbc'] != ServiceTypes.RTPACKET.value:
raise VBANCMDPacketError(
f'Not an RTPacket response packet: {parsed["format_nbc"]:02x}',
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
type_=ServiceTypes(parsed['format_nbc']),
)
return cls(**parsed)
@ -169,9 +339,9 @@ class VbanMatrixResponseHeader:
"""Represents the header of a matrix response packet"""
name: str = 'Request Reply'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = VBAN_SERVICE_FNCT_REPLY
format_nbc: int = VBAN_SERVICE_REQUESTREPLY
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = ServiceTypes.FNCT_REPLY.value
format_nbc: int = ServiceTypes.REQUESTREPLY.value
format_bit: int = 0
framecounter: int = 0
@ -181,16 +351,29 @@ class VbanMatrixResponseHeader:
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust(
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a matrix response packet from bytes."""
parsed = _parse_vban_service_header(data)
# Validate this is a service reply packet
if parsed['format_nbs'] != VBAN_SERVICE_FNCT_REPLY:
raise ValueError(f'Not a service reply packet: {parsed["format_nbs"]:02x}')
# Validate this is a service reply packet (dual encoding scheme)
if parsed['format_nbs'] != ServiceTypes.FNCT_REPLY.value:
raise VBANCMDPacketError(
f'Not a service reply packet: {parsed["format_nbs"]:02x}',
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
type_=ServiceTypes(parsed['format_nbs']),
)
if parsed['format_nbc'] != ServiceTypes.REQUESTREPLY.value:
raise VBANCMDPacketError(
f'Not a request reply packet: {parsed["format_nbc"]:02x}',
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
type_=ServiceTypes(parsed['format_nbc']),
)
return cls(**parsed)
@ -209,148 +392,3 @@ class VbanMatrixResponseHeader:
header = cls.from_bytes(data)
payload = cls.extract_payload(data)
return header, payload
@dataclass
class VbanPingHeader:
"""Represents the header of a PING packet"""
name: str = 'PING0'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_PING
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def to_bytes(cls, framecounter: int = 0) -> bytes:
"""Creates the PING header bytes only."""
header = cls(framecounter=framecounter)
data = bytearray()
data.extend(header.vban)
data.extend(header.format_sr.to_bytes(1, 'little'))
data.extend(header.format_nbs.to_bytes(1, 'little'))
data.extend(header.format_nbc.to_bytes(1, 'little'))
data.extend(header.format_bit.to_bytes(1, 'little'))
data.extend(header.streamname)
data.extend(header.framecounter.to_bytes(4, 'little'))
return bytes(data)
@dataclass
class VbanPongHeader:
"""Represents the header of a PONG response packet"""
name: str = 'PING0'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_PONG
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a PONG response packet from bytes."""
parsed = _parse_vban_service_header(data)
# PONG responses use the same service type as PING (0x00)
# and are identified by having payload data
if parsed['format_nbc'] != VBAN_SERVICE_PONG:
raise ValueError(f'Not a PONG response packet: {parsed["format_nbc"]:02x}')
return cls(**parsed)
@classmethod
def is_pong_response(cls, data: bytes) -> bool:
"""Check if packet is a PONG response by analyzing the actual response format."""
try:
parsed = _parse_vban_service_header(data)
# Validate this is a service protocol packet with PING/PONG service type
if parsed['format_nbc'] != VBAN_SERVICE_PONG:
return False
if parsed['name'] not in ['PING0', 'VBAN Service']:
return False
# PONG should have payload data (same size as PING)
return len(data) >= PINGPONG_PACKET_SIZE
except (ValueError, Exception):
return False
@dataclass
class VbanRequestHeader:
"""Represents the header of a request packet"""
name: str
bps_index: int
channel: int
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def sr(self) -> bytes:
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little')
@property
def nbs(self) -> bytes:
return (0).to_bytes(1, 'little')
@property
def nbc(self) -> bytes:
return (self.channel).to_bytes(1, 'little')
@property
def bit(self) -> bytes:
return (0x10).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode()[:16].ljust(16, b'\x00')
@classmethod
def to_bytes(
cls, name: str, bps_index: int, channel: int, framecounter: int
) -> bytes:
header = cls(
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
)
data = bytearray()
data.extend(header.vban)
data.extend(header.sr)
data.extend(header.nbs)
data.extend(header.nbc)
data.extend(header.bit)
data.extend(header.streamname)
data.extend(header.framecounter.to_bytes(4, 'little'))
return bytes(data)
@classmethod
def encode_with_payload(
cls, name: str, bps_index: int, channel: int, framecounter: int, payload: str
) -> bytes:
"""Creates the complete packet with header and payload."""
return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode()

View File

@ -1,11 +1,14 @@
import struct
from dataclasses import dataclass
from functools import cached_property
from typing import NamedTuple
from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass
from vban_cmd.util import comp
from .headers import VbanPacket
from .enums import ChannelModes
from .headers import VbanRTPacket
class Levels(NamedTuple):
@ -20,6 +23,13 @@ class ChannelState:
# Convert 4-byte state to integer once for efficient lookups
self._state = int.from_bytes(state_bytes, 'little')
@classmethod
def from_int(cls, state_int: int):
"""Create ChannelState directly from integer for efficiency"""
instance = cls.__new__(cls)
instance._state = state_int
return instance
def get_mode(self, mode_value: int) -> bool:
"""Get boolean state for a specific mode"""
return (self._state & mode_value) != 0
@ -31,57 +41,57 @@ class ChannelState:
# Common boolean modes
@property
def mute(self) -> bool:
return (self._state & 0x00000001) != 0
return (self._state & ChannelModes.MUTE.value) != 0
@property
def solo(self) -> bool:
return (self._state & 0x00000002) != 0
return (self._state & ChannelModes.SOLO.value) != 0
@property
def mono(self) -> bool:
return (self._state & 0x00000004) != 0
return (self._state & ChannelModes.MONO.value) != 0
@property
def mc(self) -> bool:
return (self._state & 0x00000008) != 0
return (self._state & ChannelModes.MC.value) != 0
# EQ modes
@property
def eq_on(self) -> bool:
return (self._state & 0x00000100) != 0
return (self._state & ChannelModes.ON.value) != 0
@property
def eq_ab(self) -> bool:
return (self._state & 0x00000800) != 0
return (self._state & ChannelModes.AB.value) != 0
# Bus assignments (strip to bus routing)
@property
def busa1(self) -> bool:
return (self._state & 0x00001000) != 0
return (self._state & ChannelModes.BUSA1.value) != 0
@property
def busa2(self) -> bool:
return (self._state & 0x00002000) != 0
return (self._state & ChannelModes.BUSA2.value) != 0
@property
def busa3(self) -> bool:
return (self._state & 0x00004000) != 0
return (self._state & ChannelModes.BUSA3.value) != 0
@property
def busa4(self) -> bool:
return (self._state & 0x00008000) != 0
return (self._state & ChannelModes.BUSA4.value) != 0
@property
def busb1(self) -> bool:
return (self._state & 0x00010000) != 0
return (self._state & ChannelModes.BUSB1.value) != 0
@property
def busb2(self) -> bool:
return (self._state & 0x00020000) != 0
return (self._state & ChannelModes.BUSB2.value) != 0
@property
def busb3(self) -> bool:
return (self._state & 0x00040000) != 0
return (self._state & ChannelModes.BUSB3.value) != 0
class States(NamedTuple):
@ -95,8 +105,8 @@ class Labels(NamedTuple):
@dataclass
class VbanPacketNBS0(VbanPacket):
"""Represents the body of a VBAN data packet with ident:0"""
class VbanRTPacketNBS0(VbanRTPacket):
"""Represents the body of a VBAN RTPacket with ident:0"""
_inputLeveldB100: bytes
_outputLeveldB100: bytes
@ -146,32 +156,17 @@ class VbanPacketNBS0(VbanPacket):
def pdirty(self, other) -> bool:
"""True iff any defined parameter has changed"""
self_gains = (
self._stripGaindB100Layer1
+ self._stripGaindB100Layer2
+ self._stripGaindB100Layer3
+ self._stripGaindB100Layer4
+ self._stripGaindB100Layer5
+ self._stripGaindB100Layer6
+ self._stripGaindB100Layer7
+ self._stripGaindB100Layer8
)
other_gains = (
other._stripGaindB100Layer1
+ other._stripGaindB100Layer2
+ other._stripGaindB100Layer3
+ other._stripGaindB100Layer4
+ other._stripGaindB100Layer5
+ other._stripGaindB100Layer6
+ other._stripGaindB100Layer7
+ other._stripGaindB100Layer8
)
return (
self._stripState != other._stripState
or self._busState != other._busState
or self_gains != other_gains
or self._stripGaindB100Layer1 != other._stripGaindB100Layer1
or self._stripGaindB100Layer2 != other._stripGaindB100Layer2
or self._stripGaindB100Layer3 != other._stripGaindB100Layer3
or self._stripGaindB100Layer4 != other._stripGaindB100Layer4
or self._stripGaindB100Layer5 != other._stripGaindB100Layer5
or self._stripGaindB100Layer6 != other._stripGaindB100Layer6
or self._stripGaindB100Layer7 != other._stripGaindB100Layer7
or self._stripGaindB100Layer8 != other._stripGaindB100Layer8
or self._busGaindB100 != other._busGaindB100
or self._stripLabelUTF8c60 != other._stripLabelUTF8c60
or self._busLabelUTF8c60 != other._busLabelUTF8c60
@ -185,77 +180,54 @@ class VbanPacketNBS0(VbanPacket):
)
return any(self._strip_comp) or any(self._bus_comp)
@property
@cached_property
def strip_levels(self) -> tuple[float, ...]:
"""Returns strip levels in dB"""
return tuple(
round(
int.from_bytes(self._inputLeveldB100[i : i + 2], 'little', signed=True)
* 0.01,
1,
)
for i in range(0, len(self._inputLeveldB100), 2)
)[: self._kind.num_strip_levels]
strip_raw = struct.unpack('<34h', self._inputLeveldB100)
return tuple(round(val * 0.01, 1) for val in strip_raw)[
: self._kind.num_strip_levels
]
@property
@cached_property
def bus_levels(self) -> tuple[float, ...]:
"""Returns bus levels in dB"""
return tuple(
round(
int.from_bytes(self._outputLeveldB100[i : i + 2], 'little', signed=True)
* 0.01,
1,
)
for i in range(0, len(self._outputLeveldB100), 2)
)[: self._kind.num_bus_levels]
bus_raw = struct.unpack('<64h', self._outputLeveldB100)
return tuple(round(val * 0.01, 1) for val in bus_raw)[
: self._kind.num_bus_levels
]
@property
def levels(self) -> Levels:
"""Returns strip and bus levels as a namedtuple"""
return Levels(strip=self.strip_levels, bus=self.bus_levels)
@property
@cached_property
def states(self) -> States:
"""returns States object with processed strip and bus channel states"""
strip_states = struct.unpack('<8I', self._stripState)
bus_states = struct.unpack('<8I', self._busState)
return States(
strip=tuple(
ChannelState(self._stripState[i : i + 4]) for i in range(0, 32, 4)
),
bus=tuple(ChannelState(self._busState[i : i + 4]) for i in range(0, 32, 4)),
strip=tuple(ChannelState.from_int(state) for state in strip_states),
bus=tuple(ChannelState.from_int(state) for state in bus_states),
)
@property
@cached_property
def gainlayers(self) -> tuple:
"""returns tuple of all strip gain layers as tuples"""
return tuple(
tuple(
round(
int.from_bytes(
getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2],
'little',
signed=True,
)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
for layer in range(1, 9)
)
layer_data = []
for layer in range(1, 9):
layer_bytes = getattr(self, f'_stripGaindB100Layer{layer}')
layer_raw = struct.unpack('<8h', layer_bytes)
layer_data.append(tuple(round(val * 0.01, 2) for val in layer_raw))
return tuple(layer_data)
@property
@cached_property
def busgain(self) -> tuple:
"""returns tuple of bus gains"""
return tuple(
round(
int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
bus_gain_raw = struct.unpack('<8h', self._busGaindB100)
return tuple(round(val * 0.01, 2) for val in bus_gain_raw)
@property
@cached_property
def labels(self) -> Labels:
"""returns Labels namedtuple of strip and bus labels"""

View File

@ -1,11 +1,12 @@
import struct
from dataclasses import dataclass
from functools import cached_property
from typing import NamedTuple
from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass
from .headers import VbanPacket
from .headers import VbanRTPacket
VMPARAMSTRIP_SIZE = 174
@ -193,11 +194,15 @@ class VbanVMParamStrip:
_Pitch_formant_high=data[172:174],
)
@property
@cached_property
def mode(self) -> int:
return int.from_bytes(self._mode, 'little')
@property
@cached_property
def karaoke(self) -> int:
return int.from_bytes(self._nKaraoke, 'little')
@cached_property
def audibility(self) -> Audibility:
return Audibility(
round(int.from_bytes(self._audibility, 'little', signed=True) * 0.01, 2),
@ -206,7 +211,7 @@ class VbanVMParamStrip:
round(int.from_bytes(self._audibility_d, 'little', signed=True) * 0.01, 2),
)
@property
@cached_property
def positions(self) -> Positions:
return Positions(
round(int.from_bytes(self._pos3D_x, 'little', signed=True) * 0.01, 2),
@ -217,7 +222,7 @@ class VbanVMParamStrip:
round(int.from_bytes(self._posMod_y, 'little', signed=True) * 0.01, 2),
)
@property
@cached_property
def eqgains(self) -> EqGains:
return EqGains(
*[
@ -230,7 +235,7 @@ class VbanVMParamStrip:
]
)
@property
@cached_property
def parametric_eq(self) -> tuple[ParametricEQSettings, ...]:
return tuple(
ParametricEQSettings(
@ -243,7 +248,7 @@ class VbanVMParamStrip:
for i in range(6)
)
@property
@cached_property
def sends(self) -> Sends:
return Sends(
round(int.from_bytes(self._send_reverb, 'little', signed=True) * 0.01, 2),
@ -252,11 +257,7 @@ class VbanVMParamStrip:
round(int.from_bytes(self._send_fx2, 'little', signed=True) * 0.01, 2),
)
@property
def karaoke(self) -> int:
return int.from_bytes(self._nKaraoke, 'little')
@property
@cached_property
def compressor(self) -> CompressorSettings:
return CompressorSettings(
gain_in=round(
@ -276,7 +277,7 @@ class VbanVMParamStrip:
),
)
@property
@cached_property
def gate(self) -> GateSettings:
return GateSettings(
threshold_in=round(
@ -295,7 +296,7 @@ class VbanVMParamStrip:
release_ms=round(int.from_bytes(self._GATE_release_ms, 'little') * 0.1, 2),
)
@property
@cached_property
def denoiser(self) -> DenoiserSettings:
return DenoiserSettings(
threshold=round(
@ -303,7 +304,7 @@ class VbanVMParamStrip:
)
)
@property
@cached_property
def pitch(self) -> PitchSettings:
return PitchSettings(
enabled=bool(int.from_bytes(self._PitchEnabled, 'little')),
@ -327,8 +328,8 @@ class VbanVMParamStrip:
@dataclass
class VbanPacketNBS1(VbanPacket):
"""Represents the body of a VBAN data packet with ident:1"""
class VbanRTPacketNBS1(VbanRTPacket):
"""Represents the body of a VBAN RTPacket with ident:1"""
strips: tuple[VbanVMParamStrip, ...]

View File

@ -1,3 +1,4 @@
import struct
from dataclasses import dataclass
from enum import Enum
@ -65,30 +66,31 @@ class VbanPing0Payload:
"""Convert payload to bytes"""
payload = cls()
data = bytearray()
data.extend(payload.bit_type.to_bytes(4, 'little'))
data.extend(payload.bit_feature.to_bytes(4, 'little'))
data.extend(payload.bit_feature_ex.to_bytes(4, 'little'))
data.extend(payload.preferred_rate.to_bytes(4, 'little'))
data.extend(payload.min_rate.to_bytes(4, 'little'))
data.extend(payload.max_rate.to_bytes(4, 'little'))
data.extend(payload.color_rgb.to_bytes(4, 'little'))
data.extend(payload.version)
data.extend(payload.gps_position)
data.extend(payload.user_position)
data.extend(payload.lang_code)
data.extend(payload.reserved)
data.extend(payload.reserved_ex)
data.extend(payload.distant_ip)
data.extend(payload.distant_port.to_bytes(2, 'little'))
data.extend(payload.distant_reserved.to_bytes(2, 'little'))
data.extend(payload.device_name)
data.extend(payload.manufacturer_name)
data.extend(payload.application_name)
data.extend(payload.host_name)
data.extend(payload.user_name)
data.extend(payload.user_comment)
return bytes(data)
return struct.pack(
'<7I4s8s8s8s8s64s32s2H64s64s64s64s128s128s',
payload.bit_type,
payload.bit_feature,
payload.bit_feature_ex,
payload.preferred_rate,
payload.min_rate,
payload.max_rate,
payload.color_rgb,
payload.version,
payload.gps_position,
payload.user_position,
payload.lang_code,
payload.reserved,
payload.reserved_ex,
payload.distant_ip,
payload.distant_port,
payload.distant_reserved,
payload.device_name,
payload.manufacturer_name,
payload.application_name,
payload.host_name,
payload.user_name,
payload.user_comment,
)
@classmethod
def create_packet(cls, framecounter: int) -> bytes:

View File

@ -1,5 +1,62 @@
import socket
import time
from typing import Iterator
from .error import VBANCMDConnectionError
def script_ratelimit(func):
"""script_ratelimit decorator for {VbanCmd}.sendtext, to prevent flooding the network with script requests."""
def wrapper(*args, **kwargs):
self, *rem = args
if self.script_ratelimit:
now = time.time()
elapsed = now - self._last_script_request_time
if elapsed < self.script_ratelimit:
time.sleep(self.script_ratelimit - elapsed)
self._last_script_request_time = time.time()
return func(*args, **kwargs)
return wrapper
def pong_timeout(func):
"""pong_timeout decorator for {VbanCmd}._handle_pong, to handle timeout logic and socket management."""
def wrapper(self, timeout: float = None):
if timeout is None:
timeout = min(self.timeout, 3.0)
original_timeout = self.sock.gettimeout()
self.sock.settimeout(0.5)
try:
start_time = time.time()
response_count = 0
while time.time() - start_time < timeout:
try:
response_count += 1
if func(self):
return
except socket.timeout:
continue
self.logger.debug(
f'PING timeout after {timeout}s, received {response_count} non-PONG packets'
)
raise VBANCMDConnectionError(
f'PING timeout: No response from {self.host}:{self.port} after {timeout}s'
)
finally:
self.sock.settimeout(original_timeout)
return wrapper
def cache_bool(func, param):
"""Check cache for a bool prop"""
@ -67,16 +124,11 @@ def comp(t0: tuple, t1: tuple) -> Iterator[bool]:
"""
Generator function, accepts two tuples of dB values.
Evaluates equality of each member in both tuples.
Only ignores changes when levels are very quiet (below -72 dB).
Returns True when levels are equal (no change), False when different.
"""
for a, b in zip(t0, t1):
# If both values are very quiet (below -72dB), ignore small changes
if a <= -72.0 and b <= -72.0:
yield a == b # Both quiet, check if they're equal
else:
yield a != b # At least one has significant level, detect changes
yield a == b
def deep_merge(dict1, dict2):

View File

@ -5,7 +5,7 @@ import threading
import time
from pathlib import Path
from queue import Queue
from typing import Union
from typing import Mapping, Union
from .enums import NBS
from .error import VBANCMDConnectionError, VBANCMDError
@ -13,11 +13,11 @@ from .event import Event
from .packet.headers import (
VbanMatrixResponseHeader,
VbanPongHeader,
VbanRequestHeader,
VbanRTRequestHeader,
)
from .packet.ping0 import VbanPing0Payload, VbanServerType
from .subject import Subject
from .util import bump_framecounter, deep_merge
from .util import bump_framecounter, deep_merge, pong_timeout, script_ratelimit
from .worker import Producer, Subscriber, Updater
logger = logging.getLogger(__name__)
@ -27,22 +27,22 @@ class VbanCmd(abc.ABC):
"""Abstract Base Class for Voicemeeter VBAN Command Interfaces"""
DELAY = 0.001
# fmt: off
BPS_OPTS = [
0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 31250,
38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800, 921600,
1000000, 1500000, 2000000, 3000000,
]
# fmt: on
def __init__(self, **kwargs):
self.logger = logger.getChild(self.__class__.__name__)
self.event = Event({k: kwargs.pop(k) for k in ('pdirty', 'ldirty')})
if not kwargs['ip']:
if not kwargs['host']:
kwargs |= self._conn_from_toml()
for attr, val in kwargs.items():
setattr(self, attr, val)
try:
self._host_ip = socket.gethostbyname(self.host)
except socket.gaierror as e:
raise VBANCMDConnectionError(
f'Unable to resolve hostname {self.host}'
) from e
self._framecounter = 0
self._framecounter_lock = threading.Lock()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@ -52,14 +52,14 @@ class VbanCmd(abc.ABC):
self.cache = {}
self._pdirty = False
self._ldirty = False
self._script = str()
self.stop_event = None
self.producer = None
self._last_script_request_time = 0
@property
@abc.abstractmethod
def __str__(self):
"""Ensure subclasses override str magic method"""
pass
def steps(self):
"""Steps required to build the interface for this Voicemeeter kind"""
def _conn_from_toml(self) -> dict:
try:
@ -97,6 +97,7 @@ class VbanCmd(abc.ABC):
If the server is detected as Matrix, RT listeners will be disabled for compatibility.
"""
self._ping()
self._handle_pong()
if not self.disable_rt_listeners:
self.event.info()
@ -113,7 +114,7 @@ class VbanCmd(abc.ABC):
self.producer.start()
self.logger.info(
"Successfully logged into VBANCMD {kind} with ip='{ip}', port={port}, streamname='{streamname}'".format(
"Successfully logged into VBANCMD {kind} with host='{host}', port={port}, streamname='{streamname}'".format(
**self.__dict__
)
)
@ -138,43 +139,32 @@ class VbanCmd(abc.ABC):
self._framecounter = bump_framecounter(self._framecounter)
return current
def _ping(self, timeout: float = None) -> None:
"""Send a PING packet and wait for PONG response to verify connectivity."""
if timeout is None:
timeout = min(self.timeout, 3.0)
ping_packet = VbanPing0Payload.create_packet(self._get_next_framecounter())
original_timeout = self.sock.gettimeout()
self.sock.settimeout(0.5)
def _ping(self):
"""Initiates the PING/PONG handshake with the VBAN server."""
try:
self.sock.sendto(ping_packet, (socket.gethostbyname(self.ip), self.port))
self.logger.debug(f'PING sent to {self.ip}:{self.port}')
self.sock.sendto(
VbanPing0Payload.create_packet(self._get_next_framecounter()),
(self._host_ip, self.port),
)
self.logger.debug(f'PING sent to {self.host}:{self.port}')
start_time = time.time()
response_count = 0
while time.time() - start_time < timeout:
try:
except Exception as e:
raise VBANCMDConnectionError(f'PING failed: {e}') from e
@pong_timeout
def _handle_pong(self) -> bool:
"""Handles incoming packets during the PING/PONG handshake, looking for a valid PONG response to confirm connectivity and detect server type.
Returns True if a valid PONG is received, False otherwise."""
data, addr = self.sock.recvfrom(2048)
response_count += 1
self.logger.debug(
f'Received packet #{response_count} from {addr}: {len(data)} bytes'
)
self.logger.debug(
f'Response header: {data[: min(32, len(data))].hex()}'
)
if VbanPongHeader.is_pong_response(data):
self.logger.debug(
f'PONG received from {addr}, connectivity confirmed'
)
self.logger.debug(f'PONG received from {addr}, connectivity confirmed')
server_type = VbanPing0Payload.detect_server_type(data)
self._handle_server_type(server_type)
return # Exit after successful PONG response
return True
else:
if len(data) >= 8:
if data[:4] == b'VBAN':
@ -186,22 +176,7 @@ class VbanCmd(abc.ABC):
else:
self.logger.debug('Non-VBAN packet received')
except socket.timeout:
continue
self.logger.debug(
f'PING timeout after {timeout}s, received {response_count} non-PONG packets'
)
raise VBANCMDConnectionError(
f'PING timeout: No response from {self.ip}:{self.port} after {timeout}s'
)
except socket.gaierror as e:
raise VBANCMDConnectionError(f'Unable to resolve hostname {self.ip}') from e
except Exception as e:
raise VBANCMDConnectionError(f'PING failed: {e}') from e
finally:
self.sock.settimeout(original_timeout)
return False
def _handle_server_type(self, server_type: VbanServerType) -> None:
"""Handle the detected server type by adjusting settings accordingly."""
@ -223,14 +198,14 @@ class VbanCmd(abc.ABC):
def _send_request(self, payload: str) -> None:
"""Sends a request packet over the network and bumps the framecounter."""
self.sock.sendto(
VbanRequestHeader.encode_with_payload(
VbanRTRequestHeader.encode_with_payload(
name=self.streamname,
bps_index=self.BPS_OPTS.index(self.bps),
bps=self.bps,
channel=self.channel,
framecounter=self._get_next_framecounter(),
payload=payload,
),
(socket.gethostbyname(self.ip), self.port),
(self._host_ip, self.port),
)
def _set_rt(self, cmd: str, val: Union[str, float]):
@ -238,6 +213,7 @@ class VbanCmd(abc.ABC):
self._send_request(f'{cmd}={val};')
self.cache[cmd] = val
@script_ratelimit
def sendtext(self, script) -> str | None:
"""Sends a multiple parameter string over a network."""
self._send_request(script)
@ -246,17 +222,14 @@ class VbanCmd(abc.ABC):
if self.disable_rt_listeners and script.endswith(('?', '?;')):
try:
data, _ = self.sock.recvfrom(2048)
payload = VbanMatrixResponseHeader.extract_payload(data)
return VbanMatrixResponseHeader.extract_payload(data)
except ValueError as e:
self.logger.warning(f'Error extracting matrix response: {e}')
except TimeoutError as e:
self.logger.exception(f'Timeout waiting for matrix response: {e}')
raise VBANCMDConnectionError(
f'Timeout waiting for response from {self.ip}:{self.port}'
f'Timeout waiting for response from {self.host}:{self.port}'
) from e
return payload
time.sleep(self.DELAY)
@property
def type(self) -> str:
@ -288,12 +261,8 @@ class VbanCmd(abc.ABC):
while self.pdirty:
time.sleep(self.DELAY)
def apply(self, data: dict):
"""
Sets all parameters of a dict
minor delay between each recursion
"""
def apply(self, data: Mapping):
"""Set all parameters of a dict"""
def target(key):
match key.split('-'):
@ -313,7 +282,8 @@ class VbanCmd(abc.ABC):
raise ValueError(ERR_MSG)
return target[int(index)]
[target(key).apply(di).then_wait() for key, di in data.items()]
for key, di in data.items():
target(key).apply(di)
def apply_config(self, name):
"""applies a config from memory"""

View File

@ -3,15 +3,16 @@ import threading
import time
from .enums import NBS
from .error import VBANCMDConnectionError
from .error import VBANCMDConnectionError, VBANCMDPacketError
from .packet.enums import SubProtocols
from .packet.headers import (
HEADER_SIZE,
VbanPacket,
VbanResponseHeader,
VbanSubscribeHeader,
VbanRTPacket,
VbanRTResponseHeader,
VbanRTSubscribeHeader,
)
from .packet.nbs0 import VbanPacketNBS0
from .packet.nbs1 import VbanPacketNBS1
from .packet.nbs0 import VbanRTPacketNBS0
from .packet.nbs1 import VbanRTPacketNBS1
logger = logging.getLogger(__name__)
@ -27,19 +28,13 @@ class Subscriber(threading.Thread):
def run(self):
while not self.stopped():
try:
for nbs in NBS:
sub_packet = VbanSubscribeHeader().to_bytes(
sub_packet = VbanRTSubscribeHeader().to_bytes(
nbs, self._remote._get_next_framecounter()
)
self._remote.sock.sendto(
sub_packet, (self._remote.ip, self._remote.port)
sub_packet, (self._remote._host_ip, self._remote.port)
)
except TimeoutError as e:
self.logger.exception(f'{type(e).__name__}: {e}')
raise VBANCMDConnectionError(
f'timeout sending subscription to {self._remote.ip}:{self._remote.port}'
) from e
self.wait_until_stopped(10)
self.logger.debug(f'terminating {self.name} thread')
@ -72,7 +67,7 @@ class Producer(threading.Thread):
self._remote.cache['bus_level'],
) = self._remote.public_packets[NBS.zero].levels
def _get_rt(self) -> VbanPacket:
def _get_rt(self) -> VbanRTPacket:
"""Attempt to fetch data packet until a valid one found"""
while True:
try:
@ -82,23 +77,28 @@ class Producer(threading.Thread):
except TimeoutError as e:
self.logger.exception(f'{type(e).__name__}: {e}')
raise VBANCMDConnectionError(
f'timeout waiting for response from {self._remote.ip}:{self._remote.port}'
f'timeout waiting for response from {self._remote.host}:{self._remote.port}'
) from e
try:
header = VbanResponseHeader.from_bytes(data[:HEADER_SIZE])
except ValueError as e:
header = VbanRTResponseHeader.from_bytes(data[:HEADER_SIZE])
except VBANCMDPacketError as e:
match e.protocol:
case SubProtocols.SERVICE:
# Silently ignore periodic SERVICE packets unrelated to vban-cmd
pass
case _:
self.logger.debug(f'Error parsing response packet: {e}')
continue
match header.format_nbs:
case NBS.zero:
return VbanPacketNBS0.from_bytes(
return VbanRTPacketNBS0.from_bytes(
nbs=NBS.zero, kind=self._remote.kind, data=data
)
case NBS.one:
return VbanPacketNBS1.from_bytes(
return VbanRTPacketNBS1.from_bytes(
nbs=NBS.one, kind=self._remote.kind, data=data
)
@ -128,7 +128,6 @@ class Producer(threading.Thread):
self.queue.put('pdirty')
if self._remote.event.ldirty:
self.queue.put('ldirty')
# time.sleep(self._remote.ratelimit)
self.logger.debug(f'terminating {self.name} thread')
self.queue.put(None)