fix regression causing pdirty update to fail.

patch bump
This commit is contained in:
onyx-and-iris 2022-09-03 20:35:37 +01:00
parent 4ef3d1f225
commit aadfbd3925
3 changed files with 64 additions and 32 deletions

View File

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "vban-cmd" name = "vban-cmd"
version = "1.4.0" version = "1.4.1"
description = "Python interface for the VBAN RT Packet Service (Sendtext)" description = "Python interface for the VBAN RT Packet Service (Sendtext)"
authors = ["onyx-and-iris <code@onyxandiris.online>"] authors = ["onyx-and-iris <code@onyxandiris.online>"]
license = "MIT" license = "MIT"

View File

@ -11,29 +11,28 @@ HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 + 4
class VbanRtPacket: class VbanRtPacket:
"""Represents the body of a VBAN RT data packet""" """Represents the body of a VBAN RT data packet"""
def __init__(self, data): _voicemeeterType: bytes
self._voicemeeterType: bytes = data[28:29] _reserved: bytes
self._reserved: bytes = data[29:30] _buffersize: bytes
self._buffersize: bytes = data[30:32] _voicemeeterVersion: bytes
self._voicemeeterVersion: bytes = data[32:36] _optionBits: bytes
self._optionBits: bytes = data[36:40] _samplerate: bytes
self._samplerate: bytes = data[40:44] _inputLeveldB100: bytes
self._inputLeveldB100: bytes = data[44:112] _outputLeveldB100: bytes
self._outputLeveldB100: bytes = data[112:240] _TransportBit: bytes
self._TransportBit: bytes = data[240:244] _stripState: bytes
self._stripState: bytes = data[244:276] _busState: bytes
self._busState: bytes = data[276:308] _stripGaindB100Layer1: bytes
self._stripGaindB100Layer1: bytes = data[308:324] _stripGaindB100Layer2: bytes
self._stripGaindB100Layer2: bytes = data[324:340] _stripGaindB100Layer3: bytes
self._stripGaindB100Layer3: bytes = data[340:356] _stripGaindB100Layer4: bytes
self._stripGaindB100Layer4: bytes = data[356:372] _stripGaindB100Layer5: bytes
self._stripGaindB100Layer5: bytes = data[372:388] _stripGaindB100Layer6: bytes
self._stripGaindB100Layer6: bytes = data[388:404] _stripGaindB100Layer7: bytes
self._stripGaindB100Layer7: bytes = data[404:420] _stripGaindB100Layer8: bytes
self._stripGaindB100Layer8: bytes = data[420:436] _busGaindB100: bytes
self._busGaindB100: bytes = data[436:452] _stripLabelUTF8c60: bytes
self._stripLabelUTF8c60: bytes = data[452:932] _busLabelUTF8c60: bytes
self._busLabelUTF8c60: bytes = data[932:1412]
def pdirty(self, other): def pdirty(self, other):
"""True iff any defined parameter has changed""" """True iff any defined parameter has changed"""

View File

@ -1,6 +1,7 @@
import socket import socket
import threading import threading
import time import time
from typing import Optional
from .packet import HEADER_SIZE, SubscribeHeader, VbanRtPacket, VbanRtPacketHeader from .packet import HEADER_SIZE, SubscribeHeader, VbanRtPacket, VbanRtPacketHeader
from .util import Socket from .util import Socket
@ -46,17 +47,48 @@ class Updater(threading.Thread):
self.packet_expected = VbanRtPacketHeader() self.packet_expected = VbanRtPacketHeader()
self._remote._public_packet = self._get_rt() self._remote._public_packet = self._get_rt()
def _fetch_rt_packet(self) -> Optional[VbanRtPacket]:
data, _ = self._remote.socks[Socket.response].recvfrom(2048)
# check for packet data
if len(data) > HEADER_SIZE:
# check if packet is of type rt packet response
if self.packet_expected.header == data[: HEADER_SIZE - 4]:
return VbanRtPacket(
_voicemeeterType=data[28:29],
_reserved=data[29:30],
_buffersize=data[30:32],
_voicemeeterVersion=data[32:36],
_optionBits=data[36:40],
_samplerate=data[40:44],
_inputLeveldB100=data[44:112],
_outputLeveldB100=data[112:240],
_TransportBit=data[240:244],
_stripState=data[244:276],
_busState=data[276:308],
_stripGaindB100Layer1=data[308:324],
_stripGaindB100Layer2=data[324:340],
_stripGaindB100Layer3=data[340:356],
_stripGaindB100Layer4=data[356:372],
_stripGaindB100Layer5=data[372:388],
_stripGaindB100Layer6=data[388:404],
_stripGaindB100Layer7=data[404:420],
_stripGaindB100Layer8=data[420:436],
_busGaindB100=data[436:452],
_stripLabelUTF8c60=data[452:932],
_busLabelUTF8c60=data[932:1412],
)
def _get_rt(self) -> VbanRtPacket: def _get_rt(self) -> VbanRtPacket:
"""Attempt to fetch data packet until a valid one found""" """Attempt to fetch data packet until a valid one found"""
while True: def fget():
data, _ = self._remote.socks[Socket.response].recvfrom(2048) data = False
# check for packet data while not data:
if len(data) > HEADER_SIZE: data = self._fetch_rt_packet()
# check if packet is of type rt packet response time.sleep(self._remote.DELAY)
if self.packet_expected.header == data[: HEADER_SIZE - 4]: return data
return VbanRtPacket(data)
time.sleep(self._remote.DELAY) return fget()
def update(self): def update(self):
print(f"Listening for {', '.join(self._remote.event.get())} events") print(f"Listening for {', '.join(self._remote.event.get())} events")
@ -72,6 +104,7 @@ class Updater(threading.Thread):
_pp _pp
) )
self._remote._pdirty = _pp.pdirty(self._remote.public_packet) self._remote._pdirty = _pp.pdirty(self._remote.public_packet)
print(self._remote.pdirty)
if self._remote.event.ldirty and self._remote.ldirty: if self._remote.event.ldirty and self._remote.ldirty:
self._remote.cache["strip_level"] = self._remote._strip_buf self._remote.cache["strip_level"] = self._remote._strip_buf