14 Commits

Author SHA1 Message Date
3d01321be3 separate ping from pong
this separates concerns and allows the pong_timeout to strictly handle timeouts for pongs.

patch bump
2026-03-03 19:47:15 +00:00
2dd52a7258 move ping timeout logic into decorator
patch bump
2026-03-03 18:21:14 +00:00
28cbef5ef6 patch bump 2026-03-03 15:48:23 +00:00
5b3b35fca3 flag value 2026-03-03 15:48:09 +00:00
7b3149a1e1 patch bump 2026-03-03 15:38:11 +00:00
230d9f0eb3 upd test config 2026-03-03 15:37:38 +00:00
c9a505df0a convert Modes class to a Flag Enum type and rename it to ChannelModes
move it into vban_cmd.packet
2026-03-03 15:36:56 +00:00
3e3bec6d50 remove the sleep(), the @ratelimit decorator already handles this. 2026-03-03 15:31:31 +00:00
55b3125e10 fixes regression in apply()
patch bump
2026-03-02 23:52:06 +00:00
7b3340042c upd reference to ip 2026-03-02 23:26:38 +00:00
6ea0859180 patch bump 2026-03-02 23:25:03 +00:00
81ed963bea fix references to remote.ip 2026-03-02 23:24:09 +00:00
0b99b6a67f update references to ip kwarg 2026-03-02 23:21:57 +00:00
86d0aa91c3 add a ratelimit decorator to {VbanCmd}.sendtext()
ip kwarg renamed to host.
2026-03-02 23:20:45 +00:00
11 changed files with 205 additions and 201 deletions

View File

