Compare commits

..

No commits in common. "dev" and "v2.9.5" have entirely different histories.
dev ... v2.9.5

17 changed files with 439 additions and 460 deletions

View File

@ -548,7 +548,7 @@ You may pass the following optional keyword arguments:
- `channel`: int=0, channel on which to send the UDP requests. - `channel`: int=0, channel on which to send the UDP requests.
- `pdirty`: boolean=False, parameter updates - `pdirty`: boolean=False, parameter updates
- `ldirty`: boolean=False, level updates - `ldirty`: boolean=False, level updates
- `script_ratelimit`: float | None=None, ratelimit for vban.sendtext() specifically. - `script_ratelimit`: float=0.05, default to 20 script requests per second. This affects vban.sendtext() specifically.
- `timeout`: int=5, timeout for socket operations. - `timeout`: int=5, timeout for socket operations.
- `disable_rt_listeners`: boolean=False, set `True` if you don't wish to receive RT packets. - `disable_rt_listeners`: boolean=False, set `True` if you don't wish to receive RT packets.
- You can still send Matrix string requests ending with `?` and receive a response. - You can still send Matrix string requests ending with `?` and receive a response.

View File

@ -103,7 +103,7 @@ class App(tk.Tk):
def main(): def main():
KIND_ID = 'banana' KIND_ID = 'banana'
conn = { conn = {
'host': os.environ.get('VBANCMD_HOST', 'localhost'), 'ip': os.environ.get('VBANCMD_IP', 'localhost'),
'port': int(os.environ.get('VBANCMD_PORT', 6980)), 'port': int(os.environ.get('VBANCMD_PORT', 6980)),
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'), 'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
} }

View File

@ -94,7 +94,7 @@ class Observer:
def main(): def main():
KIND_ID = 'potato' KIND_ID = 'potato'
conn = { conn = {
'host': os.environ.get('VBANCMD_HOST', 'localhost'), 'ip': os.environ.get('VBANCMD_IP', 'localhost'),
'port': int(os.environ.get('VBANCMD_PORT', 6980)), 'port': int(os.environ.get('VBANCMD_PORT', 6980)),
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'), 'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
} }

View File

@ -25,7 +25,7 @@ class App:
def main(): def main():
KIND_ID = 'banana' KIND_ID = 'banana'
conn = { conn = {
'host': os.environ.get('VBANCMD_HOST', 'localhost'), 'ip': os.environ.get('VBANCMD_IP', 'localhost'),
'port': int(os.environ.get('VBANCMD_PORT', 6980)), 'port': int(os.environ.get('VBANCMD_PORT', 6980)),
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'), 'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
} }

