add missing constants

add docstrings that describes data breakdown

move SubscribeHeader above  VbanRtPacketHeader

expand assert failure string
This commit is contained in:
onyx-and-iris 2023-08-04 23:06:51 +01:00
parent 3b65035e50
commit ee32f92914

View File

@ -3,10 +3,14 @@ from dataclasses import dataclass
from .kinds import KindMapClass from .kinds import KindMapClass
from .util import comp from .util import comp
VBAN_PROTOCOL_TXT = 0x40
VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32 VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33 VBAN_SERVICE_RTPACKET = 33
MAX_PACKET_SIZE = 1436 MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 + 4 HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
@dataclass @dataclass
@ -14,28 +18,28 @@ class VbanRtPacket:
"""Represents the body of a VBAN RT data packet""" """Represents the body of a VBAN RT data packet"""
_kind: KindMapClass _kind: KindMapClass
_voicemeeterType: bytes _voicemeeterType: bytes # data[28:29]
_reserved: bytes _reserved: bytes # data[29:30]
_buffersize: bytes _buffersize: bytes # data[30:32]
_voicemeeterVersion: bytes _voicemeeterVersion: bytes # data[32:36]
_optionBits: bytes _optionBits: bytes # data[36:40]
_samplerate: bytes _samplerate: bytes # data[40:44]
_inputLeveldB100: bytes _inputLeveldB100: bytes # data[44:112]
_outputLeveldB100: bytes _outputLeveldB100: bytes # data[112:240]
_TransportBit: bytes _TransportBit: bytes # data[240:244]
_stripState: bytes _stripState: bytes # data[244:276]
_busState: bytes _busState: bytes # data[276:308]
_stripGaindB100Layer1: bytes _stripGaindB100Layer1: bytes # data[308:324]
_stripGaindB100Layer2: bytes _stripGaindB100Layer2: bytes # data[324:340]
_stripGaindB100Layer3: bytes _stripGaindB100Layer3: bytes # data[340:356]
_stripGaindB100Layer4: bytes _stripGaindB100Layer4: bytes # data[356:372]
_stripGaindB100Layer5: bytes _stripGaindB100Layer5: bytes # data[372:388]
_stripGaindB100Layer6: bytes _stripGaindB100Layer6: bytes # data[388:404]
_stripGaindB100Layer7: bytes _stripGaindB100Layer7: bytes # data[404:420]
_stripGaindB100Layer8: bytes _stripGaindB100Layer8: bytes # data[420:436]
_busGaindB100: bytes _busGaindB100: bytes # data[436:452]
_stripLabelUTF8c60: bytes _stripLabelUTF8c60: bytes # data[452:932]
_busLabelUTF8c60: bytes _busLabelUTF8c60: bytes # data[932:1412]
def _generate_levels(self, levelarray) -> tuple: def _generate_levels(self, levelarray) -> tuple:
return tuple( return tuple(
@ -206,13 +210,42 @@ class VbanRtPacket:
) )
@dataclass
class SubscribeHeader:
"""Represents the header an RT Packet Service subscription packet"""
name = "Register RTP"
timeout = 15
vban: bytes = "VBAN".encode()
format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, "little")
format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, "little") # timeout
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
framecounter: bytes = (0).to_bytes(4, "little")
@property
def header(self):
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
header += self.framecounter
assert (
len(header) == HEADER_SIZE + 4
), f"expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE +4} bytes total)"
return header
@dataclass @dataclass
class VbanRtPacketHeader: class VbanRtPacketHeader:
"""Represents the header of VBAN RT data packet""" """Represents the header of a VBAN RT response packet"""
name = "Voicemeeter-RTP" name = "Voicemeeter-RTP"
vban: bytes = "VBAN".encode() vban: bytes = "VBAN".encode()
format_sr: bytes = (0x60).to_bytes(1, "little") format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little") format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKET).to_bytes(1, "little") format_nbc: bytes = (VBAN_SERVICE_RTPACKET).to_bytes(1, "little")
format_bit: bytes = (0).to_bytes(1, "little") format_bit: bytes = (0).to_bytes(1, "little")
@ -226,13 +259,13 @@ class VbanRtPacketHeader:
header += self.format_nbc header += self.format_nbc
header += self.format_bit header += self.format_bit
header += self.streamname header += self.streamname
assert len(header) == HEADER_SIZE - 4, f"Header expected {HEADER_SIZE-4} bytes" assert len(header) == HEADER_SIZE, f"expected header size {HEADER_SIZE} bytes"
return header return header
@dataclass @dataclass
class RequestHeader: class RequestHeader:
"""Represents a REQUEST RT PACKET header""" """Represents the header of an REQUEST RT PACKET"""
name: str name: str
bps_index: int bps_index: int
@ -244,7 +277,7 @@ class RequestHeader:
@property @property
def sr(self): def sr(self):
return (0x40 + self.bps_index).to_bytes(1, "little") return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little")
@property @property
def nbc(self): def nbc(self):
@ -263,32 +296,7 @@ class RequestHeader:
header += self.bit header += self.bit
header += self.streamname header += self.streamname
header += self.framecounter header += self.framecounter
assert len(header) == HEADER_SIZE, f"Header expected {HEADER_SIZE} bytes" assert (
return header len(header) == HEADER_SIZE + 4
), f"expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE +4} bytes total)"
@dataclass
class SubscribeHeader:
"""Represents a packet used to subscribe to the RT Packet Service"""
name = "Register RTP"
timeout = 15
vban: bytes = "VBAN".encode()
format_sr: bytes = (0x60).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, "little")
format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, "little") # timeout
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
framecounter: bytes = (0).to_bytes(4, "little")
@property
def header(self):
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
header += self.framecounter
assert len(header) == HEADER_SIZE, f"Header expected {HEADER_SIZE} bytes"
return header return header