diff --git a/vban_cmd/packet.py b/vban_cmd/packet.py index 789f669..f6adf8a 100644 --- a/vban_cmd/packet.py +++ b/vban_cmd/packet.py @@ -1,5 +1,6 @@ from dataclasses import dataclass +from .enums import NBS from .kinds import KindMapClass from .util import comp @@ -8,15 +9,18 @@ VBAN_PROTOCOL_SERVICE = 0x60 VBAN_SERVICE_RTPACKETREGISTER = 32 VBAN_SERVICE_RTPACKET = 33 +VBAN_SERVICE_MASK = 0xE0 MAX_PACKET_SIZE = 1436 HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 +VMPARAMSTRIP_SIZE = 174 @dataclass class VbanRtPacket: """Represents the body of a VBAN RT data packet""" + nbs: NBS _kind: KindMapClass _voicemeeterType: bytes # data[28:29] _reserved: bytes # data[29:30] @@ -24,6 +28,12 @@ class VbanRtPacket: _voicemeeterVersion: bytes # data[32:36] _optionBits: bytes # data[36:40] _samplerate: bytes # data[40:44] + + +@dataclass +class VbanRtPacketNBS0(VbanRtPacket): + """Represents the body of a VBAN RT data packet with NBS 0""" + _inputLeveldB100: bytes # data[44:112] _outputLeveldB100: bytes # data[112:240] _TransportBit: bytes # data[240:244] @@ -211,39 +221,177 @@ class VbanRtPacket: @dataclass -class SubscribeHeader: - """Represents the header an RT Packet Service subscription packet""" +class VbanVMParamStrip: + """Represents the VBAN_VMPARAMSTRIP_PACKET structure""" - name = 'Register RTP' + _mode: bytes # long = 4 bytes data[0:4] + _dblevel: bytes # float = 4 bytes data[4:8] + _audibility: bytes # short = 2 bytes data[8:10] + _pos3D_x: bytes # short = 2 bytes data[10:12] + _pos3D_y: bytes # short = 2 bytes data[12:14] + _posColor_x: bytes # short = 2 bytes data[14:16] + _posColor_y: bytes # short = 2 bytes data[16:18] + _EQgain1: bytes # short = 2 bytes data[18:20] + _EQgain2: bytes # short = 2 bytes data[20:22] + _EQgain3: bytes # short = 2 bytes data[22:24] + + # First channel parametric EQ + _PEQ_eqOn: bytes # 6 * char = 6 bytes data[24:30] + _PEQ_eqtype: bytes # 6 * char = 6 bytes data[30:36] + _PEQ_eqgain: bytes # 6 * float = 24 bytes data[36:60] + _PEQ_eqfreq: bytes # 6 * float = 24 bytes data[60:84] + _PEQ_eqq: bytes # 6 * float = 24 bytes data[84:108] + + _audibility_c: bytes # short = 2 bytes data[108:110] + _audibility_g: bytes # short = 2 bytes data[110:112] + _audibility_d: bytes # short = 2 bytes data[112:114] + _posMod_x: bytes # short = 2 bytes data[114:116] + _posMod_y: bytes # short = 2 bytes data[116:118] + _send_reverb: bytes # short = 2 bytes data[118:120] + _send_delay: bytes # short = 2 bytes data[120:122] + _send_fx1: bytes # short = 2 bytes data[122:124] + _send_fx2: bytes # short = 2 bytes data[124:126] + _dblimit: bytes # short = 2 bytes data[126:128] + _nKaraoke: bytes # short = 2 bytes data[128:130] + + _COMP_gain_in: bytes # short = 2 bytes data[130:132] + _COMP_attack_ms: bytes # short = 2 bytes data[132:134] + _COMP_release_ms: bytes # short = 2 bytes data[134:136] + _COMP_n_knee: bytes # short = 2 bytes data[136:138] + _COMP_comprate: bytes # short = 2 bytes data[138:140] + _COMP_threshold: bytes # short = 2 bytes data[140:142] + _COMP_c_enabled: bytes # short = 2 bytes data[142:144] + _COMP_c_auto: bytes # short = 2 bytes data[144:146] + _COMP_gain_out: bytes # short = 2 bytes data[146:148] + + _GATE_dBThreshold_in: bytes # short = 2 bytes data[148:150] + _GATE_dBDamping_max: bytes # short = 2 bytes data[150:152] + _GATE_BP_Sidechain: bytes # short = 2 bytes data[152:154] + _GATE_attack_ms: bytes # short = 2 bytes data[154:156] + _GATE_hold_ms: bytes # short = 2 bytes data[156:158] + _GATE_release_ms: bytes # short = 2 bytes data[158:160] + + _DenoiserThreshold: bytes # short = 2 bytes data[160:162] + _PitchEnabled: bytes # short = 2 bytes data[162:164] + _Pitch_DryWet: bytes # short = 2 bytes data[164:166] + _Pitch_Value: bytes # short = 2 bytes data[166:168] + _Pitch_formant_lo: bytes # short = 2 bytes data[168:170] + _Pitch_formant_med: bytes # short = 2 bytes data[170:172] + _Pitch_formant_high: bytes # short = 2 bytes data[172:174] + + @classmethod + def from_bytes(cls, data: bytes): + return cls( + _mode=data[0:4], + _dblevel=data[4:8], + _audibility=data[8:10], + _pos3D_x=data[10:12], + _pos3D_y=data[12:14], + _posColor_x=data[14:16], + _posColor_y=data[16:18], + _EQgain1=data[18:20], + _EQgain2=data[20:22], + _EQgain3=data[22:24], + _PEQ_eqOn=data[24:30], + _PEQ_eqtype=data[30:36], + _PEQ_eqgain=data[36:60], + _PEQ_eqfreq=data[60:84], + _PEQ_eqq=data[84:108], + _audibility_c=data[108:110], + _audibility_g=data[110:112], + _audibility_d=data[112:114], + _posMod_x=data[114:116], + _posMod_y=data[116:118], + _send_reverb=data[118:120], + _send_delay=data[120:122], + _send_fx1=data[122:124], + _send_fx2=data[124:126], + _dblimit=data[126:128], + _nKaraoke=data[128:130], + _COMP_gain_in=data[130:132], + _COMP_attack_ms=data[132:134], + _COMP_release_ms=data[134:136], + _COMP_n_knee=data[136:138], + _COMP_comprate=data[138:140], + _COMP_threshold=data[140:142], + _COMP_c_enabled=data[142:144], + _COMP_c_auto=data[144:146], + _COMP_gain_out=data[146:148], + _GATE_dBThreshold_in=data[148:150], + _GATE_dBDamping_max=data[150:152], + _GATE_BP_Sidechain=data[152:154], + _GATE_attack_ms=data[154:156], + _GATE_hold_ms=data[156:158], + _GATE_release_ms=data[158:160], + _DenoiserThreshold=data[160:162], + _PitchEnabled=data[162:164], + _Pitch_DryWet=data[164:166], + _Pitch_Value=data[166:168], + _Pitch_formant_lo=data[168:170], + _Pitch_formant_med=data[170:172], + _Pitch_formant_high=data[172:174], + ) + + @property + def mode(self) -> int: + return int.from_bytes(self._mode, 'little') + + @property + def eqgains(self) -> tuple[float, float, float]: + return tuple( + round( + int.from_bytes(getattr(self, f'_EQgain{i}'), 'little', signed=True) + * 0.01, + 2, + ) + for i in range(1, 4) + ) + + @property + def karaoke(self) -> int: + return int.from_bytes(self._nKaraoke, 'little') + + +@dataclass +class VbanRtPacketNBS1(VbanRtPacket): + """Represents the body of a VBAN RT data packet with NBS 1""" + + strips: tuple[VbanVMParamStrip, ...] + + +@dataclass +class SubscribeHeader: + """Represents the header of an RT subscription packet""" + + ident: NBS = NBS.zero + name = 'Register-RTP' timeout = 15 vban: bytes = 'VBAN'.encode() format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, 'little') - format_nbs: bytes = (0).to_bytes(1, 'little') + format_nbs: bytes = (ident.value & 0xFF).to_bytes(1, 'little') format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, 'little') - format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, 'little') # timeout + format_bit: bytes = (timeout & 0xFF).to_bytes(1, 'little') # timeout streamname: bytes = name.encode('ascii') + bytes(16 - len(name)) - framecounter: bytes = (0).to_bytes(4, 'little') - @property - def header(self): - header = self.vban - header += self.format_sr - header += self.format_nbs - header += self.format_nbc - header += self.format_bit - header += self.streamname - header += self.framecounter - assert len(header) == HEADER_SIZE + 4, ( - f'expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE + 4} bytes total)' - ) - return header + @classmethod + def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes: + header = cls(ident=nbs) + data = bytearray() + data.extend(header.vban) + data.extend(header.format_sr) + data.extend(header.format_nbs) + data.extend(header.format_nbc) + data.extend(header.format_bit) + data.extend(header.streamname) + data.extend(framecounter.to_bytes(4, 'little')) + return bytes(data) @dataclass class VbanRtPacketHeader: - """Represents the header of a VBAN RT response packet""" + """Represents the header of an RT response packet""" - name = 'Voicemeeter-RTP' + name: str = 'Voicemeeter-RTP' vban: bytes = 'VBAN'.encode() format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, 'little') format_nbs: bytes = (0).to_bytes(1, 'little') @@ -251,21 +399,29 @@ class VbanRtPacketHeader: format_bit: bytes = (0).to_bytes(1, 'little') streamname: bytes = name.encode('ascii') + bytes(16 - len(name)) - @property - def header(self): - header = self.vban - header += self.format_sr - header += self.format_nbs - header += self.format_nbc - header += self.format_bit - header += self.streamname - assert len(header) == HEADER_SIZE, f'expected header size {HEADER_SIZE} bytes' - return header + @classmethod + def from_bytes(cls, data: bytes): + if len(data) < HEADER_SIZE: + raise ValueError('Data is too short to be a valid VbanRTPPacketHeader') + vban = data[0:4] + format_sr = data[4] + format_nbs = data[5] + format_nbc = data[6] + format_bit = data[7] + name = data[8:24].rstrip(b'\x00').decode('utf-8') + return cls( + name=name, + vban=vban, + format_sr=format_sr & VBAN_SERVICE_MASK, + format_nbs=format_nbs, + format_nbc=format_nbc, + format_bit=format_bit, + ) @dataclass class RequestHeader: - """Represents the header of a REQUEST RT PACKET""" + """Represents the header of an RT request packet""" name: str bps_index: int @@ -287,16 +443,19 @@ class RequestHeader: def streamname(self): return self.name.encode() + bytes(16 - len(self.name)) - @property - def header(self): - header = self.vban - header += self.sr - header += self.nbs - header += self.nbc - header += self.bit - header += self.streamname - header += self.framecounter - assert len(header) == HEADER_SIZE + 4, ( - f'expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE + 4} bytes total)' + @classmethod + def to_bytes( + cls, name: str, bps_index: int, channel: int, framecounter: int + ) -> bytes: + header = cls( + name=name, bps_index=bps_index, channel=channel, framecounter=framecounter ) - return header + data = bytearray() + data.extend(header.vban) + data.extend(header.sr) + data.extend(header.nbs) + data.extend(header.nbc) + data.extend(header.bit) + data.extend(header.streamname) + data.extend(header.framecounter.to_bytes(4, 'little')) + return bytes(data) diff --git a/vban_cmd/worker.py b/vban_cmd/worker.py index 3a05b8a..25848ee 100644 --- a/vban_cmd/worker.py +++ b/vban_cmd/worker.py @@ -2,10 +2,22 @@ import logging import socket import threading import time -from typing import Optional +from .enums import NBS from .error import VBANCMDConnectionError -from .packet import HEADER_SIZE, SubscribeHeader, VbanRtPacket, VbanRtPacketHeader +from .packet import ( + HEADER_SIZE, + VBAN_PROTOCOL_SERVICE, + VBAN_SERVICE_RTPACKET, + VMPARAMSTRIP_SIZE, + SubscribeHeader, + VbanRtPacket, + VbanRtPacketHeader, + VbanRtPacketNBS0, + VbanRtPacketNBS1, + VbanVMParamStrip, +) +from .util import bump_framecounter logger = logging.getLogger(__name__) @@ -18,18 +30,21 @@ class Subscriber(threading.Thread): self._remote = remote self.stop_event = stop_event self.logger = logger.getChild(self.__class__.__name__) - self.packet = SubscribeHeader() + self._framecounter = 0 def run(self): while not self.stopped(): try: - self._remote.sock.sendto( - self.packet.header, - (socket.gethostbyname(self._remote.ip), self._remote.port), - ) - self.packet.framecounter = ( - int.from_bytes(self.packet.framecounter, 'little') + 1 - ).to_bytes(4, 'little') + for nbs in NBS: + sub_packet = SubscribeHeader().to_bytes(nbs, self._framecounter) + self._remote.sock.sendto( + sub_packet, (self._remote.ip, self._remote.port) + ) + self._framecounter = bump_framecounter(self._framecounter) + self.logger.debug( + f'sent subscription for NBS {nbs.name} to {self._remote.ip}:{self._remote.port}' + ) + self.wait_until_stopped(10) except socket.gaierror as e: self.logger.exception(f'{type(e).__name__}: {e}') @@ -58,33 +73,47 @@ class Producer(threading.Thread): self.queue = queue self.stop_event = stop_event self.logger = logger.getChild(self.__class__.__name__) - self.packet_expected = VbanRtPacketHeader() self._remote.sock.settimeout(self._remote.timeout) - self._remote._public_packet = self._get_rt() + self._remote._public_packets = [None] * (max(NBS) + 1) + _pp = self._get_rt() + self._remote._public_packets[_pp.nbs] = _pp ( self._remote.cache['strip_level'], self._remote.cache['bus_level'], - ) = self._remote._get_levels(self._remote.public_packet) + ) = self._remote._get_levels(self._remote.public_packets[NBS.zero]) def _get_rt(self) -> VbanRtPacket: """Attempt to fetch data packet until a valid one found""" - def fget(): - data = None - while not data: - data = self._fetch_rt_packet() - return data + while True: + if resp := self._fetch_rt_packet(): + return resp - return fget() - - def _fetch_rt_packet(self) -> Optional[VbanRtPacket]: + def _fetch_rt_packet(self) -> VbanRtPacket | None: try: data, _ = self._remote.sock.recvfrom(2048) - # do we have packet data? - if len(data) > HEADER_SIZE: - # is the packet of type VBAN RT response? - if self.packet_expected.header == data[:HEADER_SIZE]: - return VbanRtPacket( + if len(data) < HEADER_SIZE: + return + + response_header = VbanRtPacketHeader.from_bytes(data[:HEADER_SIZE]) + if ( + response_header.format_sr != VBAN_PROTOCOL_SERVICE + or response_header.format_nbc != VBAN_SERVICE_RTPACKET + ): + return + + match response_header.format_nbs: + case NBS.zero: + """ + self.logger.debug( + 'Received NB0 RTP Packet from %s, Size: %d bytes', + addr, + len(data), + ) + """ + + return VbanRtPacketNBS0( + nbs=NBS.zero, _kind=self._remote.kind, _voicemeeterType=data[28:29], _reserved=data[29:30], @@ -109,6 +138,36 @@ class Producer(threading.Thread): _stripLabelUTF8c60=data[452:932], _busLabelUTF8c60=data[932:1412], ) + + case NBS.one: + """ + self.logger.debug( + 'Received NB1 RTP Packet from %s, Size: %d bytes', + addr, + len(data), + ) + """ + + return VbanRtPacketNBS1( + nbs=NBS.one, + _kind=self._remote.kind, + _voicemeeterType=data[28:29], + _reserved=data[29:30], + _buffersize=data[30:32], + _voicemeeterVersion=data[32:36], + _optionBits=data[36:40], + _samplerate=data[40:44], + strips=tuple( + VbanVMParamStrip.from_bytes( + data[ + 44 + i * VMPARAMSTRIP_SIZE : 44 + + (i + 1) * VMPARAMSTRIP_SIZE + ] + ) + for i in range(self._remote.kind.num_strip) + ), + ) + return None except TimeoutError as e: self.logger.exception(f'{type(e).__name__}: {e}') raise VBANCMDConnectionError( @@ -120,14 +179,20 @@ class Producer(threading.Thread): def run(self): while not self.stopped(): + pdirty = ldirty = False _pp = self._get_rt() - pdirty = _pp.pdirty(self._remote.public_packet) - ldirty = _pp.ldirty( - self._remote.cache['strip_level'], self._remote.cache['bus_level'] - ) + match _pp.nbs: + case NBS.zero: + ldirty = _pp.ldirty( + self._remote.cache['strip_level'], + self._remote.cache['bus_level'], + ) + pdirty = _pp.pdirty(self._remote.public_packets[NBS.zero]) + case NBS.one: + pdirty = True if pdirty or ldirty: - self._remote._public_packet = _pp + self._remote._public_packets[_pp.nbs] = _pp self._remote._pdirty = pdirty self._remote._ldirty = ldirty @@ -166,15 +231,15 @@ class Updater(threading.Thread): self._remote.subject.notify(event) elif event == 'ldirty' and self._remote.ldirty: self._remote._strip_comp, self._remote._bus_comp = ( - self._remote._public_packet._strip_comp, - self._remote._public_packet._bus_comp, + self._remote._public_packets[NBS.zero]._strip_comp, + self._remote._public_packets[NBS.zero]._bus_comp, ) ( self._remote.cache['strip_level'], self._remote.cache['bus_level'], ) = ( - self._remote._public_packet.inputlevels, - self._remote._public_packet.outputlevels, + self._remote._public_packets[NBS.zero].inputlevels, + self._remote._public_packets[NBS.zero].outputlevels, ) self._remote.subject.notify(event) self.logger.debug(f'terminating {self.name} thread')