Compare commits

...

4 Commits

Author SHA1 Message Date
ee32f92914 add missing constants
add docstrings that describes data breakdown

move SubscribeHeader above  VbanRtPacketHeader

expand assert failure string
2023-08-04 23:06:51 +01:00
3b65035e50 add double click event for slider 2023-08-04 21:14:33 +01:00
c8b4bde49d patch bump 2023-08-04 16:33:48 +01:00
47e9203b1e use walrus 2023-08-04 16:21:57 +01:00
4 changed files with 83 additions and 70 deletions

View File

@ -8,6 +8,8 @@ from tkinter import ttk
class App(tk.Tk):
INDEX = 3
def __init__(self, vban):
super().__init__()
self.vban = vban
@ -15,8 +17,8 @@ class App(tk.Tk):
self.vban.observer.add(self.on_ldirty)
# create widget variables
self.button_var = tk.BooleanVar(value=vban.strip[3].mute)
self.slider_var = tk.DoubleVar(value=vban.strip[3].gain)
self.button_var = tk.BooleanVar(value=vban.strip[self.INDEX].mute)
self.slider_var = tk.DoubleVar(value=vban.strip[self.INDEX].gain)
self.meter_var = tk.DoubleVar(value=self._get_level())
self.gainlabel_var = tk.StringVar(value=self.slider_var.get())
@ -24,11 +26,12 @@ class App(tk.Tk):
self.style = ttk.Style()
self.style.theme_use("clam")
self.style.configure(
"Mute.TButton", foreground="#cd5c5c" if vban.strip[3].mute else "#5a5a5a"
"Mute.TButton",
foreground="#cd5c5c" if vban.strip[self.INDEX].mute else "#5a5a5a",
)
# create labelframe and grid it onto the mainframe
self.labelframe = tk.LabelFrame(text=self.vban.strip[3].label)
self.labelframe = tk.LabelFrame(text=self.vban.strip[self.INDEX].label)
self.labelframe.grid(padx=1)
# create slider and grid it onto the labelframe
@ -44,6 +47,7 @@ class App(tk.Tk):
column=0,
row=0,
)
slider.bind("<Double-Button-1>", self.on_button_double_click)
# create level meter and grid it onto the labelframe
level_meter = ttk.Progressbar(
@ -72,18 +76,23 @@ class App(tk.Tk):
def on_slider_move(self, *args):
val = round(self.slider_var.get(), 1)
self.vban.strip[3].gain = val
self.vban.strip[self.INDEX].gain = val
self.gainlabel_var.set(val)
def on_button_press(self):
self.button_var.set(not self.button_var.get())
self.vban.strip[3].mute = self.button_var.get()
self.vban.strip[self.INDEX].mute = self.button_var.get()
self.style.configure(
"Mute.TButton", foreground="#cd5c5c" if self.button_var.get() else "#5a5a5a"
)
def on_button_double_click(self, e):
self.slider_var.set(0)
self.gainlabel_var.set(0)
self.vban.strip[self.INDEX].gain = 0
def _get_level(self):
val = max(self.vban.strip[3].levels.prefader)
val = max(self.vban.strip[self.INDEX].levels.prefader)
return 0 if self.button_var.get() else 72 + val - 12 + self.slider_var.get()
def on_ldirty(self):

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "vban-cmd"
version = "2.4.1"
version = "2.4.2"
description = "Python interface for the VBAN RT Packet Service (Sendtext)"
authors = ["onyx-and-iris <code@onyxandiris.online>"]
license = "MIT"

View File

@ -3,10 +3,14 @@ from dataclasses import dataclass
from .kinds import KindMapClass
from .util import comp
VBAN_PROTOCOL_TXT = 0x40
VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 + 4
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
@dataclass
@ -14,28 +18,28 @@ class VbanRtPacket:
"""Represents the body of a VBAN RT data packet"""
_kind: KindMapClass
_voicemeeterType: bytes
_reserved: bytes
_buffersize: bytes
_voicemeeterVersion: bytes
_optionBits: bytes
_samplerate: bytes
_inputLeveldB100: bytes
_outputLeveldB100: bytes
_TransportBit: bytes
_stripState: bytes
_busState: bytes
_stripGaindB100Layer1: bytes
_stripGaindB100Layer2: bytes
_stripGaindB100Layer3: bytes
_stripGaindB100Layer4: bytes
_stripGaindB100Layer5: bytes
_stripGaindB100Layer6: bytes
_stripGaindB100Layer7: bytes
_stripGaindB100Layer8: bytes
_busGaindB100: bytes
_stripLabelUTF8c60: bytes
_busLabelUTF8c60: bytes
_voicemeeterType: bytes # data[28:29]
_reserved: bytes # data[29:30]
_buffersize: bytes # data[30:32]
_voicemeeterVersion: bytes # data[32:36]
_optionBits: bytes # data[36:40]
_samplerate: bytes # data[40:44]
_inputLeveldB100: bytes # data[44:112]
_outputLeveldB100: bytes # data[112:240]
_TransportBit: bytes # data[240:244]
_stripState: bytes # data[244:276]
_busState: bytes # data[276:308]
_stripGaindB100Layer1: bytes # data[308:324]
_stripGaindB100Layer2: bytes # data[324:340]
_stripGaindB100Layer3: bytes # data[340:356]
_stripGaindB100Layer4: bytes # data[356:372]
_stripGaindB100Layer5: bytes # data[372:388]
_stripGaindB100Layer6: bytes # data[388:404]
_stripGaindB100Layer7: bytes # data[404:420]
_stripGaindB100Layer8: bytes # data[420:436]
_busGaindB100: bytes # data[436:452]
_stripLabelUTF8c60: bytes # data[452:932]
_busLabelUTF8c60: bytes # data[932:1412]
def _generate_levels(self, levelarray) -> tuple:
return tuple(
@ -206,13 +210,42 @@ class VbanRtPacket:
)
@dataclass
class SubscribeHeader:
"""Represents the header an RT Packet Service subscription packet"""
name = "Register RTP"
timeout = 15
vban: bytes = "VBAN".encode()
format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, "little")
format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, "little") # timeout
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
framecounter: bytes = (0).to_bytes(4, "little")
@property
def header(self):
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
header += self.framecounter
assert (
len(header) == HEADER_SIZE + 4
), f"expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE +4} bytes total)"
return header
@dataclass
class VbanRtPacketHeader:
"""Represents the header of VBAN RT data packet"""
"""Represents the header of a VBAN RT response packet"""
name = "Voicemeeter-RTP"
vban: bytes = "VBAN".encode()
format_sr: bytes = (0x60).to_bytes(1, "little")
format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKET).to_bytes(1, "little")
format_bit: bytes = (0).to_bytes(1, "little")
@ -226,13 +259,13 @@ class VbanRtPacketHeader:
header += self.format_nbc
header += self.format_bit
header += self.streamname
assert len(header) == HEADER_SIZE - 4, f"Header expected {HEADER_SIZE-4} bytes"
assert len(header) == HEADER_SIZE, f"expected header size {HEADER_SIZE} bytes"
return header
@dataclass
class RequestHeader:
"""Represents a REQUEST RT PACKET header"""
"""Represents the header of an REQUEST RT PACKET"""
name: str
bps_index: int
@ -244,7 +277,7 @@ class RequestHeader:
@property
def sr(self):
return (0x40 + self.bps_index).to_bytes(1, "little")
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little")
@property
def nbc(self):
@ -263,32 +296,7 @@ class RequestHeader:
header += self.bit
header += self.streamname
header += self.framecounter
assert len(header) == HEADER_SIZE, f"Header expected {HEADER_SIZE} bytes"
return header
@dataclass
class SubscribeHeader:
"""Represents a packet used to subscribe to the RT Packet Service"""
name = "Register RTP"
timeout = 15
vban: bytes = "VBAN".encode()
format_sr: bytes = (0x60).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, "little")
format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, "little") # timeout
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
framecounter: bytes = (0).to_bytes(4, "little")
@property
def header(self):
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
header += self.framecounter
assert len(header) == HEADER_SIZE, f"Header expected {HEADER_SIZE} bytes"
assert (
len(header) == HEADER_SIZE + 4
), f"expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE +4} bytes total)"
return header

View File

@ -150,12 +150,7 @@ class Updater(threading.Thread):
Generate _strip_comp, _bus_comp and update level cache if ldirty.
"""
while True:
event = self.queue.get()
if event is None:
self.logger.debug(f"terminating {self.name} thread")
break
while event := self.queue.get():
if event == "pdirty" and self._remote.pdirty:
self._remote.subject.notify(event)
elif event == "ldirty" and self._remote.ldirty:
@ -171,3 +166,4 @@ class Updater(threading.Thread):
self._remote._public_packet.outputlevels,
)
self._remote.subject.notify(event)
self.logger.debug(f"terminating {self.name} thread")