7 Commits

Author SHA1 Message Date
5f7b62a0e0 add missing cached_property 2026-03-07 14:29:32 +00:00
d1bcbfed6f minor bump 2026-03-07 14:24:44 +00:00
ab80bbf226 use host kwarg/env var in examples 2026-03-07 14:23:29 +00:00
ad58852a77 improve efficiency with cached properties and struct.unpack 2026-03-07 14:22:25 +00:00
5363584940 improve to_bytes efficiency with struct.pack 2026-03-07 14:20:31 +00:00
9f43ee18d3 add more enums so we can remove some of the constants
rename some of the packet classes

patch bump
2026-03-07 00:03:46 +00:00
3cde874a3c remove unnecessary assignment 2026-03-03 20:03:09 +00:00
11 changed files with 316 additions and 318 deletions

View File

@@ -103,7 +103,7 @@ class App(tk.Tk):
def main(): def main():
KIND_ID = 'banana' KIND_ID = 'banana'
conn = { conn = {
'ip': os.environ.get('VBANCMD_IP', 'localhost'), 'host': os.environ.get('VBANCMD_HOST', 'localhost'),
'port': int(os.environ.get('VBANCMD_PORT', 6980)), 'port': int(os.environ.get('VBANCMD_PORT', 6980)),
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'), 'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
} }

View File

@@ -94,7 +94,7 @@ class Observer:
def main(): def main():
KIND_ID = 'potato' KIND_ID = 'potato'
conn = { conn = {
'ip': os.environ.get('VBANCMD_IP', 'localhost'), 'host': os.environ.get('VBANCMD_HOST', 'localhost'),
'port': int(os.environ.get('VBANCMD_PORT', 6980)), 'port': int(os.environ.get('VBANCMD_PORT', 6980)),
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'), 'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
} }

View File

@@ -25,7 +25,7 @@ class App:
def main(): def main():
KIND_ID = 'banana' KIND_ID = 'banana'
conn = { conn = {
'ip': os.environ.get('VBANCMD_IP', 'localhost'), 'host': os.environ.get('VBANCMD_HOST', 'localhost'),
'port': int(os.environ.get('VBANCMD_PORT', 6980)), 'port': int(os.environ.get('VBANCMD_PORT', 6980)),
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'), 'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
} }

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "vban-cmd" name = "vban-cmd"
version = "2.9.6" version = "2.10.0"
description = "Python interface for the VBAN RT Packet Service (Sendtext)" description = "Python interface for the VBAN RT Packet Service (Sendtext)"
authors = [{ name = "Onyx and Iris", email = "code@onyxandiris.online" }] authors = [{ name = "Onyx and Iris", email = "code@onyxandiris.online" }]
license = { text = "MIT" } license = { text = "MIT" }

View File

@@ -1,6 +1,36 @@
from enum import Flag from enum import Flag
class SubProtocols(Flag):
"""Sub Protocols - Bit flags that can be combined"""
AUDIO = 0x00
SERIAL = 0x20
TEXT = 0x40
SERVICE = 0x60
MASK = 0xE0
class ServiceTypes(Flag):
"""Service Types - Bit flags that can be combined"""
PING = 0
PONG = 0
CHATUTF8 = 1
RTPACKETREGISTER = 32
RTPACKET = 33
REQUESTREPLY = 0x02 # A Matrix reply
FNCT_REPLY = 0x80 # An RTPacket reply
class StreamTypes(Flag):
"""Stream Types - Bit flags that can be combined"""
ASCII = 0x00
UTF8 = 0x10
WCHAR = 0x20
class ChannelModes(Flag): class ChannelModes(Flag):
"""Channel Modes - Bit flags that can be combined""" """Channel Modes - Bit flags that can be combined"""

View File

