revert move data slices

strip_leves, bus_levels properties added to VbanRtPacket
This commit is contained in:
onyx-and-iris 2023-06-25 16:15:32 +01:00
parent 54041503c9
commit 0970bfe0b5
2 changed files with 61 additions and 31 deletions

View File

@ -1,5 +1,6 @@
from dataclasses import dataclass from dataclasses import dataclass
from .kinds import KindMapClass
from .util import comp from .util import comp
VBAN_SERVICE_RTPACKETREGISTER = 32 VBAN_SERVICE_RTPACKETREGISTER = 32
@ -12,32 +13,29 @@ HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 + 4
class VbanRtPacket: class VbanRtPacket:
"""Represents the body of a VBAN RT data packet""" """Represents the body of a VBAN RT data packet"""
def __init__(self, kind=None, data=None): _kind: KindMapClass
self._kind = kind _voicemeeterType: bytes
self._voicemeeterType = data[28:29] _reserved: bytes
self._reserved = data[29:30] _buffersize: bytes
self._buffersize = data[30:32] _voicemeeterVersion: bytes
self._voicemeeterVersion = data[32:36] _optionBits: bytes
self._optionBits = data[36:40] _samplerate: bytes
self._samplerate = data[40:44] _inputLeveldB100: bytes
self._inputLeveldB100 = data[44:112] _outputLeveldB100: bytes
self._outputLeveldB100 = data[112:240] _TransportBit: bytes
self._TransportBit = data[240:244] _stripState: bytes
self._stripState = data[244:276] _busState: bytes
self._busState = data[276:308] _stripGaindB100Layer1: bytes
self._stripGaindB100Layer1 = data[308:324] _stripGaindB100Layer2: bytes
self._stripGaindB100Layer2 = data[324:340] _stripGaindB100Layer3: bytes
self._stripGaindB100Layer3 = data[340:356] _stripGaindB100Layer4: bytes
self._stripGaindB100Layer4 = data[356:372] _stripGaindB100Layer5: bytes
self._stripGaindB100Layer5 = data[372:388] _stripGaindB100Layer6: bytes
self._stripGaindB100Layer6 = data[388:404] _stripGaindB100Layer7: bytes
self._stripGaindB100Layer7 = data[404:420] _stripGaindB100Layer8: bytes
self._stripGaindB100Layer8 = data[420:436] _busGaindB100: bytes
self._busGaindB100 = data[436:452] _stripLabelUTF8c60: bytes
self._stripLabelUTF8c60 = data[452:932] _busLabelUTF8c60: bytes
self._busLabelUTF8c60 = data[932:1412]
self._strip_level = self._generate_levels(self._inputLeveldB100)
self._bus_level = self._generate_levels(self._outputLeveldB100)
def _generate_levels(self, levelarray) -> tuple: def _generate_levels(self, levelarray) -> tuple:
return tuple( return tuple(
@ -45,6 +43,14 @@ class VbanRtPacket:
for i in range(0, len(levelarray), 2) for i in range(0, len(levelarray), 2)
) )
@property
def strip_levels(self):
return self._generate_levels(self._inputLeveldB100)
@property
def bus_levels(self):
return self._generate_levels(self._outputLeveldB100)
def pdirty(self, other) -> bool: def pdirty(self, other) -> bool:
"""True iff any defined parameter has changed""" """True iff any defined parameter has changed"""
@ -66,8 +72,8 @@ class VbanRtPacket:
def ldirty(self, strip_cache, bus_cache) -> bool: def ldirty(self, strip_cache, bus_cache) -> bool:
self._strip_comp, self._bus_comp = ( self._strip_comp, self._bus_comp = (
tuple(not val for val in comp(strip_cache, self._strip_level)), tuple(not val for val in comp(strip_cache, self.strip_levels)),
tuple(not val for val in comp(bus_cache, self._bus_level)), tuple(not val for val in comp(bus_cache, self.bus_levels)),
) )
return any(any(l) for l in (self._strip_comp, self._bus_comp)) return any(any(l) for l in (self._strip_comp, self._bus_comp))
@ -97,12 +103,12 @@ class VbanRtPacket:
@property @property
def inputlevels(self) -> tuple: def inputlevels(self) -> tuple:
"""returns the entire level array across all inputs for a kind""" """returns the entire level array across all inputs for a kind"""
return self._strip_level[0 : (2 * self._kind.phys_in + 8 * self._kind.virt_in)] return self.strip_levels[0 : (2 * self._kind.phys_in + 8 * self._kind.virt_in)]
@property @property
def outputlevels(self) -> tuple: def outputlevels(self) -> tuple:
"""returns the entire level array across all outputs for a kind""" """returns the entire level array across all outputs for a kind"""
return self._bus_level[0 : 8 * self._kind.num_bus] return self.bus_levels[0 : 8 * self._kind.num_bus]
@property @property
def stripstate(self) -> tuple: def stripstate(self) -> tuple:

View File

@ -72,7 +72,31 @@ class Producer(threading.Thread):
if len(data) > HEADER_SIZE: if len(data) > HEADER_SIZE:
# check if packet is of type rt packet response # check if packet is of type rt packet response
if self.packet_expected.header == data[: HEADER_SIZE - 4]: if self.packet_expected.header == data[: HEADER_SIZE - 4]:
return VbanRtPacket(kind=self._remote.kind, data=data) return VbanRtPacket(
_kind=self._remote.kind,
_voicemeeterType=data[28:29],
_reserved=data[29:30],
_buffersize=data[30:32],
_voicemeeterVersion=data[32:36],
_optionBits=data[36:40],
_samplerate=data[40:44],
_inputLeveldB100=data[44:112],
_outputLeveldB100=data[112:240],
_TransportBit=data[240:244],
_stripState=data[244:276],
_busState=data[276:308],
_stripGaindB100Layer1=data[308:324],
_stripGaindB100Layer2=data[324:340],
_stripGaindB100Layer3=data[340:356],
_stripGaindB100Layer4=data[356:372],
_stripGaindB100Layer5=data[372:388],
_stripGaindB100Layer6=data[388:404],
_stripGaindB100Layer7=data[404:420],
_stripGaindB100Layer8=data[420:436],
_busGaindB100=data[436:452],
_stripLabelUTF8c60=data[452:932],
_busLabelUTF8c60=data[932:1412],
)
except TimeoutError as e: except TimeoutError as e:
self.logger.exception(f"{type(e).__name__}: {e}") self.logger.exception(f"{type(e).__name__}: {e}")
raise VBANCMDConnectionError( raise VBANCMDConnectionError(