9 Commits

Author SHA1 Message Date
cf66ae252c minor bump 2026-03-02 20:50:12 +00:00
42f6f29d1e add 2.9.0 to CHANGELOG 2026-03-02 20:50:05 +00:00
a210766b7b improve framecounter thread safety 2026-03-02 20:44:08 +00:00
7d741d6e8b {VbanCmd}._ping() now causes login to fail fast if no pong is received. 2026-03-02 20:26:35 +00:00
8be9d3cb7f implement ping/pong dataclasses 2026-03-02 20:25:58 +00:00
23b99cb66b perform some path magic so Voicemeeter receives the entire path
patch bump
2026-03-01 21:29:09 +00:00
2fd7b8ad8b minor bump 2026-03-01 21:13:38 +00:00
c851cb5abe add Recorder section to README 2026-03-01 21:11:48 +00:00
dc681f50d0 add Recorder
add it to banana+potato
2026-03-01 21:10:10 +00:00
10 changed files with 553 additions and 56 deletions

2
.gitignore vendored
View File

@@ -159,3 +159,5 @@ config.toml
vban.toml
.vscode/
PING_FEATURE.md

View File

@@ -11,6 +11,16 @@ Before any major/minor/patch bump all unit tests will be run to verify they pass
- [x]
## [2.9.0] - 2026-03-02
### Added
- Recorder class, see [Recorder](https://github.com/onyx-and-iris/vban-cmd-python?tab=readme-ov-file#recorder) in README.
- Ping/pong implemented. If a pong is not received {VbanCmd}.login() will fail fast. This prevents the rt listener threads from starting up.
- It has the added benefit of automatically detecting the type of VBAN server (Voicemeeter or Matrix).
- A thread lock around the framecounter to improve thread safety since it can be accessed by both the main thread and the Producer thread.
## [2.7.0] - 2026-03-01
### Added

View File

@@ -349,6 +349,40 @@ vban.strip[0].fadeto(-10.3, 1000)
vban.bus[3].fadeby(-5.6, 500)
```
### Recorder
The following methods are available
- `play()`
- `stop()`
- `pause()`
- `record()`
- `ff()`
- `rew()`
- `load(filepath)`: raw string
- `goto(time_string)`: time string in format `hh:mm:ss`
The following properties are available
- `samplerate`: int, (22050, 24000, 32000, 44100, 48000, 88200, 96000, 176400, 192000)
- `bitresolution`: int, (8, 16, 24, 32)
- `channel`: int, from 1 to 8
- `kbps`: int, (32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320)
- `gain`: float, from -60.0 to 12.0
example:
```python
vban.recorder.play()
vban.recorder.stop()
# filepath as raw string
vban.recorder.load(r'C:\music\mytune.mp3')
# set the goto time to 1m 30s
vban.recorder.goto('00:01:30')
```
### Command
Certain 'special' commands are defined by the API as performing actions rather than setting values. The following methods are available:

View File

@@ -1,6 +1,6 @@
[project]
name = "vban-cmd"
version = "2.7.1"
version = "2.9.0"
description = "Python interface for the VBAN RT Packet Service (Sendtext)"
authors = [{ name = "Onyx and Iris", email = "code@onyxandiris.online" }]
license = { text = "MIT" }
@@ -9,7 +9,7 @@ requires-python = ">=3.10"
dependencies = ["tomli (>=2.0.1,<3.0) ; python_version < '3.11'"]
[tool.poetry.requires-plugins]
poethepoet = "^0.35.0"
poethepoet = ">=0.42.0"
[tool.poetry.group.dev.dependencies]
pytest = "^8.3.4"

View File

@@ -11,6 +11,7 @@ from .error import VBANCMDError
from .kinds import KindMapClass
from .kinds import request_kind_map as kindmap
from .macrobutton import MacroButton
from .recorder import Recorder
from .strip import request_strip_obj as strip
from .vban import request_vban_obj as vban
from .vbancmd import VbanCmd
@@ -26,7 +27,7 @@ class FactoryBuilder:
"""
BuilderProgress = IntEnum(
'BuilderProgress', 'strip bus command macrobutton vban', start=0
'BuilderProgress', 'strip bus command macrobutton vban recorder', start=0
)
def __init__(self, factory, kind: KindMapClass):
@@ -38,6 +39,7 @@ class FactoryBuilder:
f'Finished building commands for {self._factory}',
f'Finished building macrobuttons for {self._factory}',
f'Finished building vban in/out streams for {self._factory}',
f'Finished building recorder for {self._factory}',
)
self.logger = logger.getChild(self.__class__.__name__)
@@ -72,6 +74,10 @@ class FactoryBuilder:
self._factory.vban = vban(self._factory)
return self
def make_recorder(self):
self._factory.recorder = Recorder.make(self._factory)
return self
class FactoryBase(VbanCmd):
"""Base class for factories, subclasses VbanCmd."""
@@ -166,7 +172,7 @@ class BananaFactory(FactoryBase):
@property
def steps(self) -> Iterable:
"""steps required to build the interface for a kind"""
return self._steps
return self._steps + (self.builder.make_recorder,)
class PotatoFactory(FactoryBase):
@@ -188,7 +194,7 @@ class PotatoFactory(FactoryBase):
@property
def steps(self) -> Iterable:
"""steps required to build the interface for a kind"""
return self._steps
return self._steps + (self.builder.make_recorder,)
def vbancmd_factory(kind_id: str, **kwargs) -> VbanCmd:

View File

@@ -8,11 +8,15 @@ VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
VBAN_SERVICE_PING = 0
VBAN_SERVICE_PONG = 0 # PONG uses same service type as PING
VBAN_SERVICE_MASK = 0xE0
VBAN_PROTOCOL_MASK = 0xE0
VBAN_SERVICE_REQUESTREPLY = 0x02
VBAN_SERVICE_FNCT_REPLY = 0x02
PINGPONG_PACKET_SIZE = 704 # Size of the PING/PONG header + payload in bytes
MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
@@ -207,6 +211,92 @@ class VbanMatrixResponseHeader:
return header, payload
@dataclass
class VbanPingHeader:
"""Represents the header of a PING packet"""
name: str = 'PING0'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_PING
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def to_bytes(cls, framecounter: int = 0) -> bytes:
"""Creates the PING header bytes only."""
header = cls(framecounter=framecounter)
data = bytearray()
data.extend(header.vban)
data.extend(header.format_sr.to_bytes(1, 'little'))
data.extend(header.format_nbs.to_bytes(1, 'little'))
data.extend(header.format_nbc.to_bytes(1, 'little'))
data.extend(header.format_bit.to_bytes(1, 'little'))
data.extend(header.streamname)
data.extend(header.framecounter.to_bytes(4, 'little'))
return bytes(data)
@dataclass
class VbanPongHeader:
"""Represents the header of a PONG response packet"""
name: str = 'PING0'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_PONG
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a PONG response packet from bytes."""
parsed = _parse_vban_service_header(data)
# PONG responses use the same service type as PING (0x00)
# and are identified by having payload data
if parsed['format_nbc'] != VBAN_SERVICE_PONG:
raise ValueError(f'Not a PONG response packet: {parsed["format_nbc"]:02x}')
return cls(**parsed)
@classmethod
def is_pong_response(cls, data: bytes) -> bool:
"""Check if packet is a PONG response by analyzing the actual response format."""
try:
parsed = _parse_vban_service_header(data)
# Validate this is a service protocol packet with PING/PONG service type
if parsed['format_nbc'] != VBAN_SERVICE_PONG:
return False
if parsed['name'] not in ['PING0', 'VBAN Service']:
return False
# PONG should have payload data (same size as PING)
return len(data) >= PINGPONG_PACKET_SIZE
except (ValueError, Exception):
return False
@dataclass
class VbanRequestHeader:
"""Represents the header of a request packet"""
@@ -238,7 +328,7 @@ class VbanRequestHeader:
@property
def streamname(self) -> bytes:
return self.name.encode() + bytes(16 - len(self.name))
return self.name.encode()[:16].ljust(16, b'\x00')
@classmethod
def to_bytes(

124
vban_cmd/packet/ping0.py Normal file
View File

@@ -0,0 +1,124 @@
from dataclasses import dataclass
from enum import Enum
from .headers import VbanPingHeader
# VBAN PING bitType constants
VBANPING_TYPE_RECEPTOR = 0x00000001 # Simple receptor
VBANPING_TYPE_TRANSMITTER = 0x00000002 # Simple Transmitter
VBANPING_TYPE_RECEPTORSPOT = 0x00000004 # SPOT receptor
VBANPING_TYPE_TRANSMITTERSPOT = 0x00000008 # SPOT transmitter
VBANPING_TYPE_VIRTUALDEVICE = 0x00000010 # Virtual Device
VBANPING_TYPE_VIRTUALMIXER = 0x00000020 # Virtual Mixer
VBANPING_TYPE_MATRIX = 0x00000040 # MATRIX
VBANPING_TYPE_DAW = 0x00000080 # Workstation
VBANPING_TYPE_SERVER = 0x01000000 # VBAN SERVER
# VBAN PING bitfeature constants
VBANPING_FEATURE_AUDIO = 0x00000001
VBANPING_FEATURE_AOIP = 0x00000002
VBANPING_FEATURE_VOIP = 0x00000004
VBANPING_FEATURE_SERIAL = 0x00000100
VBANPING_FEATURE_MIDI = 0x00000300
VBANPING_FEATURE_FRAME = 0x00001000
VBANPING_FEATURE_TXT = 0x00010000
class VbanServerType(Enum):
"""VBAN server types detected from PONG responses"""
UNKNOWN = 0
VOICEMEETER = VBANPING_TYPE_VIRTUALMIXER
MATRIX = VBANPING_TYPE_MATRIX
@dataclass
class VbanPing0Payload:
"""Represents the VBAN PING0 payload structure as defined in the VBAN protocol documentation."""
def __init__(self):
self.bit_type = VBANPING_TYPE_RECEPTOR
self.bit_feature = VBANPING_FEATURE_TXT
self.bit_feature_ex = 0x00000000
self.preferred_rate = 48000
self.min_rate = 8000
self.max_rate = 192000
self.color_rgb = 0x00FF0000
self.version = b'\x01\x02\x03\x04'
self.gps_position = b'\x00' * 8
self.user_position = b'\x00' * 8
self.lang_code = b'EN\x00\x00\x00\x00\x00\x00'
self.reserved = b'\x00' * 8
self.reserved_ex = b'\x00' * 64
self.distant_ip = b'\x00' * 32
self.distant_port = 0
self.distant_reserved = 0
self.device_name = b'VBAN-CMD-Python\x00'.ljust(64, b'\x00')
self.manufacturer_name = b'Python-VBAN\x00'.ljust(64, b'\x00')
self.application_name = b'vban-cmd\x00'.ljust(64, b'\x00')
self.host_name = b'localhost\x00'.ljust(64, b'\x00')
self.user_name = b'Python User\x00'.ljust(128, b'\x00')
self.user_comment = b'VBAN CMD Python Client\x00'.ljust(128, b'\x00')
@classmethod
def to_bytes(cls) -> bytes:
"""Convert payload to bytes"""
payload = cls()
data = bytearray()
data.extend(payload.bit_type.to_bytes(4, 'little'))
data.extend(payload.bit_feature.to_bytes(4, 'little'))
data.extend(payload.bit_feature_ex.to_bytes(4, 'little'))
data.extend(payload.preferred_rate.to_bytes(4, 'little'))
data.extend(payload.min_rate.to_bytes(4, 'little'))
data.extend(payload.max_rate.to_bytes(4, 'little'))
data.extend(payload.color_rgb.to_bytes(4, 'little'))
data.extend(payload.version)
data.extend(payload.gps_position)
data.extend(payload.user_position)
data.extend(payload.lang_code)
data.extend(payload.reserved)
data.extend(payload.reserved_ex)
data.extend(payload.distant_ip)
data.extend(payload.distant_port.to_bytes(2, 'little'))
data.extend(payload.distant_reserved.to_bytes(2, 'little'))
data.extend(payload.device_name)
data.extend(payload.manufacturer_name)
data.extend(payload.application_name)
data.extend(payload.host_name)
data.extend(payload.user_name)
data.extend(payload.user_comment)
return bytes(data)
@classmethod
def create_packet(cls, framecounter: int) -> bytes:
"""Creates a complete PING packet with header and payload."""
data = bytearray()
data.extend(VbanPingHeader.to_bytes(framecounter))
data.extend(cls.to_bytes())
return bytes(data)
@staticmethod
def detect_server_type(pong_data: bytes) -> VbanServerType:
"""Detect server type from PONG response packet.
Args:
pong_data: Raw bytes from PONG response packet
Returns:
VbanServerType enum indicating the detected server type
"""
try:
if len(pong_data) >= 32:
frame_counter_bytes = pong_data[28:32]
frame_counter = int.from_bytes(frame_counter_bytes, 'little')
if frame_counter == VbanServerType.MATRIX.value:
return VbanServerType.MATRIX
elif frame_counter == VbanServerType.VOICEMEETER.value:
return VbanServerType.VOICEMEETER
return VbanServerType.UNKNOWN
except Exception:
return VbanServerType.UNKNOWN

138
vban_cmd/recorder.py Normal file
View File

@@ -0,0 +1,138 @@
import os
import re
from .error import VBANCMDError
from .iremote import IRemote
from .meta import action_fn
class Recorder(IRemote):
"""
Implements the common interface
Defines concrete implementation for recorder
"""
@classmethod
def make(cls, remote):
"""
Factory function for recorder class.
Returns a Recorder class of a kind.
"""
Recorder_cls = type(
f'Recorder{remote.kind}',
(cls,),
{
**{
param: action_fn(param)
for param in [
'play',
'stop',
'pause',
'replay',
'record',
'ff',
'rew',
]
},
},
)
return Recorder_cls(remote)
def __str__(self):
return f'{type(self).__name__}'
@property
def identifier(self) -> str:
return 'recorder'
@property
def samplerate(self) -> int:
return
@samplerate.setter
def samplerate(self, val: int):
opts = (22050, 24000, 32000, 44100, 48000, 88200, 96000, 176400, 192000)
if val not in opts:
self.logger.warning(f'samplerate got: {val} but expected a value in {opts}')
self.setter('samplerate', val)
@property
def bitresolution(self) -> int:
return
@bitresolution.setter
def bitresolution(self, val: int):
opts = (8, 16, 24, 32)
if val not in opts:
self.logger.warning(
f'bitresolution got: {val} but expected a value in {opts}'
)
self.setter('bitresolution', val)
@property
def channel(self) -> int:
return
@channel.setter
def channel(self, val: int):
if not 1 <= val <= 8:
self.logger.warning(f'channel got: {val} but expected a value from 1 to 8')
self.setter('channel', val)
@property
def kbps(self):
return
@kbps.setter
def kbps(self, val: int):
opts = (32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320)
if val not in opts:
self.logger.warning(f'kbps got: {val} but expected a value in {opts}')
self.setter('kbps', val)
@property
def gain(self) -> float:
return
@gain.setter
def gain(self, val: float):
self.setter('gain', val)
def load(self, file: os.PathLike):
try:
# Convert to string, use forward slashes, and wrap in quotes for spaces
file_path = f'"{os.fspath(file).replace(chr(92), "/")}"'
self.setter('load', file_path)
except UnicodeError:
raise VBANCMDError('File full directory must be a raw string')
def goto(self, time_str):
def get_sec():
"""Get seconds from time string"""
h, m, s = time_str.split(':')
return int(h) * 3600 + int(m) * 60 + int(s)
time_str = str(time_str) # coerce the type
if (
re.match(
r'^(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d)$',
time_str,
)
is not None
):
self.setter('goto', get_sec())
else:
self.logger.warning(
"goto expects a string that matches the format 'hh:mm:ss'"
)
def filetype(self, val: str):
opts = {'wav': 1, 'aiff': 2, 'bwf': 3, 'mp3': 100}
try:
self.setter('filetype', opts[val.lower()])
except KeyError:
self.logger.warning(
f'filetype got: {val} but expected a value in {list(opts.keys())}'
)

View File

@@ -8,9 +8,14 @@ from queue import Queue
from typing import Union
from .enums import NBS
from .error import VBANCMDError
from .error import VBANCMDConnectionError, VBANCMDError
from .event import Event
from .packet.headers import VbanMatrixResponseHeader, VbanRequestHeader
from .packet.headers import (
VbanMatrixResponseHeader,
VbanPongHeader,
VbanRequestHeader,
)
from .packet.ping0 import VbanPing0Payload, VbanServerType
from .subject import Subject
from .util import bump_framecounter, deep_merge
from .worker import Producer, Subscriber, Updater
@@ -39,8 +44,10 @@ class VbanCmd(abc.ABC):
setattr(self, attr, val)
self._framecounter = 0
self._framecounter_lock = threading.Lock()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.settimeout(self.timeout)
self.subject = self.observer = Subject()
self.cache = {}
self._pdirty = False
@@ -86,7 +93,11 @@ class VbanCmd(abc.ABC):
self.logout()
def login(self) -> None:
"""Starts the subscriber and updater threads (unless disable_rt_listeners is True) and logs into Voicemeeter."""
"""Sends a PING packet to the VBAN server to verify connectivity and detect server type.
If the server is detected as Matrix, RT listeners will be disabled for compatibility.
"""
self._ping()
if not self.disable_rt_listeners:
self.event.info()
@@ -120,6 +131,95 @@ class VbanCmd(abc.ABC):
def stopped(self):
return self.stop_event is None or self.stop_event.is_set()
def _get_next_framecounter(self) -> int:
"""Thread-safe method to get and increment framecounter."""
with self._framecounter_lock:
current = self._framecounter
self._framecounter = bump_framecounter(self._framecounter)
return current
def _ping(self, timeout: float = None) -> None:
"""Send a PING packet and wait for PONG response to verify connectivity."""
if timeout is None:
timeout = min(self.timeout, 3.0)
ping_packet = VbanPing0Payload.create_packet(self._get_next_framecounter())
original_timeout = self.sock.gettimeout()
self.sock.settimeout(0.5)
try:
self.sock.sendto(ping_packet, (socket.gethostbyname(self.ip), self.port))
self.logger.debug(f'PING sent to {self.ip}:{self.port}')
start_time = time.time()
response_count = 0
while time.time() - start_time < timeout:
try:
data, addr = self.sock.recvfrom(2048)
response_count += 1
self.logger.debug(
f'Received packet #{response_count} from {addr}: {len(data)} bytes'
)
self.logger.debug(
f'Response header: {data[: min(32, len(data))].hex()}'
)
if VbanPongHeader.is_pong_response(data):
self.logger.debug(
f'PONG received from {addr}, connectivity confirmed'
)
server_type = VbanPing0Payload.detect_server_type(data)
self._handle_server_type(server_type)
return # Exit after successful PONG response
else:
if len(data) >= 8:
if data[:4] == b'VBAN':
protocol = data[4] & 0xE0
nbc = data[6]
self.logger.debug(
f'Non-PONG VBAN packet: protocol=0x{protocol:02x}, nbc=0x{nbc:02x}'
)
else:
self.logger.debug('Non-VBAN packet received')
except socket.timeout:
continue
self.logger.debug(
f'PING timeout after {timeout}s, received {response_count} non-PONG packets'
)
raise VBANCMDConnectionError(
f'PING timeout: No response from {self.ip}:{self.port} after {timeout}s'
)
except socket.gaierror as e:
raise VBANCMDConnectionError(f'Unable to resolve hostname {self.ip}') from e
except Exception as e:
raise VBANCMDConnectionError(f'PING failed: {e}') from e
finally:
self.sock.settimeout(original_timeout)
def _handle_server_type(self, server_type: VbanServerType) -> None:
"""Handle the detected server type by adjusting settings accordingly."""
match server_type:
case VbanServerType.VOICEMEETER:
self.logger.debug(
'Detected Voicemeeter VBAN server - RT listeners supported'
)
case VbanServerType.MATRIX:
self.logger.info(
'Detected Matrix VBAN server - disabling RT listeners for compatibility'
)
self.disable_rt_listeners = True
case _:
self.logger.debug(
f'Unknown server type ({server_type}) - using default settings'
)
def _send_request(self, payload: str) -> None:
"""Sends a request packet over the network and bumps the framecounter."""
self.sock.sendto(
@@ -127,12 +227,11 @@ class VbanCmd(abc.ABC):
name=self.streamname,
bps_index=self.BPS_OPTS.index(self.bps),
channel=self.channel,
framecounter=self._framecounter,
framecounter=self._get_next_framecounter(),
payload=payload,
),
(socket.gethostbyname(self.ip), self.port),
)
self._framecounter = bump_framecounter(self._framecounter)
def _set_rt(self, cmd: str, val: Union[str, float]):
"""Sends a string request command over a network."""
@@ -146,12 +245,16 @@ class VbanCmd(abc.ABC):
if self.disable_rt_listeners and script.endswith(('?', '?;')):
try:
response = VbanMatrixResponseHeader.extract_payload(
self.sock.recv(1024)
)
return response
data, _ = self.sock.recvfrom(2048)
payload = VbanMatrixResponseHeader.extract_payload(data)
except ValueError as e:
self.logger.warning(f'Error extracting matrix response: {e}')
except TimeoutError as e:
self.logger.exception(f'Timeout waiting for matrix response: {e}')
raise VBANCMDConnectionError(
f'Timeout waiting for response from {self.ip}:{self.port}'
) from e
return payload
time.sleep(self.DELAY)

View File

@@ -1,5 +1,4 @@
import logging
import socket
import threading
import time
@@ -13,7 +12,6 @@ from .packet.headers import (
)
from .packet.nbs0 import VbanPacketNBS0
from .packet.nbs1 import VbanPacketNBS1
from .util import bump_framecounter
logger = logging.getLogger(__name__)
@@ -26,24 +24,24 @@ class Subscriber(threading.Thread):
self._remote = remote
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__)
self._framecounter = 0
def run(self):
while not self.stopped():
try:
for nbs in NBS:
sub_packet = VbanSubscribeHeader().to_bytes(nbs, self._framecounter)
sub_packet = VbanSubscribeHeader().to_bytes(
nbs, self._remote._get_next_framecounter()
)
self._remote.sock.sendto(
sub_packet, (self._remote.ip, self._remote.port)
)
self._framecounter = bump_framecounter(self._framecounter)
self.wait_until_stopped(10)
except socket.gaierror as e:
except TimeoutError as e:
self.logger.exception(f'{type(e).__name__}: {e}')
raise VBANCMDConnectionError(
f'unable to resolve hostname {self._remote.ip}'
f'timeout sending subscription to {self._remote.ip}:{self._remote.port}'
) from e
self.wait_until_stopped(10)
self.logger.debug(f'terminating {self.name} thread')
def stopped(self):
@@ -66,7 +64,6 @@ class Producer(threading.Thread):
self.queue = queue
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__)
self._remote.sock.settimeout(self._remote.timeout)
self._remote._public_packets = [None] * (max(NBS) + 1)
_pp = self._get_rt()
self._remote._public_packets[_pp.nbs] = _pp
@@ -77,16 +74,11 @@ class Producer(threading.Thread):
def _get_rt(self) -> VbanPacket:
"""Attempt to fetch data packet until a valid one found"""
while True:
if resp := self._fetch_rt_packet():
return resp
def _fetch_rt_packet(self) -> VbanPacket | None:
try:
data, _ = self._remote.sock.recvfrom(2048)
if len(data) < HEADER_SIZE:
return
continue
except TimeoutError as e:
self.logger.exception(f'{type(e).__name__}: {e}')
raise VBANCMDConnectionError(
@@ -97,7 +89,7 @@ class Producer(threading.Thread):
header = VbanResponseHeader.from_bytes(data[:HEADER_SIZE])
except ValueError as e:
self.logger.debug(f'Error parsing response packet: {e}')
return None
continue
match header.format_nbs:
case NBS.zero:
@@ -110,8 +102,6 @@ class Producer(threading.Thread):
nbs=NBS.one, kind=self._remote.kind, data=data
)
return None
def stopped(self):
return self.stop_event.is_set()