@@ -1,29 +1,106 @@
import struct
from dataclasses import dataclass from dataclasses import dataclass
from vban_cmd.enums import NBS from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass from vban_cmd.kinds import KindMapClass
VBAN_PROTOCOL_TXT = 0x40 from .enums import ServiceTypes, StreamTypes, SubProtocols
VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
VBAN_SERVICE_PING = 0
VBAN_SERVICE_PONG = 0 # PONG uses same service type as PING
VBAN_SERVICE_MASK = 0xE0
VBAN_PROTOCOL_MASK = 0xE0
VBAN_SERVICE_REQUESTREPLY = 0x02
VBAN_SERVICE_FNCT_REPLY = 0x02
PINGPONG_PACKET_SIZE = 704 # Size of the PING/PONG header + payload in bytes PINGPONG_PACKET_SIZE = 704 # Size of the PING/PONG header + payload in bytes
MAX_PACKET_SIZE = 1436 MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
@dataclass @dataclass
class VbanPacket: class VbanPingHeader:
"""Represents the header of an incoming VBAN data packet""" """Represents the header of a PING packet"""
name: str = 'PING0'
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0
format_nbc: int = ServiceTypes.PING.value
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def to_bytes(cls, framecounter: int = 0) -> bytes:
"""Creates the PING header bytes only."""
header = cls(framecounter=framecounter)
return struct.pack(
'<4s4B16sI',
header.vban,
header.format_sr,
header.format_nbs,
header.format_nbc,
header.format_bit,
header.streamname,
header.framecounter,
)
@dataclass
class VbanPongHeader:
"""Represents the header of a PONG response packet"""
name: str = 'PING0'
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0
format_nbc: int = ServiceTypes.PONG.value
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a PONG response packet from bytes."""
parsed = _parse_vban_service_header(data)
# PONG responses use the same service type as PING (0x00)
# and are identified by having payload data
if parsed['format_nbc'] != ServiceTypes.PONG.value:
raise ValueError(f'Not a PONG response packet: {parsed["format_nbc"]:02x}')
return cls(**parsed)
@classmethod
def is_pong_response(cls, data: bytes) -> bool:
"""Check if packet is a PONG response by analyzing the actual response format."""
try:
parsed = _parse_vban_service_header(data)
# Validate this is a service protocol packet with PING/PONG service type
if parsed['format_nbc'] != ServiceTypes.PONG.value:
return False
if parsed['name'] not in ['PING0', 'VBAN Service']:
return False
# PONG should have payload data (same size as PING)
return len(data) >= PINGPONG_PACKET_SIZE
except (ValueError, Exception):
return False
@dataclass
class VbanRTPacket:
"""Represents the header of an incoming RTPacket"""
nbs: NBS nbs: NBS
_kind: KindMapClass _kind: KindMapClass
@@ -53,8 +130,8 @@ class VbanPacket:
@dataclass @dataclass
class VbanSubscribeHeader: class VbanRTSubscribeHeader:
"""Represents the header of a subscription packet""" """Represents the header of an RT subscription packet"""
nbs: NBS = NBS.zero nbs: NBS = NBS.zero
name: str = 'Register-RTP' name: str = 'Register-RTP'
@@ -66,7 +143,7 @@ class VbanSubscribeHeader:
@property @property
def format_sr(self) -> bytes: def format_sr(self) -> bytes:
return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little') return SubProtocols.SERVICE.value.to_bytes(1, 'little')
@property @property
def format_nbs(self) -> bytes: def format_nbs(self) -> bytes:
@@ -74,7 +151,7 @@ class VbanSubscribeHeader:
@property @property
def format_nbc(self) -> bytes: def format_nbc(self) -> bytes:
return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little') return ServiceTypes.RTPACKETREGISTER.value.to_bytes(1, 'little')
@property @property
def format_bit(self) -> bytes: def format_bit(self) -> bytes:
@@ -88,15 +165,76 @@ class VbanSubscribeHeader:
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes: def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
header = cls(nbs=nbs) header = cls(nbs=nbs)
data = bytearray() return struct.pack(
data.extend(header.vban) '<4s4B16sI',
data.extend(header.format_sr) header.vban,
data.extend(header.format_nbs) header.format_sr[0],
data.extend(header.format_nbc) header.format_nbs[0],
data.extend(header.format_bit) header.format_nbc[0],
data.extend(header.streamname) header.format_bit[0],
data.extend(framecounter.to_bytes(4, 'little')) header.streamname,
return bytes(data) framecounter,
)
@dataclass
class VbanRTRequestHeader:
"""Represents the header of an RT request packet"""
name: str
bps_index: int
channel: int
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def sr(self) -> bytes:
return (self.bps_index | SubProtocols.TEXT.value).to_bytes(1, 'little')
@property
def nbs(self) -> bytes:
return (0).to_bytes(1, 'little')
@property
def nbc(self) -> bytes:
return (self.channel).to_bytes(1, 'little')
@property
def bit(self) -> bytes:
return (StreamTypes.UTF8.value).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode()[:16].ljust(16, b'\x00')
@classmethod
def to_bytes(
cls, name: str, bps_index: int, channel: int, framecounter: int
) -> bytes:
header = cls(
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
)
return struct.pack(
'<4s4B16sI',
header.vban,
header.sr[0],
header.nbs[0],
header.nbc[0],
header.bit[0],
header.streamname,
header.framecounter,
)
@classmethod
def encode_with_payload(
cls, name: str, bps_index: int, channel: int, framecounter: int, payload: str
) -> bytes:
"""Creates the complete packet with header and payload."""
return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode()
def _parse_vban_service_header(data: bytes) -> dict: def _parse_vban_service_header(data: bytes) -> dict:
@@ -113,8 +251,8 @@ def _parse_vban_service_header(data: bytes) -> dict:
format_bit = data[7] format_bit = data[7]
# Verify this is a service protocol packet # Verify this is a service protocol packet
protocol = format_sr & VBAN_PROTOCOL_MASK protocol = format_sr & SubProtocols.MASK.value
if protocol != VBAN_PROTOCOL_SERVICE: if protocol != SubProtocols.SERVICE.value:
raise ValueError(f'Not a service protocol packet: {protocol:02x}') raise ValueError(f'Not a service protocol packet: {protocol:02x}')
# Extract stream name and frame counter # Extract stream name and frame counter
@@ -132,13 +270,13 @@ def _parse_vban_service_header(data: bytes) -> dict:
@dataclass @dataclass
class VbanResponseHeader: class VbanRTResponseHeader:
"""Represents the header of a response packet""" """Represents the header of an RT response packet"""
name: str = 'Voicemeeter-RTP' name: str = 'Voicemeeter-RTP'
format_sr: int = VBAN_PROTOCOL_SERVICE format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0 format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_RTPACKET format_nbc: int = ServiceTypes.RTPACKET.value
format_bit: int = 0 format_bit: int = 0
framecounter: int = 0 framecounter: int = 0
@@ -156,7 +294,7 @@ class VbanResponseHeader:
parsed = _parse_vban_service_header(data) parsed = _parse_vban_service_header(data)
# Validate this is an RTPacket response # Validate this is an RTPacket response
if parsed['format_nbc'] != VBAN_SERVICE_RTPACKET: if parsed['format_nbc'] != ServiceTypes.RTPACKET.value:
raise ValueError( raise ValueError(
f'Not an RTPacket response packet: {parsed["format_nbc"]:02x}' f'Not an RTPacket response packet: {parsed["format_nbc"]:02x}'
) )
@@ -169,9 +307,9 @@ class VbanMatrixResponseHeader:
"""Represents the header of a matrix response packet""" """Represents the header of a matrix response packet"""
name: str = 'Request Reply' name: str = 'Request Reply'
format_sr: int = VBAN_PROTOCOL_SERVICE format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = VBAN_SERVICE_FNCT_REPLY format_nbs: int = ServiceTypes.FNCT_REPLY.value
format_nbc: int = VBAN_SERVICE_REQUESTREPLY format_nbc: int = ServiceTypes.REQUESTREPLY.value
format_bit: int = 0 format_bit: int = 0
framecounter: int = 0 framecounter: int = 0
@@ -189,7 +327,7 @@ class VbanMatrixResponseHeader:
parsed = _parse_vban_service_header(data) parsed = _parse_vban_service_header(data)
# Validate this is a service reply packet # Validate this is a service reply packet
if parsed['format_nbs'] != VBAN_SERVICE_FNCT_REPLY: if parsed['format_nbs'] != ServiceTypes.FNCT_REPLY.value:
raise ValueError(f'Not a service reply packet: {parsed["format_nbs"]:02x}') raise ValueError(f'Not a service reply packet: {parsed["format_nbs"]:02x}')
return cls(**parsed) return cls(**parsed)
@@ -209,148 +347,3 @@ class VbanMatrixResponseHeader:
header = cls.from_bytes(data) header = cls.from_bytes(data)
payload = cls.extract_payload(data) payload = cls.extract_payload(data)
return header, payload return header, payload
@dataclass
class VbanPingHeader:
"""Represents the header of a PING packet"""
name: str = 'PING0'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_PING
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def to_bytes(cls, framecounter: int = 0) -> bytes:
"""Creates the PING header bytes only."""
header = cls(framecounter=framecounter)
data = bytearray()
data.extend(header.vban)
data.extend(header.format_sr.to_bytes(1, 'little'))
data.extend(header.format_nbs.to_bytes(1, 'little'))
data.extend(header.format_nbc.to_bytes(1, 'little'))
data.extend(header.format_bit.to_bytes(1, 'little'))
data.extend(header.streamname)
data.extend(header.framecounter.to_bytes(4, 'little'))
return bytes(data)
@dataclass
class VbanPongHeader:
"""Represents the header of a PONG response packet"""
name: str = 'PING0'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_PONG
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a PONG response packet from bytes."""
parsed = _parse_vban_service_header(data)
# PONG responses use the same service type as PING (0x00)
# and are identified by having payload data
if parsed['format_nbc'] != VBAN_SERVICE_PONG:
raise ValueError(f'Not a PONG response packet: {parsed["format_nbc"]:02x}')
return cls(**parsed)
@classmethod
def is_pong_response(cls, data: bytes) -> bool:
"""Check if packet is a PONG response by analyzing the actual response format."""
try:
parsed = _parse_vban_service_header(data)
# Validate this is a service protocol packet with PING/PONG service type
if parsed['format_nbc'] != VBAN_SERVICE_PONG:
return False
if parsed['name'] not in ['PING0', 'VBAN Service']:
return False
# PONG should have payload data (same size as PING)
return len(data) >= PINGPONG_PACKET_SIZE
except (ValueError, Exception):
return False
@dataclass
class VbanRequestHeader:
"""Represents the header of a request packet"""
name: str
bps_index: int
channel: int
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def sr(self) -> bytes:
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little')
@property
def nbs(self) -> bytes:
return (0).to_bytes(1, 'little')
@property
def nbc(self) -> bytes:
return (self.channel).to_bytes(1, 'little')
@property
def bit(self) -> bytes:
return (0x10).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode()[:16].ljust(16, b'\x00')
@classmethod
def to_bytes(
cls, name: str, bps_index: int, channel: int, framecounter: int
) -> bytes:
header = cls(
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
)
data = bytearray()
data.extend(header.vban)
data.extend(header.sr)
data.extend(header.nbs)
data.extend(header.nbc)
data.extend(header.bit)
data.extend(header.streamname)
data.extend(header.framecounter.to_bytes(4, 'little'))
return bytes(data)
@classmethod
def encode_with_payload(
cls, name: str, bps_index: int, channel: int, framecounter: int, payload: str
) -> bytes:
"""Creates the complete packet with header and payload."""
return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode()

