add more enums so we can remove some of the constants

rename some of the packet classes

patch bump
This commit is contained in:
onyx-and-iris 2026-03-07 00:03:46 +00:00
parent 3cde874a3c
commit 9f43ee18d3
7 changed files with 250 additions and 231 deletions

View File

@ -1,6 +1,6 @@
[project] [project]
name = "vban-cmd" name = "vban-cmd"
version = "2.9.6" version = "2.9.7"
description = "Python interface for the VBAN RT Packet Service (Sendtext)" description = "Python interface for the VBAN RT Packet Service (Sendtext)"
authors = [{ name = "Onyx and Iris", email = "code@onyxandiris.online" }] authors = [{ name = "Onyx and Iris", email = "code@onyxandiris.online" }]
license = { text = "MIT" } license = { text = "MIT" }

View File

@ -1,6 +1,36 @@
from enum import Flag from enum import Flag
class SubProtocols(Flag):
"""Sub Protocols - Bit flags that can be combined"""
AUDIO = 0x00
SERIAL = 0x20
TEXT = 0x40
SERVICE = 0x60
MASK = 0xE0
class ServiceTypes(Flag):
"""Service Types - Bit flags that can be combined"""
PING = 0
PONG = 0
CHATUTF8 = 1
RTPACKETREGISTER = 32
RTPACKET = 33
REQUESTREPLY = 0x02 # A Matrix reply
FNCT_REPLY = 0x80 # An RTPacket reply
class StreamTypes(Flag):
"""Stream Types - Bit flags that can be combined"""
ASCII = 0x00
UTF8 = 0x10
WCHAR = 0x20
class ChannelModes(Flag): class ChannelModes(Flag):
"""Channel Modes - Bit flags that can be combined""" """Channel Modes - Bit flags that can be combined"""

View File

