import struct from dataclasses import dataclass from typing import NamedTuple from .enums import NBS from .kinds import KindMapClass from .util import comp VBAN_PROTOCOL_TXT = 0x40 VBAN_PROTOCOL_SERVICE = 0x60 VBAN_SERVICE_RTPACKETREGISTER = 32 VBAN_SERVICE_RTPACKET = 33 VBAN_SERVICE_MASK = 0xE0 MAX_PACKET_SIZE = 1436 HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 VMPARAMSTRIP_SIZE = 174 @dataclass class VbanRtPacket: """Represents the body of a VBAN RT data packet""" nbs: NBS _kind: KindMapClass _voicemeeterType: bytes _reserved: bytes _buffersize: bytes _voicemeeterVersion: bytes _optionBits: bytes _samplerate: bytes @dataclass class VbanRtPacketNBS0(VbanRtPacket): """Represents the body of a VBAN RT data packet with NBS 0""" _inputLeveldB100: bytes _outputLeveldB100: bytes _TransportBit: bytes _stripState: bytes _busState: bytes _stripGaindB100Layer1: bytes _stripGaindB100Layer2: bytes _stripGaindB100Layer3: bytes _stripGaindB100Layer4: bytes _stripGaindB100Layer5: bytes _stripGaindB100Layer6: bytes _stripGaindB100Layer7: bytes _stripGaindB100Layer8: bytes _busGaindB100: bytes _stripLabelUTF8c60: bytes _busLabelUTF8c60: bytes @classmethod def from_bytes(cls, nbs: NBS, kind: KindMapClass, data: bytes): return cls( nbs=nbs, _kind=kind, _voicemeeterType=data[28:29], _reserved=data[29:30], _buffersize=data[30:32], _voicemeeterVersion=data[32:36], _optionBits=data[36:40], _samplerate=data[40:44], _inputLeveldB100=data[44:112], _outputLeveldB100=data[112:240], _TransportBit=data[240:244], _stripState=data[244:276], _busState=data[276:308], _stripGaindB100Layer1=data[308:324], _stripGaindB100Layer2=data[324:340], _stripGaindB100Layer3=data[340:356], _stripGaindB100Layer4=data[356:372], _stripGaindB100Layer5=data[372:388], _stripGaindB100Layer6=data[388:404], _stripGaindB100Layer7=data[404:420], _stripGaindB100Layer8=data[420:436], _busGaindB100=data[436:452], _stripLabelUTF8c60=data[452:932], _busLabelUTF8c60=data[932:1412], ) def _generate_levels(self, levelarray) -> tuple: return tuple( int.from_bytes(levelarray[i : i + 2], 'little') for i in range(0, len(levelarray), 2) ) @property def strip_levels(self): return self._generate_levels(self._inputLeveldB100) @property def bus_levels(self): return self._generate_levels(self._outputLeveldB100) def pdirty(self, other) -> bool: """True iff any defined parameter has changed""" return not ( self._stripState == other._stripState and self._busState == other._busState and self._stripGaindB100Layer1 == other._stripGaindB100Layer1 and self._stripGaindB100Layer2 == other._stripGaindB100Layer2 and self._stripGaindB100Layer3 == other._stripGaindB100Layer3 and self._stripGaindB100Layer4 == other._stripGaindB100Layer4 and self._stripGaindB100Layer5 == other._stripGaindB100Layer5 and self._stripGaindB100Layer6 == other._stripGaindB100Layer6 and self._stripGaindB100Layer7 == other._stripGaindB100Layer7 and self._stripGaindB100Layer8 == other._stripGaindB100Layer8 and self._busGaindB100 == other._busGaindB100 and self._stripLabelUTF8c60 == other._stripLabelUTF8c60 and self._busLabelUTF8c60 == other._busLabelUTF8c60 ) def ldirty(self, strip_cache, bus_cache) -> bool: self._strip_comp, self._bus_comp = ( tuple(not val for val in comp(strip_cache, self.strip_levels)), tuple(not val for val in comp(bus_cache, self.bus_levels)), ) return any(any(li) for li in (self._strip_comp, self._bus_comp)) @property def voicemeetertype(self) -> str: """returns voicemeeter type as a string""" type_ = ('basic', 'banana', 'potato') return type_[int.from_bytes(self._voicemeeterType, 'little') - 1] @property def voicemeeterversion(self) -> tuple: """returns voicemeeter version as a tuple""" return tuple( reversed( tuple( int.from_bytes(self._voicemeeterVersion[i : i + 1], 'little') for i in range(4) ) ) ) @property def samplerate(self) -> int: """returns samplerate as an int""" return int.from_bytes(self._samplerate, 'little') @property def inputlevels(self) -> tuple: """returns the entire level array across all inputs for a kind""" return self.strip_levels[0 : self._kind.num_strip_levels] @property def outputlevels(self) -> tuple: """returns the entire level array across all outputs for a kind""" return self.bus_levels[0 : self._kind.num_bus_levels] @property def stripstate(self) -> tuple: """returns tuple of strip states accessable through bit modes""" return tuple(self._stripState[i : i + 4] for i in range(0, 32, 4)) @property def busstate(self) -> tuple: """returns tuple of bus states accessable through bit modes""" return tuple(self._busState[i : i + 4] for i in range(0, 32, 4)) """ these functions return an array of gainlayers[i] across all strips ie stripgainlayer1 = [strip[0].gainlayer[0], strip[1].gainlayer[0], strip[2].gainlayer[0]...] """ @property def gainlayers(self) -> tuple: """returns tuple of all strip gain layers as tuples""" return tuple( tuple( round( int.from_bytes( getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2], 'little', signed=True, ) * 0.01, 2, ) for i in range(0, 16, 2) ) for layer in range(1, 9) ) @property def busgain(self) -> tuple: """returns tuple of bus gains""" return tuple( round( int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True) * 0.01, 2, ) for i in range(0, 16, 2) ) @property def striplabels(self) -> tuple: """returns tuple of strip labels""" return tuple( self._stripLabelUTF8c60[i : i + 60].decode().split('\x00')[0] for i in range(0, 480, 60) ) @property def buslabels(self) -> tuple: """returns tuple of bus labels""" return tuple( self._busLabelUTF8c60[i : i + 60].decode().split('\x00')[0] for i in range(0, 480, 60) ) class Audibility(NamedTuple): knob: float comp: float gate: float denoiser: float class Positions(NamedTuple): pan_x: float pan_y: float color_x: float color_y: float fx1: float fx2: float class EqGains(NamedTuple): bass: float mid: float treble: float class ParametricEQSettings(NamedTuple): on: bool type: int gain: float freq: float q: float class Sends(NamedTuple): reverb: float delay: float fx1: float fx2: float class CompressorSettings(NamedTuple): gain_in: float attack_ms: float release_ms: float n_knee: float ratio: float threshold: float c_enabled: bool makeup: bool gain_out: float class GateSettings(NamedTuple): threshold_in: float damping_max: float bp_sidechain: bool attack_ms: float hold_ms: float release_ms: float class DenoiserSettings(NamedTuple): threshold: float class PitchSettings(NamedTuple): enabled: bool dry_wet: float value: float formant_lo: float formant_med: float formant_high: float @dataclass class VbanVMParamStrip: """Represents the VBAN_VMPARAMSTRIP_PACKET structure""" _mode: bytes _dblevel: bytes _audibility: bytes _pos3D_x: bytes _pos3D_y: bytes _posColor_x: bytes _posColor_y: bytes _EQgain1: bytes _EQgain2: bytes _EQgain3: bytes # First channel parametric EQ _PEQ_eqOn: bytes _PEQ_eqtype: bytes _PEQ_eqgain: bytes _PEQ_eqfreq: bytes _PEQ_eqq: bytes _audibility_c: bytes _audibility_g: bytes _audibility_d: bytes _posMod_x: bytes _posMod_y: bytes _send_reverb: bytes _send_delay: bytes _send_fx1: bytes _send_fx2: bytes _dblimit: bytes _nKaraoke: bytes _COMP_gain_in: bytes _COMP_attack_ms: bytes _COMP_release_ms: bytes _COMP_n_knee: bytes _COMP_comprate: bytes _COMP_threshold: bytes _COMP_c_enabled: bytes _COMP_c_auto: bytes _COMP_gain_out: bytes _GATE_dBThreshold_in: bytes _GATE_dBDamping_max: bytes _GATE_BP_Sidechain: bytes _GATE_attack_ms: bytes _GATE_hold_ms: bytes _GATE_release_ms: bytes _DenoiserThreshold: bytes _PitchEnabled: bytes _Pitch_DryWet: bytes _Pitch_Value: bytes _Pitch_formant_lo: bytes _Pitch_formant_med: bytes _Pitch_formant_high: bytes @classmethod def from_bytes(cls, data: bytes): return cls( _mode=data[0:4], _dblevel=data[4:8], _audibility=data[8:10], _pos3D_x=data[10:12], _pos3D_y=data[12:14], _posColor_x=data[14:16], _posColor_y=data[16:18], _EQgain1=data[18:20], _EQgain2=data[20:22], _EQgain3=data[22:24], _PEQ_eqOn=data[24:30], _PEQ_eqtype=data[30:36], _PEQ_eqgain=data[36:60], _PEQ_eqfreq=data[60:84], _PEQ_eqq=data[84:108], _audibility_c=data[108:110], _audibility_g=data[110:112], _audibility_d=data[112:114], _posMod_x=data[114:116], _posMod_y=data[116:118], _send_reverb=data[118:120], _send_delay=data[120:122], _send_fx1=data[122:124], _send_fx2=data[124:126], _dblimit=data[126:128], _nKaraoke=data[128:130], _COMP_gain_in=data[130:132], _COMP_attack_ms=data[132:134], _COMP_release_ms=data[134:136], _COMP_n_knee=data[136:138], _COMP_comprate=data[138:140], _COMP_threshold=data[140:142], _COMP_c_enabled=data[142:144], _COMP_c_auto=data[144:146], _COMP_gain_out=data[146:148], _GATE_dBThreshold_in=data[148:150], _GATE_dBDamping_max=data[150:152], _GATE_BP_Sidechain=data[152:154], _GATE_attack_ms=data[154:156], _GATE_hold_ms=data[156:158], _GATE_release_ms=data[158:160], _DenoiserThreshold=data[160:162], _PitchEnabled=data[162:164], _Pitch_DryWet=data[164:166], _Pitch_Value=data[166:168], _Pitch_formant_lo=data[168:170], _Pitch_formant_med=data[170:172], _Pitch_formant_high=data[172:174], ) @property def mode(self) -> int: return int.from_bytes(self._mode, 'little') @property def audibility(self) -> Audibility: return Audibility( round(int.from_bytes(self._audibility, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._audibility_c, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._audibility_g, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._audibility_d, 'little', signed=True) * 0.01, 2), ) @property def positions(self) -> Positions: return Positions( round(int.from_bytes(self._pos3D_x, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._pos3D_y, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._posColor_x, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._posColor_y, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._posMod_x, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._posMod_y, 'little', signed=True) * 0.01, 2), ) @property def eqgains(self) -> EqGains: return EqGains( *[ round( int.from_bytes(getattr(self, f'_EQgain{i}'), 'little', signed=True) * 0.01, 2, ) for i in range(1, 4) ] ) @property def parametric_eq(self) -> tuple[ParametricEQSettings, ...]: return tuple( ParametricEQSettings( on=bool(int.from_bytes(self._PEQ_eqOn[i : i + 1], 'little')), type=int.from_bytes(self._PEQ_eqtype[i : i + 1], 'little'), freq=struct.unpack(' Sends: return Sends( round(int.from_bytes(self._send_reverb, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._send_delay, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._send_fx1, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._send_fx2, 'little', signed=True) * 0.01, 2), ) @property def karaoke(self) -> int: return int.from_bytes(self._nKaraoke, 'little') @property def compressor(self) -> CompressorSettings: return CompressorSettings( gain_in=round( int.from_bytes(self._COMP_gain_in, 'little', signed=True) * 0.01, 2 ), attack_ms=round(int.from_bytes(self._COMP_attack_ms, 'little') * 0.1, 2), release_ms=round(int.from_bytes(self._COMP_release_ms, 'little') * 0.1, 2), n_knee=round(int.from_bytes(self._COMP_n_knee, 'little') * 0.01, 2), ratio=round(int.from_bytes(self._COMP_comprate, 'little') * 0.01, 2), threshold=round( int.from_bytes(self._COMP_threshold, 'little', signed=True) * 0.01, 2 ), c_enabled=bool(int.from_bytes(self._COMP_c_enabled, 'little')), makeup=bool(int.from_bytes(self._COMP_c_auto, 'little')), gain_out=round( int.from_bytes(self._COMP_gain_out, 'little', signed=True) * 0.01, 2 ), ) @property def gate(self) -> GateSettings: return GateSettings( threshold_in=round( int.from_bytes(self._GATE_dBThreshold_in, 'little', signed=True) * 0.01, 2, ), damping_max=round( int.from_bytes(self._GATE_dBDamping_max, 'little', signed=True) * 0.01, 2, ), bp_sidechain=round( int.from_bytes(self._GATE_BP_Sidechain, 'little') * 0.1, 2 ), attack_ms=round(int.from_bytes(self._GATE_attack_ms, 'little') * 0.1, 2), hold_ms=round(int.from_bytes(self._GATE_hold_ms, 'little') * 0.1, 2), release_ms=round(int.from_bytes(self._GATE_release_ms, 'little') * 0.1, 2), ) @property def denoiser(self) -> DenoiserSettings: return DenoiserSettings( threshold=round( int.from_bytes(self._DenoiserThreshold, 'little', signed=True) * 0.01, 2 ) ) @property def pitch(self) -> PitchSettings: return PitchSettings( enabled=bool(int.from_bytes(self._PitchEnabled, 'little')), dry_wet=round( int.from_bytes(self._Pitch_DryWet, 'little', signed=True) * 0.01, 2 ), value=round( int.from_bytes(self._Pitch_Value, 'little', signed=True) * 0.01, 2 ), formant_lo=round( int.from_bytes(self._Pitch_formant_lo, 'little', signed=True) * 0.01, 2 ), formant_med=round( int.from_bytes(self._Pitch_formant_med, 'little', signed=True) * 0.01, 2 ), formant_high=round( int.from_bytes(self._Pitch_formant_high, 'little', signed=True) * 0.01, 2, ), ) @dataclass class VbanRtPacketNBS1(VbanRtPacket): """Represents the body of a VBAN RT data packet with NBS 1""" strips: tuple[VbanVMParamStrip, ...] @classmethod def from_bytes( cls, nbs: NBS, kind: KindMapClass, data: bytes, ): return cls( nbs=nbs, _kind=kind, _voicemeeterType=data[28:29], _reserved=data[29:30], _buffersize=data[30:32], _voicemeeterVersion=data[32:36], _optionBits=data[36:40], _samplerate=data[40:44], strips=tuple( VbanVMParamStrip.from_bytes( data[44 + i * VMPARAMSTRIP_SIZE : 44 + (i + 1) * VMPARAMSTRIP_SIZE] ) for i in range(16) ), ) @dataclass class SubscribeHeader: """Represents the header of an RT subscription packet""" nbs: NBS = NBS.zero name: str = 'Register-RTP' timeout: int = 15 @property def vban(self) -> bytes: return b'VBAN' @property def format_sr(self) -> bytes: return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little') @property def format_nbs(self) -> bytes: return (self.nbs.value & 0xFF).to_bytes(1, 'little') @property def format_nbc(self) -> bytes: return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little') @property def format_bit(self) -> bytes: return (self.timeout & 0xFF).to_bytes(1, 'little') @property def streamname(self) -> bytes: return self.name.encode('ascii') + bytes(16 - len(self.name)) @classmethod def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes: header = cls(nbs=nbs) data = bytearray() data.extend(header.vban) data.extend(header.format_sr) data.extend(header.format_nbs) data.extend(header.format_nbc) data.extend(header.format_bit) data.extend(header.streamname) data.extend(framecounter.to_bytes(4, 'little')) return bytes(data) @dataclass class VbanRtPacketHeader: """Represents the header of an RT response packet""" name: str = 'Voicemeeter-RTP' format_sr: int = VBAN_PROTOCOL_SERVICE format_nbs: int = 0 format_nbc: int = VBAN_SERVICE_RTPACKET format_bit: int = 0 @property def vban(self) -> bytes: return b'VBAN' @property def streamname(self) -> bytes: return self.name.encode('ascii') + bytes(16 - len(self.name)) @classmethod def from_bytes(cls, data: bytes): if len(data) < HEADER_SIZE: raise ValueError('Data is too short to be a valid VbanRTPPacketHeader') name = data[8:24].rstrip(b'\x00').decode('utf-8') return cls( name=name, format_sr=data[4] & VBAN_SERVICE_MASK, format_nbs=data[5], format_nbc=data[6], format_bit=data[7], ) @dataclass class RequestHeader: """Represents the header of an RT request packet""" name: str bps_index: int channel: int framecounter: int = 0 @property def vban(self) -> bytes: return b'VBAN' @property def sr(self) -> bytes: return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little') @property def nbs(self) -> bytes: return (0).to_bytes(1, 'little') @property def nbc(self) -> bytes: return (self.channel).to_bytes(1, 'little') @property def bit(self) -> bytes: return (0x10).to_bytes(1, 'little') @property def streamname(self) -> bytes: return self.name.encode() + bytes(16 - len(self.name)) @classmethod def to_bytes( cls, name: str, bps_index: int, channel: int, framecounter: int ) -> bytes: header = cls( name=name, bps_index=bps_index, channel=channel, framecounter=framecounter ) data = bytearray() data.extend(header.vban) data.extend(header.sr) data.extend(header.nbs) data.extend(header.nbc) data.extend(header.bit) data.extend(header.streamname) data.extend(header.framecounter.to_bytes(4, 'little')) return bytes(data)