View File

@@ -1,4 +1,6 @@
import struct
from dataclasses import dataclass from dataclasses import dataclass
from functools import cached_property
from typing import NamedTuple from typing import NamedTuple
from vban_cmd.enums import NBS from vban_cmd.enums import NBS
@@ -6,7 +8,7 @@ from vban_cmd.kinds import KindMapClass
from vban_cmd.util import comp from vban_cmd.util import comp
from .enums import ChannelModes from .enums import ChannelModes
from .headers import VbanPacket from .headers import VbanRTPacket
class Levels(NamedTuple): class Levels(NamedTuple):
@@ -21,6 +23,13 @@ class ChannelState:
# Convert 4-byte state to integer once for efficient lookups # Convert 4-byte state to integer once for efficient lookups
self._state = int.from_bytes(state_bytes, 'little') self._state = int.from_bytes(state_bytes, 'little')
@classmethod
def from_int(cls, state_int: int):
"""Create ChannelState directly from integer for efficiency"""
instance = cls.__new__(cls)
instance._state = state_int
return instance
def get_mode(self, mode_value: int) -> bool: def get_mode(self, mode_value: int) -> bool:
"""Get boolean state for a specific mode""" """Get boolean state for a specific mode"""
return (self._state & mode_value) != 0 return (self._state & mode_value) != 0
@@ -96,8 +105,8 @@ class Labels(NamedTuple):
@dataclass @dataclass
class VbanPacketNBS0(VbanPacket): class VbanRTPacketNBS0(VbanRTPacket):
"""Represents the body of a VBAN data packet with ident:0""" """Represents the body of a VBAN RTPacket with ident:0"""
_inputLeveldB100: bytes _inputLeveldB100: bytes
_outputLeveldB100: bytes _outputLeveldB100: bytes
@@ -147,32 +156,17 @@ class VbanPacketNBS0(VbanPacket):
def pdirty(self, other) -> bool: def pdirty(self, other) -> bool:
"""True iff any defined parameter has changed""" """True iff any defined parameter has changed"""
self_gains = (
self._stripGaindB100Layer1
+ self._stripGaindB100Layer2
+ self._stripGaindB100Layer3
+ self._stripGaindB100Layer4
+ self._stripGaindB100Layer5
+ self._stripGaindB100Layer6
+ self._stripGaindB100Layer7
+ self._stripGaindB100Layer8
)
other_gains = (
other._stripGaindB100Layer1
+ other._stripGaindB100Layer2
+ other._stripGaindB100Layer3
+ other._stripGaindB100Layer4
+ other._stripGaindB100Layer5
+ other._stripGaindB100Layer6
+ other._stripGaindB100Layer7
+ other._stripGaindB100Layer8
)
return ( return (
self._stripState != other._stripState self._stripState != other._stripState
or self._busState != other._busState or self._busState != other._busState
or self_gains != other_gains or self._stripGaindB100Layer1 != other._stripGaindB100Layer1
or self._stripGaindB100Layer2 != other._stripGaindB100Layer2
or self._stripGaindB100Layer3 != other._stripGaindB100Layer3
or self._stripGaindB100Layer4 != other._stripGaindB100Layer4
or self._stripGaindB100Layer5 != other._stripGaindB100Layer5
or self._stripGaindB100Layer6 != other._stripGaindB100Layer6
or self._stripGaindB100Layer7 != other._stripGaindB100Layer7
or self._stripGaindB100Layer8 != other._stripGaindB100Layer8
or self._busGaindB100 != other._busGaindB100 or self._busGaindB100 != other._busGaindB100
or self._stripLabelUTF8c60 != other._stripLabelUTF8c60 or self._stripLabelUTF8c60 != other._stripLabelUTF8c60
or self._busLabelUTF8c60 != other._busLabelUTF8c60 or self._busLabelUTF8c60 != other._busLabelUTF8c60
@@ -186,77 +180,54 @@ class VbanPacketNBS0(VbanPacket):
) )
return any(self._strip_comp) or any(self._bus_comp) return any(self._strip_comp) or any(self._bus_comp)
@property @cached_property
def strip_levels(self) -> tuple[float, ...]: def strip_levels(self) -> tuple[float, ...]:
"""Returns strip levels in dB""" """Returns strip levels in dB"""
return tuple( strip_raw = struct.unpack('<34h', self._inputLeveldB100)
round( return tuple(round(val * 0.01, 1) for val in strip_raw)[
int.from_bytes(self._inputLeveldB100[i : i + 2], 'little', signed=True) : self._kind.num_strip_levels
* 0.01, ]
1,
)
for i in range(0, len(self._inputLeveldB100), 2)
)[: self._kind.num_strip_levels]
@property @cached_property
def bus_levels(self) -> tuple[float, ...]: def bus_levels(self) -> tuple[float, ...]:
"""Returns bus levels in dB""" """Returns bus levels in dB"""
return tuple( bus_raw = struct.unpack('<64h', self._outputLeveldB100)
round( return tuple(round(val * 0.01, 1) for val in bus_raw)[
int.from_bytes(self._outputLeveldB100[i : i + 2], 'little', signed=True) : self._kind.num_bus_levels
* 0.01, ]
1,
)
for i in range(0, len(self._outputLeveldB100), 2)
)[: self._kind.num_bus_levels]
@property @property
def levels(self) -> Levels: def levels(self) -> Levels:
"""Returns strip and bus levels as a namedtuple""" """Returns strip and bus levels as a namedtuple"""
return Levels(strip=self.strip_levels, bus=self.bus_levels) return Levels(strip=self.strip_levels, bus=self.bus_levels)
@property @cached_property
def states(self) -> States: def states(self) -> States:
"""returns States object with processed strip and bus channel states""" """returns States object with processed strip and bus channel states"""
strip_states = struct.unpack('<8I', self._stripState)
bus_states = struct.unpack('<8I', self._busState)
return States( return States(
strip=tuple( strip=tuple(ChannelState.from_int(state) for state in strip_states),
ChannelState(self._stripState[i : i + 4]) for i in range(0, 32, 4) bus=tuple(ChannelState.from_int(state) for state in bus_states),
),
bus=tuple(ChannelState(self._busState[i : i + 4]) for i in range(0, 32, 4)),
) )
@property @cached_property
def gainlayers(self) -> tuple: def gainlayers(self) -> tuple:
"""returns tuple of all strip gain layers as tuples""" """returns tuple of all strip gain layers as tuples"""
return tuple( layer_data = []
tuple( for layer in range(1, 9):
round( layer_bytes = getattr(self, f'_stripGaindB100Layer{layer}')
int.from_bytes( layer_raw = struct.unpack('<8h', layer_bytes)
getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2], layer_data.append(tuple(round(val * 0.01, 2) for val in layer_raw))
'little', return tuple(layer_data)
signed=True,
)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
for layer in range(1, 9)
)
@property @cached_property
def busgain(self) -> tuple: def busgain(self) -> tuple:
"""returns tuple of bus gains""" """returns tuple of bus gains"""
return tuple( bus_gain_raw = struct.unpack('<8h', self._busGaindB100)
round( return tuple(round(val * 0.01, 2) for val in bus_gain_raw)
int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
@property @cached_property
def labels(self) -> Labels: def labels(self) -> Labels:
"""returns Labels namedtuple of strip and bus labels""" """returns Labels namedtuple of strip and bus labels"""

View File

@@ -1,11 +1,12 @@
import struct import struct
from dataclasses import dataclass from dataclasses import dataclass
from functools import cached_property
from typing import NamedTuple from typing import NamedTuple
from vban_cmd.enums import NBS from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass from vban_cmd.kinds import KindMapClass
from .headers import VbanPacket from .headers import VbanRTPacket
VMPARAMSTRIP_SIZE = 174 VMPARAMSTRIP_SIZE = 174
@@ -193,11 +194,15 @@ class VbanVMParamStrip:
_Pitch_formant_high=data[172:174], _Pitch_formant_high=data[172:174],
) )
@property @cached_property
def mode(self) -> int: def mode(self) -> int:
return int.from_bytes(self._mode, 'little') return int.from_bytes(self._mode, 'little')
@property @cached_property
def karaoke(self) -> int:
return int.from_bytes(self._nKaraoke, 'little')
@cached_property
def audibility(self) -> Audibility: def audibility(self) -> Audibility:
return Audibility( return Audibility(
round(int.from_bytes(self._audibility, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._audibility, 'little', signed=True) * 0.01, 2),
@@ -206,7 +211,7 @@ class VbanVMParamStrip:
round(int.from_bytes(self._audibility_d, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._audibility_d, 'little', signed=True) * 0.01, 2),
) )
@property @cached_property
def positions(self) -> Positions: def positions(self) -> Positions:
return Positions( return Positions(
round(int.from_bytes(self._pos3D_x, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._pos3D_x, 'little', signed=True) * 0.01, 2),
@@ -217,7 +222,7 @@ class VbanVMParamStrip:
round(int.from_bytes(self._posMod_y, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._posMod_y, 'little', signed=True) * 0.01, 2),
) )
@property @cached_property
def eqgains(self) -> EqGains: def eqgains(self) -> EqGains:
return EqGains( return EqGains(
*[ *[
@@ -230,7 +235,7 @@ class VbanVMParamStrip:
] ]
) )
@property @cached_property
def parametric_eq(self) -> tuple[ParametricEQSettings, ...]: def parametric_eq(self) -> tuple[ParametricEQSettings, ...]:
return tuple( return tuple(
ParametricEQSettings( ParametricEQSettings(
@@ -243,7 +248,7 @@ class VbanVMParamStrip:
for i in range(6) for i in range(6)
) )
@property @cached_property
def sends(self) -> Sends: def sends(self) -> Sends:
return Sends( return Sends(
round(int.from_bytes(self._send_reverb, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._send_reverb, 'little', signed=True) * 0.01, 2),
@@ -252,11 +257,7 @@ class VbanVMParamStrip:
round(int.from_bytes(self._send_fx2, 'little', signed=True) * 0.01, 2), round(int.from_bytes(self._send_fx2, 'little', signed=True) * 0.01, 2),
) )
@property @cached_property
def karaoke(self) -> int:
return int.from_bytes(self._nKaraoke, 'little')
@property
def compressor(self) -> CompressorSettings: def compressor(self) -> CompressorSettings:
return CompressorSettings( return CompressorSettings(
gain_in=round( gain_in=round(
@@ -276,7 +277,7 @@ class VbanVMParamStrip:
), ),
) )
@property @cached_property
def gate(self) -> GateSettings: def gate(self) -> GateSettings:
return GateSettings( return GateSettings(
threshold_in=round( threshold_in=round(
@@ -295,7 +296,7 @@ class VbanVMParamStrip:
release_ms=round(int.from_bytes(self._GATE_release_ms, 'little') * 0.1, 2), release_ms=round(int.from_bytes(self._GATE_release_ms, 'little') * 0.1, 2),
) )
@property @cached_property
def denoiser(self) -> DenoiserSettings: def denoiser(self) -> DenoiserSettings:
return DenoiserSettings( return DenoiserSettings(
threshold=round( threshold=round(
@@ -303,7 +304,7 @@ class VbanVMParamStrip:
) )
) )
@property @cached_property
def pitch(self) -> PitchSettings: def pitch(self) -> PitchSettings:
return PitchSettings( return PitchSettings(
enabled=bool(int.from_bytes(self._PitchEnabled, 'little')), enabled=bool(int.from_bytes(self._PitchEnabled, 'little')),
@@ -327,8 +328,8 @@ class VbanVMParamStrip:
@dataclass @dataclass
class VbanPacketNBS1(VbanPacket): class VbanRTPacketNBS1(VbanRTPacket):
"""Represents the body of a VBAN data packet with ident:1""" """Represents the body of a VBAN RTPacket with ident:1"""
strips: tuple[VbanVMParamStrip, ...] strips: tuple[VbanVMParamStrip, ...]

View File

@@ -1,3 +1,4 @@
import struct
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
@@ -65,30 +66,31 @@ class VbanPing0Payload:
"""Convert payload to bytes""" """Convert payload to bytes"""
payload = cls() payload = cls()
data = bytearray() return struct.pack(
data.extend(payload.bit_type.to_bytes(4, 'little')) '<7I4s8s8s8s8s64s32s2H64s64s64s64s128s128s',
data.extend(payload.bit_feature.to_bytes(4, 'little')) payload.bit_type,
data.extend(payload.bit_feature_ex.to_bytes(4, 'little')) payload.bit_feature,
data.extend(payload.preferred_rate.to_bytes(4, 'little')) payload.bit_feature_ex,
data.extend(payload.min_rate.to_bytes(4, 'little')) payload.preferred_rate,
data.extend(payload.max_rate.to_bytes(4, 'little')) payload.min_rate,
data.extend(payload.color_rgb.to_bytes(4, 'little')) payload.max_rate,
data.extend(payload.version) payload.color_rgb,
data.extend(payload.gps_position) payload.version,
data.extend(payload.user_position) payload.gps_position,
data.extend(payload.lang_code) payload.user_position,
data.extend(payload.reserved) payload.lang_code,
data.extend(payload.reserved_ex) payload.reserved,
data.extend(payload.distant_ip) payload.reserved_ex,
data.extend(payload.distant_port.to_bytes(2, 'little')) payload.distant_ip,
data.extend(payload.distant_reserved.to_bytes(2, 'little')) payload.distant_port,
data.extend(payload.device_name) payload.distant_reserved,
data.extend(payload.manufacturer_name) payload.device_name,
data.extend(payload.application_name) payload.manufacturer_name,
data.extend(payload.host_name) payload.application_name,
data.extend(payload.user_name) payload.host_name,
data.extend(payload.user_comment) payload.user_name,
return bytes(data) payload.user_comment,
)
@classmethod @classmethod
def create_packet(cls, framecounter: int) -> bytes: def create_packet(cls, framecounter: int) -> bytes:

View File

@@ -13,7 +13,7 @@ from .event import Event
from .packet.headers import ( from .packet.headers import (
VbanMatrixResponseHeader, VbanMatrixResponseHeader,
VbanPongHeader, VbanPongHeader,
VbanRequestHeader, VbanRTRequestHeader,
) )
from .packet.ping0 import VbanPing0Payload, VbanServerType from .packet.ping0 import VbanPing0Payload, VbanServerType
from .subject import Subject from .subject import Subject
@@ -140,10 +140,11 @@ class VbanCmd(abc.ABC):
def _ping(self): def _ping(self):
"""Initiates the PING/PONG handshake with the VBAN server.""" """Initiates the PING/PONG handshake with the VBAN server."""
ping_packet = VbanPing0Payload.create_packet(self._get_next_framecounter())
try: try:
self.sock.sendto(ping_packet, (socket.gethostbyname(self.host), self.port)) self.sock.sendto(
VbanPing0Payload.create_packet(self._get_next_framecounter()),
(socket.gethostbyname(self.host), self.port),
)
self.logger.debug(f'PING sent to {self.host}:{self.port}') self.logger.debug(f'PING sent to {self.host}:{self.port}')
except socket.gaierror as e: except socket.gaierror as e:
@@ -200,7 +201,7 @@ class VbanCmd(abc.ABC):
def _send_request(self, payload: str) -> None: def _send_request(self, payload: str) -> None:
"""Sends a request packet over the network and bumps the framecounter.""" """Sends a request packet over the network and bumps the framecounter."""
self.sock.sendto( self.sock.sendto(
VbanRequestHeader.encode_with_payload( VbanRTRequestHeader.encode_with_payload(
name=self.streamname, name=self.streamname,
bps_index=self.BPS_OPTS.index(self.bps), bps_index=self.BPS_OPTS.index(self.bps),
channel=self.channel, channel=self.channel,

View File

@@ -6,12 +6,12 @@ from .enums import NBS
from .error import VBANCMDConnectionError from .error import VBANCMDConnectionError
from .packet.headers import ( from .packet.headers import (
HEADER_SIZE, HEADER_SIZE,
VbanPacket, VbanRTPacket,
VbanResponseHeader, VbanRTResponseHeader,
VbanSubscribeHeader, VbanRTSubscribeHeader,
) )
from .packet.nbs0 import VbanPacketNBS0 from .packet.nbs0 import VbanRTPacketNBS0
from .packet.nbs1 import VbanPacketNBS1 from .packet.nbs1 import VbanRTPacketNBS1
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -28,7 +28,7 @@ class Subscriber(threading.Thread):
def run(self): def run(self):
while not self.stopped(): while not self.stopped():
for nbs in NBS: for nbs in NBS:
sub_packet = VbanSubscribeHeader().to_bytes( sub_packet = VbanRTSubscribeHeader().to_bytes(
nbs, self._remote._get_next_framecounter() nbs, self._remote._get_next_framecounter()
) )
self._remote.sock.sendto( self._remote.sock.sendto(
@@ -66,7 +66,7 @@ class Producer(threading.Thread):
self._remote.cache['bus_level'], self._remote.cache['bus_level'],
) = self._remote.public_packets[NBS.zero].levels ) = self._remote.public_packets[NBS.zero].levels
def _get_rt(self) -> VbanPacket: def _get_rt(self) -> VbanRTPacket:
"""Attempt to fetch data packet until a valid one found""" """Attempt to fetch data packet until a valid one found"""
while True: while True:
try: try:
@@ -80,19 +80,19 @@ class Producer(threading.Thread):
) from e ) from e
try: try:
header = VbanResponseHeader.from_bytes(data[:HEADER_SIZE]) header = VbanRTResponseHeader.from_bytes(data[:HEADER_SIZE])
except ValueError as e: except ValueError as e:
self.logger.debug(f'Error parsing response packet: {e}') self.logger.debug(f'Error parsing response packet: {e}')
continue continue
match header.format_nbs: match header.format_nbs:
case NBS.zero: case NBS.zero:
return VbanPacketNBS0.from_bytes( return VbanRTPacketNBS0.from_bytes(
nbs=NBS.zero, kind=self._remote.kind, data=data nbs=NBS.zero, kind=self._remote.kind, data=data
) )
case NBS.one: case NBS.one:
return VbanPacketNBS1.from_bytes( return VbanRTPacketNBS1.from_bytes(
nbs=NBS.one, kind=self._remote.kind, data=data nbs=NBS.one, kind=self._remote.kind, data=data
) )