@ -3,222 +3,21 @@ from dataclasses import dataclass
from vban_cmd.enums import NBS from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass from vban_cmd.kinds import KindMapClass
VBAN_PROTOCOL_TXT = 0x40 from .enums import ServiceTypes, StreamTypes, SubProtocols
VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
VBAN_SERVICE_PING = 0
VBAN_SERVICE_PONG = 0 # PONG uses same service type as PING
VBAN_SERVICE_MASK = 0xE0
VBAN_PROTOCOL_MASK = 0xE0
VBAN_SERVICE_REQUESTREPLY = 0x02
VBAN_SERVICE_FNCT_REPLY = 0x02
PINGPONG_PACKET_SIZE = 704 # Size of the PING/PONG header + payload in bytes PINGPONG_PACKET_SIZE = 704 # Size of the PING/PONG header + payload in bytes
MAX_PACKET_SIZE = 1436 MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
@dataclass
class VbanPacket:
"""Represents the header of an incoming VBAN data packet"""
nbs: NBS
_kind: KindMapClass
_voicemeeterType: bytes
_reserved: bytes
_buffersize: bytes
_voicemeeterVersion: bytes
_optionBits: bytes
_samplerate: bytes
@property
def voicemeetertype(self) -> str:
"""returns voicemeeter type as a string"""
return ['', 'basic', 'banana', 'potato'][
int.from_bytes(self._voicemeeterType, 'little')
]
@property
def voicemeeterversion(self) -> tuple:
"""returns voicemeeter version as a tuple"""
return tuple(self._voicemeeterVersion[i] for i in range(3, -1, -1))
@property
def samplerate(self) -> int:
"""returns samplerate as an int"""
return int.from_bytes(self._samplerate, 'little')
@dataclass
class VbanSubscribeHeader:
"""Represents the header of a subscription packet"""
nbs: NBS = NBS.zero
name: str = 'Register-RTP'
timeout: int = 15
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def format_sr(self) -> bytes:
return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little')
@property
def format_nbs(self) -> bytes:
return (self.nbs.value & 0xFF).to_bytes(1, 'little')
@property
def format_nbc(self) -> bytes:
return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little')
@property
def format_bit(self) -> bytes:
return (self.timeout & 0xFF).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
@classmethod
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
header = cls(nbs=nbs)
data = bytearray()
data.extend(header.vban)
data.extend(header.format_sr)
data.extend(header.format_nbs)
data.extend(header.format_nbc)
data.extend(header.format_bit)
data.extend(header.streamname)
data.extend(framecounter.to_bytes(4, 'little'))
return bytes(data)
def _parse_vban_service_header(data: bytes) -> dict:
"""Common parsing and validation for VBAN service protocol headers."""
if len(data) < HEADER_SIZE:
raise ValueError('Data is too short to be a valid VBAN header')
if data[:4] != b'VBAN':
raise ValueError('Invalid VBAN magic bytes')
format_sr = data[4]
format_nbs = data[5]
format_nbc = data[6]
format_bit = data[7]
# Verify this is a service protocol packet
protocol = format_sr & VBAN_PROTOCOL_MASK
if protocol != VBAN_PROTOCOL_SERVICE:
raise ValueError(f'Not a service protocol packet: {protocol:02x}')
# Extract stream name and frame counter
name = data[8:24].rstrip(b'\x00').decode('utf-8', errors='ignore')
framecounter = int.from_bytes(data[24:28], 'little')
return {
'format_sr': format_sr,
'format_nbs': format_nbs,
'format_nbc': format_nbc,
'format_bit': format_bit,
'name': name,
'framecounter': framecounter,
}
@dataclass
class VbanResponseHeader:
"""Represents the header of a response packet"""
name: str = 'Voicemeeter-RTP'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_RTPACKET
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a VbanResponseHeader from bytes."""
parsed = _parse_vban_service_header(data)
# Validate this is an RTPacket response
if parsed['format_nbc'] != VBAN_SERVICE_RTPACKET:
raise ValueError(
f'Not an RTPacket response packet: {parsed["format_nbc"]:02x}'
)
return cls(**parsed)
@dataclass
class VbanMatrixResponseHeader:
"""Represents the header of a matrix response packet"""
name: str = 'Request Reply'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = VBAN_SERVICE_FNCT_REPLY
format_nbc: int = VBAN_SERVICE_REQUESTREPLY
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a matrix response packet from bytes."""
parsed = _parse_vban_service_header(data)
# Validate this is a service reply packet
if parsed['format_nbs'] != VBAN_SERVICE_FNCT_REPLY:
raise ValueError(f'Not a service reply packet: {parsed["format_nbs"]:02x}')
return cls(**parsed)
@classmethod
def extract_payload(cls, data: bytes) -> str:
"""Extract the text payload from a matrix response packet."""
if len(data) <= HEADER_SIZE:
return ''
payload_bytes = data[HEADER_SIZE:]
return payload_bytes.rstrip(b'\x00').decode('utf-8', errors='ignore')
@classmethod
def parse_response(cls, data: bytes) -> tuple['VbanMatrixResponseHeader', str]:
"""Parse a complete matrix response packet returning header and payload."""
header = cls.from_bytes(data)
payload = cls.extract_payload(data)
return header, payload
@dataclass @dataclass
class VbanPingHeader: class VbanPingHeader:
"""Represents the header of a PING packet""" """Represents the header of a PING packet"""
name: str = 'PING0' name: str = 'PING0'
format_sr: int = VBAN_PROTOCOL_SERVICE format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0 format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_PING format_nbc: int = ServiceTypes.PING.value
format_bit: int = 0 format_bit: int = 0
framecounter: int = 0 framecounter: int = 0
@ -251,9 +50,9 @@ class VbanPongHeader:
"""Represents the header of a PONG response packet""" """Represents the header of a PONG response packet"""
name: str = 'PING0' name: str = 'PING0'
format_sr: int = VBAN_PROTOCOL_SERVICE format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0 format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_PONG format_nbc: int = ServiceTypes.PONG.value
format_bit: int = 0 format_bit: int = 0
framecounter: int = 0 framecounter: int = 0
@ -272,7 +71,7 @@ class VbanPongHeader:
# PONG responses use the same service type as PING (0x00) # PONG responses use the same service type as PING (0x00)
# and are identified by having payload data # and are identified by having payload data
if parsed['format_nbc'] != VBAN_SERVICE_PONG: if parsed['format_nbc'] != ServiceTypes.PONG.value:
raise ValueError(f'Not a PONG response packet: {parsed["format_nbc"]:02x}') raise ValueError(f'Not a PONG response packet: {parsed["format_nbc"]:02x}')
return cls(**parsed) return cls(**parsed)
@ -284,7 +83,7 @@ class VbanPongHeader:
parsed = _parse_vban_service_header(data) parsed = _parse_vban_service_header(data)
# Validate this is a service protocol packet with PING/PONG service type # Validate this is a service protocol packet with PING/PONG service type
if parsed['format_nbc'] != VBAN_SERVICE_PONG: if parsed['format_nbc'] != ServiceTypes.PONG.value:
return False return False
if parsed['name'] not in ['PING0', 'VBAN Service']: if parsed['name'] not in ['PING0', 'VBAN Service']:
@ -298,8 +97,86 @@ class VbanPongHeader:
@dataclass @dataclass
class VbanRequestHeader: class VbanRTPacket:
"""Represents the header of a request packet""" """Represents the header of an incoming RTPacket"""
nbs: NBS
_kind: KindMapClass
_voicemeeterType: bytes
_reserved: bytes
_buffersize: bytes
_voicemeeterVersion: bytes
_optionBits: bytes
_samplerate: bytes
@property
def voicemeetertype(self) -> str:
"""returns voicemeeter type as a string"""
return ['', 'basic', 'banana', 'potato'][
int.from_bytes(self._voicemeeterType, 'little')
]
@property
def voicemeeterversion(self) -> tuple:
"""returns voicemeeter version as a tuple"""
return tuple(self._voicemeeterVersion[i] for i in range(3, -1, -1))
@property
def samplerate(self) -> int:
"""returns samplerate as an int"""
return int.from_bytes(self._samplerate, 'little')
@dataclass
class VbanRTSubscribeHeader:
"""Represents the header of an RT subscription packet"""
nbs: NBS = NBS.zero
name: str = 'Register-RTP'
timeout: int = 15
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def format_sr(self) -> bytes:
return SubProtocols.SERVICE.value.to_bytes(1, 'little')
@property
def format_nbs(self) -> bytes:
return (self.nbs.value & 0xFF).to_bytes(1, 'little')
@property
def format_nbc(self) -> bytes:
return ServiceTypes.RTPACKETREGISTER.value.to_bytes(1, 'little')
@property
def format_bit(self) -> bytes:
return (self.timeout & 0xFF).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
@classmethod
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
header = cls(nbs=nbs)
data = bytearray()
data.extend(header.vban)
data.extend(header.format_sr)
data.extend(header.format_nbs)
data.extend(header.format_nbc)
data.extend(header.format_bit)
data.extend(header.streamname)
data.extend(framecounter.to_bytes(4, 'little'))
return bytes(data)
@dataclass
class VbanRTRequestHeader:
"""Represents the header of an RT request packet"""
name: str name: str
bps_index: int bps_index: int
@ -312,7 +189,7 @@ class VbanRequestHeader:
@property @property
def sr(self) -> bytes: def sr(self) -> bytes:
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little') return (self.bps_index | SubProtocols.TEXT.value).to_bytes(1, 'little')
@property @property
def nbs(self) -> bytes: def nbs(self) -> bytes:
@ -324,7 +201,7 @@ class VbanRequestHeader:
@property @property
def bit(self) -> bytes: def bit(self) -> bytes:
return (0x10).to_bytes(1, 'little') return (StreamTypes.UTF8.value).to_bytes(1, 'little')
@property @property
def streamname(self) -> bytes: def streamname(self) -> bytes:
@ -354,3 +231,115 @@ class VbanRequestHeader:
) -> bytes: ) -> bytes:
"""Creates the complete packet with header and payload.""" """Creates the complete packet with header and payload."""
return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode() return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode()
def _parse_vban_service_header(data: bytes) -> dict:
"""Common parsing and validation for VBAN service protocol headers."""
if len(data) < HEADER_SIZE:
raise ValueError('Data is too short to be a valid VBAN header')
if data[:4] != b'VBAN':
raise ValueError('Invalid VBAN magic bytes')
format_sr = data[4]
format_nbs = data[5]
format_nbc = data[6]
format_bit = data[7]
# Verify this is a service protocol packet
protocol = format_sr & SubProtocols.MASK.value
if protocol != SubProtocols.SERVICE.value:
raise ValueError(f'Not a service protocol packet: {protocol:02x}')
# Extract stream name and frame counter
name = data[8:24].rstrip(b'\x00').decode('utf-8', errors='ignore')
framecounter = int.from_bytes(data[24:28], 'little')
return {
'format_sr': format_sr,
'format_nbs': format_nbs,
'format_nbc': format_nbc,
'format_bit': format_bit,
'name': name,
'framecounter': framecounter,
}
@dataclass
class VbanRTResponseHeader:
"""Represents the header of an RT response packet"""
name: str = 'Voicemeeter-RTP'
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0
format_nbc: int = ServiceTypes.RTPACKET.value
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a VbanResponseHeader from bytes."""
parsed = _parse_vban_service_header(data)
# Validate this is an RTPacket response
if parsed['format_nbc'] != ServiceTypes.RTPACKET.value:
raise ValueError(
f'Not an RTPacket response packet: {parsed["format_nbc"]:02x}'
)
return cls(**parsed)
@dataclass
class VbanMatrixResponseHeader:
"""Represents the header of a matrix response packet"""
name: str = 'Request Reply'
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = ServiceTypes.FNCT_REPLY.value
format_nbc: int = ServiceTypes.REQUESTREPLY.value
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a matrix response packet from bytes."""
parsed = _parse_vban_service_header(data)
# Validate this is a service reply packet
if parsed['format_nbs'] != ServiceTypes.FNCT_REPLY.value:
raise ValueError(f'Not a service reply packet: {parsed["format_nbs"]:02x}')
return cls(**parsed)
@classmethod
def extract_payload(cls, data: bytes) -> str:
"""Extract the text payload from a matrix response packet."""
if len(data) <= HEADER_SIZE:
return ''
payload_bytes = data[HEADER_SIZE:]
return payload_bytes.rstrip(b'\x00').decode('utf-8', errors='ignore')
@classmethod
def parse_response(cls, data: bytes) -> tuple['VbanMatrixResponseHeader', str]:
"""Parse a complete matrix response packet returning header and payload."""
header = cls.from_bytes(data)
payload = cls.extract_payload(data)
return header, payload

