From 9f43ee18d367b4b9797a07550a68d466dab3c7fe Mon Sep 17 00:00:00 2001 From: onyx-and-iris Date: Sat, 7 Mar 2026 00:03:46 +0000 Subject: [PATCH] add more enums so we can remove some of the constants rename some of the packet classes patch bump --- pyproject.toml | 2 +- vban_cmd/packet/enums.py | 30 +++ vban_cmd/packet/headers.py | 413 ++++++++++++++++++------------------- vban_cmd/packet/nbs0.py | 6 +- vban_cmd/packet/nbs1.py | 6 +- vban_cmd/vbancmd.py | 4 +- vban_cmd/worker.py | 20 +- 7 files changed, 250 insertions(+), 231 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c80caf6..3ffb2ee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "vban-cmd" -version = "2.9.6" +version = "2.9.7" description = "Python interface for the VBAN RT Packet Service (Sendtext)" authors = [{ name = "Onyx and Iris", email = "code@onyxandiris.online" }] license = { text = "MIT" } diff --git a/vban_cmd/packet/enums.py b/vban_cmd/packet/enums.py index d5081d9..3e78d07 100644 --- a/vban_cmd/packet/enums.py +++ b/vban_cmd/packet/enums.py @@ -1,6 +1,36 @@ from enum import Flag +class SubProtocols(Flag): + """Sub Protocols - Bit flags that can be combined""" + + AUDIO = 0x00 + SERIAL = 0x20 + TEXT = 0x40 + SERVICE = 0x60 + MASK = 0xE0 + + +class ServiceTypes(Flag): + """Service Types - Bit flags that can be combined""" + + PING = 0 + PONG = 0 + CHATUTF8 = 1 + RTPACKETREGISTER = 32 + RTPACKET = 33 + REQUESTREPLY = 0x02 # A Matrix reply + FNCT_REPLY = 0x80 # An RTPacket reply + + +class StreamTypes(Flag): + """Stream Types - Bit flags that can be combined""" + + ASCII = 0x00 + UTF8 = 0x10 + WCHAR = 0x20 + + class ChannelModes(Flag): """Channel Modes - Bit flags that can be combined""" diff --git a/vban_cmd/packet/headers.py b/vban_cmd/packet/headers.py index 65a0543..c7107b6 100644 --- a/vban_cmd/packet/headers.py +++ b/vban_cmd/packet/headers.py @@ -3,222 +3,21 @@ from dataclasses import dataclass from vban_cmd.enums import NBS from vban_cmd.kinds import KindMapClass -VBAN_PROTOCOL_TXT = 0x40 -VBAN_PROTOCOL_SERVICE = 0x60 - -VBAN_SERVICE_RTPACKETREGISTER = 32 -VBAN_SERVICE_RTPACKET = 33 -VBAN_SERVICE_PING = 0 -VBAN_SERVICE_PONG = 0 # PONG uses same service type as PING -VBAN_SERVICE_MASK = 0xE0 -VBAN_PROTOCOL_MASK = 0xE0 -VBAN_SERVICE_REQUESTREPLY = 0x02 -VBAN_SERVICE_FNCT_REPLY = 0x02 +from .enums import ServiceTypes, StreamTypes, SubProtocols PINGPONG_PACKET_SIZE = 704 # Size of the PING/PONG header + payload in bytes - MAX_PACKET_SIZE = 1436 HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 -@dataclass -class VbanPacket: - """Represents the header of an incoming VBAN data packet""" - - nbs: NBS - _kind: KindMapClass - _voicemeeterType: bytes - _reserved: bytes - _buffersize: bytes - _voicemeeterVersion: bytes - _optionBits: bytes - _samplerate: bytes - - @property - def voicemeetertype(self) -> str: - """returns voicemeeter type as a string""" - return ['', 'basic', 'banana', 'potato'][ - int.from_bytes(self._voicemeeterType, 'little') - ] - - @property - def voicemeeterversion(self) -> tuple: - """returns voicemeeter version as a tuple""" - return tuple(self._voicemeeterVersion[i] for i in range(3, -1, -1)) - - @property - def samplerate(self) -> int: - """returns samplerate as an int""" - return int.from_bytes(self._samplerate, 'little') - - -@dataclass -class VbanSubscribeHeader: - """Represents the header of a subscription packet""" - - nbs: NBS = NBS.zero - name: str = 'Register-RTP' - timeout: int = 15 - - @property - def vban(self) -> bytes: - return b'VBAN' - - @property - def format_sr(self) -> bytes: - return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little') - - @property - def format_nbs(self) -> bytes: - return (self.nbs.value & 0xFF).to_bytes(1, 'little') - - @property - def format_nbc(self) -> bytes: - return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little') - - @property - def format_bit(self) -> bytes: - return (self.timeout & 0xFF).to_bytes(1, 'little') - - @property - def streamname(self) -> bytes: - return self.name.encode('ascii') + bytes(16 - len(self.name)) - - @classmethod - def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes: - header = cls(nbs=nbs) - - data = bytearray() - data.extend(header.vban) - data.extend(header.format_sr) - data.extend(header.format_nbs) - data.extend(header.format_nbc) - data.extend(header.format_bit) - data.extend(header.streamname) - data.extend(framecounter.to_bytes(4, 'little')) - return bytes(data) - - -def _parse_vban_service_header(data: bytes) -> dict: - """Common parsing and validation for VBAN service protocol headers.""" - if len(data) < HEADER_SIZE: - raise ValueError('Data is too short to be a valid VBAN header') - - if data[:4] != b'VBAN': - raise ValueError('Invalid VBAN magic bytes') - - format_sr = data[4] - format_nbs = data[5] - format_nbc = data[6] - format_bit = data[7] - - # Verify this is a service protocol packet - protocol = format_sr & VBAN_PROTOCOL_MASK - if protocol != VBAN_PROTOCOL_SERVICE: - raise ValueError(f'Not a service protocol packet: {protocol:02x}') - - # Extract stream name and frame counter - name = data[8:24].rstrip(b'\x00').decode('utf-8', errors='ignore') - framecounter = int.from_bytes(data[24:28], 'little') - - return { - 'format_sr': format_sr, - 'format_nbs': format_nbs, - 'format_nbc': format_nbc, - 'format_bit': format_bit, - 'name': name, - 'framecounter': framecounter, - } - - -@dataclass -class VbanResponseHeader: - """Represents the header of a response packet""" - - name: str = 'Voicemeeter-RTP' - format_sr: int = VBAN_PROTOCOL_SERVICE - format_nbs: int = 0 - format_nbc: int = VBAN_SERVICE_RTPACKET - format_bit: int = 0 - framecounter: int = 0 - - @property - def vban(self) -> bytes: - return b'VBAN' - - @property - def streamname(self) -> bytes: - return self.name.encode('ascii') + bytes(16 - len(self.name)) - - @classmethod - def from_bytes(cls, data: bytes): - """Parse a VbanResponseHeader from bytes.""" - parsed = _parse_vban_service_header(data) - - # Validate this is an RTPacket response - if parsed['format_nbc'] != VBAN_SERVICE_RTPACKET: - raise ValueError( - f'Not an RTPacket response packet: {parsed["format_nbc"]:02x}' - ) - - return cls(**parsed) - - -@dataclass -class VbanMatrixResponseHeader: - """Represents the header of a matrix response packet""" - - name: str = 'Request Reply' - format_sr: int = VBAN_PROTOCOL_SERVICE - format_nbs: int = VBAN_SERVICE_FNCT_REPLY - format_nbc: int = VBAN_SERVICE_REQUESTREPLY - format_bit: int = 0 - framecounter: int = 0 - - @property - def vban(self) -> bytes: - return b'VBAN' - - @property - def streamname(self) -> bytes: - return self.name.encode('ascii')[:16].ljust(16, b'\x00') - - @classmethod - def from_bytes(cls, data: bytes): - """Parse a matrix response packet from bytes.""" - parsed = _parse_vban_service_header(data) - - # Validate this is a service reply packet - if parsed['format_nbs'] != VBAN_SERVICE_FNCT_REPLY: - raise ValueError(f'Not a service reply packet: {parsed["format_nbs"]:02x}') - - return cls(**parsed) - - @classmethod - def extract_payload(cls, data: bytes) -> str: - """Extract the text payload from a matrix response packet.""" - if len(data) <= HEADER_SIZE: - return '' - - payload_bytes = data[HEADER_SIZE:] - return payload_bytes.rstrip(b'\x00').decode('utf-8', errors='ignore') - - @classmethod - def parse_response(cls, data: bytes) -> tuple['VbanMatrixResponseHeader', str]: - """Parse a complete matrix response packet returning header and payload.""" - header = cls.from_bytes(data) - payload = cls.extract_payload(data) - return header, payload - - @dataclass class VbanPingHeader: """Represents the header of a PING packet""" name: str = 'PING0' - format_sr: int = VBAN_PROTOCOL_SERVICE + format_sr: int = SubProtocols.SERVICE.value format_nbs: int = 0 - format_nbc: int = VBAN_SERVICE_PING + format_nbc: int = ServiceTypes.PING.value format_bit: int = 0 framecounter: int = 0 @@ -251,9 +50,9 @@ class VbanPongHeader: """Represents the header of a PONG response packet""" name: str = 'PING0' - format_sr: int = VBAN_PROTOCOL_SERVICE + format_sr: int = SubProtocols.SERVICE.value format_nbs: int = 0 - format_nbc: int = VBAN_SERVICE_PONG + format_nbc: int = ServiceTypes.PONG.value format_bit: int = 0 framecounter: int = 0 @@ -272,7 +71,7 @@ class VbanPongHeader: # PONG responses use the same service type as PING (0x00) # and are identified by having payload data - if parsed['format_nbc'] != VBAN_SERVICE_PONG: + if parsed['format_nbc'] != ServiceTypes.PONG.value: raise ValueError(f'Not a PONG response packet: {parsed["format_nbc"]:02x}') return cls(**parsed) @@ -284,7 +83,7 @@ class VbanPongHeader: parsed = _parse_vban_service_header(data) # Validate this is a service protocol packet with PING/PONG service type - if parsed['format_nbc'] != VBAN_SERVICE_PONG: + if parsed['format_nbc'] != ServiceTypes.PONG.value: return False if parsed['name'] not in ['PING0', 'VBAN Service']: @@ -298,8 +97,86 @@ class VbanPongHeader: @dataclass -class VbanRequestHeader: - """Represents the header of a request packet""" +class VbanRTPacket: + """Represents the header of an incoming RTPacket""" + + nbs: NBS + _kind: KindMapClass + _voicemeeterType: bytes + _reserved: bytes + _buffersize: bytes + _voicemeeterVersion: bytes + _optionBits: bytes + _samplerate: bytes + + @property + def voicemeetertype(self) -> str: + """returns voicemeeter type as a string""" + return ['', 'basic', 'banana', 'potato'][ + int.from_bytes(self._voicemeeterType, 'little') + ] + + @property + def voicemeeterversion(self) -> tuple: + """returns voicemeeter version as a tuple""" + return tuple(self._voicemeeterVersion[i] for i in range(3, -1, -1)) + + @property + def samplerate(self) -> int: + """returns samplerate as an int""" + return int.from_bytes(self._samplerate, 'little') + + +@dataclass +class VbanRTSubscribeHeader: + """Represents the header of an RT subscription packet""" + + nbs: NBS = NBS.zero + name: str = 'Register-RTP' + timeout: int = 15 + + @property + def vban(self) -> bytes: + return b'VBAN' + + @property + def format_sr(self) -> bytes: + return SubProtocols.SERVICE.value.to_bytes(1, 'little') + + @property + def format_nbs(self) -> bytes: + return (self.nbs.value & 0xFF).to_bytes(1, 'little') + + @property + def format_nbc(self) -> bytes: + return ServiceTypes.RTPACKETREGISTER.value.to_bytes(1, 'little') + + @property + def format_bit(self) -> bytes: + return (self.timeout & 0xFF).to_bytes(1, 'little') + + @property + def streamname(self) -> bytes: + return self.name.encode('ascii') + bytes(16 - len(self.name)) + + @classmethod + def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes: + header = cls(nbs=nbs) + + data = bytearray() + data.extend(header.vban) + data.extend(header.format_sr) + data.extend(header.format_nbs) + data.extend(header.format_nbc) + data.extend(header.format_bit) + data.extend(header.streamname) + data.extend(framecounter.to_bytes(4, 'little')) + return bytes(data) + + +@dataclass +class VbanRTRequestHeader: + """Represents the header of an RT request packet""" name: str bps_index: int @@ -312,7 +189,7 @@ class VbanRequestHeader: @property def sr(self) -> bytes: - return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little') + return (self.bps_index | SubProtocols.TEXT.value).to_bytes(1, 'little') @property def nbs(self) -> bytes: @@ -324,7 +201,7 @@ class VbanRequestHeader: @property def bit(self) -> bytes: - return (0x10).to_bytes(1, 'little') + return (StreamTypes.UTF8.value).to_bytes(1, 'little') @property def streamname(self) -> bytes: @@ -354,3 +231,115 @@ class VbanRequestHeader: ) -> bytes: """Creates the complete packet with header and payload.""" return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode() + + +def _parse_vban_service_header(data: bytes) -> dict: + """Common parsing and validation for VBAN service protocol headers.""" + if len(data) < HEADER_SIZE: + raise ValueError('Data is too short to be a valid VBAN header') + + if data[:4] != b'VBAN': + raise ValueError('Invalid VBAN magic bytes') + + format_sr = data[4] + format_nbs = data[5] + format_nbc = data[6] + format_bit = data[7] + + # Verify this is a service protocol packet + protocol = format_sr & SubProtocols.MASK.value + if protocol != SubProtocols.SERVICE.value: + raise ValueError(f'Not a service protocol packet: {protocol:02x}') + + # Extract stream name and frame counter + name = data[8:24].rstrip(b'\x00').decode('utf-8', errors='ignore') + framecounter = int.from_bytes(data[24:28], 'little') + + return { + 'format_sr': format_sr, + 'format_nbs': format_nbs, + 'format_nbc': format_nbc, + 'format_bit': format_bit, + 'name': name, + 'framecounter': framecounter, + } + + +@dataclass +class VbanRTResponseHeader: + """Represents the header of an RT response packet""" + + name: str = 'Voicemeeter-RTP' + format_sr: int = SubProtocols.SERVICE.value + format_nbs: int = 0 + format_nbc: int = ServiceTypes.RTPACKET.value + format_bit: int = 0 + framecounter: int = 0 + + @property + def vban(self) -> bytes: + return b'VBAN' + + @property + def streamname(self) -> bytes: + return self.name.encode('ascii') + bytes(16 - len(self.name)) + + @classmethod + def from_bytes(cls, data: bytes): + """Parse a VbanResponseHeader from bytes.""" + parsed = _parse_vban_service_header(data) + + # Validate this is an RTPacket response + if parsed['format_nbc'] != ServiceTypes.RTPACKET.value: + raise ValueError( + f'Not an RTPacket response packet: {parsed["format_nbc"]:02x}' + ) + + return cls(**parsed) + + +@dataclass +class VbanMatrixResponseHeader: + """Represents the header of a matrix response packet""" + + name: str = 'Request Reply' + format_sr: int = SubProtocols.SERVICE.value + format_nbs: int = ServiceTypes.FNCT_REPLY.value + format_nbc: int = ServiceTypes.REQUESTREPLY.value + format_bit: int = 0 + framecounter: int = 0 + + @property + def vban(self) -> bytes: + return b'VBAN' + + @property + def streamname(self) -> bytes: + return self.name.encode('ascii')[:16].ljust(16, b'\x00') + + @classmethod + def from_bytes(cls, data: bytes): + """Parse a matrix response packet from bytes.""" + parsed = _parse_vban_service_header(data) + + # Validate this is a service reply packet + if parsed['format_nbs'] != ServiceTypes.FNCT_REPLY.value: + raise ValueError(f'Not a service reply packet: {parsed["format_nbs"]:02x}') + + return cls(**parsed) + + @classmethod + def extract_payload(cls, data: bytes) -> str: + """Extract the text payload from a matrix response packet.""" + if len(data) <= HEADER_SIZE: + return '' + + payload_bytes = data[HEADER_SIZE:] + return payload_bytes.rstrip(b'\x00').decode('utf-8', errors='ignore') + + @classmethod + def parse_response(cls, data: bytes) -> tuple['VbanMatrixResponseHeader', str]: + """Parse a complete matrix response packet returning header and payload.""" + header = cls.from_bytes(data) + payload = cls.extract_payload(data) + return header, payload diff --git a/vban_cmd/packet/nbs0.py b/vban_cmd/packet/nbs0.py index 3e3e2a8..bee65b6 100644 --- a/vban_cmd/packet/nbs0.py +++ b/vban_cmd/packet/nbs0.py @@ -6,7 +6,7 @@ from vban_cmd.kinds import KindMapClass from vban_cmd.util import comp from .enums import ChannelModes -from .headers import VbanPacket +from .headers import VbanRTPacket class Levels(NamedTuple): @@ -96,8 +96,8 @@ class Labels(NamedTuple): @dataclass -class VbanPacketNBS0(VbanPacket): - """Represents the body of a VBAN data packet with ident:0""" +class VbanRTPacketNBS0(VbanRTPacket): + """Represents the body of a VBAN RTPacket with ident:0""" _inputLeveldB100: bytes _outputLeveldB100: bytes diff --git a/vban_cmd/packet/nbs1.py b/vban_cmd/packet/nbs1.py index 901dcda..f268deb 100644 --- a/vban_cmd/packet/nbs1.py +++ b/vban_cmd/packet/nbs1.py @@ -5,7 +5,7 @@ from typing import NamedTuple from vban_cmd.enums import NBS from vban_cmd.kinds import KindMapClass -from .headers import VbanPacket +from .headers import VbanRTPacket VMPARAMSTRIP_SIZE = 174 @@ -327,8 +327,8 @@ class VbanVMParamStrip: @dataclass -class VbanPacketNBS1(VbanPacket): - """Represents the body of a VBAN data packet with ident:1""" +class VbanRTPacketNBS1(VbanRTPacket): + """Represents the body of a VBAN RTPacket with ident:1""" strips: tuple[VbanVMParamStrip, ...] diff --git a/vban_cmd/vbancmd.py b/vban_cmd/vbancmd.py index b29b1b6..acd9191 100644 --- a/vban_cmd/vbancmd.py +++ b/vban_cmd/vbancmd.py @@ -13,7 +13,7 @@ from .event import Event from .packet.headers import ( VbanMatrixResponseHeader, VbanPongHeader, - VbanRequestHeader, + VbanRTRequestHeader, ) from .packet.ping0 import VbanPing0Payload, VbanServerType from .subject import Subject @@ -201,7 +201,7 @@ class VbanCmd(abc.ABC): def _send_request(self, payload: str) -> None: """Sends a request packet over the network and bumps the framecounter.""" self.sock.sendto( - VbanRequestHeader.encode_with_payload( + VbanRTRequestHeader.encode_with_payload( name=self.streamname, bps_index=self.BPS_OPTS.index(self.bps), channel=self.channel, diff --git a/vban_cmd/worker.py b/vban_cmd/worker.py index d2e0365..6836e8e 100644 --- a/vban_cmd/worker.py +++ b/vban_cmd/worker.py @@ -6,12 +6,12 @@ from .enums import NBS from .error import VBANCMDConnectionError from .packet.headers import ( HEADER_SIZE, - VbanPacket, - VbanResponseHeader, - VbanSubscribeHeader, + VbanRTPacket, + VbanRTResponseHeader, + VbanRTSubscribeHeader, ) -from .packet.nbs0 import VbanPacketNBS0 -from .packet.nbs1 import VbanPacketNBS1 +from .packet.nbs0 import VbanRTPacketNBS0 +from .packet.nbs1 import VbanRTPacketNBS1 logger = logging.getLogger(__name__) @@ -28,7 +28,7 @@ class Subscriber(threading.Thread): def run(self): while not self.stopped(): for nbs in NBS: - sub_packet = VbanSubscribeHeader().to_bytes( + sub_packet = VbanRTSubscribeHeader().to_bytes( nbs, self._remote._get_next_framecounter() ) self._remote.sock.sendto( @@ -66,7 +66,7 @@ class Producer(threading.Thread): self._remote.cache['bus_level'], ) = self._remote.public_packets[NBS.zero].levels - def _get_rt(self) -> VbanPacket: + def _get_rt(self) -> VbanRTPacket: """Attempt to fetch data packet until a valid one found""" while True: try: @@ -80,19 +80,19 @@ class Producer(threading.Thread): ) from e try: - header = VbanResponseHeader.from_bytes(data[:HEADER_SIZE]) + header = VbanRTResponseHeader.from_bytes(data[:HEADER_SIZE]) except ValueError as e: self.logger.debug(f'Error parsing response packet: {e}') continue match header.format_nbs: case NBS.zero: - return VbanPacketNBS0.from_bytes( + return VbanRTPacketNBS0.from_bytes( nbs=NBS.zero, kind=self._remote.kind, data=data ) case NBS.one: - return VbanPacketNBS1.from_bytes( + return VbanRTPacketNBS1.from_bytes( nbs=NBS.one, kind=self._remote.kind, data=data )