@@ -41,14 +41,14 @@ Load VBAN connection info from toml config. A valid `vban.toml` might look like
```toml
[connection]
ip = "gamepc.local"
host = "localhost"
port = 6980
streamname = "Command1"
```
It should be placed in \<user home directory\> / "Documents" / "Voicemeeter" / "configs"
Alternatively you may pass `ip`, `port`, `streamname` as keyword arguments.
Alternatively you may pass `host`, `port`, `streamname` as keyword arguments.
#### `__main__.py`
@@ -85,7 +85,7 @@ def main():
KIND_ID = 'banana'
with vban_cmd.api(
KIND_ID, ip='gamepc.local', port=6980, streamname='Command1'
KIND_ID, host='localhost', port=6980, streamname='Command1'
) as vban:
do = ManyThings(vban)
do.things()
@@ -474,7 +474,7 @@ example:
import vban_cmd
opts = {
'ip': '<ip address>',
'host': '<ip address>',
'streamname': 'Command1',
'port': 6980,
}
@@ -541,14 +541,15 @@ print(vban.event.get())
You may pass the following optional keyword arguments:
- `ip`: str='localhost', ip or hostname of remote machine
- `host`: str='localhost', ip or hostname of remote machine
- `port`: int=6980, vban udp port of remote machine.
- `streamname`: str='Command1', name of the stream to connect to.
- `bps`: int=256000, bps rate of the stream.
- `channel`: int=0, channel on which to send the UDP requests.
- `pdirty`: boolean=False, parameter updates
- `ldirty`: boolean=False, level updates
- `timeout`: int=5, amount of time (seconds) to wait for an incoming RT data packet (parameter states).
- `script_ratelimit`: float=0.05, default to 20 script requests per second. This affects vban.sendtext() specifically.
- `timeout`: int=5, timeout for socket operations.
- `disable_rt_listeners`: boolean=False, set `True` if you don't wish to receive RT packets.
- You can still send Matrix string requests ending with `?` and receive a response.
@@ -591,7 +592,7 @@ import vban_cmd
logging.basicConfig(level=logging.DEBUG)
opts = {'ip': 'ip.local', 'port': 6980, 'streamname': 'Command1'}
opts = {'host': 'localhost', 'port': 6980, 'streamname': 'Command1'}
with vban_cmd.api('banana', **opts) as vban:
...
```

View File

@@ -1,6 +1,6 @@
[project]
name = "vban-cmd"
version = "2.9.0"
version = "2.9.6"
description = "Python interface for the VBAN RT Packet Service (Sendtext)"
authors = [{ name = "Onyx and Iris", email = "code@onyxandiris.online" }]
license = { text = "MIT" }

View File

@@ -11,7 +11,7 @@ from vban_cmd.kinds import request_kind_map as kindmap
KIND_ID = os.environ.get('KIND', 'potato')
opts = {
'ip': os.getenv('VBANCMD_IP', 'localhost'),
'host': os.getenv('VBANCMD_HOST', 'localhost'),
'streamname': os.getenv('VBANCMD_STREAMNAME', 'Command1'),
'port': int(os.getenv('VBANCMD_PORT', 6980)),
}

View File

@@ -84,18 +84,20 @@ class FactoryBase(VbanCmd):
def __init__(self, kind_id: str, **kwargs):
defaultkwargs = {
'ip': 'localhost',
'host': 'localhost',
'port': 6980,
'streamname': 'Command1',
'bps': 256000,
'channel': 0,
'ratelimit': 0.01,
'timeout': 5,
'script_ratelimit': 0.05, # 20 commands per second, to avoid overloading Voicemeeter
'timeout': 5, # timeout on socket operations, in seconds
'disable_rt_listeners': False,
'sync': False,
'pdirty': False,
'ldirty': False,
}
if 'ip' in kwargs:
defaultkwargs['host'] = kwargs.pop('ip') # for backwards compatibility
if 'subs' in kwargs:
defaultkwargs |= kwargs.pop('subs') # for backwards compatibility
kwargs = defaultkwargs | kwargs

View File

@@ -1,83 +1,9 @@
import abc
import logging
import time
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class Modes:
"""Channel Modes"""
_mute: hex = 0x00000001
_solo: hex = 0x00000002
_mono: hex = 0x00000004
_mc: hex = 0x00000008
_amix: hex = 0x00000010
_repeat: hex = 0x00000020
_bmix: hex = 0x00000030
_composite: hex = 0x00000040
_tvmix: hex = 0x00000050
_upmix21: hex = 0x00000060
_upmix41: hex = 0x00000070
_upmix61: hex = 0x00000080
_centeronly: hex = 0x00000090
_lfeonly: hex = 0x000000A0
_rearonly: hex = 0x000000B0
_mask: hex = 0x000000F0
_on: hex = 0x00000100 # eq.on
_cross: hex = 0x00000200
_ab: hex = 0x00000800 # eq.ab
_busa: hex = 0x00001000
_busa1: hex = 0x00001000
_busa2: hex = 0x00002000
_busa3: hex = 0x00004000
_busa4: hex = 0x00008000
_busa5: hex = 0x00080000
_busb: hex = 0x00010000
_busb1: hex = 0x00010000
_busb2: hex = 0x00020000
_busb3: hex = 0x00040000
_pan0: hex = 0x00000000
_pancolor: hex = 0x00100000
_panmod: hex = 0x00200000
_panmask: hex = 0x00F00000
_postfx_r: hex = 0x01000000
_postfx_d: hex = 0x02000000
_postfx1: hex = 0x04000000
_postfx2: hex = 0x08000000
_sel: hex = 0x10000000
_monitor: hex = 0x20000000
@property
def modevals(self):
return (
val
for val in [
self._amix,
self._repeat,
self._bmix,
self._composite,
self._tvmix,
self._upmix21,
self._upmix41,
self._upmix61,
self._centeronly,
self._lfeonly,
self._rearonly,
]
)
class IRemote(abc.ABC):
"""
Common interface between base class and extended (higher) classes
@@ -89,7 +15,6 @@ class IRemote(abc.ABC):
self._remote = remote
self.index = index
self.logger = logger.getChild(self.__class__.__name__)
self._modes = Modes()
def getter(self, param):
cmd = self._cmd(param)
@@ -123,6 +48,8 @@ class IRemote(abc.ABC):
def apply(self, data):
"""Sets all parameters of a dict for the channel."""
script = ''
def fget(attr, val):
if attr == 'mode':
return (f'mode.{val}', 1)
@@ -138,14 +65,9 @@ class IRemote(abc.ABC):
val = 1 if val else 0
self._remote.cache[self._cmd(attr)] = val
self._remote._script += f'{self._cmd(attr)}={val};'
script += f'{self._cmd(attr)}={val};'
else:
target = getattr(self, attr)
target.apply(val)
self._remote.sendtext(self._remote._script)
return self
def then_wait(self):
self._remote._script = str()
time.sleep(self._remote.DELAY)
self._remote.sendtext(script)

View File

@@ -1,6 +1,7 @@
from functools import partial
from .enums import NBS, BusModes
from .packet.enums import ChannelModes
from .util import cache_bool, cache_float, cache_int, cache_string
@@ -27,7 +28,7 @@ def channel_bool_prop(param):
elif param.lower() == 'mc':
return channel_state.mc
else:
return channel_state.get_mode(getattr(self._modes, f'_{param.lower()}'))
return channel_state.get_mode(getattr(ChannelModes, param.upper()).value)
def fset(self, val):
self.setter(param, 1 if val else 0)
@@ -55,7 +56,9 @@ def channel_int_prop(param):
bit_9 = (channel_state._state >> 9) & 1
return (bit_9 << 1) | bit_2
else:
return channel_state.get_mode_int(getattr(self._modes, f'_{param.lower()}'))
return channel_state.get_mode_int(
getattr(ChannelModes, param.upper()).value
)
def fset(self, val):
self.setter(param, val)
@@ -89,7 +92,7 @@ def strip_output_prop(param):
strip_state = self.public_packets[NBS.zero].states.strip[self.index]
return strip_state.get_mode(getattr(self._modes, f'_bus{param.lower()}'))
return strip_state.get_mode(getattr(ChannelModes, f'BUS{param.upper()}').value)
def fset(self, val):
self.setter(param, 1 if val else 0)

53
vban_cmd/packet/enums.py Normal file
View File

@@ -0,0 +1,53 @@
from enum import Flag
class ChannelModes(Flag):
"""Channel Modes - Bit flags that can be combined"""
MUTE = 0x00000001
SOLO = 0x00000002
MONO = 0x00000004
MC = 0x00000008
AMIX = 0x00000010
REPEAT = 0x00000020
BMIX = 0x00000030
COMPOSITE = 0x00000040
TVMIX = 0x00000050
UPMIX21 = 0x00000060
UPMIX41 = 0x00000070
UPMIX61 = 0x00000080
CENTERONLY = 0x00000090
LFEONLY = 0x000000A0
REARONLY = 0x000000B0
MASK = 0x000000F0
ON = 0x00000100 # eq.on
CROSS = 0x00000200
AB = 0x00000800 # eq.ab
BUSA = 0x00001000
BUSA1 = 0x00001000
BUSA2 = 0x00002000
BUSA3 = 0x00004000
BUSA4 = 0x00008000
BUSA5 = 0x00080000
BUSB = 0x00010000
BUSB1 = 0x00010000
BUSB2 = 0x00020000
BUSB3 = 0x00040000
PAN0 = 0x00000000
PANCOLOR = 0x00100000
PANMOD = 0x00200000
PANMASK = 0x00F00000
POSTFX_R = 0x01000000
POSTFX_D = 0x02000000
POSTFX1 = 0x04000000
POSTFX2 = 0x08000000
SEL = 0x10000000
MONITOR = 0x20000000

View File

@@ -5,6 +5,7 @@ from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass
from vban_cmd.util import comp
from .enums import ChannelModes
from .headers import VbanPacket
@@ -31,57 +32,57 @@ class ChannelState:
# Common boolean modes
@property
def mute(self) -> bool:
return (self._state & 0x00000001) != 0
return (self._state & ChannelModes.MUTE.value) != 0
@property
def solo(self) -> bool:
return (self._state & 0x00000002) != 0
return (self._state & ChannelModes.SOLO.value) != 0
@property
def mono(self) -> bool:
return (self._state & 0x00000004) != 0
return (self._state & ChannelModes.MONO.value) != 0
@property
def mc(self) -> bool:
return (self._state & 0x00000008) != 0
return (self._state & ChannelModes.MC.value) != 0
# EQ modes
@property
def eq_on(self) -> bool:
return (self._state & 0x00000100) != 0
return (self._state & ChannelModes.ON.value) != 0
@property
def eq_ab(self) -> bool:
return (self._state & 0x00000800) != 0
return (self._state & ChannelModes.AB.value) != 0
# Bus assignments (strip to bus routing)
@property
def busa1(self) -> bool:
return (self._state & 0x00001000) != 0
return (self._state & ChannelModes.BUSA1.value) != 0
@property
def busa2(self) -> bool:
return (self._state & 0x00002000) != 0
return (self._state & ChannelModes.BUSA2.value) != 0
@property
def busa3(self) -> bool:
return (self._state & 0x00004000) != 0
return (self._state & ChannelModes.BUSA3.value) != 0
@property
def busa4(self) -> bool:
return (self._state & 0x00008000) != 0
return (self._state & ChannelModes.BUSA4.value) != 0
@property
def busb1(self) -> bool:
return (self._state & 0x00010000) != 0
return (self._state & ChannelModes.BUSB1.value) != 0
@property
def busb2(self) -> bool:
return (self._state & 0x00020000) != 0
return (self._state & ChannelModes.BUSB2.value) != 0
@property
def busb3(self) -> bool:
return (self._state & 0x00040000) != 0
return (self._state & ChannelModes.BUSB3.value) != 0
class States(NamedTuple):

View File

@@ -1,5 +1,62 @@
import socket
import time
from typing import Iterator
from .error import VBANCMDConnectionError
def ratelimit(func):
"""ratelimit decorator for {VbanCmd}.sendtext, to prevent flooding the network with script requests."""
def wrapper(*args, **kwargs):
self, *rem = args
if self.script_ratelimit > 0:
now = time.time()
elapsed = now - self._last_script_request_time
if elapsed < self.script_ratelimit:
time.sleep(self.script_ratelimit - elapsed)
self._last_script_request_time = time.time()
return func(*args, **kwargs)
return wrapper
def pong_timeout(func):
"""pong_timeout decorator for {VbanCmd}._handle_pong, to handle timeout logic and socket management."""
def wrapper(self, timeout: float = None):
if timeout is None:
timeout = min(self.timeout, 3.0)
original_timeout = self.sock.gettimeout()
self.sock.settimeout(0.5)
try:
start_time = time.time()
response_count = 0
while time.time() - start_time < timeout:
try:
response_count += 1
if func(self):
return
except socket.timeout:
continue
self.logger.debug(
f'PING timeout after {timeout}s, received {response_count} non-PONG packets'
)
raise VBANCMDConnectionError(
f'PING timeout: No response from {self.host}:{self.port} after {timeout}s'
)
finally:
self.sock.settimeout(original_timeout)
return wrapper
def cache_bool(func, param):
"""Check cache for a bool prop"""

View File

@@ -5,7 +5,7 @@ import threading
import time
from pathlib import Path
from queue import Queue
from typing import Union
from typing import Mapping, Union
from .enums import NBS
from .error import VBANCMDConnectionError, VBANCMDError
@@ -17,7 +17,7 @@ from .packet.headers import (
)
from .packet.ping0 import VbanPing0Payload, VbanServerType
from .subject import Subject
from .util import bump_framecounter, deep_merge
from .util import bump_framecounter, deep_merge, pong_timeout, ratelimit
from .worker import Producer, Subscriber, Updater
logger = logging.getLogger(__name__)
@@ -38,7 +38,7 @@ class VbanCmd(abc.ABC):
def __init__(self, **kwargs):
self.logger = logger.getChild(self.__class__.__name__)
self.event = Event({k: kwargs.pop(k) for k in ('pdirty', 'ldirty')})
if not kwargs['ip']:
if not kwargs['host']:
kwargs |= self._conn_from_toml()
for attr, val in kwargs.items():
setattr(self, attr, val)
@@ -52,14 +52,13 @@ class VbanCmd(abc.ABC):
self.cache = {}
self._pdirty = False
self._ldirty = False
self._script = str()
self.stop_event = None
self.producer = None
self._last_script_request_time = 0
@abc.abstractmethod
def __str__(self):
"""Ensure subclasses override str magic method"""
pass
def _conn_from_toml(self) -> dict:
try:
@@ -97,6 +96,7 @@ class VbanCmd(abc.ABC):
If the server is detected as Matrix, RT listeners will be disabled for compatibility.
"""
self._ping()
self._handle_pong()
if not self.disable_rt_listeners:
self.event.info()
@@ -113,7 +113,7 @@ class VbanCmd(abc.ABC):
self.producer.start()
self.logger.info(
"Successfully logged into VBANCMD {kind} with ip='{ip}', port={port}, streamname='{streamname}'".format(
"Successfully logged into VBANCMD {kind} with host='{host}', port={port}, streamname='{streamname}'".format(
**self.__dict__
)
)
@@ -138,43 +138,35 @@ class VbanCmd(abc.ABC):
self._framecounter = bump_framecounter(self._framecounter)
return current
def _ping(self, timeout: float = None) -> None:
"""Send a PING packet and wait for PONG response to verify connectivity."""
if timeout is None:
timeout = min(self.timeout, 3.0)
def _ping(self):
"""Initiates the PING/PONG handshake with the VBAN server."""
ping_packet = VbanPing0Payload.create_packet(self._get_next_framecounter())
original_timeout = self.sock.gettimeout()
self.sock.settimeout(0.5)
try:
self.sock.sendto(ping_packet, (socket.gethostbyname(self.ip), self.port))
self.logger.debug(f'PING sent to {self.ip}:{self.port}')
self.sock.sendto(ping_packet, (socket.gethostbyname(self.host), self.port))
self.logger.debug(f'PING sent to {self.host}:{self.port}')
start_time = time.time()
response_count = 0
while time.time() - start_time < timeout:
try:
except socket.gaierror as e:
raise VBANCMDConnectionError(
f'Unable to resolve hostname {self.host}'
) from e
except Exception as e:
raise VBANCMDConnectionError(f'PING failed: {e}') from e
@pong_timeout
def _handle_pong(self) -> bool:
"""Handles incoming packets during the PING/PONG handshake, looking for a valid PONG response to confirm connectivity and detect server type.
Returns True if a valid PONG is received, False otherwise."""
data, addr = self.sock.recvfrom(2048)
response_count += 1
self.logger.debug(
f'Received packet #{response_count} from {addr}: {len(data)} bytes'
)
self.logger.debug(
f'Response header: {data[: min(32, len(data))].hex()}'
)
if VbanPongHeader.is_pong_response(data):
self.logger.debug(
f'PONG received from {addr}, connectivity confirmed'
)
self.logger.debug(f'PONG received from {addr}, connectivity confirmed')
server_type = VbanPing0Payload.detect_server_type(data)
self._handle_server_type(server_type)
return # Exit after successful PONG response
return True
else:
if len(data) >= 8:
if data[:4] == b'VBAN':
@@ -186,22 +178,7 @@ class VbanCmd(abc.ABC):
else:
self.logger.debug('Non-VBAN packet received')
except socket.timeout:
continue
self.logger.debug(
f'PING timeout after {timeout}s, received {response_count} non-PONG packets'
)
raise VBANCMDConnectionError(
f'PING timeout: No response from {self.ip}:{self.port} after {timeout}s'
)
except socket.gaierror as e:
raise VBANCMDConnectionError(f'Unable to resolve hostname {self.ip}') from e
except Exception as e:
raise VBANCMDConnectionError(f'PING failed: {e}') from e
finally:
self.sock.settimeout(original_timeout)
return False
def _handle_server_type(self, server_type: VbanServerType) -> None:
"""Handle the detected server type by adjusting settings accordingly."""
@@ -230,7 +207,7 @@ class VbanCmd(abc.ABC):
framecounter=self._get_next_framecounter(),
payload=payload,
),
(socket.gethostbyname(self.ip), self.port),
(socket.gethostbyname(self.host), self.port),
)
def _set_rt(self, cmd: str, val: Union[str, float]):
@@ -238,6 +215,7 @@ class VbanCmd(abc.ABC):
self._send_request(f'{cmd}={val};')
self.cache[cmd] = val
@ratelimit
def sendtext(self, script) -> str | None:
"""Sends a multiple parameter string over a network."""
self._send_request(script)
@@ -246,17 +224,14 @@ class VbanCmd(abc.ABC):
if self.disable_rt_listeners and script.endswith(('?', '?;')):
try:
data, _ = self.sock.recvfrom(2048)
payload = VbanMatrixResponseHeader.extract_payload(data)
return VbanMatrixResponseHeader.extract_payload(data)
except ValueError as e:
self.logger.warning(f'Error extracting matrix response: {e}')
except TimeoutError as e:
self.logger.exception(f'Timeout waiting for matrix response: {e}')
raise VBANCMDConnectionError(
f'Timeout waiting for response from {self.ip}:{self.port}'
f'Timeout waiting for response from {self.host}:{self.port}'
) from e
return payload
time.sleep(self.DELAY)
@property
def type(self) -> str:
@@ -288,12 +263,8 @@ class VbanCmd(abc.ABC):
while self.pdirty:
time.sleep(self.DELAY)
def apply(self, data: dict):
"""
Sets all parameters of a dict
minor delay between each recursion
"""
def apply(self, data: Mapping):
"""Set all parameters of a dict"""
def target(key):
match key.split('-'):
@@ -313,7 +284,8 @@ class VbanCmd(abc.ABC):
raise ValueError(ERR_MSG)
return target[int(index)]
[target(key).apply(di).then_wait() for key, di in data.items()]
for key, di in data.items():
target(key).apply(di)
def apply_config(self, name):
"""applies a config from memory"""

View File

@@ -27,19 +27,13 @@ class Subscriber(threading.Thread):
def run(self):
while not self.stopped():
try:
for nbs in NBS:
sub_packet = VbanSubscribeHeader().to_bytes(
nbs, self._remote._get_next_framecounter()
)
self._remote.sock.sendto(
sub_packet, (self._remote.ip, self._remote.port)
sub_packet, (self._remote.host, self._remote.port)
)
except TimeoutError as e:
self.logger.exception(f'{type(e).__name__}: {e}')
raise VBANCMDConnectionError(
f'timeout sending subscription to {self._remote.ip}:{self._remote.port}'
) from e
self.wait_until_stopped(10)
self.logger.debug(f'terminating {self.name} thread')
@@ -82,7 +76,7 @@ class Producer(threading.Thread):
except TimeoutError as e:
self.logger.exception(f'{type(e).__name__}: {e}')
raise VBANCMDConnectionError(
f'timeout waiting for response from {self._remote.ip}:{self._remote.port}'
f'timeout waiting for response from {self._remote.host}:{self._remote.port}'
) from e
try:
@@ -128,7 +122,6 @@ class Producer(threading.Thread):
self.queue.put('pdirty')
if self._remote.event.ldirty:
self.queue.put('ldirty')
# time.sleep(self._remote.ratelimit)
self.logger.debug(f'terminating {self.name} thread')
self.queue.put(None)