View File

@ -6,7 +6,7 @@ from vban_cmd.kinds import KindMapClass
from vban_cmd.util import comp from vban_cmd.util import comp
from .enums import ChannelModes from .enums import ChannelModes
from .headers import VbanPacket from .headers import VbanRTPacket
class Levels(NamedTuple): class Levels(NamedTuple):
@ -96,8 +96,8 @@ class Labels(NamedTuple):
@dataclass @dataclass
class VbanPacketNBS0(VbanPacket): class VbanRTPacketNBS0(VbanRTPacket):
"""Represents the body of a VBAN data packet with ident:0""" """Represents the body of a VBAN RTPacket with ident:0"""
_inputLeveldB100: bytes _inputLeveldB100: bytes
_outputLeveldB100: bytes _outputLeveldB100: bytes

View File

@ -5,7 +5,7 @@ from typing import NamedTuple
from vban_cmd.enums import NBS from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass from vban_cmd.kinds import KindMapClass
from .headers import VbanPacket from .headers import VbanRTPacket
VMPARAMSTRIP_SIZE = 174 VMPARAMSTRIP_SIZE = 174
@ -327,8 +327,8 @@ class VbanVMParamStrip:
@dataclass @dataclass
class VbanPacketNBS1(VbanPacket): class VbanRTPacketNBS1(VbanRTPacket):
"""Represents the body of a VBAN data packet with ident:1""" """Represents the body of a VBAN RTPacket with ident:1"""
strips: tuple[VbanVMParamStrip, ...] strips: tuple[VbanVMParamStrip, ...]

