mirror of
https://github.com/onyx-and-iris/vban-cmd-python.git
synced 2026-04-07 08:03:30 +00:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c46ca8a8c8 | |||
| f8b56b4a30 | |||
| 09259269d7 | |||
| 242401e294 | |||
| 98ec9b715f | |||
| 5f7b62a0e0 | |||
| d1bcbfed6f | |||
| ab80bbf226 | |||
| ad58852a77 | |||
| 5363584940 |
@@ -548,7 +548,7 @@ You may pass the following optional keyword arguments:
|
||||
- `channel`: int=0, channel on which to send the UDP requests.
|
||||
- `pdirty`: boolean=False, parameter updates
|
||||
- `ldirty`: boolean=False, level updates
|
||||
- `script_ratelimit`: float=0.05, default to 20 script requests per second. This affects vban.sendtext() specifically.
|
||||
- `script_ratelimit`: float | None=None, ratelimit for vban.sendtext() specifically.
|
||||
- `timeout`: int=5, timeout for socket operations.
|
||||
- `disable_rt_listeners`: boolean=False, set `True` if you don't wish to receive RT packets.
|
||||
- You can still send Matrix string requests ending with `?` and receive a response.
|
||||
|
||||
@@ -103,7 +103,7 @@ class App(tk.Tk):
|
||||
def main():
|
||||
KIND_ID = 'banana'
|
||||
conn = {
|
||||
'ip': os.environ.get('VBANCMD_IP', 'localhost'),
|
||||
'host': os.environ.get('VBANCMD_HOST', 'localhost'),
|
||||
'port': int(os.environ.get('VBANCMD_PORT', 6980)),
|
||||
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
|
||||
}
|
||||
|
||||
@@ -94,7 +94,7 @@ class Observer:
|
||||
def main():
|
||||
KIND_ID = 'potato'
|
||||
conn = {
|
||||
'ip': os.environ.get('VBANCMD_IP', 'localhost'),
|
||||
'host': os.environ.get('VBANCMD_HOST', 'localhost'),
|
||||
'port': int(os.environ.get('VBANCMD_PORT', 6980)),
|
||||
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ class App:
|
||||
def main():
|
||||
KIND_ID = 'banana'
|
||||
conn = {
|
||||
'ip': os.environ.get('VBANCMD_IP', 'localhost'),
|
||||
'host': os.environ.get('VBANCMD_HOST', 'localhost'),
|
||||
'port': int(os.environ.get('VBANCMD_PORT', 6980)),
|
||||
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "vban-cmd"
|
||||
version = "2.9.7"
|
||||
version = "2.10.2"
|
||||
description = "Python interface for the VBAN RT Packet Service (Sendtext)"
|
||||
authors = [{ name = "Onyx and Iris", email = "code@onyxandiris.online" }]
|
||||
license = { text = "MIT" }
|
||||
|
||||
@@ -1,6 +1,18 @@
|
||||
from .packet.enums import ServiceTypes, SubProtocols
|
||||
|
||||
|
||||
class VBANCMDError(Exception):
|
||||
"""Base VBANCMD Exception class."""
|
||||
|
||||
|
||||
class VBANCMDConnectionError(VBANCMDError):
|
||||
"""Exception raised when connection/timeout errors occur"""
|
||||
|
||||
|
||||
class VBANCMDPacketError(VBANCMDError):
|
||||
"""Exception raised when packet parsing errors occur"""
|
||||
|
||||
def __init__(self, message: str, protocol: SubProtocols, type_: ServiceTypes):
|
||||
super().__init__(message)
|
||||
self.protocol = protocol
|
||||
self.type = type_
|
||||
|
||||
@@ -89,7 +89,7 @@ class FactoryBase(VbanCmd):
|
||||
'streamname': 'Command1',
|
||||
'bps': 256000,
|
||||
'channel': 0,
|
||||
'script_ratelimit': 0.05, # 20 commands per second, to avoid overloading Voicemeeter
|
||||
'script_ratelimit': None, # if None or 0, no rate limit applied to script commands
|
||||
'timeout': 5, # timeout on socket operations, in seconds
|
||||
'disable_rt_listeners': False,
|
||||
'sync': False,
|
||||
|
||||
@@ -48,26 +48,18 @@ class IRemote(abc.ABC):
|
||||
def apply(self, data):
|
||||
"""Sets all parameters of a dict for the channel."""
|
||||
|
||||
script = ''
|
||||
|
||||
def fget(attr, val):
|
||||
if attr == 'mode':
|
||||
return (f'mode.{val}', 1)
|
||||
elif attr == 'knob':
|
||||
return ('', val)
|
||||
return (attr, val)
|
||||
return (getattr(self, attr), val, 1)
|
||||
return (self, attr, val)
|
||||
|
||||
for attr, val in data.items():
|
||||
if not isinstance(val, dict):
|
||||
if attr in dir(self): # avoid calling getattr (with hasattr)
|
||||
attr, val = fget(attr, val)
|
||||
if isinstance(val, bool):
|
||||
val = 1 if val else 0
|
||||
|
||||
self._remote.cache[self._cmd(attr)] = val
|
||||
script += f'{self._cmd(attr)}={val};'
|
||||
target, attr, val = fget(attr, val)
|
||||
setattr(target, attr, val)
|
||||
else:
|
||||
self.logger.error(f'invalid attribute {attr} for {self}')
|
||||
else:
|
||||
target = getattr(self, attr)
|
||||
target.apply(val)
|
||||
|
||||
self._remote.sendtext(script)
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import struct
|
||||
from dataclasses import dataclass
|
||||
|
||||
from vban_cmd.enums import NBS
|
||||
from vban_cmd.error import VBANCMDPacketError
|
||||
from vban_cmd.kinds import KindMapClass
|
||||
|
||||
from .enums import ServiceTypes, StreamTypes, SubProtocols
|
||||
@@ -9,6 +11,15 @@ PINGPONG_PACKET_SIZE = 704 # Size of the PING/PONG header + payload in bytes
|
||||
MAX_PACKET_SIZE = 1436
|
||||
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
|
||||
|
||||
STREAMNAME_MAX_LENGTH = 16
|
||||
# fmt: off
|
||||
BPS_OPTS = [
|
||||
0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 31250,
|
||||
38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800, 921600,
|
||||
1000000, 1500000, 2000000, 3000000,
|
||||
]
|
||||
# fmt: on
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanPingHeader:
|
||||
@@ -27,22 +38,25 @@ class VbanPingHeader:
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
|
||||
return self.name.encode('ascii')[:STREAMNAME_MAX_LENGTH].ljust(
|
||||
STREAMNAME_MAX_LENGTH, b'\x00'
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def to_bytes(cls, framecounter: int = 0) -> bytes:
|
||||
"""Creates the PING header bytes only."""
|
||||
header = cls(framecounter=framecounter)
|
||||
|
||||
data = bytearray()
|
||||
data.extend(header.vban)
|
||||
data.extend(header.format_sr.to_bytes(1, 'little'))
|
||||
data.extend(header.format_nbs.to_bytes(1, 'little'))
|
||||
data.extend(header.format_nbc.to_bytes(1, 'little'))
|
||||
data.extend(header.format_bit.to_bytes(1, 'little'))
|
||||
data.extend(header.streamname)
|
||||
data.extend(header.framecounter.to_bytes(4, 'little'))
|
||||
return bytes(data)
|
||||
return struct.pack(
|
||||
'<4s4B16sI',
|
||||
header.vban,
|
||||
header.format_sr,
|
||||
header.format_nbs,
|
||||
header.format_nbc,
|
||||
header.format_bit,
|
||||
header.streamname,
|
||||
header.framecounter,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -62,7 +76,9 @@ class VbanPongHeader:
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
|
||||
return self.name.encode('ascii')[:STREAMNAME_MAX_LENGTH].ljust(
|
||||
STREAMNAME_MAX_LENGTH, b'\x00'
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes):
|
||||
@@ -72,7 +88,11 @@ class VbanPongHeader:
|
||||
# PONG responses use the same service type as PING (0x00)
|
||||
# and are identified by having payload data
|
||||
if parsed['format_nbc'] != ServiceTypes.PONG.value:
|
||||
raise ValueError(f'Not a PONG response packet: {parsed["format_nbc"]:02x}')
|
||||
raise VBANCMDPacketError(
|
||||
f'Not a PONG response packet: {parsed["format_nbc"]:02x}',
|
||||
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
|
||||
type_=ServiceTypes(parsed['format_nbc']),
|
||||
)
|
||||
|
||||
return cls(**parsed)
|
||||
|
||||
@@ -131,7 +151,7 @@ class VbanRTPacket:
|
||||
class VbanRTSubscribeHeader:
|
||||
"""Represents the header of an RT subscription packet"""
|
||||
|
||||
nbs: NBS = NBS.zero
|
||||
_nbs: NBS = NBS.zero
|
||||
name: str = 'Register-RTP'
|
||||
timeout: int = 15
|
||||
|
||||
@@ -140,38 +160,41 @@ class VbanRTSubscribeHeader:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def format_sr(self) -> bytes:
|
||||
return SubProtocols.SERVICE.value.to_bytes(1, 'little')
|
||||
def sr(self) -> int:
|
||||
return SubProtocols.SERVICE.value
|
||||
|
||||
@property
|
||||
def format_nbs(self) -> bytes:
|
||||
return (self.nbs.value & 0xFF).to_bytes(1, 'little')
|
||||
def nbs(self) -> int:
|
||||
return self._nbs.value & 0xFF
|
||||
|
||||
@property
|
||||
def format_nbc(self) -> bytes:
|
||||
return ServiceTypes.RTPACKETREGISTER.value.to_bytes(1, 'little')
|
||||
def nbc(self) -> int:
|
||||
return ServiceTypes.RTPACKETREGISTER.value
|
||||
|
||||
@property
|
||||
def format_bit(self) -> bytes:
|
||||
return (self.timeout & 0xFF).to_bytes(1, 'little')
|
||||
def bit(self) -> int:
|
||||
return self.timeout & 0xFF
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii') + bytes(16 - len(self.name))
|
||||
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust(
|
||||
STREAMNAME_MAX_LENGTH, b'\x00'
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
|
||||
header = cls(nbs=nbs)
|
||||
header = cls(_nbs=nbs)
|
||||
|
||||
data = bytearray()
|
||||
data.extend(header.vban)
|
||||
data.extend(header.format_sr)
|
||||
data.extend(header.format_nbs)
|
||||
data.extend(header.format_nbc)
|
||||
data.extend(header.format_bit)
|
||||
data.extend(header.streamname)
|
||||
data.extend(framecounter.to_bytes(4, 'little'))
|
||||
return bytes(data)
|
||||
return struct.pack(
|
||||
'<4s4B16sI',
|
||||
header.vban,
|
||||
header.sr,
|
||||
header.nbs,
|
||||
header.nbc,
|
||||
header.bit,
|
||||
header.streamname,
|
||||
framecounter,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -179,58 +202,64 @@ class VbanRTRequestHeader:
|
||||
"""Represents the header of an RT request packet"""
|
||||
|
||||
name: str
|
||||
bps_index: int
|
||||
bps: int
|
||||
channel: int
|
||||
framecounter: int = 0
|
||||
|
||||
def __post_init__(self):
|
||||
if self.bps not in BPS_OPTS:
|
||||
raise ValueError(
|
||||
f'Invalid bps value: {self.bps}. Must be one of {BPS_OPTS}'
|
||||
)
|
||||
self.bps_index = BPS_OPTS.index(self.bps)
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def sr(self) -> bytes:
|
||||
return (self.bps_index | SubProtocols.TEXT.value).to_bytes(1, 'little')
|
||||
def sr(self) -> int:
|
||||
return self.bps_index | SubProtocols.TEXT.value
|
||||
|
||||
@property
|
||||
def nbs(self) -> bytes:
|
||||
return (0).to_bytes(1, 'little')
|
||||
def nbs(self) -> int:
|
||||
return 0
|
||||
|
||||
@property
|
||||
def nbc(self) -> bytes:
|
||||
return (self.channel).to_bytes(1, 'little')
|
||||
def nbc(self) -> int:
|
||||
return self.channel
|
||||
|
||||
@property
|
||||
def bit(self) -> bytes:
|
||||
return (StreamTypes.UTF8.value).to_bytes(1, 'little')
|
||||
def bit(self) -> int:
|
||||
return StreamTypes.UTF8.value
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode()[:16].ljust(16, b'\x00')
|
||||
|
||||
@classmethod
|
||||
def to_bytes(
|
||||
cls, name: str, bps_index: int, channel: int, framecounter: int
|
||||
) -> bytes:
|
||||
header = cls(
|
||||
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
|
||||
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust(
|
||||
STREAMNAME_MAX_LENGTH, b'\x00'
|
||||
)
|
||||
|
||||
data = bytearray()
|
||||
data.extend(header.vban)
|
||||
data.extend(header.sr)
|
||||
data.extend(header.nbs)
|
||||
data.extend(header.nbc)
|
||||
data.extend(header.bit)
|
||||
data.extend(header.streamname)
|
||||
data.extend(header.framecounter.to_bytes(4, 'little'))
|
||||
return bytes(data)
|
||||
@classmethod
|
||||
def to_bytes(cls, name: str, bps: int, channel: int, framecounter: int) -> bytes:
|
||||
header = cls(name=name, bps=bps, channel=channel, framecounter=framecounter)
|
||||
|
||||
return struct.pack(
|
||||
'<4s4B16sI',
|
||||
header.vban,
|
||||
header.sr,
|
||||
header.nbs,
|
||||
header.nbc,
|
||||
header.bit,
|
||||
header.streamname,
|
||||
header.framecounter,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def encode_with_payload(
|
||||
cls, name: str, bps_index: int, channel: int, framecounter: int, payload: str
|
||||
cls, name: str, bps: int, channel: int, framecounter: int, payload: str
|
||||
) -> bytes:
|
||||
"""Creates the complete packet with header and payload."""
|
||||
return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode()
|
||||
return cls.to_bytes(name, bps, channel, framecounter) + payload.encode()
|
||||
|
||||
|
||||
def _parse_vban_service_header(data: bytes) -> dict:
|
||||
@@ -249,7 +278,10 @@ def _parse_vban_service_header(data: bytes) -> dict:
|
||||
# Verify this is a service protocol packet
|
||||
protocol = format_sr & SubProtocols.MASK.value
|
||||
if protocol != SubProtocols.SERVICE.value:
|
||||
raise ValueError(f'Not a service protocol packet: {protocol:02x}')
|
||||
raise VBANCMDPacketError(
|
||||
f'Invalid protocol in service header: {protocol:02x}',
|
||||
protocol=SubProtocols(protocol),
|
||||
)
|
||||
|
||||
# Extract stream name and frame counter
|
||||
name = data[8:24].rstrip(b'\x00').decode('utf-8', errors='ignore')
|
||||
@@ -282,7 +314,9 @@ class VbanRTResponseHeader:
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii') + bytes(16 - len(self.name))
|
||||
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust(
|
||||
STREAMNAME_MAX_LENGTH, b'\x00'
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes):
|
||||
@@ -291,8 +325,10 @@ class VbanRTResponseHeader:
|
||||
|
||||
# Validate this is an RTPacket response
|
||||
if parsed['format_nbc'] != ServiceTypes.RTPACKET.value:
|
||||
raise ValueError(
|
||||
f'Not an RTPacket response packet: {parsed["format_nbc"]:02x}'
|
||||
raise VBANCMDPacketError(
|
||||
f'Not an RTPacket response packet: {parsed["format_nbc"]:02x}',
|
||||
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
|
||||
type_=ServiceTypes(parsed['format_nbc']),
|
||||
)
|
||||
|
||||
return cls(**parsed)
|
||||
@@ -315,16 +351,29 @@ class VbanMatrixResponseHeader:
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
|
||||
return self.name.encode()[:STREAMNAME_MAX_LENGTH].ljust(
|
||||
STREAMNAME_MAX_LENGTH, b'\x00'
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes):
|
||||
"""Parse a matrix response packet from bytes."""
|
||||
parsed = _parse_vban_service_header(data)
|
||||
|
||||
# Validate this is a service reply packet
|
||||
# Validate this is a service reply packet (dual encoding scheme)
|
||||
if parsed['format_nbs'] != ServiceTypes.FNCT_REPLY.value:
|
||||
raise ValueError(f'Not a service reply packet: {parsed["format_nbs"]:02x}')
|
||||
raise VBANCMDPacketError(
|
||||
f'Not a service reply packet: {parsed["format_nbs"]:02x}',
|
||||
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
|
||||
type_=ServiceTypes(parsed['format_nbs']),
|
||||
)
|
||||
|
||||
if parsed['format_nbc'] != ServiceTypes.REQUESTREPLY.value:
|
||||
raise VBANCMDPacketError(
|
||||
f'Not a request reply packet: {parsed["format_nbc"]:02x}',
|
||||
protocol=SubProtocols(parsed['format_sr'] & SubProtocols.MASK.value),
|
||||
type_=ServiceTypes(parsed['format_nbc']),
|
||||
)
|
||||
|
||||
return cls(**parsed)
|
||||
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import struct
|
||||
from dataclasses import dataclass
|
||||
from functools import cached_property
|
||||
from typing import NamedTuple
|
||||
|
||||
from vban_cmd.enums import NBS
|
||||
@@ -21,6 +23,13 @@ class ChannelState:
|
||||
# Convert 4-byte state to integer once for efficient lookups
|
||||
self._state = int.from_bytes(state_bytes, 'little')
|
||||
|
||||
@classmethod
|
||||
def from_int(cls, state_int: int):
|
||||
"""Create ChannelState directly from integer for efficiency"""
|
||||
instance = cls.__new__(cls)
|
||||
instance._state = state_int
|
||||
return instance
|
||||
|
||||
def get_mode(self, mode_value: int) -> bool:
|
||||
"""Get boolean state for a specific mode"""
|
||||
return (self._state & mode_value) != 0
|
||||
@@ -147,32 +156,17 @@ class VbanRTPacketNBS0(VbanRTPacket):
|
||||
|
||||
def pdirty(self, other) -> bool:
|
||||
"""True iff any defined parameter has changed"""
|
||||
|
||||
self_gains = (
|
||||
self._stripGaindB100Layer1
|
||||
+ self._stripGaindB100Layer2
|
||||
+ self._stripGaindB100Layer3
|
||||
+ self._stripGaindB100Layer4
|
||||
+ self._stripGaindB100Layer5
|
||||
+ self._stripGaindB100Layer6
|
||||
+ self._stripGaindB100Layer7
|
||||
+ self._stripGaindB100Layer8
|
||||
)
|
||||
other_gains = (
|
||||
other._stripGaindB100Layer1
|
||||
+ other._stripGaindB100Layer2
|
||||
+ other._stripGaindB100Layer3
|
||||
+ other._stripGaindB100Layer4
|
||||
+ other._stripGaindB100Layer5
|
||||
+ other._stripGaindB100Layer6
|
||||
+ other._stripGaindB100Layer7
|
||||
+ other._stripGaindB100Layer8
|
||||
)
|
||||
|
||||
return (
|
||||
self._stripState != other._stripState
|
||||
or self._busState != other._busState
|
||||
or self_gains != other_gains
|
||||
or self._stripGaindB100Layer1 != other._stripGaindB100Layer1
|
||||
or self._stripGaindB100Layer2 != other._stripGaindB100Layer2
|
||||
or self._stripGaindB100Layer3 != other._stripGaindB100Layer3
|
||||
or self._stripGaindB100Layer4 != other._stripGaindB100Layer4
|
||||
or self._stripGaindB100Layer5 != other._stripGaindB100Layer5
|
||||
or self._stripGaindB100Layer6 != other._stripGaindB100Layer6
|
||||
or self._stripGaindB100Layer7 != other._stripGaindB100Layer7
|
||||
or self._stripGaindB100Layer8 != other._stripGaindB100Layer8
|
||||
or self._busGaindB100 != other._busGaindB100
|
||||
or self._stripLabelUTF8c60 != other._stripLabelUTF8c60
|
||||
or self._busLabelUTF8c60 != other._busLabelUTF8c60
|
||||
@@ -186,77 +180,54 @@ class VbanRTPacketNBS0(VbanRTPacket):
|
||||
)
|
||||
return any(self._strip_comp) or any(self._bus_comp)
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def strip_levels(self) -> tuple[float, ...]:
|
||||
"""Returns strip levels in dB"""
|
||||
return tuple(
|
||||
round(
|
||||
int.from_bytes(self._inputLeveldB100[i : i + 2], 'little', signed=True)
|
||||
* 0.01,
|
||||
1,
|
||||
)
|
||||
for i in range(0, len(self._inputLeveldB100), 2)
|
||||
)[: self._kind.num_strip_levels]
|
||||
strip_raw = struct.unpack('<34h', self._inputLeveldB100)
|
||||
return tuple(round(val * 0.01, 1) for val in strip_raw)[
|
||||
: self._kind.num_strip_levels
|
||||
]
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def bus_levels(self) -> tuple[float, ...]:
|
||||
"""Returns bus levels in dB"""
|
||||
return tuple(
|
||||
round(
|
||||
int.from_bytes(self._outputLeveldB100[i : i + 2], 'little', signed=True)
|
||||
* 0.01,
|
||||
1,
|
||||
)
|
||||
for i in range(0, len(self._outputLeveldB100), 2)
|
||||
)[: self._kind.num_bus_levels]
|
||||
bus_raw = struct.unpack('<64h', self._outputLeveldB100)
|
||||
return tuple(round(val * 0.01, 1) for val in bus_raw)[
|
||||
: self._kind.num_bus_levels
|
||||
]
|
||||
|
||||
@property
|
||||
def levels(self) -> Levels:
|
||||
"""Returns strip and bus levels as a namedtuple"""
|
||||
return Levels(strip=self.strip_levels, bus=self.bus_levels)
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def states(self) -> States:
|
||||
"""returns States object with processed strip and bus channel states"""
|
||||
strip_states = struct.unpack('<8I', self._stripState)
|
||||
bus_states = struct.unpack('<8I', self._busState)
|
||||
return States(
|
||||
strip=tuple(
|
||||
ChannelState(self._stripState[i : i + 4]) for i in range(0, 32, 4)
|
||||
),
|
||||
bus=tuple(ChannelState(self._busState[i : i + 4]) for i in range(0, 32, 4)),
|
||||
strip=tuple(ChannelState.from_int(state) for state in strip_states),
|
||||
bus=tuple(ChannelState.from_int(state) for state in bus_states),
|
||||
)
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def gainlayers(self) -> tuple:
|
||||
"""returns tuple of all strip gain layers as tuples"""
|
||||
return tuple(
|
||||
tuple(
|
||||
round(
|
||||
int.from_bytes(
|
||||
getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2],
|
||||
'little',
|
||||
signed=True,
|
||||
)
|
||||
* 0.01,
|
||||
2,
|
||||
)
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
for layer in range(1, 9)
|
||||
)
|
||||
layer_data = []
|
||||
for layer in range(1, 9):
|
||||
layer_bytes = getattr(self, f'_stripGaindB100Layer{layer}')
|
||||
layer_raw = struct.unpack('<8h', layer_bytes)
|
||||
layer_data.append(tuple(round(val * 0.01, 2) for val in layer_raw))
|
||||
return tuple(layer_data)
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def busgain(self) -> tuple:
|
||||
"""returns tuple of bus gains"""
|
||||
return tuple(
|
||||
round(
|
||||
int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True)
|
||||
* 0.01,
|
||||
2,
|
||||
)
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
bus_gain_raw = struct.unpack('<8h', self._busGaindB100)
|
||||
return tuple(round(val * 0.01, 2) for val in bus_gain_raw)
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def labels(self) -> Labels:
|
||||
"""returns Labels namedtuple of strip and bus labels"""
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import struct
|
||||
from dataclasses import dataclass
|
||||
from functools import cached_property
|
||||
from typing import NamedTuple
|
||||
|
||||
from vban_cmd.enums import NBS
|
||||
@@ -193,11 +194,15 @@ class VbanVMParamStrip:
|
||||
_Pitch_formant_high=data[172:174],
|
||||
)
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def mode(self) -> int:
|
||||
return int.from_bytes(self._mode, 'little')
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def karaoke(self) -> int:
|
||||
return int.from_bytes(self._nKaraoke, 'little')
|
||||
|
||||
@cached_property
|
||||
def audibility(self) -> Audibility:
|
||||
return Audibility(
|
||||
round(int.from_bytes(self._audibility, 'little', signed=True) * 0.01, 2),
|
||||
@@ -206,7 +211,7 @@ class VbanVMParamStrip:
|
||||
round(int.from_bytes(self._audibility_d, 'little', signed=True) * 0.01, 2),
|
||||
)
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def positions(self) -> Positions:
|
||||
return Positions(
|
||||
round(int.from_bytes(self._pos3D_x, 'little', signed=True) * 0.01, 2),
|
||||
@@ -217,7 +222,7 @@ class VbanVMParamStrip:
|
||||
round(int.from_bytes(self._posMod_y, 'little', signed=True) * 0.01, 2),
|
||||
)
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def eqgains(self) -> EqGains:
|
||||
return EqGains(
|
||||
*[
|
||||
@@ -230,7 +235,7 @@ class VbanVMParamStrip:
|
||||
]
|
||||
)
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def parametric_eq(self) -> tuple[ParametricEQSettings, ...]:
|
||||
return tuple(
|
||||
ParametricEQSettings(
|
||||
@@ -243,7 +248,7 @@ class VbanVMParamStrip:
|
||||
for i in range(6)
|
||||
)
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def sends(self) -> Sends:
|
||||
return Sends(
|
||||
round(int.from_bytes(self._send_reverb, 'little', signed=True) * 0.01, 2),
|
||||
@@ -252,11 +257,7 @@ class VbanVMParamStrip:
|
||||
round(int.from_bytes(self._send_fx2, 'little', signed=True) * 0.01, 2),
|
||||
)
|
||||
|
||||
@property
|
||||
def karaoke(self) -> int:
|
||||
return int.from_bytes(self._nKaraoke, 'little')
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def compressor(self) -> CompressorSettings:
|
||||
return CompressorSettings(
|
||||
gain_in=round(
|
||||
@@ -276,7 +277,7 @@ class VbanVMParamStrip:
|
||||
),
|
||||
)
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def gate(self) -> GateSettings:
|
||||
return GateSettings(
|
||||
threshold_in=round(
|
||||
@@ -295,7 +296,7 @@ class VbanVMParamStrip:
|
||||
release_ms=round(int.from_bytes(self._GATE_release_ms, 'little') * 0.1, 2),
|
||||
)
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def denoiser(self) -> DenoiserSettings:
|
||||
return DenoiserSettings(
|
||||
threshold=round(
|
||||
@@ -303,7 +304,7 @@ class VbanVMParamStrip:
|
||||
)
|
||||
)
|
||||
|
||||
@property
|
||||
@cached_property
|
||||
def pitch(self) -> PitchSettings:
|
||||
return PitchSettings(
|
||||
enabled=bool(int.from_bytes(self._PitchEnabled, 'little')),
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import struct
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
||||
@@ -65,30 +66,31 @@ class VbanPing0Payload:
|
||||
"""Convert payload to bytes"""
|
||||
payload = cls()
|
||||
|
||||
data = bytearray()
|
||||
data.extend(payload.bit_type.to_bytes(4, 'little'))
|
||||
data.extend(payload.bit_feature.to_bytes(4, 'little'))
|
||||
data.extend(payload.bit_feature_ex.to_bytes(4, 'little'))
|
||||
data.extend(payload.preferred_rate.to_bytes(4, 'little'))
|
||||
data.extend(payload.min_rate.to_bytes(4, 'little'))
|
||||
data.extend(payload.max_rate.to_bytes(4, 'little'))
|
||||
data.extend(payload.color_rgb.to_bytes(4, 'little'))
|
||||
data.extend(payload.version)
|
||||
data.extend(payload.gps_position)
|
||||
data.extend(payload.user_position)
|
||||
data.extend(payload.lang_code)
|
||||
data.extend(payload.reserved)
|
||||
data.extend(payload.reserved_ex)
|
||||
data.extend(payload.distant_ip)
|
||||
data.extend(payload.distant_port.to_bytes(2, 'little'))
|
||||
data.extend(payload.distant_reserved.to_bytes(2, 'little'))
|
||||
data.extend(payload.device_name)
|
||||
data.extend(payload.manufacturer_name)
|
||||
data.extend(payload.application_name)
|
||||
data.extend(payload.host_name)
|
||||
data.extend(payload.user_name)
|
||||
data.extend(payload.user_comment)
|
||||
return bytes(data)
|
||||
return struct.pack(
|
||||
'<7I4s8s8s8s8s64s32s2H64s64s64s64s128s128s',
|
||||
payload.bit_type,
|
||||
payload.bit_feature,
|
||||
payload.bit_feature_ex,
|
||||
payload.preferred_rate,
|
||||
payload.min_rate,
|
||||
payload.max_rate,
|
||||
payload.color_rgb,
|
||||
payload.version,
|
||||
payload.gps_position,
|
||||
payload.user_position,
|
||||
payload.lang_code,
|
||||
payload.reserved,
|
||||
payload.reserved_ex,
|
||||
payload.distant_ip,
|
||||
payload.distant_port,
|
||||
payload.distant_reserved,
|
||||
payload.device_name,
|
||||
payload.manufacturer_name,
|
||||
payload.application_name,
|
||||
payload.host_name,
|
||||
payload.user_name,
|
||||
payload.user_comment,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def create_packet(cls, framecounter: int) -> bytes:
|
||||
|
||||
@@ -5,12 +5,12 @@ from typing import Iterator
|
||||
from .error import VBANCMDConnectionError
|
||||
|
||||
|
||||
def ratelimit(func):
|
||||
"""ratelimit decorator for {VbanCmd}.sendtext, to prevent flooding the network with script requests."""
|
||||
def script_ratelimit(func):
|
||||
"""script_ratelimit decorator for {VbanCmd}.sendtext, to prevent flooding the network with script requests."""
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
self, *rem = args
|
||||
if self.script_ratelimit > 0:
|
||||
if self.script_ratelimit:
|
||||
now = time.time()
|
||||
elapsed = now - self._last_script_request_time
|
||||
if elapsed < self.script_ratelimit:
|
||||
@@ -124,16 +124,11 @@ def comp(t0: tuple, t1: tuple) -> Iterator[bool]:
|
||||
"""
|
||||
Generator function, accepts two tuples of dB values.
|
||||
|
||||
Evaluates equality of each member in both tuples.
|
||||
Only ignores changes when levels are very quiet (below -72 dB).
|
||||
Returns True when levels are equal (no change), False when different.
|
||||
"""
|
||||
|
||||
for a, b in zip(t0, t1):
|
||||
# If both values are very quiet (below -72dB), ignore small changes
|
||||
if a <= -72.0 and b <= -72.0:
|
||||
yield a == b # Both quiet, check if they're equal
|
||||
else:
|
||||
yield a != b # At least one has significant level, detect changes
|
||||
yield a == b
|
||||
|
||||
|
||||
def deep_merge(dict1, dict2):
|
||||
|
||||
@@ -17,7 +17,7 @@ from .packet.headers import (
|
||||
)
|
||||
from .packet.ping0 import VbanPing0Payload, VbanServerType
|
||||
from .subject import Subject
|
||||
from .util import bump_framecounter, deep_merge, pong_timeout, ratelimit
|
||||
from .util import bump_framecounter, deep_merge, pong_timeout, script_ratelimit
|
||||
from .worker import Producer, Subscriber, Updater
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -27,13 +27,6 @@ class VbanCmd(abc.ABC):
|
||||
"""Abstract Base Class for Voicemeeter VBAN Command Interfaces"""
|
||||
|
||||
DELAY = 0.001
|
||||
# fmt: off
|
||||
BPS_OPTS = [
|
||||
0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 31250,
|
||||
38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800, 921600,
|
||||
1000000, 1500000, 2000000, 3000000,
|
||||
]
|
||||
# fmt: on
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.logger = logger.getChild(self.__class__.__name__)
|
||||
@@ -203,7 +196,7 @@ class VbanCmd(abc.ABC):
|
||||
self.sock.sendto(
|
||||
VbanRTRequestHeader.encode_with_payload(
|
||||
name=self.streamname,
|
||||
bps_index=self.BPS_OPTS.index(self.bps),
|
||||
bps=self.bps,
|
||||
channel=self.channel,
|
||||
framecounter=self._get_next_framecounter(),
|
||||
payload=payload,
|
||||
@@ -216,7 +209,7 @@ class VbanCmd(abc.ABC):
|
||||
self._send_request(f'{cmd}={val};')
|
||||
self.cache[cmd] = val
|
||||
|
||||
@ratelimit
|
||||
@script_ratelimit
|
||||
def sendtext(self, script) -> str | None:
|
||||
"""Sends a multiple parameter string over a network."""
|
||||
self._send_request(script)
|
||||
|
||||
@@ -3,7 +3,8 @@ import threading
|
||||
import time
|
||||
|
||||
from .enums import NBS
|
||||
from .error import VBANCMDConnectionError
|
||||
from .error import VBANCMDConnectionError, VBANCMDPacketError
|
||||
from .packet.enums import SubProtocols
|
||||
from .packet.headers import (
|
||||
HEADER_SIZE,
|
||||
VbanRTPacket,
|
||||
@@ -81,8 +82,13 @@ class Producer(threading.Thread):
|
||||
|
||||
try:
|
||||
header = VbanRTResponseHeader.from_bytes(data[:HEADER_SIZE])
|
||||
except ValueError as e:
|
||||
self.logger.debug(f'Error parsing response packet: {e}')
|
||||
except VBANCMDPacketError as e:
|
||||
match e.protocol:
|
||||
case SubProtocols.SERVICE:
|
||||
# Silently ignore periodic SERVICE packets unrelated to vban-cmd
|
||||
pass
|
||||
case _:
|
||||
self.logger.debug(f'Error parsing response packet: {e}')
|
||||
continue
|
||||
|
||||
match header.format_nbs:
|
||||
|
||||
Reference in New Issue
Block a user