Compare commits

...

42 Commits
v2.7.0 ... dev

Author SHA1 Message Date
ba85373f94 move steps abstract method into VbanCmd. 2026-03-16 23:16:01 +00:00
22cc980fc2
Merge pull request #7 from onyx-and-iris/dependabot/pip/virtualenv-20.36.1
Bump virtualenv from 20.29.0 to 20.36.1
2026-03-15 16:20:26 +00:00
dependabot[bot]
2986d451ad
Bump virtualenv from 20.29.0 to 20.36.1
Bumps [virtualenv](https://github.com/pypa/virtualenv) from 20.29.0 to 20.36.1.
- [Release notes](https://github.com/pypa/virtualenv/releases)
- [Changelog](https://github.com/pypa/virtualenv/blob/main/docs/changelog.rst)
- [Commits](https://github.com/pypa/virtualenv/compare/20.29.0...20.36.1)

---
updated-dependencies:
- dependency-name: virtualenv
  dependency-version: 20.36.1
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-15 16:19:39 +00:00
96f0008654
Merge pull request #6 from onyx-and-iris/dependabot/pip/filelock-3.20.3
Bump filelock from 3.16.1 to 3.20.3
2026-03-15 16:18:24 +00:00
dependabot[bot]
6ddead68d0
Bump filelock from 3.16.1 to 3.20.3
Bumps [filelock](https://github.com/tox-dev/py-filelock) from 3.16.1 to 3.20.3.
- [Release notes](https://github.com/tox-dev/py-filelock/releases)
- [Changelog](https://github.com/tox-dev/filelock/blob/main/docs/changelog.rst)
- [Commits](https://github.com/tox-dev/py-filelock/compare/3.16.1...3.20.3)

---
updated-dependencies:
- dependency-name: filelock
  dependency-version: 3.20.3
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-15 16:17:48 +00:00
2794b14cf1 resolves the hostname once and use it throughout the package
this is more efficient and fails faster on error.

patch bump
2026-03-11 04:04:53 +00:00
c46ca8a8c8 patch bump 2026-03-09 05:21:57 +00:00
f8b56b4a30 swap out implementation of {IRemote}.apply(). It now uses individual requests instead of script requests. 2026-03-09 05:21:32 +00:00
09259269d7 rename ratelimit decorator to script_ratelimit for clarity
script_ratelimit now defaults to None.

add note in README
2026-03-09 05:20:25 +00:00
242401e294 add VBANCMDPacketError to exception hierarchy
raise them when we fail to validate incoming packets.

add VbanRTRequestHeader post_init to validate the provided bps value.

VbanRTSubscribeHeader and VbanRTRequestHeader properties now return int type. They are then directly packed into the header.
2026-03-09 05:19:41 +00:00
98ec9b715f fixes bug receiving inconsistent level updates
patch bump
2026-03-07 15:44:32 +00:00
5f7b62a0e0 add missing cached_property 2026-03-07 14:29:32 +00:00
d1bcbfed6f minor bump 2026-03-07 14:24:44 +00:00
ab80bbf226 use host kwarg/env var in examples 2026-03-07 14:23:29 +00:00
ad58852a77 improve efficiency with cached properties and struct.unpack 2026-03-07 14:22:25 +00:00
5363584940 improve to_bytes efficiency with struct.pack 2026-03-07 14:20:31 +00:00
9f43ee18d3 add more enums so we can remove some of the constants
rename some of the packet classes

patch bump
2026-03-07 00:03:46 +00:00
3cde874a3c remove unnecessary assignment 2026-03-03 20:03:09 +00:00
3d01321be3 separate ping from pong
this separates concerns and allows the pong_timeout to strictly handle timeouts for pongs.

patch bump
2026-03-03 19:47:15 +00:00
2dd52a7258 move ping timeout logic into decorator
patch bump
2026-03-03 18:21:14 +00:00
28cbef5ef6 patch bump 2026-03-03 15:48:23 +00:00
5b3b35fca3 flag value 2026-03-03 15:48:09 +00:00
7b3149a1e1 patch bump 2026-03-03 15:38:11 +00:00
230d9f0eb3 upd test config 2026-03-03 15:37:38 +00:00
c9a505df0a convert Modes class to a Flag Enum type and rename it to ChannelModes
move it into vban_cmd.packet
2026-03-03 15:36:56 +00:00
3e3bec6d50 remove the sleep(), the @ratelimit decorator already handles this. 2026-03-03 15:31:31 +00:00
55b3125e10 fixes regression in apply()
patch bump
2026-03-02 23:52:06 +00:00
7b3340042c upd reference to ip 2026-03-02 23:26:38 +00:00
6ea0859180 patch bump 2026-03-02 23:25:03 +00:00
81ed963bea fix references to remote.ip 2026-03-02 23:24:09 +00:00
0b99b6a67f update references to ip kwarg 2026-03-02 23:21:57 +00:00
86d0aa91c3 add a ratelimit decorator to {VbanCmd}.sendtext()
ip kwarg renamed to host.
2026-03-02 23:20:45 +00:00
cf66ae252c minor bump 2026-03-02 20:50:12 +00:00
42f6f29d1e add 2.9.0 to CHANGELOG 2026-03-02 20:50:05 +00:00
a210766b7b improve framecounter thread safety 2026-03-02 20:44:08 +00:00
7d741d6e8b {VbanCmd}._ping() now causes login to fail fast if no pong is received. 2026-03-02 20:26:35 +00:00
8be9d3cb7f implement ping/pong dataclasses 2026-03-02 20:25:58 +00:00
23b99cb66b perform some path magic so Voicemeeter receives the entire path
patch bump
2026-03-01 21:29:09 +00:00
2fd7b8ad8b minor bump 2026-03-01 21:13:38 +00:00
c851cb5abe add Recorder section to README 2026-03-01 21:11:48 +00:00
dc681f50d0 add Recorder
add it to banana+potato
2026-03-01 21:10:10 +00:00
a0ec00652b reduce the level of logging for packet parse errors
patch bump
2026-03-01 17:22:06 +00:00
22 changed files with 993 additions and 457 deletions

2
.gitignore vendored
View File

@ -159,3 +159,5 @@ config.toml
vban.toml
.vscode/
PING_FEATURE.md

View File

@ -11,6 +11,16 @@ Before any major/minor/patch bump all unit tests will be run to verify they pass
- [x]
## [2.9.0] - 2026-03-02
### Added
- Recorder class, see [Recorder](https://github.com/onyx-and-iris/vban-cmd-python?tab=readme-ov-file#recorder) in README.
- Ping/pong implemented. If a pong is not received {VbanCmd}.login() will fail fast. This prevents the rt listener threads from starting up.
- It has the added benefit of automatically detecting the type of VBAN server (Voicemeeter or Matrix).
- A thread lock around the framecounter to improve thread safety since it can be accessed by both the main thread and the Producer thread.
## [2.7.0] - 2026-03-01
### Added

View File

@ -41,14 +41,14 @@ Load VBAN connection info from toml config. A valid `vban.toml` might look like
```toml
[connection]
ip = "gamepc.local"
host = "localhost"
port = 6980
streamname = "Command1"
```
It should be placed in \<user home directory\> / "Documents" / "Voicemeeter" / "configs"
Alternatively you may pass `ip`, `port`, `streamname` as keyword arguments.
Alternatively you may pass `host`, `port`, `streamname` as keyword arguments.
#### `__main__.py`
@ -85,7 +85,7 @@ def main():
KIND_ID = 'banana'
with vban_cmd.api(
KIND_ID, ip='gamepc.local', port=6980, streamname='Command1'
KIND_ID, host='localhost', port=6980, streamname='Command1'
) as vban:
do = ManyThings(vban)
do.things()
@ -349,6 +349,40 @@ vban.strip[0].fadeto(-10.3, 1000)
vban.bus[3].fadeby(-5.6, 500)
```
### Recorder
The following methods are available
- `play()`
- `stop()`
- `pause()`
- `record()`
- `ff()`
- `rew()`
- `load(filepath)`: raw string
- `goto(time_string)`: time string in format `hh:mm:ss`
The following properties are available
- `samplerate`: int, (22050, 24000, 32000, 44100, 48000, 88200, 96000, 176400, 192000)
- `bitresolution`: int, (8, 16, 24, 32)
- `channel`: int, from 1 to 8
- `kbps`: int, (32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320)
- `gain`: float, from -60.0 to 12.0
example:
```python
vban.recorder.play()
vban.recorder.stop()
# filepath as raw string
vban.recorder.load(r'C:\music\mytune.mp3')
# set the goto time to 1m 30s
vban.recorder.goto('00:01:30')
```
### Command
Certain 'special' commands are defined by the API as performing actions rather than setting values. The following methods are available:
@ -440,7 +474,7 @@ example:
import vban_cmd
opts = {
'ip': '<ip address>',
'host': '<ip address>',
'streamname': 'Command1',
'port': 6980,
}
@ -507,14 +541,15 @@ print(vban.event.get())
You may pass the following optional keyword arguments:
- `ip`: str='localhost', ip or hostname of remote machine
- `host`: str='localhost', ip or hostname of remote machine
- `port`: int=6980, vban udp port of remote machine.
- `streamname`: str='Command1', name of the stream to connect to.
- `bps`: int=256000, bps rate of the stream.
- `channel`: int=0, channel on which to send the UDP requests.
- `pdirty`: boolean=False, parameter updates
- `ldirty`: boolean=False, level updates
- `timeout`: int=5, amount of time (seconds) to wait for an incoming RT data packet (parameter states).
- `script_ratelimit`: float | None=None, ratelimit for vban.sendtext() specifically.
- `timeout`: int=5, timeout for socket operations.
- `disable_rt_listeners`: boolean=False, set `True` if you don't wish to receive RT packets.
- You can still send Matrix string requests ending with `?` and receive a response.
@ -557,7 +592,7 @@ import vban_cmd
logging.basicConfig(level=logging.DEBUG)
opts = {'ip': 'ip.local', 'port': 6980, 'streamname': 'Command1'}
opts = {'host': 'localhost', 'port': 6980, 'streamname': 'Command1'}
with vban_cmd.api('banana', **opts) as vban:
...
```

View File

@ -103,7 +103,7 @@ class App(tk.Tk):
def main():
KIND_ID = 'banana'
conn = {
'ip': os.environ.get('VBANCMD_IP', 'localhost'),
'host': os.environ.get('VBANCMD_HOST', 'localhost'),
'port': int(os.environ.get('VBANCMD_PORT', 6980)),
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
}

View File

@ -94,7 +94,7 @@ class Observer:
def main():
KIND_ID = 'potato'
conn = {
'ip': os.environ.get('VBANCMD_IP', 'localhost'),
'host': os.environ.get('VBANCMD_HOST', 'localhost'),
'port': int(os.environ.get('VBANCMD_PORT', 6980)),
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
}

View File

@ -25,7 +25,7 @@ class App:
def main():
KIND_ID = 'banana'
conn = {
'ip': os.environ.get('VBANCMD_IP', 'localhost'),
'host': os.environ.get('VBANCMD_HOST', 'localhost'),
'port': int(os.environ.get('VBANCMD_PORT', 6980)),
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
}

42
poetry.lock generated
View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.0.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand.
[[package]]
name = "cachetools"
@ -55,7 +55,7 @@ description = "Backport of PEP 654 (exception groups)"
optional = false
python-versions = ">=3.7"
groups = ["dev"]
markers = "python_version < \"3.11\""
markers = "python_version == \"3.10\""
files = [
{file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"},
{file = "exceptiongroup-1.2.2.tar.gz", hash = "sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc"},
@ -66,21 +66,16 @@ test = ["pytest (>=6)"]
[[package]]
name = "filelock"
version = "3.16.1"
version = "3.20.3"
description = "A platform independent file lock."
optional = false
python-versions = ">=3.8"
python-versions = ">=3.10"
groups = ["dev"]
files = [
{file = "filelock-3.16.1-py3-none-any.whl", hash = "sha256:2082e5703d51fbf98ea75855d9d5527e33d8ff23099bec374a134febee6946b0"},
{file = "filelock-3.16.1.tar.gz", hash = "sha256:c249fbfcd5db47e5e2d6d62198e565475ee65e4831e2561c8e313fa7eb961435"},
{file = "filelock-3.20.3-py3-none-any.whl", hash = "sha256:4b0dda527ee31078689fc205ec4f1c1bf7d56cf88b6dc9426c4f230e46c2dce1"},
{file = "filelock-3.20.3.tar.gz", hash = "sha256:18c57ee915c7ec61cff0ecf7f0f869936c7c30191bb0cf406f1341778d0834e1"},
]
[package.extras]
docs = ["furo (>=2024.8.6)", "sphinx (>=8.0.2)", "sphinx-autodoc-typehints (>=2.4.1)"]
testing = ["covdefaults (>=2.3)", "coverage (>=7.6.1)", "diff-cover (>=9.2)", "pytest (>=8.3.3)", "pytest-asyncio (>=0.24)", "pytest-cov (>=5)", "pytest-mock (>=3.14)", "pytest-timeout (>=2.3.1)", "virtualenv (>=20.26.4)"]
typing = ["typing-extensions (>=4.12.2)"]
[[package]]
name = "iniconfig"
version = "2.0.0"
@ -243,7 +238,7 @@ description = "A lil' TOML parser"
optional = false
python-versions = ">=3.8"
groups = ["main", "dev"]
markers = "python_version < \"3.11\""
markers = "python_version == \"3.10\""
files = [
{file = "tomli-2.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249"},
{file = "tomli-2.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6"},
@ -309,37 +304,38 @@ test = ["devpi-process (>=1.0.2)", "pytest (>=8.3.3)", "pytest-mock (>=3.14)"]
[[package]]
name = "typing-extensions"
version = "4.12.2"
description = "Backported and Experimental Type Hints for Python 3.8+"
version = "4.15.0"
description = "Backported and Experimental Type Hints for Python 3.9+"
optional = false
python-versions = ">=3.8"
python-versions = ">=3.9"
groups = ["dev"]
markers = "python_version < \"3.11\""
markers = "python_version == \"3.10\""
files = [
{file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"},
{file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"},
{file = "typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548"},
{file = "typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466"},
]
[[package]]
name = "virtualenv"
version = "20.29.0"
version = "20.36.1"
description = "Virtual Python Environment builder"
optional = false
python-versions = ">=3.8"
groups = ["dev"]
files = [
{file = "virtualenv-20.29.0-py3-none-any.whl", hash = "sha256:c12311863497992dc4b8644f8ea82d3b35bb7ef8ee82e6630d76d0197c39baf9"},
{file = "virtualenv-20.29.0.tar.gz", hash = "sha256:6345e1ff19d4b1296954cee076baaf58ff2a12a84a338c62b02eda39f20aa982"},
{file = "virtualenv-20.36.1-py3-none-any.whl", hash = "sha256:575a8d6b124ef88f6f51d56d656132389f961062a9177016a50e4f507bbcc19f"},
{file = "virtualenv-20.36.1.tar.gz", hash = "sha256:8befb5c81842c641f8ee658481e42641c68b5eab3521d8e092d18320902466ba"},
]
[package.dependencies]
distlib = ">=0.3.7,<1"
filelock = ">=3.12.2,<4"
filelock = {version = ">=3.20.1,<4", markers = "python_version >= \"3.10\""}
platformdirs = ">=3.9.1,<5"
typing-extensions = {version = ">=4.13.2", markers = "python_version < \"3.11\""}
[package.extras]
docs = ["furo (>=2023.7.26)", "proselint (>=0.13)", "sphinx (>=7.1.2,!=7.3)", "sphinx-argparse (>=0.4)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=23.6)"]
test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.4)", "pytest-env (>=0.8.2)", "pytest-freezer (>=0.4.8)", "pytest-mock (>=3.11.1)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=68)", "time-machine (>=2.10)"]
test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.4)", "pytest-env (>=0.8.2)", "pytest-freezer (>=0.4.8) ; platform_python_implementation == \"PyPy\" or platform_python_implementation == \"GraalVM\" or platform_python_implementation == \"CPython\" and sys_platform == \"win32\" and python_version >= \"3.13\"", "pytest-mock (>=3.11.1)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=68)", "time-machine (>=2.10) ; platform_python_implementation == \"CPython\""]
[[package]]
name = "virtualenv-pyenv"

View File

@ -1,6 +1,6 @@
[project]
name = "vban-cmd"
version = "2.7.0"
version = "2.10.3"
description = "Python interface for the VBAN RT Packet Service (Sendtext)"
authors = [{ name = "Onyx and Iris", email = "code@onyxandiris.online" }]
license = { text = "MIT" }
@ -9,7 +9,7 @@ requires-python = ">=3.10"
dependencies = ["tomli (>=2.0.1,<3.0) ; python_version < '3.11'"]
[tool.poetry.requires-plugins]
poethepoet = "^0.35.0"
poethepoet = ">=0.42.0"
[tool.poetry.group.dev.dependencies]
pytest = "^8.3.4"

View File

@ -11,7 +11,7 @@ from vban_cmd.kinds import request_kind_map as kindmap
KIND_ID = os.environ.get('KIND', 'potato')
opts = {
'ip': os.getenv('VBANCMD_IP', 'localhost'),
'host': os.getenv('VBANCMD_HOST', 'localhost'),
'streamname': os.getenv('VBANCMD_STREAMNAME', 'Command1'),
'port': int(os.getenv('VBANCMD_PORT', 6980)),
}

View File

@ -1,6 +1,18 @@
from .packet.enums import ServiceTypes, SubProtocols
class VBANCMDError(Exception):
"""Base VBANCMD Exception class."""
class VBANCMDConnectionError(VBANCMDError):
"""Exception raised when connection/timeout errors occur"""
class VBANCMDPacketError(VBANCMDError):
"""Exception raised when packet parsing errors occur"""
def __init__(self, message: str, protocol: SubProtocols, type_: ServiceTypes):
super().__init__(message)
self.protocol = protocol
self.type = type_

View File

@ -1,4 +1,3 @@
import abc
import logging
from enum import IntEnum
from functools import cached_property
@ -11,6 +10,7 @@ from .error import VBANCMDError
from .kinds import KindMapClass
from .kinds import request_kind_map as kindmap
from .macrobutton import MacroButton
from .recorder import Recorder
from .strip import request_strip_obj as strip
from .vban import request_vban_obj as vban
from .vbancmd import VbanCmd
@ -26,7 +26,7 @@ class FactoryBuilder:
"""
BuilderProgress = IntEnum(
'BuilderProgress', 'strip bus command macrobutton vban', start=0
'BuilderProgress', 'strip bus command macrobutton vban recorder', start=0
)
def __init__(self, factory, kind: KindMapClass):
@ -38,6 +38,7 @@ class FactoryBuilder:
f'Finished building commands for {self._factory}',
f'Finished building macrobuttons for {self._factory}',
f'Finished building vban in/out streams for {self._factory}',
f'Finished building recorder for {self._factory}',
)
self.logger = logger.getChild(self.__class__.__name__)
@ -72,24 +73,30 @@ class FactoryBuilder:
self._factory.vban = vban(self._factory)
return self
def make_recorder(self):
self._factory.recorder = Recorder.make(self._factory)
return self
class FactoryBase(VbanCmd):
"""Base class for factories, subclasses VbanCmd."""
def __init__(self, kind_id: str, **kwargs):
defaultkwargs = {
'ip': 'localhost',
'host': 'localhost',
'port': 6980,
'streamname': 'Command1',
'bps': 256000,
'channel': 0,
'ratelimit': 0.01,
'timeout': 5,
'script_ratelimit': None, # if None or 0, no rate limit applied to script commands
'timeout': 5, # timeout on socket operations, in seconds
'disable_rt_listeners': False,
'sync': False,
'pdirty': False,
'ldirty': False,
}
if 'ip' in kwargs:
defaultkwargs['host'] = kwargs.pop('ip') # for backwards compatibility
if 'subs' in kwargs:
defaultkwargs |= kwargs.pop('subs') # for backwards compatibility
kwargs = defaultkwargs | kwargs
@ -114,11 +121,6 @@ class FactoryBase(VbanCmd):
+ f"({self.kind}, ip='{self.ip}', port={self.port}, streamname='{self.streamname}')"
)
@property
@abc.abstractmethod
def steps(self):
pass
@cached_property
def configs(self):
self._configs = configs(self.kind.name)
@ -166,7 +168,7 @@ class BananaFactory(FactoryBase):
@property
def steps(self) -> Iterable:
"""steps required to build the interface for a kind"""
return self._steps
return self._steps + (self.builder.make_recorder,)
class PotatoFactory(FactoryBase):
@ -188,7 +190,7 @@ class PotatoFactory(FactoryBase):
@property
def steps(self) -> Iterable:
"""steps required to build the interface for a kind"""
return self._steps
return self._steps + (self.builder.make_recorder,)
def vbancmd_factory(kind_id: str, **kwargs) -> VbanCmd:

View File

@ -1,83 +1,9 @@
import abc
import logging
import time
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class Modes:
"""Channel Modes"""
_mute: hex = 0x00000001
_solo: hex = 0x00000002
_mono: hex = 0x00000004
_mc: hex = 0x00000008
_amix: hex = 0x00000010
_repeat: hex = 0x00000020
_bmix: hex = 0x00000030
_composite: hex = 0x00000040
_tvmix: hex = 0x00000050
_upmix21: hex = 0x00000060
_upmix41: hex = 0x00000070
_upmix61: hex = 0x00000080
_centeronly: hex = 0x00000090
_lfeonly: hex = 0x000000A0
_rearonly: hex = 0x000000B0
_mask: hex = 0x000000F0
_on: hex = 0x00000100 # eq.on
_cross: hex = 0x00000200
_ab: hex = 0x00000800 # eq.ab
_busa: hex = 0x00001000
_busa1: hex = 0x00001000
_busa2: hex = 0x00002000
_busa3: hex = 0x00004000
_busa4: hex = 0x00008000
_busa5: hex = 0x00080000
_busb: hex = 0x00010000
_busb1: hex = 0x00010000
_busb2: hex = 0x00020000
_busb3: hex = 0x00040000
_pan0: hex = 0x00000000
_pancolor: hex = 0x00100000
_panmod: hex = 0x00200000
_panmask: hex = 0x00F00000
_postfx_r: hex = 0x01000000
_postfx_d: hex = 0x02000000
_postfx1: hex = 0x04000000
_postfx2: hex = 0x08000000
_sel: hex = 0x10000000
_monitor: hex = 0x20000000
@property
def modevals(self):
return (
val
for val in [
self._amix,
self._repeat,
self._bmix,
self._composite,
self._tvmix,
self._upmix21,
self._upmix41,
self._upmix61,
self._centeronly,
self._lfeonly,
self._rearonly,
]
)
class IRemote(abc.ABC):
"""
Common interface between base class and extended (higher) classes
@ -89,7 +15,6 @@ class IRemote(abc.ABC):
self._remote = remote
self.index = index
self.logger = logger.getChild(self.__class__.__name__)
self._modes = Modes()
def getter(self, param):
cmd = self._cmd(param)
@ -125,27 +50,16 @@ class IRemote(abc.ABC):
def fget(attr, val):
if attr == 'mode':
return (f'mode.{val}', 1)
elif attr == 'knob':
return ('', val)
return (attr, val)
return (getattr(self, attr), val, 1)
return (self, attr, val)
for attr, val in data.items():
if not isinstance(val, dict):
if attr in dir(self): # avoid calling getattr (with hasattr)
attr, val = fget(attr, val)
if isinstance(val, bool):
val = 1 if val else 0
self._remote.cache[self._cmd(attr)] = val
self._remote._script += f'{self._cmd(attr)}={val};'
target, attr, val = fget(attr, val)
setattr(target, attr, val)
else:
self.logger.error(f'invalid attribute {attr} for {self}')
else:
target = getattr(self, attr)
target.apply(val)
self._remote.sendtext(self._remote._script)
return self
def then_wait(self):
self._remote._script = str()
time.sleep(self._remote.DELAY)

View File

@ -1,6 +1,7 @@
from functools import partial
from .enums import NBS, BusModes
from .packet.enums import ChannelModes
from .util import cache_bool, cache_float, cache_int, cache_string
@ -27,7 +28,7 @@ def channel_bool_prop(param):
elif param.lower() == 'mc':
return channel_state.mc
else:
return channel_state.get_mode(getattr(self._modes, f'_{param.lower()}'))
return channel_state.get_mode(getattr(ChannelModes, param.upper()).value)
def fset(self, val):
self.setter(param, 1 if val else 0)
@ -55,7 +56,9 @@ def channel_int_prop(param):
bit_9 = (channel_state._state >> 9) & 1
return (bit_9 << 1) | bit_2
else:
return channel_state.get_mode_int(getattr(self._modes, f'_{param.lower()}'))
return channel_state.get_mode_int(
getattr(ChannelModes, param.upper()).value
)
def fset(self, val):
self.setter(param, val)
@ -89,7 +92,7 @@ def strip_output_prop(param):
strip_state = self.public_packets[NBS.zero].states.strip[self.index]
return strip_state.get_mode(getattr(self._modes, f'_bus{param.lower()}'))
return strip_state.get_mode(getattr(ChannelModes, f'BUS{param.upper()}').value)
def fset(self, val):
self.setter(param, 1 if val else 0)

83
vban_cmd/packet/enums.py Normal file
View File

@ -0,0 +1,83 @@
from enum import Flag
class SubProtocols(Flag):
"""Sub Protocols - Bit flags that can be combined"""
AUDIO = 0x00
SERIAL = 0x20
TEXT = 0x40
SERVICE = 0x60
MASK = 0xE0
class ServiceTypes(Flag):
"""Service Types - Bit flags that can be combined"""
PING = 0
PONG = 0
CHATUTF8 = 1
RTPACKETREGISTER = 32
RTPACKET = 33
REQUESTREPLY = 0x02 # A Matrix reply
FNCT_REPLY = 0x80 # An RTPacket reply
class StreamTypes(Flag):
"""Stream Types - Bit flags that can be combined"""
ASCII = 0x00
UTF8 = 0x10
WCHAR = 0x20
class ChannelModes(Flag):
"""Channel Modes - Bit flags that can be combined"""
MUTE = 0x00000001
SOLO = 0x00000002
MONO = 0x00000004
MC = 0x00000008
AMIX = 0x00000010
REPEAT = 0x00000020
BMIX = 0x00000030
COMPOSITE = 0x00000040
TVMIX = 0x00000050
UPMIX21 = 0x00000060
UPMIX41 = 0x00000070
UPMIX61 = 0x00000080
CENTERONLY = 0x00000090
LFEONLY = 0x000000A0
REARONLY = 0x000000B0
MASK = 0x000000F0
ON = 0x00000100 # eq.on
CROSS = 0x00000200
AB = 0x00000800 # eq.ab
BUSA = 0x00001000
BUSA1 = 0x00001000
BUSA2 = 0x00002000
BUSA3 = 0x00004000
BUSA4 = 0x00008000
BUSA5 = 0x00080000
BUSB = 0x00010000
BUSB1 = 0x00010000
BUSB2 = 0x00020000
BUSB3 = 0x00040000
PAN0 = 0x00000000
PANCOLOR = 0x00100000
PANMOD = 0x00200000
PANMASK = 0x00F00000
POSTFX_R = 0x01000000
POSTFX_D = 0x02000000
POSTFX1 = 0x04000000
POSTFX2 = 0x08000000
SEL = 0x10000000
MONITOR = 0x20000000

View File

@ -1,25 +1,124 @@
import struct
from dataclasses import dataclass
from vban_cmd.enums import NBS
from vban_cmd.error import VBANCMDPacketError
from vban_cmd.kinds import KindMapClass
VBAN_PROTOCOL_TXT = 0x40
VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
VBAN_SERVICE_MASK = 0xE0
VBAN_PROTOCOL_MASK = 0xE0
VBAN_SERVICE_REQUESTREPLY = 0x02
VBAN_SERVICE_FNCT_REPLY = 0x02
from .enums import ServiceTypes, StreamTypes, SubProtocols
PINGPONG_PACKET_SIZE = 704 # Size of the PING/PONG header + payload in bytes
MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
STREAMNAME_MAX_LENGTH = 16
# fmt: off
BPS_OPTS = [
0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 31250,
38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800, 921600,
1000000, 1500000, 2000000, 3000000,
]
# fmt: on
@dataclass
class VbanPacket:
"""Represents the header of an incoming VBAN data packet"""
class VbanPingHeader:
"""Represents the header of a PING packet"""
name: str = 'PING0'
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0
format_nbc: int = ServiceTypes.PING.value
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:STREAMNAME_MAX_LENGTH].ljust(
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod
def to_bytes(cls, framecounter: int = 0) -> bytes:
"""Creates the PING header bytes only."""
header = cls(framecounter=framecounter)
return struct.pack(
'<4s4B16sI',
header.vban,
header.format_sr,
header.format_nbs,
header.format_nbc,
header.format_bit,
header.streamname,
header.framecounter,
)
@dataclass
class VbanPongHeader:
"""Represents the header of a PONG response packet"""
name: str = 'PING0'
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0
format_nbc: int = ServiceTypes.PONG.value
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:STREAMNAME_MAX_LENGTH].ljust(
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a PONG response packet from bytes."""
parsed = _parse_vban_service_header(data)
# PONG responses use the same service type as PING (0x00)
# and are identified by having payload data
if parsed['format_nbc'] != ServiceTypes.PONG.value:
raise VBANCMDPacketError(
f'Not a PONG response packet: {parsed["format_nbc"]:02x}',
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
type_=ServiceTypes(parsed['format_nbc']),
)
return cls(**parsed)
@classmethod
def is_pong_response(cls, data: bytes) -> bool:
"""Check if packet is a PONG response by analyzing the actual response format."""
try:
parsed = _parse_vban_service_header(data)
# Validate this is a service protocol packet with PING/PONG service type
if parsed['format_nbc'] != ServiceTypes.PONG.value:
return False
if parsed['name'] not in ['PING0', 'VBAN Service']:
return False
# PONG should have payload data (same size as PING)
return len(data) >= PINGPONG_PACKET_SIZE
except (ValueError, Exception):
return False
@dataclass
class VbanRTPacket:
"""Represents the header of an incoming RTPacket"""
nbs: NBS
_kind: KindMapClass
@ -49,10 +148,10 @@ class VbanPacket:
@dataclass
class VbanSubscribeHeader:
"""Represents the header of a subscription packet"""
class VbanRTSubscribeHeader:
"""Represents the header of an RT subscription packet"""
nbs: NBS = NBS.zero
_nbs: NBS = NBS.zero
name: str = 'Register-RTP'
timeout: int = 15
@ -61,38 +160,106 @@ class VbanSubscribeHeader:
return b'VBAN'
@property
def format_sr(self) -> bytes:
return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little')
def sr(self) -> int:
return SubProtocols.SERVICE.value
@property
def format_nbs(self) -> bytes:
return (self.nbs.value & 0xFF).to_bytes(1, 'little')
def nbs(self) -> int:
return self._nbs.value & 0xFF
@property
def format_nbc(self) -> bytes:
return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little')
def nbc(self) -> int:
return ServiceTypes.RTPACKETREGISTER.value
@property
def format_bit(self) -> bytes:
return (self.timeout & 0xFF).to_bytes(1, 'little')
def bit(self) -> int:
return self.timeout & 0xFF
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust(
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
header = cls(nbs=nbs)
header = cls(_nbs=nbs)
data = bytearray()
data.extend(header.vban)
data.extend(header.format_sr)
data.extend(header.format_nbs)
data.extend(header.format_nbc)
data.extend(header.format_bit)
data.extend(header.streamname)
data.extend(framecounter.to_bytes(4, 'little'))
return bytes(data)
return struct.pack(
'<4s4B16sI',
header.vban,
header.sr,
header.nbs,
header.nbc,
header.bit,
header.streamname,
framecounter,
)
@dataclass
class VbanRTRequestHeader:
"""Represents the header of an RT request packet"""
name: str
bps: int
channel: int
framecounter: int = 0
def __post_init__(self):
if self.bps not in BPS_OPTS:
raise ValueError(
f'Invalid bps value: {self.bps}. Must be one of {BPS_OPTS}'
)
self.bps_index = BPS_OPTS.index(self.bps)
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def sr(self) -> int:
return self.bps_index | SubProtocols.TEXT.value
@property
def nbs(self) -> int:
return 0
@property
def nbc(self) -> int:
return self.channel
@property
def bit(self) -> int:
return StreamTypes.UTF8.value
@property
def streamname(self) -> bytes:
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust(
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod
def to_bytes(cls, name: str, bps: int, channel: int, framecounter: int) -> bytes:
header = cls(name=name, bps=bps, channel=channel, framecounter=framecounter)
return struct.pack(
'<4s4B16sI',
header.vban,
header.sr,
header.nbs,
header.nbc,
header.bit,
header.streamname,
header.framecounter,
)
@classmethod
def encode_with_payload(
cls, name: str, bps: int, channel: int, framecounter: int, payload: str
) -> bytes:
"""Creates the complete packet with header and payload."""
return cls.to_bytes(name, bps, channel, framecounter) + payload.encode()
def _parse_vban_service_header(data: bytes) -> dict:
@ -109,9 +276,12 @@ def _parse_vban_service_header(data: bytes) -> dict:
format_bit = data[7]
# Verify this is a service protocol packet
protocol = format_sr & VBAN_PROTOCOL_MASK
if protocol != VBAN_PROTOCOL_SERVICE:
raise ValueError(f'Not a service protocol packet: {protocol:02x}')
protocol = format_sr & SubProtocols.MASK.value
if protocol != SubProtocols.SERVICE.value:
raise VBANCMDPacketError(
f'Invalid protocol in service header: {protocol:02x}',
protocol=SubProtocols(protocol),
)
# Extract stream name and frame counter
name = data[8:24].rstrip(b'\x00').decode('utf-8', errors='ignore')
@ -128,13 +298,13 @@ def _parse_vban_service_header(data: bytes) -> dict:
@dataclass
class VbanResponseHeader:
"""Represents the header of a response packet"""
class VbanRTResponseHeader:
"""Represents the header of an RT response packet"""
name: str = 'Voicemeeter-RTP'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_RTPACKET
format_nbc: int = ServiceTypes.RTPACKET.value
format_bit: int = 0
framecounter: int = 0
@ -144,7 +314,9 @@ class VbanResponseHeader:
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust(
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod
def from_bytes(cls, data: bytes):
@ -152,9 +324,11 @@ class VbanResponseHeader:
parsed = _parse_vban_service_header(data)
# Validate this is an RTPacket response
if parsed['format_nbc'] != VBAN_SERVICE_RTPACKET:
raise ValueError(
f'Not a RTPacket response packet: {parsed["format_nbc"]:02x}'
if parsed['format_nbc'] != ServiceTypes.RTPACKET.value:
raise VBANCMDPacketError(
f'Not an RTPacket response packet: {parsed["format_nbc"]:02x}',
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
type_=ServiceTypes(parsed['format_nbc']),
)
return cls(**parsed)
@ -165,9 +339,9 @@ class VbanMatrixResponseHeader:
"""Represents the header of a matrix response packet"""
name: str = 'Request Reply'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = VBAN_SERVICE_FNCT_REPLY
format_nbc: int = VBAN_SERVICE_REQUESTREPLY
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = ServiceTypes.FNCT_REPLY.value
format_nbc: int = ServiceTypes.REQUESTREPLY.value
format_bit: int = 0
framecounter: int = 0
@ -177,16 +351,29 @@ class VbanMatrixResponseHeader:
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust(
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a matrix response packet from bytes."""
parsed = _parse_vban_service_header(data)
# Validate this is a service reply packet
if parsed['format_nbs'] != VBAN_SERVICE_FNCT_REPLY:
raise ValueError(f'Not a service reply packet: {parsed["format_nbs"]:02x}')
# Validate this is a service reply packet (dual encoding scheme)
if parsed['format_nbs'] != ServiceTypes.FNCT_REPLY.value:
raise VBANCMDPacketError(
f'Not a service reply packet: {parsed["format_nbs"]:02x}',
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
type_=ServiceTypes(parsed['format_nbs']),
)
if parsed['format_nbc'] != ServiceTypes.REQUESTREPLY.value:
raise VBANCMDPacketError(
f'Not a request reply packet: {parsed["format_nbc"]:02x}',
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
type_=ServiceTypes(parsed['format_nbc']),
)
return cls(**parsed)
@ -205,62 +392,3 @@ class VbanMatrixResponseHeader:
header = cls.from_bytes(data)
payload = cls.extract_payload(data)
return header, payload
@dataclass
class VbanRequestHeader:
"""Represents the header of a request packet"""
name: str
bps_index: int
channel: int
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def sr(self) -> bytes:
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little')
@property
def nbs(self) -> bytes:
return (0).to_bytes(1, 'little')
@property
def nbc(self) -> bytes:
return (self.channel).to_bytes(1, 'little')
@property
def bit(self) -> bytes:
return (0x10).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode() + bytes(16 - len(self.name))
@classmethod
def to_bytes(
cls, name: str, bps_index: int, channel: int, framecounter: int
) -> bytes:
header = cls(
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
)
data = bytearray()
data.extend(header.vban)
data.extend(header.sr)
data.extend(header.nbs)
data.extend(header.nbc)
data.extend(header.bit)
data.extend(header.streamname)
data.extend(header.framecounter.to_bytes(4, 'little'))
return bytes(data)
@classmethod
def encode_with_payload(
cls, name: str, bps_index: int, channel: int, framecounter: int, payload: str
) -> bytes:
"""Creates the complete packet with header and payload."""
return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode()

View File

@ -1,11 +1,14 @@
import struct
from dataclasses import dataclass
from functools import cached_property
from typing import NamedTuple
from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass
from vban_cmd.util import comp
from .headers import VbanPacket
from .enums import ChannelModes
from .headers import VbanRTPacket
class Levels(NamedTuple):
@ -20,6 +23,13 @@ class ChannelState:
# Convert 4-byte state to integer once for efficient lookups
self._state = int.from_bytes(state_bytes, 'little')
@classmethod
def from_int(cls, state_int: int):
"""Create ChannelState directly from integer for efficiency"""
instance = cls.__new__(cls)
instance._state = state_int
return instance
def get_mode(self, mode_value: int) -> bool:
"""Get boolean state for a specific mode"""
return (self._state & mode_value) != 0
@ -31,57 +41,57 @@ class ChannelState:
# Common boolean modes
@property
def mute(self) -> bool:
return (self._state & 0x00000001) != 0
return (self._state & ChannelModes.MUTE.value) != 0
@property
def solo(self) -> bool:
return (self._state & 0x00000002) != 0
return (self._state & ChannelModes.SOLO.value) != 0
@property
def mono(self) -> bool:
return (self._state & 0x00000004) != 0
return (self._state & ChannelModes.MONO.value) != 0
@property
def mc(self) -> bool:
return (self._state & 0x00000008) != 0
return (self._state & ChannelModes.MC.value) != 0
# EQ modes
@property
def eq_on(self) -> bool:
return (self._state & 0x00000100) != 0
return (self._state & ChannelModes.ON.value) != 0
@property
def eq_ab(self) -> bool:
return (self._state & 0x00000800) != 0
return (self._state & ChannelModes.AB.value) != 0
# Bus assignments (strip to bus routing)
@property
def busa1(self) -> bool:
return (self._state & 0x00001000) != 0
return (self._state & ChannelModes.BUSA1.value) != 0
@property
def busa2(self) -> bool:
return (self._state & 0x00002000) != 0
return (self._state & ChannelModes.BUSA2.value) != 0
@property
def busa3(self) -> bool:
return (self._state & 0x00004000) != 0
return (self._state & ChannelModes.BUSA3.value) != 0
@property
def busa4(self) -> bool:
return (self._state & 0x00008000) != 0
return (self._state & ChannelModes.BUSA4.value) != 0
@property
def busb1(self) -> bool:
return (self._state & 0x00010000) != 0
return (self._state & ChannelModes.BUSB1.value) != 0
@property
def busb2(self) -> bool:
return (self._state & 0x00020000) != 0
return (self._state & ChannelModes.BUSB2.value) != 0
@property
def busb3(self) -> bool:
return (self._state & 0x00040000) != 0
return (self._state & ChannelModes.BUSB3.value) != 0
class States(NamedTuple):
@ -95,8 +105,8 @@ class Labels(NamedTuple):
@dataclass
class VbanPacketNBS0(VbanPacket):
"""Represents the body of a VBAN data packet with ident:0"""
class VbanRTPacketNBS0(VbanRTPacket):
"""Represents the body of a VBAN RTPacket with ident:0"""
_inputLeveldB100: bytes
_outputLeveldB100: bytes
@ -146,32 +156,17 @@ class VbanPacketNBS0(VbanPacket):
def pdirty(self, other) -> bool:
"""True iff any defined parameter has changed"""
self_gains = (
self._stripGaindB100Layer1
+ self._stripGaindB100Layer2
+ self._stripGaindB100Layer3
+ self._stripGaindB100Layer4
+ self._stripGaindB100Layer5
+ self._stripGaindB100Layer6
+ self._stripGaindB100Layer7
+ self._stripGaindB100Layer8
)
other_gains = (
other._stripGaindB100Layer1
+ other._stripGaindB100Layer2
+ other._stripGaindB100Layer3
+ other._stripGaindB100Layer4
+ other._stripGaindB100Layer5
+ other._stripGaindB100Layer6
+ other._stripGaindB100Layer7
+ other._stripGaindB100Layer8
)
return (
self._stripState != other._stripState
or self._busState != other._busState
or self_gains != other_gains
or self._stripGaindB100Layer1 != other._stripGaindB100Layer1
or self._stripGaindB100Layer2 != other._stripGaindB100Layer2
or self._stripGaindB100Layer3 != other._stripGaindB100Layer3
or self._stripGaindB100Layer4 != other._stripGaindB100Layer4
or self._stripGaindB100Layer5 != other._stripGaindB100Layer5
or self._stripGaindB100Layer6 != other._stripGaindB100Layer6
or self._stripGaindB100Layer7 != other._stripGaindB100Layer7
or self._stripGaindB100Layer8 != other._stripGaindB100Layer8
or self._busGaindB100 != other._busGaindB100
or self._stripLabelUTF8c60 != other._stripLabelUTF8c60
or self._busLabelUTF8c60 != other._busLabelUTF8c60
@ -185,77 +180,54 @@ class VbanPacketNBS0(VbanPacket):
)
return any(self._strip_comp) or any(self._bus_comp)
@property
@cached_property
def strip_levels(self) -> tuple[float, ...]:
"""Returns strip levels in dB"""
return tuple(
round(
int.from_bytes(self._inputLeveldB100[i : i + 2], 'little', signed=True)
* 0.01,
1,
)
for i in range(0, len(self._inputLeveldB100), 2)
)[: self._kind.num_strip_levels]
strip_raw = struct.unpack('<34h', self._inputLeveldB100)
return tuple(round(val * 0.01, 1) for val in strip_raw)[
: self._kind.num_strip_levels
]
@property
@cached_property
def bus_levels(self) -> tuple[float, ...]:
"""Returns bus levels in dB"""
return tuple(
round(
int.from_bytes(self._outputLeveldB100[i : i + 2], 'little', signed=True)
* 0.01,
1,
)
for i in range(0, len(self._outputLeveldB100), 2)
)[: self._kind.num_bus_levels]
bus_raw = struct.unpack('<64h', self._outputLeveldB100)
return tuple(round(val * 0.01, 1) for val in bus_raw)[
: self._kind.num_bus_levels
]
@property
def levels(self) -> Levels:
"""Returns strip and bus levels as a namedtuple"""
return Levels(strip=self.strip_levels, bus=self.bus_levels)
@property
@cached_property
def states(self) -> States:
"""returns States object with processed strip and bus channel states"""
strip_states = struct.unpack('<8I', self._stripState)
bus_states = struct.unpack('<8I', self._busState)
return States(
strip=tuple(
ChannelState(self._stripState[i : i + 4]) for i in range(0, 32, 4)
),
bus=tuple(ChannelState(self._busState[i : i + 4]) for i in range(0, 32, 4)),
strip=tuple(ChannelState.from_int(state) for state in strip_states),
bus=tuple(ChannelState.from_int(state) for state in bus_states),
)
@property
@cached_property
def gainlayers(self) -> tuple:
"""returns tuple of all strip gain layers as tuples"""
return tuple(
tuple(
round(
int.from_bytes(
getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2],
'little',
signed=True,
)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
for layer in range(1, 9)
)
layer_data = []
for layer in range(1, 9):
layer_bytes = getattr(self, f'_stripGaindB100Layer{layer}')
layer_raw = struct.unpack('<8h', layer_bytes)
layer_data.append(tuple(round(val * 0.01, 2) for val in layer_raw))
return tuple(layer_data)
@property
@cached_property
def busgain(self) -> tuple:
"""returns tuple of bus gains"""
return tuple(
round(
int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
bus_gain_raw = struct.unpack('<8h', self._busGaindB100)
return tuple(round(val * 0.01, 2) for val in bus_gain_raw)
@property
@cached_property
def labels(self) -> Labels:
"""returns Labels namedtuple of strip and bus labels"""

View File

@ -1,11 +1,12 @@
import struct
from dataclasses import dataclass
from functools import cached_property
from typing import NamedTuple
from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass
from .headers import VbanPacket
from .headers import VbanRTPacket
VMPARAMSTRIP_SIZE = 174
@ -193,11 +194,15 @@ class VbanVMParamStrip:
_Pitch_formant_high=data[172:174],
)
@property
@cached_property
def mode(self) -> int:
return int.from_bytes(self._mode, 'little')
@property
@cached_property
def karaoke(self) -> int:
return int.from_bytes(self._nKaraoke, 'little')
@cached_property
def audibility(self) -> Audibility:
return Audibility(
round(int.from_bytes(self._audibility, 'little', signed=True) * 0.01, 2),
@ -206,7 +211,7 @@ class VbanVMParamStrip:
round(int.from_bytes(self._audibility_d, 'little', signed=True) * 0.01, 2),
)
@property
@cached_property
def positions(self) -> Positions:
return Positions(
round(int.from_bytes(self._pos3D_x, 'little', signed=True) * 0.01, 2),
@ -217,7 +222,7 @@ class VbanVMParamStrip:
round(int.from_bytes(self._posMod_y, 'little', signed=True) * 0.01, 2),
)
@property
@cached_property
def eqgains(self) -> EqGains:
return EqGains(
*[
@ -230,7 +235,7 @@ class VbanVMParamStrip:
]
)
@property
@cached_property
def parametric_eq(self) -> tuple[ParametricEQSettings, ...]:
return tuple(
ParametricEQSettings(
@ -243,7 +248,7 @@ class VbanVMParamStrip:
for i in range(6)
)
@property
@cached_property
def sends(self) -> Sends:
return Sends(
round(int.from_bytes(self._send_reverb, 'little', signed=True) * 0.01, 2),
@ -252,11 +257,7 @@ class VbanVMParamStrip:
round(int.from_bytes(self._send_fx2, 'little', signed=True) * 0.01, 2),
)
@property
def karaoke(self) -> int:
return int.from_bytes(self._nKaraoke, 'little')
@property
@cached_property
def compressor(self) -> CompressorSettings:
return CompressorSettings(
gain_in=round(
@ -276,7 +277,7 @@ class VbanVMParamStrip:
),
)
@property
@cached_property
def gate(self) -> GateSettings:
return GateSettings(
threshold_in=round(
@ -295,7 +296,7 @@ class VbanVMParamStrip:
release_ms=round(int.from_bytes(self._GATE_release_ms, 'little') * 0.1, 2),
)
@property
@cached_property
def denoiser(self) -> DenoiserSettings:
return DenoiserSettings(
threshold=round(
@ -303,7 +304,7 @@ class VbanVMParamStrip:
)
)
@property
@cached_property
def pitch(self) -> PitchSettings:
return PitchSettings(
enabled=bool(int.from_bytes(self._PitchEnabled, 'little')),
@ -327,8 +328,8 @@ class VbanVMParamStrip:
@dataclass
class VbanPacketNBS1(VbanPacket):
"""Represents the body of a VBAN data packet with ident:1"""
class VbanRTPacketNBS1(VbanRTPacket):
"""Represents the body of a VBAN RTPacket with ident:1"""
strips: tuple[VbanVMParamStrip, ...]

126
vban_cmd/packet/ping0.py Normal file
View File

@ -0,0 +1,126 @@
import struct
from dataclasses import dataclass
from enum import Enum
from .headers import VbanPingHeader
# VBAN PING bitType constants
VBANPING_TYPE_RECEPTOR = 0x00000001 # Simple receptor
VBANPING_TYPE_TRANSMITTER = 0x00000002 # Simple Transmitter
VBANPING_TYPE_RECEPTORSPOT = 0x00000004 # SPOT receptor
VBANPING_TYPE_TRANSMITTERSPOT = 0x00000008 # SPOT transmitter
VBANPING_TYPE_VIRTUALDEVICE = 0x00000010 # Virtual Device
VBANPING_TYPE_VIRTUALMIXER = 0x00000020 # Virtual Mixer
VBANPING_TYPE_MATRIX = 0x00000040 # MATRIX
VBANPING_TYPE_DAW = 0x00000080 # Workstation
VBANPING_TYPE_SERVER = 0x01000000 # VBAN SERVER
# VBAN PING bitfeature constants
VBANPING_FEATURE_AUDIO = 0x00000001
VBANPING_FEATURE_AOIP = 0x00000002
VBANPING_FEATURE_VOIP = 0x00000004
VBANPING_FEATURE_SERIAL = 0x00000100
VBANPING_FEATURE_MIDI = 0x00000300
VBANPING_FEATURE_FRAME = 0x00001000
VBANPING_FEATURE_TXT = 0x00010000
class VbanServerType(Enum):
"""VBAN server types detected from PONG responses"""
UNKNOWN = 0
VOICEMEETER = VBANPING_TYPE_VIRTUALMIXER
MATRIX = VBANPING_TYPE_MATRIX
@dataclass
class VbanPing0Payload:
"""Represents the VBAN PING0 payload structure as defined in the VBAN protocol documentation."""
def __init__(self):
self.bit_type = VBANPING_TYPE_RECEPTOR
self.bit_feature = VBANPING_FEATURE_TXT
self.bit_feature_ex = 0x00000000
self.preferred_rate = 48000
self.min_rate = 8000
self.max_rate = 192000
self.color_rgb = 0x00FF0000
self.version = b'\x01\x02\x03\x04'
self.gps_position = b'\x00' * 8
self.user_position = b'\x00' * 8
self.lang_code = b'EN\x00\x00\x00\x00\x00\x00'
self.reserved = b'\x00' * 8
self.reserved_ex = b'\x00' * 64
self.distant_ip = b'\x00' * 32
self.distant_port = 0
self.distant_reserved = 0
self.device_name = b'VBAN-CMD-Python\x00'.ljust(64, b'\x00')
self.manufacturer_name = b'Python-VBAN\x00'.ljust(64, b'\x00')
self.application_name = b'vban-cmd\x00'.ljust(64, b'\x00')
self.host_name = b'localhost\x00'.ljust(64, b'\x00')
self.user_name = b'Python User\x00'.ljust(128, b'\x00')
self.user_comment = b'VBAN CMD Python Client\x00'.ljust(128, b'\x00')
@classmethod
def to_bytes(cls) -> bytes:
"""Convert payload to bytes"""
payload = cls()
return struct.pack(
'<7I4s8s8s8s8s64s32s2H64s64s64s64s128s128s',
payload.bit_type,
payload.bit_feature,
payload.bit_feature_ex,
payload.preferred_rate,
payload.min_rate,
payload.max_rate,
payload.color_rgb,
payload.version,
payload.gps_position,
payload.user_position,
payload.lang_code,
payload.reserved,
payload.reserved_ex,
payload.distant_ip,
payload.distant_port,
payload.distant_reserved,
payload.device_name,
payload.manufacturer_name,
payload.application_name,
payload.host_name,
payload.user_name,
payload.user_comment,
)
@classmethod
def create_packet(cls, framecounter: int) -> bytes:
"""Creates a complete PING packet with header and payload."""
data = bytearray()
data.extend(VbanPingHeader.to_bytes(framecounter))
data.extend(cls.to_bytes())
return bytes(data)
@staticmethod
def detect_server_type(pong_data: bytes) -> VbanServerType:
"""Detect server type from PONG response packet.
Args:
pong_data: Raw bytes from PONG response packet
Returns:
VbanServerType enum indicating the detected server type
"""
try:
if len(pong_data) >= 32:
frame_counter_bytes = pong_data[28:32]
frame_counter = int.from_bytes(frame_counter_bytes, 'little')
if frame_counter == VbanServerType.MATRIX.value:
return VbanServerType.MATRIX
elif frame_counter == VbanServerType.VOICEMEETER.value:
return VbanServerType.VOICEMEETER
return VbanServerType.UNKNOWN
except Exception:
return VbanServerType.UNKNOWN

138
vban_cmd/recorder.py Normal file
View File

@ -0,0 +1,138 @@
import os
import re
from .error import VBANCMDError
from .iremote import IRemote
from .meta import action_fn
class Recorder(IRemote):
"""
Implements the common interface
Defines concrete implementation for recorder
"""
@classmethod
def make(cls, remote):
"""
Factory function for recorder class.
Returns a Recorder class of a kind.
"""
Recorder_cls = type(
f'Recorder{remote.kind}',
(cls,),
{
**{
param: action_fn(param)
for param in [
'play',
'stop',
'pause',
'replay',
'record',
'ff',
'rew',
]
},
},
)
return Recorder_cls(remote)
def __str__(self):
return f'{type(self).__name__}'
@property
def identifier(self) -> str:
return 'recorder'
@property
def samplerate(self) -> int:
return
@samplerate.setter
def samplerate(self, val: int):
opts = (22050, 24000, 32000, 44100, 48000, 88200, 96000, 176400, 192000)
if val not in opts:
self.logger.warning(f'samplerate got: {val} but expected a value in {opts}')
self.setter('samplerate', val)
@property
def bitresolution(self) -> int:
return
@bitresolution.setter
def bitresolution(self, val: int):
opts = (8, 16, 24, 32)
if val not in opts:
self.logger.warning(
f'bitresolution got: {val} but expected a value in {opts}'
)
self.setter('bitresolution', val)
@property
def channel(self) -> int:
return
@channel.setter
def channel(self, val: int):
if not 1 <= val <= 8:
self.logger.warning(f'channel got: {val} but expected a value from 1 to 8')
self.setter('channel', val)
@property
def kbps(self):
return
@kbps.setter
def kbps(self, val: int):
opts = (32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320)
if val not in opts:
self.logger.warning(f'kbps got: {val} but expected a value in {opts}')
self.setter('kbps', val)
@property
def gain(self) -> float:
return
@gain.setter
def gain(self, val: float):
self.setter('gain', val)
def load(self, file: os.PathLike):
try:
# Convert to string, use forward slashes, and wrap in quotes for spaces
file_path = f'"{os.fspath(file).replace(chr(92), "/")}"'
self.setter('load', file_path)
except UnicodeError:
raise VBANCMDError('File full directory must be a raw string')
def goto(self, time_str):
def get_sec():
"""Get seconds from time string"""
h, m, s = time_str.split(':')
return int(h) * 3600 + int(m) * 60 + int(s)
time_str = str(time_str) # coerce the type
if (
re.match(
r'^(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d)$',
time_str,
)
is not None
):
self.setter('goto', get_sec())
else:
self.logger.warning(
"goto expects a string that matches the format 'hh:mm:ss'"
)
def filetype(self, val: str):
opts = {'wav': 1, 'aiff': 2, 'bwf': 3, 'mp3': 100}
try:
self.setter('filetype', opts[val.lower()])
except KeyError:
self.logger.warning(
f'filetype got: {val} but expected a value in {list(opts.keys())}'
)

View File

@ -1,5 +1,62 @@
import socket
import time
from typing import Iterator
from .error import VBANCMDConnectionError
def script_ratelimit(func):
"""script_ratelimit decorator for {VbanCmd}.sendtext, to prevent flooding the network with script requests."""
def wrapper(*args, **kwargs):
self, *rem = args
if self.script_ratelimit:
now = time.time()
elapsed = now - self._last_script_request_time
if elapsed < self.script_ratelimit:
time.sleep(self.script_ratelimit - elapsed)
self._last_script_request_time = time.time()
return func(*args, **kwargs)
return wrapper
def pong_timeout(func):
"""pong_timeout decorator for {VbanCmd}._handle_pong, to handle timeout logic and socket management."""
def wrapper(self, timeout: float = None):
if timeout is None:
timeout = min(self.timeout, 3.0)
original_timeout = self.sock.gettimeout()
self.sock.settimeout(0.5)
try:
start_time = time.time()
response_count = 0
while time.time() - start_time < timeout:
try:
response_count += 1
if func(self):
return
except socket.timeout:
continue
self.logger.debug(
f'PING timeout after {timeout}s, received {response_count} non-PONG packets'
)
raise VBANCMDConnectionError(
f'PING timeout: No response from {self.host}:{self.port} after {timeout}s'
)
finally:
self.sock.settimeout(original_timeout)
return wrapper
def cache_bool(func, param):
"""Check cache for a bool prop"""
@ -67,16 +124,11 @@ def comp(t0: tuple, t1: tuple) -> Iterator[bool]:
"""
Generator function, accepts two tuples of dB values.
Evaluates equality of each member in both tuples.
Only ignores changes when levels are very quiet (below -72 dB).
Returns True when levels are equal (no change), False when different.
"""
for a, b in zip(t0, t1):
# If both values are very quiet (below -72dB), ignore small changes
if a <= -72.0 and b <= -72.0:
yield a == b # Both quiet, check if they're equal
else:
yield a != b # At least one has significant level, detect changes
yield a == b
def deep_merge(dict1, dict2):

View File

@ -5,14 +5,19 @@ import threading
import time
from pathlib import Path
from queue import Queue
from typing import Union
from typing import Mapping, Union
from .enums import NBS
from .error import VBANCMDError
from .error import VBANCMDConnectionError, VBANCMDError
from .event import Event
from .packet.headers import VbanMatrixResponseHeader, VbanRequestHeader
from .packet.headers import (
VbanMatrixResponseHeader,
VbanPongHeader,
VbanRTRequestHeader,
)
from .packet.ping0 import VbanPing0Payload, VbanServerType
from .subject import Subject
from .util import bump_framecounter, deep_merge
from .util import bump_framecounter, deep_merge, pong_timeout, script_ratelimit
from .worker import Producer, Subscriber, Updater
logger = logging.getLogger(__name__)
@ -22,37 +27,39 @@ class VbanCmd(abc.ABC):
"""Abstract Base Class for Voicemeeter VBAN Command Interfaces"""
DELAY = 0.001
# fmt: off
BPS_OPTS = [
0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 31250,
38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800, 921600,
1000000, 1500000, 2000000, 3000000,
]
# fmt: on
def __init__(self, **kwargs):
self.logger = logger.getChild(self.__class__.__name__)
self.event = Event({k: kwargs.pop(k) for k in ('pdirty', 'ldirty')})
if not kwargs['ip']:
if not kwargs['host']:
kwargs |= self._conn_from_toml()
for attr, val in kwargs.items():
setattr(self, attr, val)
try:
self._host_ip = socket.gethostbyname(self.host)
except socket.gaierror as e:
raise VBANCMDConnectionError(
f'Unable to resolve hostname {self.host}'
) from e
self._framecounter = 0
self._framecounter_lock = threading.Lock()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.settimeout(self.timeout)
self.subject = self.observer = Subject()
self.cache = {}
self._pdirty = False
self._ldirty = False
self._script = str()
self.stop_event = None
self.producer = None
self._last_script_request_time = 0
@property
@abc.abstractmethod
def __str__(self):
"""Ensure subclasses override str magic method"""
pass
def steps(self):
"""Steps required to build the interface for this Voicemeeter kind"""
def _conn_from_toml(self) -> dict:
try:
@ -86,7 +93,12 @@ class VbanCmd(abc.ABC):
self.logout()
def login(self) -> None:
"""Starts the subscriber and updater threads (unless disable_rt_listeners is True) and logs into Voicemeeter."""
"""Sends a PING packet to the VBAN server to verify connectivity and detect server type.
If the server is detected as Matrix, RT listeners will be disabled for compatibility.
"""
self._ping()
self._handle_pong()
if not self.disable_rt_listeners:
self.event.info()
@ -102,7 +114,7 @@ class VbanCmd(abc.ABC):
self.producer.start()
self.logger.info(
"Successfully logged into VBANCMD {kind} with ip='{ip}', port={port}, streamname='{streamname}'".format(
"Successfully logged into VBANCMD {kind} with host='{host}', port={port}, streamname='{streamname}'".format(
**self.__dict__
)
)
@ -120,25 +132,88 @@ class VbanCmd(abc.ABC):
def stopped(self):
return self.stop_event is None or self.stop_event.is_set()
def _get_next_framecounter(self) -> int:
"""Thread-safe method to get and increment framecounter."""
with self._framecounter_lock:
current = self._framecounter
self._framecounter = bump_framecounter(self._framecounter)
return current
def _ping(self):
"""Initiates the PING/PONG handshake with the VBAN server."""
try:
self.sock.sendto(
VbanPing0Payload.create_packet(self._get_next_framecounter()),
(self._host_ip, self.port),
)
self.logger.debug(f'PING sent to {self.host}:{self.port}')
except Exception as e:
raise VBANCMDConnectionError(f'PING failed: {e}') from e
@pong_timeout
def _handle_pong(self) -> bool:
"""Handles incoming packets during the PING/PONG handshake, looking for a valid PONG response to confirm connectivity and detect server type.
Returns True if a valid PONG is received, False otherwise."""
data, addr = self.sock.recvfrom(2048)
if VbanPongHeader.is_pong_response(data):
self.logger.debug(f'PONG received from {addr}, connectivity confirmed')
server_type = VbanPing0Payload.detect_server_type(data)
self._handle_server_type(server_type)
return True
else:
if len(data) >= 8:
if data[:4] == b'VBAN':
protocol = data[4] & 0xE0
nbc = data[6]
self.logger.debug(
f'Non-PONG VBAN packet: protocol=0x{protocol:02x}, nbc=0x{nbc:02x}'
)
else:
self.logger.debug('Non-VBAN packet received')
return False
def _handle_server_type(self, server_type: VbanServerType) -> None:
"""Handle the detected server type by adjusting settings accordingly."""
match server_type:
case VbanServerType.VOICEMEETER:
self.logger.debug(
'Detected Voicemeeter VBAN server - RT listeners supported'
)
case VbanServerType.MATRIX:
self.logger.info(
'Detected Matrix VBAN server - disabling RT listeners for compatibility'
)
self.disable_rt_listeners = True
case _:
self.logger.debug(
f'Unknown server type ({server_type}) - using default settings'
)
def _send_request(self, payload: str) -> None:
"""Sends a request packet over the network and bumps the framecounter."""
self.sock.sendto(
VbanRequestHeader.encode_with_payload(
VbanRTRequestHeader.encode_with_payload(
name=self.streamname,
bps_index=self.BPS_OPTS.index(self.bps),
bps=self.bps,
channel=self.channel,
framecounter=self._framecounter,
framecounter=self._get_next_framecounter(),
payload=payload,
),
(socket.gethostbyname(self.ip), self.port),
(self._host_ip, self.port),
)
self._framecounter = bump_framecounter(self._framecounter)
def _set_rt(self, cmd: str, val: Union[str, float]):
"""Sends a string request command over a network."""
self._send_request(f'{cmd}={val};')
self.cache[cmd] = val
@script_ratelimit
def sendtext(self, script) -> str | None:
"""Sends a multiple parameter string over a network."""
self._send_request(script)
@ -146,14 +221,15 @@ class VbanCmd(abc.ABC):
if self.disable_rt_listeners and script.endswith(('?', '?;')):
try:
response = VbanMatrixResponseHeader.extract_payload(
self.sock.recv(1024)
)
return response
data, _ = self.sock.recvfrom(2048)
return VbanMatrixResponseHeader.extract_payload(data)
except ValueError as e:
self.logger.warning(f'Error extracting matrix response: {e}')
time.sleep(self.DELAY)
except TimeoutError as e:
self.logger.exception(f'Timeout waiting for matrix response: {e}')
raise VBANCMDConnectionError(
f'Timeout waiting for response from {self.host}:{self.port}'
) from e
@property
def type(self) -> str:
@ -185,12 +261,8 @@ class VbanCmd(abc.ABC):
while self.pdirty:
time.sleep(self.DELAY)
def apply(self, data: dict):
"""
Sets all parameters of a dict
minor delay between each recursion
"""
def apply(self, data: Mapping):
"""Set all parameters of a dict"""
def target(key):
match key.split('-'):
@ -210,7 +282,8 @@ class VbanCmd(abc.ABC):
raise ValueError(ERR_MSG)
return target[int(index)]
[target(key).apply(di).then_wait() for key, di in data.items()]
for key, di in data.items():
target(key).apply(di)
def apply_config(self, name):
"""applies a config from memory"""

View File

@ -1,19 +1,18 @@
import logging
import socket
import threading
import time
from .enums import NBS
from .error import VBANCMDConnectionError
from .error import VBANCMDConnectionError, VBANCMDPacketError
from .packet.enums import SubProtocols
from .packet.headers import (
HEADER_SIZE,
VbanPacket,
VbanResponseHeader,
VbanSubscribeHeader,
VbanRTPacket,
VbanRTResponseHeader,
VbanRTSubscribeHeader,
)
from .packet.nbs0 import VbanPacketNBS0
from .packet.nbs1 import VbanPacketNBS1
from .util import bump_framecounter
from .packet.nbs0 import VbanRTPacketNBS0
from .packet.nbs1 import VbanRTPacketNBS1
logger = logging.getLogger(__name__)
@ -26,24 +25,18 @@ class Subscriber(threading.Thread):
self._remote = remote
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__)
self._framecounter = 0
def run(self):
while not self.stopped():
try:
for nbs in NBS:
sub_packet = VbanSubscribeHeader().to_bytes(nbs, self._framecounter)
self._remote.sock.sendto(
sub_packet, (self._remote.ip, self._remote.port)
sub_packet = VbanRTSubscribeHeader().to_bytes(
nbs, self._remote._get_next_framecounter()
)
self._remote.sock.sendto(
sub_packet, (self._remote._host_ip, self._remote.port)
)
self._framecounter = bump_framecounter(self._framecounter)
self.wait_until_stopped(10)
except socket.gaierror as e:
self.logger.exception(f'{type(e).__name__}: {e}')
raise VBANCMDConnectionError(
f'unable to resolve hostname {self._remote.ip}'
) from e
self.logger.debug(f'terminating {self.name} thread')
def stopped(self):
@ -66,7 +59,6 @@ class Producer(threading.Thread):
self.queue = queue
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__)
self._remote.sock.settimeout(self._remote.timeout)
self._remote._public_packets = [None] * (max(NBS) + 1)
_pp = self._get_rt()
self._remote._public_packets[_pp.nbs] = _pp
@ -75,43 +67,41 @@ class Producer(threading.Thread):
self._remote.cache['bus_level'],
) = self._remote.public_packets[NBS.zero].levels
def _get_rt(self) -> VbanPacket:
def _get_rt(self) -> VbanRTPacket:
"""Attempt to fetch data packet until a valid one found"""
while True:
if resp := self._fetch_rt_packet():
return resp
def _fetch_rt_packet(self) -> VbanPacket | None:
try:
data, _ = self._remote.sock.recvfrom(2048)
if len(data) < HEADER_SIZE:
return
continue
except TimeoutError as e:
self.logger.exception(f'{type(e).__name__}: {e}')
raise VBANCMDConnectionError(
f'timeout waiting for response from {self._remote.ip}:{self._remote.port}'
f'timeout waiting for response from {self._remote.host}:{self._remote.port}'
) from e
try:
header = VbanResponseHeader.from_bytes(data[:HEADER_SIZE])
except ValueError as e:
self.logger.warning(f'Error parsing response packet: {e}')
return None
header = VbanRTResponseHeader.from_bytes(data[:HEADER_SIZE])
except VBANCMDPacketError as e:
match e.protocol:
case SubProtocols.SERVICE:
# Silently ignore periodic SERVICE packets unrelated to vban-cmd
pass
case _:
self.logger.debug(f'Error parsing response packet: {e}')
continue
match header.format_nbs:
case NBS.zero:
return VbanPacketNBS0.from_bytes(
return VbanRTPacketNBS0.from_bytes(
nbs=NBS.zero, kind=self._remote.kind, data=data
)
case NBS.one:
return VbanPacketNBS1.from_bytes(
return VbanRTPacketNBS1.from_bytes(
nbs=NBS.one, kind=self._remote.kind, data=data
)
return None
def stopped(self):
return self.stop_event.is_set()
@ -138,7 +128,6 @@ class Producer(threading.Thread):
self.queue.put('pdirty')
if self._remote.event.ldirty:
self.queue.put('ldirty')
# time.sleep(self._remote.ratelimit)
self.logger.debug(f'terminating {self.name} thread')
self.queue.put(None)