42
poetry.lock generated
View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. # This file is automatically @generated by Poetry 2.0.1 and should not be changed by hand.
[[package]] [[package]]
name = "cachetools" name = "cachetools"
@ -55,7 +55,7 @@ description = "Backport of PEP 654 (exception groups)"
optional = false optional = false
python-versions = ">=3.7" python-versions = ">=3.7"
groups = ["dev"] groups = ["dev"]
markers = "python_version == \"3.10\"" markers = "python_version < \"3.11\""
files = [ files = [
{file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"}, {file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"},
{file = "exceptiongroup-1.2.2.tar.gz", hash = "sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc"}, {file = "exceptiongroup-1.2.2.tar.gz", hash = "sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc"},
@ -66,16 +66,21 @@ test = ["pytest (>=6)"]
[[package]] [[package]]
name = "filelock" name = "filelock"
version = "3.20.3" version = "3.16.1"
description = "A platform independent file lock." description = "A platform independent file lock."
optional = false optional = false
python-versions = ">=3.10" python-versions = ">=3.8"
groups = ["dev"] groups = ["dev"]
files = [ files = [
{file = "filelock-3.20.3-py3-none-any.whl", hash = "sha256:4b0dda527ee31078689fc205ec4f1c1bf7d56cf88b6dc9426c4f230e46c2dce1"}, {file = "filelock-3.16.1-py3-none-any.whl", hash = "sha256:2082e5703d51fbf98ea75855d9d5527e33d8ff23099bec374a134febee6946b0"},
{file = "filelock-3.20.3.tar.gz", hash = "sha256:18c57ee915c7ec61cff0ecf7f0f869936c7c30191bb0cf406f1341778d0834e1"}, {file = "filelock-3.16.1.tar.gz", hash = "sha256:c249fbfcd5db47e5e2d6d62198e565475ee65e4831e2561c8e313fa7eb961435"},
] ]
[package.extras]
docs = ["furo (>=2024.8.6)", "sphinx (>=8.0.2)", "sphinx-autodoc-typehints (>=2.4.1)"]
testing = ["covdefaults (>=2.3)", "coverage (>=7.6.1)", "diff-cover (>=9.2)", "pytest (>=8.3.3)", "pytest-asyncio (>=0.24)", "pytest-cov (>=5)", "pytest-mock (>=3.14)", "pytest-timeout (>=2.3.1)", "virtualenv (>=20.26.4)"]
typing = ["typing-extensions (>=4.12.2)"]
[[package]] [[package]]
name = "iniconfig" name = "iniconfig"
version = "2.0.0" version = "2.0.0"
@ -238,7 +243,7 @@ description = "A lil' TOML parser"
optional = false optional = false
python-versions = ">=3.8" python-versions = ">=3.8"
groups = ["main", "dev"] groups = ["main", "dev"]
markers = "python_version == \"3.10\"" markers = "python_version < \"3.11\""
files = [ files = [
{file = "tomli-2.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249"}, {file = "tomli-2.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249"},
{file = "tomli-2.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6"}, {file = "tomli-2.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6"},
@ -304,38 +309,37 @@ test = ["devpi-process (>=1.0.2)", "pytest (>=8.3.3)", "pytest-mock (>=3.14)"]
[[package]] [[package]]
name = "typing-extensions" name = "typing-extensions"
version = "4.15.0" version = "4.12.2"
description = "Backported and Experimental Type Hints for Python 3.9+" description = "Backported and Experimental Type Hints for Python 3.8+"
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.8"
groups = ["dev"] groups = ["dev"]
markers = "python_version == \"3.10\"" markers = "python_version < \"3.11\""
files = [ files = [
{file = "typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548"}, {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"},
{file = "typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466"}, {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"},
] ]
[[package]] [[package]]
name = "virtualenv" name = "virtualenv"
version = "20.36.1" version = "20.29.0"
description = "Virtual Python Environment builder" description = "Virtual Python Environment builder"
optional = false optional = false
python-versions = ">=3.8" python-versions = ">=3.8"
groups = ["dev"] groups = ["dev"]
files = [ files = [
{file = "virtualenv-20.36.1-py3-none-any.whl", hash = "sha256:575a8d6b124ef88f6f51d56d656132389f961062a9177016a50e4f507bbcc19f"}, {file = "virtualenv-20.29.0-py3-none-any.whl", hash = "sha256:c12311863497992dc4b8644f8ea82d3b35bb7ef8ee82e6630d76d0197c39baf9"},
{file = "virtualenv-20.36.1.tar.gz", hash = "sha256:8befb5c81842c641f8ee658481e42641c68b5eab3521d8e092d18320902466ba"}, {file = "virtualenv-20.29.0.tar.gz", hash = "sha256:6345e1ff19d4b1296954cee076baaf58ff2a12a84a338c62b02eda39f20aa982"},
] ]
[package.dependencies] [package.dependencies]
distlib = ">=0.3.7,<1" distlib = ">=0.3.7,<1"
filelock = {version = ">=3.20.1,<4", markers = "python_version >= \"3.10\""} filelock = ">=3.12.2,<4"
platformdirs = ">=3.9.1,<5" platformdirs = ">=3.9.1,<5"
typing-extensions = {version = ">=4.13.2", markers = "python_version < \"3.11\""}
[package.extras] [package.extras]
docs = ["furo (>=2023.7.26)", "proselint (>=0.13)", "sphinx (>=7.1.2,!=7.3)", "sphinx-argparse (>=0.4)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=23.6)"] docs = ["furo (>=2023.7.26)", "proselint (>=0.13)", "sphinx (>=7.1.2,!=7.3)", "sphinx-argparse (>=0.4)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=23.6)"]
test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.4)", "pytest-env (>=0.8.2)", "pytest-freezer (>=0.4.8) ; platform_python_implementation == \"PyPy\" or platform_python_implementation == \"GraalVM\" or platform_python_implementation == \"CPython\" and sys_platform == \"win32\" and python_version >= \"3.13\"", "pytest-mock (>=3.11.1)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=68)", "time-machine (>=2.10) ; platform_python_implementation == \"CPython\""] test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.4)", "pytest-env (>=0.8.2)", "pytest-freezer (>=0.4.8)", "pytest-mock (>=3.11.1)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=68)", "time-machine (>=2.10)"]
[[package]] [[package]]
name = "virtualenv-pyenv" name = "virtualenv-pyenv"

View File

@ -1,6 +1,6 @@
[project] [project]
name = "vban-cmd" name = "vban-cmd"
version = "2.10.3" version = "2.9.5"
description = "Python interface for the VBAN RT Packet Service (Sendtext)" description = "Python interface for the VBAN RT Packet Service (Sendtext)"
authors = [{ name = "Onyx and Iris", email = "code@onyxandiris.online" }] authors = [{ name = "Onyx and Iris", email = "code@onyxandiris.online" }]
license = { text = "MIT" } license = { text = "MIT" }

View File

@ -1,18 +1,6 @@
from .packet.enums import ServiceTypes, SubProtocols
class VBANCMDError(Exception): class VBANCMDError(Exception):
"""Base VBANCMD Exception class.""" """Base VBANCMD Exception class."""
class VBANCMDConnectionError(VBANCMDError): class VBANCMDConnectionError(VBANCMDError):
"""Exception raised when connection/timeout errors occur""" """Exception raised when connection/timeout errors occur"""
class VBANCMDPacketError(VBANCMDError):
"""Exception raised when packet parsing errors occur"""
def __init__(self, message: str, protocol: SubProtocols, type_: ServiceTypes):
super().__init__(message)
self.protocol = protocol
self.type = type_

View File

@ -1,3 +1,4 @@
import abc
import logging import logging
from enum import IntEnum from enum import IntEnum
from functools import cached_property from functools import cached_property
@ -88,7 +89,7 @@ class FactoryBase(VbanCmd):
'streamname': 'Command1', 'streamname': 'Command1',
'bps': 256000, 'bps': 256000,
'channel': 0, 'channel': 0,
'script_ratelimit': None, # if None or 0, no rate limit applied to script commands 'script_ratelimit': 0.05, # 20 commands per second, to avoid overloading Voicemeeter
'timeout': 5, # timeout on socket operations, in seconds 'timeout': 5, # timeout on socket operations, in seconds
'disable_rt_listeners': False, 'disable_rt_listeners': False,
'sync': False, 'sync': False,
@ -121,6 +122,11 @@ class FactoryBase(VbanCmd):
+ f"({self.kind}, ip='{self.ip}', port={self.port}, streamname='{self.streamname}')" + f"({self.kind}, ip='{self.ip}', port={self.port}, streamname='{self.streamname}')"
) )
@property
@abc.abstractmethod
def steps(self):
pass
@cached_property @cached_property
def configs(self): def configs(self):
self._configs = configs(self.kind.name) self._configs = configs(self.kind.name)

View File

@ -48,18 +48,26 @@ class IRemote(abc.ABC):
def apply(self, data): def apply(self, data):
"""Sets all parameters of a dict for the channel.""" """Sets all parameters of a dict for the channel."""
script = ''
def fget(attr, val): def fget(attr, val):
if attr == 'mode': if attr == 'mode':
return (getattr(self, attr), val, 1) return (f'mode.{val}', 1)
return (self, attr, val) elif attr == 'knob':
return ('', val)
return (attr, val)
for attr, val in data.items(): for attr, val in data.items():
if not isinstance(val, dict): if not isinstance(val, dict):
if attr in dir(self): # avoid calling getattr (with hasattr) if attr in dir(self): # avoid calling getattr (with hasattr)
target, attr, val = fget(attr, val) attr, val = fget(attr, val)
setattr(target, attr, val) if isinstance(val, bool):
else: val = 1 if val else 0
self.logger.error(f'invalid attribute {attr} for {self}')
self._remote.cache[self._cmd(attr)] = val
script += f'{self._cmd(attr)}={val};'
else: else:
target = getattr(self, attr) target = getattr(self, attr)
target.apply(val) target.apply(val)
self._remote.sendtext(script)

View File

@ -1,36 +1,6 @@
from enum import Flag from enum import Flag
class SubProtocols(Flag):
"""Sub Protocols - Bit flags that can be combined"""
AUDIO = 0x00
SERIAL = 0x20
TEXT = 0x40
SERVICE = 0x60
MASK = 0xE0
class ServiceTypes(Flag):
"""Service Types - Bit flags that can be combined"""
PING = 0
PONG = 0
CHATUTF8 = 1
RTPACKETREGISTER = 32
RTPACKET = 33
REQUESTREPLY = 0x02 # A Matrix reply
FNCT_REPLY = 0x80 # An RTPacket reply
class StreamTypes(Flag):
"""Stream Types - Bit flags that can be combined"""
ASCII = 0x00
UTF8 = 0x10
WCHAR = 0x20
class ChannelModes(Flag): class ChannelModes(Flag):
"""Channel Modes - Bit flags that can be combined""" """Channel Modes - Bit flags that can be combined"""