View File

@ -13,7 +13,7 @@ from .event import Event
from .packet.headers import ( from .packet.headers import (
VbanMatrixResponseHeader, VbanMatrixResponseHeader,
VbanPongHeader, VbanPongHeader,
VbanRequestHeader, VbanRTRequestHeader,
) )
from .packet.ping0 import VbanPing0Payload, VbanServerType from .packet.ping0 import VbanPing0Payload, VbanServerType
from .subject import Subject from .subject import Subject
@ -201,7 +201,7 @@ class VbanCmd(abc.ABC):
def _send_request(self, payload: str) -> None: def _send_request(self, payload: str) -> None:
"""Sends a request packet over the network and bumps the framecounter.""" """Sends a request packet over the network and bumps the framecounter."""
self.sock.sendto( self.sock.sendto(
VbanRequestHeader.encode_with_payload( VbanRTRequestHeader.encode_with_payload(
name=self.streamname, name=self.streamname,
bps_index=self.BPS_OPTS.index(self.bps), bps_index=self.BPS_OPTS.index(self.bps),
channel=self.channel, channel=self.channel,

View File

@ -6,12 +6,12 @@ from .enums import NBS
from .error import VBANCMDConnectionError from .error import VBANCMDConnectionError
from .packet.headers import ( from .packet.headers import (
HEADER_SIZE, HEADER_SIZE,
VbanPacket, VbanRTPacket,
VbanResponseHeader, VbanRTResponseHeader,
VbanSubscribeHeader, VbanRTSubscribeHeader,
) )
from .packet.nbs0 import VbanPacketNBS0 from .packet.nbs0 import VbanRTPacketNBS0
from .packet.nbs1 import VbanPacketNBS1 from .packet.nbs1 import VbanRTPacketNBS1
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -28,7 +28,7 @@ class Subscriber(threading.Thread):
def run(self): def run(self):
while not self.stopped(): while not self.stopped():
for nbs in NBS: for nbs in NBS:
sub_packet = VbanSubscribeHeader().to_bytes( sub_packet = VbanRTSubscribeHeader().to_bytes(
nbs, self._remote._get_next_framecounter() nbs, self._remote._get_next_framecounter()
) )
self._remote.sock.sendto( self._remote.sock.sendto(
@ -66,7 +66,7 @@ class Producer(threading.Thread):
self._remote.cache['bus_level'], self._remote.cache['bus_level'],
) = self._remote.public_packets[NBS.zero].levels ) = self._remote.public_packets[NBS.zero].levels
def _get_rt(self) -> VbanPacket: def _get_rt(self) -> VbanRTPacket:
"""Attempt to fetch data packet until a valid one found""" """Attempt to fetch data packet until a valid one found"""
while True: while True:
try: try:
@ -80,19 +80,19 @@ class Producer(threading.Thread):
) from e ) from e
try: try:
header = VbanResponseHeader.from_bytes(data[:HEADER_SIZE]) header = VbanRTResponseHeader.from_bytes(data[:HEADER_SIZE])
except ValueError as e: except ValueError as e:
self.logger.debug(f'Error parsing response packet: {e}') self.logger.debug(f'Error parsing response packet: {e}')
continue continue
match header.format_nbs: match header.format_nbs:
case NBS.zero: case NBS.zero:
return VbanPacketNBS0.from_bytes( return VbanRTPacketNBS0.from_bytes(
nbs=NBS.zero, kind=self._remote.kind, data=data nbs=NBS.zero, kind=self._remote.kind, data=data
) )
case NBS.one: case NBS.one:
return VbanPacketNBS1.from_bytes( return VbanRTPacketNBS1.from_bytes(
nbs=NBS.one, kind=self._remote.kind, data=data nbs=NBS.one, kind=self._remote.kind, data=data
) )