add VbanVMParamStrip defining the VMPARAMSTRIP_PACKET struct.

This commit is contained in:
onyx-and-iris 2026-01-17 09:35:33 +00:00
parent 91feccc509
commit 51394c0076
2 changed files with 303 additions and 79 deletions

View File

@ -1,5 +1,6 @@
from dataclasses import dataclass
from .enums import NBS
from .kinds import KindMapClass
from .util import comp
@ -8,15 +9,18 @@ VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
VBAN_SERVICE_MASK = 0xE0
MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
VMPARAMSTRIP_SIZE = 174
@dataclass
class VbanRtPacket:
"""Represents the body of a VBAN RT data packet"""
nbs: NBS
_kind: KindMapClass
_voicemeeterType: bytes # data[28:29]
_reserved: bytes # data[29:30]
@ -24,6 +28,12 @@ class VbanRtPacket:
_voicemeeterVersion: bytes # data[32:36]
_optionBits: bytes # data[36:40]
_samplerate: bytes # data[40:44]
@dataclass
class VbanRtPacketNBS0(VbanRtPacket):
"""Represents the body of a VBAN RT data packet with NBS 0"""
_inputLeveldB100: bytes # data[44:112]
_outputLeveldB100: bytes # data[112:240]
_TransportBit: bytes # data[240:244]
@ -211,39 +221,177 @@ class VbanRtPacket:
@dataclass
class SubscribeHeader:
"""Represents the header an RT Packet Service subscription packet"""
class VbanVMParamStrip:
"""Represents the VBAN_VMPARAMSTRIP_PACKET structure"""
name = 'Register RTP'
_mode: bytes # long = 4 bytes data[0:4]
_dblevel: bytes # float = 4 bytes data[4:8]
_audibility: bytes # short = 2 bytes data[8:10]
_pos3D_x: bytes # short = 2 bytes data[10:12]
_pos3D_y: bytes # short = 2 bytes data[12:14]
_posColor_x: bytes # short = 2 bytes data[14:16]
_posColor_y: bytes # short = 2 bytes data[16:18]
_EQgain1: bytes # short = 2 bytes data[18:20]
_EQgain2: bytes # short = 2 bytes data[20:22]
_EQgain3: bytes # short = 2 bytes data[22:24]
# First channel parametric EQ
_PEQ_eqOn: bytes # 6 * char = 6 bytes data[24:30]
_PEQ_eqtype: bytes # 6 * char = 6 bytes data[30:36]
_PEQ_eqgain: bytes # 6 * float = 24 bytes data[36:60]
_PEQ_eqfreq: bytes # 6 * float = 24 bytes data[60:84]
_PEQ_eqq: bytes # 6 * float = 24 bytes data[84:108]
_audibility_c: bytes # short = 2 bytes data[108:110]
_audibility_g: bytes # short = 2 bytes data[110:112]
_audibility_d: bytes # short = 2 bytes data[112:114]
_posMod_x: bytes # short = 2 bytes data[114:116]
_posMod_y: bytes # short = 2 bytes data[116:118]
_send_reverb: bytes # short = 2 bytes data[118:120]
_send_delay: bytes # short = 2 bytes data[120:122]
_send_fx1: bytes # short = 2 bytes data[122:124]
_send_fx2: bytes # short = 2 bytes data[124:126]
_dblimit: bytes # short = 2 bytes data[126:128]
_nKaraoke: bytes # short = 2 bytes data[128:130]
_COMP_gain_in: bytes # short = 2 bytes data[130:132]
_COMP_attack_ms: bytes # short = 2 bytes data[132:134]
_COMP_release_ms: bytes # short = 2 bytes data[134:136]
_COMP_n_knee: bytes # short = 2 bytes data[136:138]
_COMP_comprate: bytes # short = 2 bytes data[138:140]
_COMP_threshold: bytes # short = 2 bytes data[140:142]
_COMP_c_enabled: bytes # short = 2 bytes data[142:144]
_COMP_c_auto: bytes # short = 2 bytes data[144:146]
_COMP_gain_out: bytes # short = 2 bytes data[146:148]
_GATE_dBThreshold_in: bytes # short = 2 bytes data[148:150]
_GATE_dBDamping_max: bytes # short = 2 bytes data[150:152]
_GATE_BP_Sidechain: bytes # short = 2 bytes data[152:154]
_GATE_attack_ms: bytes # short = 2 bytes data[154:156]
_GATE_hold_ms: bytes # short = 2 bytes data[156:158]
_GATE_release_ms: bytes # short = 2 bytes data[158:160]
_DenoiserThreshold: bytes # short = 2 bytes data[160:162]
_PitchEnabled: bytes # short = 2 bytes data[162:164]
_Pitch_DryWet: bytes # short = 2 bytes data[164:166]
_Pitch_Value: bytes # short = 2 bytes data[166:168]
_Pitch_formant_lo: bytes # short = 2 bytes data[168:170]
_Pitch_formant_med: bytes # short = 2 bytes data[170:172]
_Pitch_formant_high: bytes # short = 2 bytes data[172:174]
@classmethod
def from_bytes(cls, data: bytes):
return cls(
_mode=data[0:4],
_dblevel=data[4:8],
_audibility=data[8:10],
_pos3D_x=data[10:12],
_pos3D_y=data[12:14],
_posColor_x=data[14:16],
_posColor_y=data[16:18],
_EQgain1=data[18:20],
_EQgain2=data[20:22],
_EQgain3=data[22:24],
_PEQ_eqOn=data[24:30],
_PEQ_eqtype=data[30:36],
_PEQ_eqgain=data[36:60],
_PEQ_eqfreq=data[60:84],
_PEQ_eqq=data[84:108],
_audibility_c=data[108:110],
_audibility_g=data[110:112],
_audibility_d=data[112:114],
_posMod_x=data[114:116],
_posMod_y=data[116:118],
_send_reverb=data[118:120],
_send_delay=data[120:122],
_send_fx1=data[122:124],
_send_fx2=data[124:126],
_dblimit=data[126:128],
_nKaraoke=data[128:130],
_COMP_gain_in=data[130:132],
_COMP_attack_ms=data[132:134],
_COMP_release_ms=data[134:136],
_COMP_n_knee=data[136:138],
_COMP_comprate=data[138:140],
_COMP_threshold=data[140:142],
_COMP_c_enabled=data[142:144],
_COMP_c_auto=data[144:146],
_COMP_gain_out=data[146:148],
_GATE_dBThreshold_in=data[148:150],
_GATE_dBDamping_max=data[150:152],
_GATE_BP_Sidechain=data[152:154],
_GATE_attack_ms=data[154:156],
_GATE_hold_ms=data[156:158],
_GATE_release_ms=data[158:160],
_DenoiserThreshold=data[160:162],
_PitchEnabled=data[162:164],
_Pitch_DryWet=data[164:166],
_Pitch_Value=data[166:168],
_Pitch_formant_lo=data[168:170],
_Pitch_formant_med=data[170:172],
_Pitch_formant_high=data[172:174],
)
@property
def mode(self) -> int:
return int.from_bytes(self._mode, 'little')
@property
def eqgains(self) -> tuple[float, float, float]:
return tuple(
round(
int.from_bytes(getattr(self, f'_EQgain{i}'), 'little', signed=True)
* 0.01,
2,
)
for i in range(1, 4)
)
@property
def karaoke(self) -> int:
return int.from_bytes(self._nKaraoke, 'little')
@dataclass
class VbanRtPacketNBS1(VbanRtPacket):
"""Represents the body of a VBAN RT data packet with NBS 1"""
strips: tuple[VbanVMParamStrip, ...]
@dataclass
class SubscribeHeader:
"""Represents the header of an RT subscription packet"""
ident: NBS = NBS.zero
name = 'Register-RTP'
timeout = 15
vban: bytes = 'VBAN'.encode()
format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, 'little')
format_nbs: bytes = (0).to_bytes(1, 'little')
format_nbs: bytes = (ident.value & 0xFF).to_bytes(1, 'little')
format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, 'little')
format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, 'little') # timeout
format_bit: bytes = (timeout & 0xFF).to_bytes(1, 'little') # timeout
streamname: bytes = name.encode('ascii') + bytes(16 - len(name))
framecounter: bytes = (0).to_bytes(4, 'little')
@property
def header(self):
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
header += self.framecounter
assert len(header) == HEADER_SIZE + 4, (
f'expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE + 4} bytes total)'
)
return header
@classmethod
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
header = cls(ident=nbs)
data = bytearray()
data.extend(header.vban)
data.extend(header.format_sr)
data.extend(header.format_nbs)
data.extend(header.format_nbc)
data.extend(header.format_bit)
data.extend(header.streamname)
data.extend(framecounter.to_bytes(4, 'little'))
return bytes(data)
@dataclass
class VbanRtPacketHeader:
"""Represents the header of a VBAN RT response packet"""
"""Represents the header of an RT response packet"""
name = 'Voicemeeter-RTP'
name: str = 'Voicemeeter-RTP'
vban: bytes = 'VBAN'.encode()
format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, 'little')
format_nbs: bytes = (0).to_bytes(1, 'little')
@ -251,21 +399,29 @@ class VbanRtPacketHeader:
format_bit: bytes = (0).to_bytes(1, 'little')
streamname: bytes = name.encode('ascii') + bytes(16 - len(name))
@property
def header(self):
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
assert len(header) == HEADER_SIZE, f'expected header size {HEADER_SIZE} bytes'
return header
@classmethod
def from_bytes(cls, data: bytes):
if len(data) < HEADER_SIZE:
raise ValueError('Data is too short to be a valid VbanRTPPacketHeader')
vban = data[0:4]
format_sr = data[4]
format_nbs = data[5]
format_nbc = data[6]
format_bit = data[7]
name = data[8:24].rstrip(b'\x00').decode('utf-8')
return cls(
name=name,
vban=vban,
format_sr=format_sr & VBAN_SERVICE_MASK,
format_nbs=format_nbs,
format_nbc=format_nbc,
format_bit=format_bit,
)
@dataclass
class RequestHeader:
"""Represents the header of a REQUEST RT PACKET"""
"""Represents the header of an RT request packet"""
name: str
bps_index: int
@ -287,16 +443,19 @@ class RequestHeader:
def streamname(self):
return self.name.encode() + bytes(16 - len(self.name))
@property
def header(self):
header = self.vban
header += self.sr
header += self.nbs
header += self.nbc
header += self.bit
header += self.streamname
header += self.framecounter
assert len(header) == HEADER_SIZE + 4, (
f'expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE + 4} bytes total)'
@classmethod
def to_bytes(
cls, name: str, bps_index: int, channel: int, framecounter: int
) -> bytes:
header = cls(
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
)
return header
data = bytearray()
data.extend(header.vban)
data.extend(header.sr)
data.extend(header.nbs)
data.extend(header.nbc)
data.extend(header.bit)
data.extend(header.streamname)
data.extend(header.framecounter.to_bytes(4, 'little'))
return bytes(data)

View File

@ -2,10 +2,22 @@ import logging
import socket
import threading
import time
from typing import Optional
from .enums import NBS
from .error import VBANCMDConnectionError
from .packet import HEADER_SIZE, SubscribeHeader, VbanRtPacket, VbanRtPacketHeader
from .packet import (
HEADER_SIZE,
VBAN_PROTOCOL_SERVICE,
VBAN_SERVICE_RTPACKET,
VMPARAMSTRIP_SIZE,
SubscribeHeader,
VbanRtPacket,
VbanRtPacketHeader,
VbanRtPacketNBS0,
VbanRtPacketNBS1,
VbanVMParamStrip,
)
from .util import bump_framecounter
logger = logging.getLogger(__name__)
@ -18,18 +30,21 @@ class Subscriber(threading.Thread):
self._remote = remote
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__)
self.packet = SubscribeHeader()
self._framecounter = 0
def run(self):
while not self.stopped():
try:
self._remote.sock.sendto(
self.packet.header,
(socket.gethostbyname(self._remote.ip), self._remote.port),
)
self.packet.framecounter = (
int.from_bytes(self.packet.framecounter, 'little') + 1
).to_bytes(4, 'little')
for nbs in NBS:
sub_packet = SubscribeHeader().to_bytes(nbs, self._framecounter)
self._remote.sock.sendto(
sub_packet, (self._remote.ip, self._remote.port)
)
self._framecounter = bump_framecounter(self._framecounter)
self.logger.debug(
f'sent subscription for NBS {nbs.name} to {self._remote.ip}:{self._remote.port}'
)
self.wait_until_stopped(10)
except socket.gaierror as e:
self.logger.exception(f'{type(e).__name__}: {e}')
@ -58,33 +73,47 @@ class Producer(threading.Thread):
self.queue = queue
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__)
self.packet_expected = VbanRtPacketHeader()
self._remote.sock.settimeout(self._remote.timeout)
self._remote._public_packet = self._get_rt()
self._remote._public_packets = [None] * (max(NBS) + 1)
_pp = self._get_rt()
self._remote._public_packets[_pp.nbs] = _pp
(
self._remote.cache['strip_level'],
self._remote.cache['bus_level'],
) = self._remote._get_levels(self._remote.public_packet)
) = self._remote._get_levels(self._remote.public_packets[NBS.zero])
def _get_rt(self) -> VbanRtPacket:
"""Attempt to fetch data packet until a valid one found"""
def fget():
data = None
while not data:
data = self._fetch_rt_packet()
return data
while True:
if resp := self._fetch_rt_packet():
return resp
return fget()
def _fetch_rt_packet(self) -> Optional[VbanRtPacket]:
def _fetch_rt_packet(self) -> VbanRtPacket | None:
try:
data, _ = self._remote.sock.recvfrom(2048)
# do we have packet data?
if len(data) > HEADER_SIZE:
# is the packet of type VBAN RT response?
if self.packet_expected.header == data[:HEADER_SIZE]:
return VbanRtPacket(
if len(data) < HEADER_SIZE:
return
response_header = VbanRtPacketHeader.from_bytes(data[:HEADER_SIZE])
if (
response_header.format_sr != VBAN_PROTOCOL_SERVICE
or response_header.format_nbc != VBAN_SERVICE_RTPACKET
):
return
match response_header.format_nbs:
case NBS.zero:
"""
self.logger.debug(
'Received NB0 RTP Packet from %s, Size: %d bytes',
addr,
len(data),
)
"""
return VbanRtPacketNBS0(
nbs=NBS.zero,
_kind=self._remote.kind,
_voicemeeterType=data[28:29],
_reserved=data[29:30],
@ -109,6 +138,36 @@ class Producer(threading.Thread):
_stripLabelUTF8c60=data[452:932],
_busLabelUTF8c60=data[932:1412],
)
case NBS.one:
"""
self.logger.debug(
'Received NB1 RTP Packet from %s, Size: %d bytes',
addr,
len(data),
)
"""
return VbanRtPacketNBS1(
nbs=NBS.one,
_kind=self._remote.kind,
_voicemeeterType=data[28:29],
_reserved=data[29:30],
_buffersize=data[30:32],
_voicemeeterVersion=data[32:36],
_optionBits=data[36:40],
_samplerate=data[40:44],
strips=tuple(
VbanVMParamStrip.from_bytes(
data[
44 + i * VMPARAMSTRIP_SIZE : 44
+ (i + 1) * VMPARAMSTRIP_SIZE
]
)
for i in range(self._remote.kind.num_strip)
),
)
return None
except TimeoutError as e:
self.logger.exception(f'{type(e).__name__}: {e}')
raise VBANCMDConnectionError(
@ -120,14 +179,20 @@ class Producer(threading.Thread):
def run(self):
while not self.stopped():
pdirty = ldirty = False
_pp = self._get_rt()
pdirty = _pp.pdirty(self._remote.public_packet)
ldirty = _pp.ldirty(
self._remote.cache['strip_level'], self._remote.cache['bus_level']
)
match _pp.nbs:
case NBS.zero:
ldirty = _pp.ldirty(
self._remote.cache['strip_level'],
self._remote.cache['bus_level'],
)
pdirty = _pp.pdirty(self._remote.public_packets[NBS.zero])
case NBS.one:
pdirty = True
if pdirty or ldirty:
self._remote._public_packet = _pp
self._remote._public_packets[_pp.nbs] = _pp
self._remote._pdirty = pdirty
self._remote._ldirty = ldirty
@ -166,15 +231,15 @@ class Updater(threading.Thread):
self._remote.subject.notify(event)
elif event == 'ldirty' and self._remote.ldirty:
self._remote._strip_comp, self._remote._bus_comp = (
self._remote._public_packet._strip_comp,
self._remote._public_packet._bus_comp,
self._remote._public_packets[NBS.zero]._strip_comp,
self._remote._public_packets[NBS.zero]._bus_comp,
)
(
self._remote.cache['strip_level'],
self._remote.cache['bus_level'],
) = (
self._remote._public_packet.inputlevels,
self._remote._public_packet.outputlevels,
self._remote._public_packets[NBS.zero].inputlevels,
self._remote._public_packets[NBS.zero].outputlevels,
)
self._remote.subject.notify(event)
self.logger.debug(f'terminating {self.name} thread')