View File

@ -1,124 +1,29 @@
import struct
from dataclasses import dataclass from dataclasses import dataclass
from vban_cmd.enums import NBS from vban_cmd.enums import NBS
from vban_cmd.error import VBANCMDPacketError
from vban_cmd.kinds import KindMapClass from vban_cmd.kinds import KindMapClass
from .enums import ServiceTypes, StreamTypes, SubProtocols VBAN_PROTOCOL_TXT = 0x40
VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
VBAN_SERVICE_PING = 0
VBAN_SERVICE_PONG = 0 # PONG uses same service type as PING
VBAN_SERVICE_MASK = 0xE0
VBAN_PROTOCOL_MASK = 0xE0
VBAN_SERVICE_REQUESTREPLY = 0x02
VBAN_SERVICE_FNCT_REPLY = 0x02
PINGPONG_PACKET_SIZE = 704 # Size of the PING/PONG header + payload in bytes PINGPONG_PACKET_SIZE = 704 # Size of the PING/PONG header + payload in bytes
MAX_PACKET_SIZE = 1436 MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
STREAMNAME_MAX_LENGTH = 16
# fmt: off
BPS_OPTS = [
0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 31250,
38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800, 921600,
1000000, 1500000, 2000000, 3000000,
]
# fmt: on
@dataclass @dataclass
class VbanPingHeader: class VbanPacket:
"""Represents the header of a PING packet""" """Represents the header of an incoming VBAN data packet"""
name: str = 'PING0'
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0
format_nbc: int = ServiceTypes.PING.value
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:STREAMNAME_MAX_LENGTH].ljust(
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod
def to_bytes(cls, framecounter: int = 0) -> bytes:
"""Creates the PING header bytes only."""
header = cls(framecounter=framecounter)
return struct.pack(
'<4s4B16sI',
header.vban,
header.format_sr,
header.format_nbs,
header.format_nbc,
header.format_bit,
header.streamname,
header.framecounter,
)
@dataclass
class VbanPongHeader:
"""Represents the header of a PONG response packet"""
name: str = 'PING0'
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0
format_nbc: int = ServiceTypes.PONG.value
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:STREAMNAME_MAX_LENGTH].ljust(
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a PONG response packet from bytes."""
parsed = _parse_vban_service_header(data)
# PONG responses use the same service type as PING (0x00)
# and are identified by having payload data
if parsed['format_nbc'] != ServiceTypes.PONG.value:
raise VBANCMDPacketError(
f'Not a PONG response packet: {parsed["format_nbc"]:02x}',
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
type_=ServiceTypes(parsed['format_nbc']),
)
return cls(**parsed)
@classmethod
def is_pong_response(cls, data: bytes) -> bool:
"""Check if packet is a PONG response by analyzing the actual response format."""
try:
parsed = _parse_vban_service_header(data)
# Validate this is a service protocol packet with PING/PONG service type
if parsed['format_nbc'] != ServiceTypes.PONG.value:
return False
if parsed['name'] not in ['PING0', 'VBAN Service']:
return False
# PONG should have payload data (same size as PING)
return len(data) >= PINGPONG_PACKET_SIZE
except (ValueError, Exception):
return False
@dataclass
class VbanRTPacket:
"""Represents the header of an incoming RTPacket"""
nbs: NBS nbs: NBS
_kind: KindMapClass _kind: KindMapClass
@ -148,10 +53,10 @@ class VbanRTPacket:
@dataclass @dataclass
class VbanRTSubscribeHeader: class VbanSubscribeHeader:
"""Represents the header of an RT subscription packet""" """Represents the header of a subscription packet"""
_nbs: NBS = NBS.zero nbs: NBS = NBS.zero
name: str = 'Register-RTP' name: str = 'Register-RTP'
timeout: int = 15 timeout: int = 15
@ -160,106 +65,38 @@ class VbanRTSubscribeHeader:
return b'VBAN' return b'VBAN'
@property @property
def sr(self) -> int: def format_sr(self) -> bytes:
return SubProtocols.SERVICE.value return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little')
@property @property
def nbs(self) -> int: def format_nbs(self) -> bytes:
return self._nbs.value & 0xFF return (self.nbs.value & 0xFF).to_bytes(1, 'little')
@property @property
def nbc(self) -> int: def format_nbc(self) -> bytes:
return ServiceTypes.RTPACKETREGISTER.value return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little')
@property @property
def bit(self) -> int: def format_bit(self) -> bytes:
return self.timeout & 0xFF return (self.timeout & 0xFF).to_bytes(1, 'little')
@property @property
def streamname(self) -> bytes: def streamname(self) -> bytes:
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust( return self.name.encode('ascii') + bytes(16 - len(self.name))
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod @classmethod
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes: def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
header = cls(_nbs=nbs) header = cls(nbs=nbs)
return struct.pack( data = bytearray()
'<4s4B16sI', data.extend(header.vban)
header.vban, data.extend(header.format_sr)
header.sr, data.extend(header.format_nbs)
header.nbs, data.extend(header.format_nbc)
header.nbc, data.extend(header.format_bit)
header.bit, data.extend(header.streamname)
header.streamname, data.extend(framecounter.to_bytes(4, 'little'))
framecounter, return bytes(data)
)
@dataclass
class VbanRTRequestHeader:
"""Represents the header of an RT request packet"""
name: str
bps: int
channel: int
framecounter: int = 0
def __post_init__(self):
if self.bps not in BPS_OPTS:
raise ValueError(
f'Invalid bps value: {self.bps}. Must be one of {BPS_OPTS}'
)
self.bps_index = BPS_OPTS.index(self.bps)
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def sr(self) -> int:
return self.bps_index | SubProtocols.TEXT.value
@property
def nbs(self) -> int:
return 0
@property
def nbc(self) -> int:
return self.channel
@property
def bit(self) -> int:
return StreamTypes.UTF8.value
@property
def streamname(self) -> bytes:
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust(
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod
def to_bytes(cls, name: str, bps: int, channel: int, framecounter: int) -> bytes:
header = cls(name=name, bps=bps, channel=channel, framecounter=framecounter)
return struct.pack(
'<4s4B16sI',
header.vban,
header.sr,
header.nbs,
header.nbc,
header.bit,
header.streamname,
header.framecounter,
)
@classmethod
def encode_with_payload(
cls, name: str, bps: int, channel: int, framecounter: int, payload: str
) -> bytes:
"""Creates the complete packet with header and payload."""
return cls.to_bytes(name, bps, channel, framecounter) + payload.encode()
def _parse_vban_service_header(data: bytes) -> dict: def _parse_vban_service_header(data: bytes) -> dict:
@ -276,12 +113,9 @@ def _parse_vban_service_header(data: bytes) -> dict:
format_bit = data[7] format_bit = data[7]
# Verify this is a service protocol packet # Verify this is a service protocol packet
protocol = format_sr & SubProtocols.MASK.value protocol = format_sr & VBAN_PROTOCOL_MASK
if protocol != SubProtocols.SERVICE.value: if protocol != VBAN_PROTOCOL_SERVICE:
raise VBANCMDPacketError( raise ValueError(f'Not a service protocol packet: {protocol:02x}')
f'Invalid protocol in service header: {protocol:02x}',
protocol=SubProtocols(protocol),
)
# Extract stream name and frame counter # Extract stream name and frame counter
name = data[8:24].rstrip(b'\x00').decode('utf-8', errors='ignore') name = data[8:24].rstrip(b'\x00').decode('utf-8', errors='ignore')
@ -298,13 +132,13 @@ def _parse_vban_service_header(data: bytes) -> dict:
@dataclass @dataclass
class VbanRTResponseHeader: class VbanResponseHeader:
"""Represents the header of an RT response packet""" """Represents the header of a response packet"""
name: str = 'Voicemeeter-RTP' name: str = 'Voicemeeter-RTP'
format_sr: int = SubProtocols.SERVICE.value format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0 format_nbs: int = 0
format_nbc: int = ServiceTypes.RTPACKET.value format_nbc: int = VBAN_SERVICE_RTPACKET
format_bit: int = 0 format_bit: int = 0
framecounter: int = 0 framecounter: int = 0
@ -314,9 +148,7 @@ class VbanRTResponseHeader:
@property @property
def streamname(self) -> bytes: def streamname(self) -> bytes:
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust( return self.name.encode('ascii') + bytes(16 - len(self.name))
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod @classmethod
def from_bytes(cls, data: bytes): def from_bytes(cls, data: bytes):
@ -324,11 +156,9 @@ class VbanRTResponseHeader:
parsed = _parse_vban_service_header(data) parsed = _parse_vban_service_header(data)
# Validate this is an RTPacket response # Validate this is an RTPacket response
if parsed['format_nbc'] != ServiceTypes.RTPACKET.value: if parsed['format_nbc'] != VBAN_SERVICE_RTPACKET:
raise VBANCMDPacketError( raise ValueError(
f'Not an RTPacket response packet: {parsed["format_nbc"]:02x}', f'Not an RTPacket response packet: {parsed["format_nbc"]:02x}'
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
type_=ServiceTypes(parsed['format_nbc']),
) )
return cls(**parsed) return cls(**parsed)
@ -339,9 +169,9 @@ class VbanMatrixResponseHeader:
"""Represents the header of a matrix response packet""" """Represents the header of a matrix response packet"""
name: str = 'Request Reply' name: str = 'Request Reply'
format_sr: int = SubProtocols.SERVICE.value format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = ServiceTypes.FNCT_REPLY.value format_nbs: int = VBAN_SERVICE_FNCT_REPLY
format_nbc: int = ServiceTypes.REQUESTREPLY.value format_nbc: int = VBAN_SERVICE_REQUESTREPLY
format_bit: int = 0 format_bit: int = 0
framecounter: int = 0 framecounter: int = 0
@ -351,29 +181,16 @@ class VbanMatrixResponseHeader:
@property @property
def streamname(self) -> bytes: def streamname(self) -> bytes:
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust( return self.name.encode('ascii')[:16].ljust(16, b'\x00')
STREAMNAME_MAX_LENGTH, b'\x00'
)
@classmethod @classmethod
def from_bytes(cls, data: bytes): def from_bytes(cls, data: bytes):
"""Parse a matrix response packet from bytes.""" """Parse a matrix response packet from bytes."""
parsed = _parse_vban_service_header(data) parsed = _parse_vban_service_header(data)
# Validate this is a service reply packet (dual encoding scheme) # Validate this is a service reply packet
if parsed['format_nbs'] != ServiceTypes.FNCT_REPLY.value: if parsed['format_nbs'] != VBAN_SERVICE_FNCT_REPLY:
raise VBANCMDPacketError( raise ValueError(f'Not a service reply packet: {parsed["format_nbs"]:02x}')
f'Not a service reply packet: {parsed["format_nbs"]:02x}',
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
type_=ServiceTypes(parsed['format_nbs']),
)
if parsed['format_nbc'] != ServiceTypes.REQUESTREPLY.value:
raise VBANCMDPacketError(
f'Not a request reply packet: {parsed["format_nbc"]:02x}',
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
type_=ServiceTypes(parsed['format_nbc']),
)
return cls(**parsed) return cls(**parsed)
@ -392,3 +209,148 @@ class VbanMatrixResponseHeader:
header = cls.from_bytes(data) header = cls.from_bytes(data)
payload = cls.extract_payload(data) payload = cls.extract_payload(data)
return header, payload return header, payload
@dataclass
class VbanPingHeader:
"""Represents the header of a PING packet"""
name: str = 'PING0'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_PING
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def to_bytes(cls, framecounter: int = 0) -> bytes:
"""Creates the PING header bytes only."""
header = cls(framecounter=framecounter)
data = bytearray()
data.extend(header.vban)
data.extend(header.format_sr.to_bytes(1, 'little'))
data.extend(header.format_nbs.to_bytes(1, 'little'))
data.extend(header.format_nbc.to_bytes(1, 'little'))
data.extend(header.format_bit.to_bytes(1, 'little'))
data.extend(header.streamname)
data.extend(header.framecounter.to_bytes(4, 'little'))
return bytes(data)
@dataclass
class VbanPongHeader:
"""Represents the header of a PONG response packet"""
name: str = 'PING0'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_PONG
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a PONG response packet from bytes."""
parsed = _parse_vban_service_header(data)
# PONG responses use the same service type as PING (0x00)
# and are identified by having payload data
if parsed['format_nbc'] != VBAN_SERVICE_PONG:
raise ValueError(f'Not a PONG response packet: {parsed["format_nbc"]:02x}')
return cls(**parsed)
@classmethod
def is_pong_response(cls, data: bytes) -> bool:
"""Check if packet is a PONG response by analyzing the actual response format."""
try:
parsed = _parse_vban_service_header(data)
# Validate this is a service protocol packet with PING/PONG service type
if parsed['format_nbc'] != VBAN_SERVICE_PONG:
return False
if parsed['name'] not in ['PING0', 'VBAN Service']:
return False
# PONG should have payload data (same size as PING)
return len(data) >= PINGPONG_PACKET_SIZE
except (ValueError, Exception):
return False
@dataclass
class VbanRequestHeader:
"""Represents the header of a request packet"""
name: str
bps_index: int
channel: int
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def sr(self) -> bytes:
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little')
@property
def nbs(self) -> bytes:
return (0).to_bytes(1, 'little')
@property
def nbc(self) -> bytes:
return (self.channel).to_bytes(1, 'little')
@property
def bit(self) -> bytes:
return (0x10).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode()[:16].ljust(16, b'\x00')
@classmethod
def to_bytes(
cls, name: str, bps_index: int, channel: int, framecounter: int
) -> bytes:
header = cls(
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
)
data = bytearray()
data.extend(header.vban)
data.extend(header.sr)
data.extend(header.nbs)
data.extend(header.nbc)
data.extend(header.bit)
data.extend(header.streamname)
data.extend(header.framecounter.to_bytes(4, 'little'))
return bytes(data)
@classmethod
def encode_with_payload(
cls, name: str, bps_index: int, channel: int, framecounter: int, payload: str
) -> bytes:
"""Creates the complete packet with header and payload."""
return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode()

View File

@ -1,6 +1,4 @@
import struct
from dataclasses import dataclass from dataclasses import dataclass
from functools import cached_property
from typing import NamedTuple from typing import NamedTuple
from vban_cmd.enums import NBS from vban_cmd.enums import NBS
@ -8,7 +6,7 @@ from vban_cmd.kinds import KindMapClass
from vban_cmd.util import comp from vban_cmd.util import comp
from .enums import ChannelModes from .enums import ChannelModes
from .headers import VbanRTPacket from .headers import VbanPacket
class Levels(NamedTuple): class Levels(NamedTuple):
@ -23,13 +21,6 @@ class ChannelState:
# Convert 4-byte state to integer once for efficient lookups # Convert 4-byte state to integer once for efficient lookups
self._state = int.from_bytes(state_bytes, 'little') self._state = int.from_bytes(state_bytes, 'little')
@classmethod
def from_int(cls, state_int: int):
"""Create ChannelState directly from integer for efficiency"""
instance = cls.__new__(cls)
instance._state = state_int
return instance
def get_mode(self, mode_value: int) -> bool: def get_mode(self, mode_value: int) -> bool:
"""Get boolean state for a specific mode""" """Get boolean state for a specific mode"""
return (self._state & mode_value) != 0 return (self._state & mode_value) != 0
@ -105,8 +96,8 @@ class Labels(NamedTuple):
@dataclass @dataclass
class VbanRTPacketNBS0(VbanRTPacket): class VbanPacketNBS0(VbanPacket):
"""Represents the body of a VBAN RTPacket with ident:0""" """Represents the body of a VBAN data packet with ident:0"""
_inputLeveldB100: bytes _inputLeveldB100: bytes
_outputLeveldB100: bytes _outputLeveldB100: bytes
@ -156,17 +147,32 @@ class VbanRTPacketNBS0(VbanRTPacket):
def pdirty(self, other) -> bool: def pdirty(self, other) -> bool:
"""True iff any defined parameter has changed""" """True iff any defined parameter has changed"""
self_gains = (
self._stripGaindB100Layer1
+ self._stripGaindB100Layer2
+ self._stripGaindB100Layer3
+ self._stripGaindB100Layer4
+ self._stripGaindB100Layer5
+ self._stripGaindB100Layer6
+ self._stripGaindB100Layer7
+ self._stripGaindB100Layer8
)
other_gains = (
other._stripGaindB100Layer1
+ other._stripGaindB100Layer2
+ other._stripGaindB100Layer3
+ other._stripGaindB100Layer4
+ other._stripGaindB100Layer5
+ other._stripGaindB100Layer6
+ other._stripGaindB100Layer7
+ other._stripGaindB100Layer8
)
return ( return (
self._stripState != other._stripState self._stripState != other._stripState
or self._busState != other._busState or self._busState != other._busState
or self._stripGaindB100Layer1 != other._stripGaindB100Layer1 or self_gains != other_gains
or self._stripGaindB100Layer2 != other._stripGaindB100Layer2
or self._stripGaindB100Layer3 != other._stripGaindB100Layer3
or self._stripGaindB100Layer4 != other._stripGaindB100Layer4
or self._stripGaindB100Layer5 != other._stripGaindB100Layer5
or self._stripGaindB100Layer6 != other._stripGaindB100Layer6
or self._stripGaindB100Layer7 != other._stripGaindB100Layer7
or self._stripGaindB100Layer8 != other._stripGaindB100Layer8
or self._busGaindB100 != other._busGaindB100 or self._busGaindB100 != other._busGaindB100
or self._stripLabelUTF8c60 != other._stripLabelUTF8c60 or self._stripLabelUTF8c60 != other._stripLabelUTF8c60
or self._busLabelUTF8c60 != other._busLabelUTF8c60 or self._busLabelUTF8c60 != other._busLabelUTF8c60
@ -180,54 +186,77 @@ class VbanRTPacketNBS0(VbanRTPacket):
) )
return any(self._strip_comp) or any(self._bus_comp) return any(self._strip_comp) or any(self._bus_comp)
@cached_property @property
def strip_levels(self) -> tuple[float, ...]: def strip_levels(self) -> tuple[float, ...]:
"""Returns strip levels in dB""" """Returns strip levels in dB"""
strip_raw = struct.unpack('<34h', self._inputLeveldB100) return tuple(
return tuple(round(val * 0.01, 1) for val in strip_raw)[ round(
: self._kind.num_strip_levels int.from_bytes(self._inputLeveldB100[i : i + 2], 'little', signed=True)
] * 0.01,
1,
)
for i in range(0, len(self._inputLeveldB100), 2)
)[: self._kind.num_strip_levels]
@cached_property @property
def bus_levels(self) -> tuple[float, ...]: def bus_levels(self) -> tuple[float, ...]:
"""Returns bus levels in dB""" """Returns bus levels in dB"""
bus_raw = struct.unpack('<64h', self._outputLeveldB100) return tuple(
return tuple(round(val * 0.01, 1) for val in bus_raw)[ round(
: self._kind.num_bus_levels int.from_bytes(self._outputLeveldB100[i : i + 2], 'little', signed=True)
] * 0.01,
1,
)
for i in range(0, len(self._outputLeveldB100), 2)
)[: self._kind.num_bus_levels]
@property @property
def levels(self) -> Levels: def levels(self) -> Levels:
"""Returns strip and bus levels as a namedtuple""" """Returns strip and bus levels as a namedtuple"""
return Levels(strip=self.strip_levels, bus=self.bus_levels) return Levels(strip=self.strip_levels, bus=self.bus_levels)
@cached_property @property
def states(self) -> States: def states(self) -> States:
"""returns States object with processed strip and bus channel states""" """returns States object with processed strip and bus channel states"""
strip_states = struct.unpack('<8I', self._stripState)
bus_states = struct.unpack('<8I', self._busState)
return States( return States(
strip=tuple(ChannelState.from_int(state) for state in strip_states), strip=tuple(
bus=tuple(ChannelState.from_int(state) for state in bus_states), ChannelState(self._stripState[i : i + 4]) for i in range(0, 32, 4)
),
bus=tuple(ChannelState(self._busState[i : i + 4]) for i in range(0, 32, 4)),
) )
@cached_property @property
def gainlayers(self) -> tuple: def gainlayers(self) -> tuple:
"""returns tuple of all strip gain layers as tuples""" """returns tuple of all strip gain layers as tuples"""
layer_data = [] return tuple(
for layer in range(1, 9): tuple(
layer_bytes = getattr(self, f'_stripGaindB100Layer{layer}') round(
layer_raw = struct.unpack('<8h', layer_bytes) int.from_bytes(
layer_data.append(tuple(round(val * 0.01, 2) for val in layer_raw)) getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2],
return tuple(layer_data) 'little',
signed=True,
)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
for layer in range(1, 9)
)
@cached_property @property
def busgain(self) -> tuple: def busgain(self) -> tuple:
"""returns tuple of bus gains""" """returns tuple of bus gains"""
bus_gain_raw = struct.unpack('<8h', self._busGaindB100) return tuple(
return tuple(round(val * 0.01, 2) for val in bus_gain_raw) round(
int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
@cached_property @property
def labels(self) -> Labels: def labels(self) -> Labels:
"""returns Labels namedtuple of strip and bus labels""" """returns Labels namedtuple of strip and bus labels"""

View File

@ -1,12 +1,11 @@
import struct import struct
from dataclasses import dataclass from dataclasses import dataclass
from functools import cached_property
from typing import NamedTuple from typing import NamedTuple
from vban_cmd.enums import NBS from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass from vban_cmd.kinds import KindMapClass
from .headers import VbanRTPacket from .headers import VbanPacket
VMPARAMSTRIP_SIZE = 174 VMPARAMSTRIP_SIZE = 174
@ -194,15 +193,11 @@ class VbanVMParamStrip:
_Pitch_formant_high=data[172:174], _Pitch_formant_high=data[172:174],
) )
@cached_property @property
def mode(self) -> int: def mode(self) -> int:
return int.from_bytes(self._mode, 'little') return int.from_bytes(self._mode, 'little')
@cached_property @property
def karaoke(self) -> int:
return int.from_bytes(self._nKaraoke, 'little')
@cached_property
def audibility(self) -> Audibility: def audibility(self) -> Audibility:
return Audibility( return Audibility(
round(int.from_bytes(self._audibility, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._audibility, 'little', signed=True) * 0.01, 2),
@ -211,7 +206,7 @@ class VbanVMParamStrip:
round(int.from_bytes(self._audibility_d, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._audibility_d, 'little', signed=True) * 0.01, 2),
) )
@cached_property @property
def positions(self) -> Positions: def positions(self) -> Positions:
return Positions( return Positions(
round(int.from_bytes(self._pos3D_x, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._pos3D_x, 'little', signed=True) * 0.01, 2),
@ -222,7 +217,7 @@ class VbanVMParamStrip:
round(int.from_bytes(self._posMod_y, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._posMod_y, 'little', signed=True) * 0.01, 2),
) )
@cached_property @property
def eqgains(self) -> EqGains: def eqgains(self) -> EqGains:
return EqGains( return EqGains(
*[ *[
@ -235,7 +230,7 @@ class VbanVMParamStrip:
] ]
) )
@cached_property @property
def parametric_eq(self) -> tuple[ParametricEQSettings, ...]: def parametric_eq(self) -> tuple[ParametricEQSettings, ...]:
return tuple( return tuple(
ParametricEQSettings( ParametricEQSettings(
@ -248,7 +243,7 @@ class VbanVMParamStrip:
for i in range(6) for i in range(6)
) )
@cached_property @property
def sends(self) -> Sends: def sends(self) -> Sends:
return Sends( return Sends(
round(int.from_bytes(self._send_reverb, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._send_reverb, 'little', signed=True) * 0.01, 2),
@ -257,7 +252,11 @@ class VbanVMParamStrip:
round(int.from_bytes(self._send_fx2, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._send_fx2, 'little', signed=True) * 0.01, 2),
) )
@cached_property @property
def karaoke(self) -> int:
return int.from_bytes(self._nKaraoke, 'little')
@property
def compressor(self) -> CompressorSettings: def compressor(self) -> CompressorSettings:
return CompressorSettings( return CompressorSettings(
gain_in=round( gain_in=round(
@ -277,7 +276,7 @@ class VbanVMParamStrip:
), ),
) )
@cached_property @property
def gate(self) -> GateSettings: def gate(self) -> GateSettings:
return GateSettings( return GateSettings(
threshold_in=round( threshold_in=round(
@ -296,7 +295,7 @@ class VbanVMParamStrip:
release_ms=round(int.from_bytes(self._GATE_release_ms, 'little') * 0.1, 2), release_ms=round(int.from_bytes(self._GATE_release_ms, 'little') * 0.1, 2),
) )
@cached_property @property
def denoiser(self) -> DenoiserSettings: def denoiser(self) -> DenoiserSettings:
return DenoiserSettings( return DenoiserSettings(
threshold=round( threshold=round(
@ -304,7 +303,7 @@ class VbanVMParamStrip:
) )
) )
@cached_property @property
def pitch(self) -> PitchSettings: def pitch(self) -> PitchSettings:
return PitchSettings( return PitchSettings(
enabled=bool(int.from_bytes(self._PitchEnabled, 'little')), enabled=bool(int.from_bytes(self._PitchEnabled, 'little')),
@ -328,8 +327,8 @@ class VbanVMParamStrip:
@dataclass @dataclass
class VbanRTPacketNBS1(VbanRTPacket): class VbanPacketNBS1(VbanPacket):
"""Represents the body of a VBAN RTPacket with ident:1""" """Represents the body of a VBAN data packet with ident:1"""
strips: tuple[VbanVMParamStrip, ...] strips: tuple[VbanVMParamStrip, ...]

View File

@ -1,4 +1,3 @@
import struct
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
@ -66,31 +65,30 @@ class VbanPing0Payload:
"""Convert payload to bytes""" """Convert payload to bytes"""
payload = cls() payload = cls()
return struct.pack( data = bytearray()
'<7I4s8s8s8s8s64s32s2H64s64s64s64s128s128s', data.extend(payload.bit_type.to_bytes(4, 'little'))
payload.bit_type, data.extend(payload.bit_feature.to_bytes(4, 'little'))
payload.bit_feature, data.extend(payload.bit_feature_ex.to_bytes(4, 'little'))
payload.bit_feature_ex, data.extend(payload.preferred_rate.to_bytes(4, 'little'))
payload.preferred_rate, data.extend(payload.min_rate.to_bytes(4, 'little'))
payload.min_rate, data.extend(payload.max_rate.to_bytes(4, 'little'))
payload.max_rate, data.extend(payload.color_rgb.to_bytes(4, 'little'))
payload.color_rgb, data.extend(payload.version)
payload.version, data.extend(payload.gps_position)
payload.gps_position, data.extend(payload.user_position)
payload.user_position, data.extend(payload.lang_code)
payload.lang_code, data.extend(payload.reserved)
payload.reserved, data.extend(payload.reserved_ex)
payload.reserved_ex, data.extend(payload.distant_ip)
payload.distant_ip, data.extend(payload.distant_port.to_bytes(2, 'little'))
payload.distant_port, data.extend(payload.distant_reserved.to_bytes(2, 'little'))
payload.distant_reserved, data.extend(payload.device_name)
payload.device_name, data.extend(payload.manufacturer_name)
payload.manufacturer_name, data.extend(payload.application_name)
payload.application_name, data.extend(payload.host_name)
payload.host_name, data.extend(payload.user_name)
payload.user_name, data.extend(payload.user_comment)
payload.user_comment, return bytes(data)
)
@classmethod @classmethod
def create_packet(cls, framecounter: int) -> bytes: def create_packet(cls, framecounter: int) -> bytes:

View File

@ -5,12 +5,12 @@ from typing import Iterator
from .error import VBANCMDConnectionError from .error import VBANCMDConnectionError
def script_ratelimit(func): def ratelimit(func):
"""script_ratelimit decorator for {VbanCmd}.sendtext, to prevent flooding the network with script requests.""" """ratelimit decorator for {VbanCmd}.sendtext, to prevent flooding the network with script requests."""
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
self, *rem = args self, *rem = args
if self.script_ratelimit: if self.script_ratelimit > 0:
now = time.time() now = time.time()
elapsed = now - self._last_script_request_time elapsed = now - self._last_script_request_time
if elapsed < self.script_ratelimit: if elapsed < self.script_ratelimit:
@ -21,8 +21,8 @@ def script_ratelimit(func):
return wrapper return wrapper
def pong_timeout(func): def ping_timeout(func):
"""pong_timeout decorator for {VbanCmd}._handle_pong, to handle timeout logic and socket management.""" """ping_timeout decorator for {VbanCmd}._ping, to handle timeout logic and socket management."""
def wrapper(self, timeout: float = None): def wrapper(self, timeout: float = None):
if timeout is None: if timeout is None:
@ -32,14 +32,25 @@ def pong_timeout(func):
self.sock.settimeout(0.5) self.sock.settimeout(0.5)
try: try:
func(self)
start_time = time.time() start_time = time.time()
response_count = 0 response_count = 0
while time.time() - start_time < timeout: while time.time() - start_time < timeout:
try: try:
data, addr = self.sock.recvfrom(2048)
response_count += 1 response_count += 1
if func(self): self.logger.debug(
f'Received packet #{response_count} from {addr}: {len(data)} bytes'
)
self.logger.debug(
f'Response header: {data[: min(32, len(data))].hex()}'
)
result = func(self, data, addr)
if result is True:
return return
except socket.timeout: except socket.timeout:
@ -124,11 +135,16 @@ def comp(t0: tuple, t1: tuple) -> Iterator[bool]:
""" """
Generator function, accepts two tuples of dB values. Generator function, accepts two tuples of dB values.
Returns True when levels are equal (no change), False when different. Evaluates equality of each member in both tuples.
Only ignores changes when levels are very quiet (below -72 dB).
""" """
for a, b in zip(t0, t1): for a, b in zip(t0, t1):
yield a == b # If both values are very quiet (below -72dB), ignore small changes
if a <= -72.0 and b <= -72.0:
yield a == b # Both quiet, check if they're equal
else:
yield a != b # At least one has significant level, detect changes
def deep_merge(dict1, dict2): def deep_merge(dict1, dict2):

View File

@ -13,11 +13,11 @@ from .event import Event
from .packet.headers import ( from .packet.headers import (
VbanMatrixResponseHeader, VbanMatrixResponseHeader,
VbanPongHeader, VbanPongHeader,
VbanRTRequestHeader, VbanRequestHeader,
) )
from .packet.ping0 import VbanPing0Payload, VbanServerType from .packet.ping0 import VbanPing0Payload, VbanServerType
from .subject import Subject from .subject import Subject
from .util import bump_framecounter, deep_merge, pong_timeout, script_ratelimit from .util import bump_framecounter, deep_merge, ping_timeout, ratelimit
from .worker import Producer, Subscriber, Updater from .worker import Producer, Subscriber, Updater
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -27,6 +27,13 @@ class VbanCmd(abc.ABC):
"""Abstract Base Class for Voicemeeter VBAN Command Interfaces""" """Abstract Base Class for Voicemeeter VBAN Command Interfaces"""
DELAY = 0.001 DELAY = 0.001
# fmt: off
BPS_OPTS = [
0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 31250,
38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800, 921600,
1000000, 1500000, 2000000, 3000000,
]
# fmt: on
def __init__(self, **kwargs): def __init__(self, **kwargs):
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
@ -36,13 +43,6 @@ class VbanCmd(abc.ABC):
for attr, val in kwargs.items(): for attr, val in kwargs.items():
setattr(self, attr, val) setattr(self, attr, val)
try:
self._host_ip = socket.gethostbyname(self.host)
except socket.gaierror as e:
raise VBANCMDConnectionError(
f'Unable to resolve hostname {self.host}'
) from e
self._framecounter = 0 self._framecounter = 0
self._framecounter_lock = threading.Lock() self._framecounter_lock = threading.Lock()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@ -56,10 +56,10 @@ class VbanCmd(abc.ABC):
self.producer = None self.producer = None
self._last_script_request_time = 0 self._last_script_request_time = 0
@property
@abc.abstractmethod @abc.abstractmethod
def steps(self): def __str__(self):
"""Steps required to build the interface for this Voicemeeter kind""" """Ensure subclasses override str magic method"""
pass
def _conn_from_toml(self) -> dict: def _conn_from_toml(self) -> dict:
try: try:
@ -97,7 +97,6 @@ class VbanCmd(abc.ABC):
If the server is detected as Matrix, RT listeners will be disabled for compatibility. If the server is detected as Matrix, RT listeners will be disabled for compatibility.
""" """
self._ping() self._ping()
self._handle_pong()
if not self.disable_rt_listeners: if not self.disable_rt_listeners:
self.event.info() self.event.info()
@ -139,24 +138,30 @@ class VbanCmd(abc.ABC):
self._framecounter = bump_framecounter(self._framecounter) self._framecounter = bump_framecounter(self._framecounter)
return current return current
def _ping(self): @ping_timeout
"""Initiates the PING/PONG handshake with the VBAN server.""" def _ping(self, data=None, addr=None) -> bool:
"""Handles the PING/PONG handshake with the VBAN server, including timeout logic and server type detection.
If data and addr are None, it sends a PING packet. If a PONG response is received, it returns True.
If a non-PONG packet is received, it logs the packet details and continues waiting until timeout"""
if data is None and addr is None:
ping_packet = VbanPing0Payload.create_packet(self._get_next_framecounter())
try: try:
self.sock.sendto( self.sock.sendto(
VbanPing0Payload.create_packet(self._get_next_framecounter()), ping_packet, (socket.gethostbyname(self.host), self.port)
(self._host_ip, self.port),
) )
self.logger.debug(f'PING sent to {self.host}:{self.port}') self.logger.debug(f'PING sent to {self.host}:{self.port}')
except socket.gaierror as e:
raise VBANCMDConnectionError(
f'Unable to resolve hostname {self.host}'
) from e
except Exception as e: except Exception as e:
raise VBANCMDConnectionError(f'PING failed: {e}') from e raise VBANCMDConnectionError(f'PING failed: {e}') from e
@pong_timeout return False
def _handle_pong(self) -> bool:
"""Handles incoming packets during the PING/PONG handshake, looking for a valid PONG response to confirm connectivity and detect server type.
Returns True if a valid PONG is received, False otherwise."""
data, addr = self.sock.recvfrom(2048)
if VbanPongHeader.is_pong_response(data): if VbanPongHeader.is_pong_response(data):
self.logger.debug(f'PONG received from {addr}, connectivity confirmed') self.logger.debug(f'PONG received from {addr}, connectivity confirmed')
@ -198,14 +203,14 @@ class VbanCmd(abc.ABC):
def _send_request(self, payload: str) -> None: def _send_request(self, payload: str) -> None:
"""Sends a request packet over the network and bumps the framecounter.""" """Sends a request packet over the network and bumps the framecounter."""
self.sock.sendto( self.sock.sendto(
VbanRTRequestHeader.encode_with_payload( VbanRequestHeader.encode_with_payload(
name=self.streamname, name=self.streamname,
bps=self.bps, bps_index=self.BPS_OPTS.index(self.bps),
channel=self.channel, channel=self.channel,
framecounter=self._get_next_framecounter(), framecounter=self._get_next_framecounter(),
payload=payload, payload=payload,
), ),
(self._host_ip, self.port), (socket.gethostbyname(self.host), self.port),
) )
def _set_rt(self, cmd: str, val: Union[str, float]): def _set_rt(self, cmd: str, val: Union[str, float]):
@ -213,7 +218,7 @@ class VbanCmd(abc.ABC):
self._send_request(f'{cmd}={val};') self._send_request(f'{cmd}={val};')
self.cache[cmd] = val self.cache[cmd] = val
@script_ratelimit @ratelimit
def sendtext(self, script) -> str | None: def sendtext(self, script) -> str | None:
"""Sends a multiple parameter string over a network.""" """Sends a multiple parameter string over a network."""
self._send_request(script) self._send_request(script)

View File

@ -3,16 +3,15 @@ import threading
import time import time
from .enums import NBS from .enums import NBS
from .error import VBANCMDConnectionError, VBANCMDPacketError from .error import VBANCMDConnectionError
from .packet.enums import SubProtocols
from .packet.headers import ( from .packet.headers import (
HEADER_SIZE, HEADER_SIZE,
VbanRTPacket, VbanPacket,
VbanRTResponseHeader, VbanResponseHeader,
VbanRTSubscribeHeader, VbanSubscribeHeader,
) )
from .packet.nbs0 import VbanRTPacketNBS0 from .packet.nbs0 import VbanPacketNBS0
from .packet.nbs1 import VbanRTPacketNBS1 from .packet.nbs1 import VbanPacketNBS1
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -29,11 +28,11 @@ class Subscriber(threading.Thread):
def run(self): def run(self):
while not self.stopped(): while not self.stopped():
for nbs in NBS: for nbs in NBS:
sub_packet = VbanRTSubscribeHeader().to_bytes( sub_packet = VbanSubscribeHeader().to_bytes(
nbs, self._remote._get_next_framecounter() nbs, self._remote._get_next_framecounter()
) )
self._remote.sock.sendto( self._remote.sock.sendto(
sub_packet, (self._remote._host_ip, self._remote.port) sub_packet, (self._remote.host, self._remote.port)
) )
self.wait_until_stopped(10) self.wait_until_stopped(10)
@ -67,7 +66,7 @@ class Producer(threading.Thread):
self._remote.cache['bus_level'], self._remote.cache['bus_level'],
) = self._remote.public_packets[NBS.zero].levels ) = self._remote.public_packets[NBS.zero].levels
def _get_rt(self) -> VbanRTPacket: def _get_rt(self) -> VbanPacket:
"""Attempt to fetch data packet until a valid one found""" """Attempt to fetch data packet until a valid one found"""
while True: while True:
try: try:
@ -81,24 +80,19 @@ class Producer(threading.Thread):
) from e ) from e
try: try:
header = VbanRTResponseHeader.from_bytes(data[:HEADER_SIZE]) header = VbanResponseHeader.from_bytes(data[:HEADER_SIZE])
except VBANCMDPacketError as e: except ValueError as e:
match e.protocol:
case SubProtocols.SERVICE:
# Silently ignore periodic SERVICE packets unrelated to vban-cmd
pass
case _:
self.logger.debug(f'Error parsing response packet: {e}') self.logger.debug(f'Error parsing response packet: {e}')
continue continue
match header.format_nbs: match header.format_nbs:
case NBS.zero: case NBS.zero:
return VbanRTPacketNBS0.from_bytes( return VbanPacketNBS0.from_bytes(
nbs=NBS.zero, kind=self._remote.kind, data=data nbs=NBS.zero, kind=self._remote.kind, data=data
) )
case NBS.one: case NBS.one:
return VbanRTPacketNBS1.from_bytes( return VbanPacketNBS1.from_bytes(
nbs=NBS.one, kind=self._remote.kind, data=data nbs=NBS.one, kind=self._remote.kind, data=data
) )