16 Commits

Author SHA1 Message Date
cbcca14481 rename until_stopped() to wait_until_stopped() 2023-08-05 13:36:36 +01:00
f584d53835 patch bump 2023-08-05 13:34:56 +01:00
72d182a488 use Threading.Event object to terminate threads
until_stopped() added to Subscriber thread
2023-08-04 23:13:58 +01:00
ee32f92914 add missing constants
add docstrings that describes data breakdown

move SubscribeHeader above  VbanRtPacketHeader

expand assert failure string
2023-08-04 23:06:51 +01:00
3b65035e50 add double click event for slider 2023-08-04 21:14:33 +01:00
c8b4bde49d patch bump 2023-08-04 16:33:48 +01:00
47e9203b1e use walrus 2023-08-04 16:21:57 +01:00
d48e7ecd79 Correct type annotations None type. 2023-08-02 17:19:08 +01:00
7e09a0d321 VBANCMDConnectionError now subclasses VBANCMDError 2023-08-02 15:45:25 +01:00
d41ee1a12a remove redundant __str__ overrides 2023-07-26 11:32:20 +01:00
1e499cd99d patch bump 2023-07-25 16:23:02 +01:00
9bf52b5c11 num_strip_levels, num_bus_levels added to KindMaps 2023-07-25 16:22:47 +01:00
77ba347e99 fix bus.eq.on example in readme 2023-07-15 08:17:18 +01:00
94fa33cebf md fix 2023-07-13 08:58:06 +01:00
ef105d878b fix logging example 2023-07-13 08:52:42 +01:00
956f759e73 add Logging section to README. 2023-07-13 08:50:24 +01:00
14 changed files with 176 additions and 120 deletions

View File

@@ -8,7 +8,7 @@
# VBAN CMD # VBAN CMD
This python interface allows you to get and set Voicemeeter parameter values over a network. This python interface allows you to transmit Voicemeeter parameters over a network.
It may be used standalone or to extend the [Voicemeeter Remote Python API](https://github.com/onyx-and-iris/voicemeeter-api-python) It may be used standalone or to extend the [Voicemeeter Remote Python API](https://github.com/onyx-and-iris/voicemeeter-api-python)
@@ -251,7 +251,6 @@ The following properties are available.
example: example:
```python ```python
vban.bus[4].eq = true
print(vban.bus[0].label) print(vban.bus[0].label)
``` ```
@@ -262,6 +261,10 @@ The following properties are available.
- `on`: boolean - `on`: boolean
- `ab`: boolean - `ab`: boolean
```python
vban.bus[4].eq.on = true
```
##### Modes ##### Modes
The following properties are available. The following properties are available.
@@ -506,12 +509,27 @@ Returns a `VbanRtPacket`. Designed to be used internally by the interface but av
States not guaranteed to be current (requires use of dirty parameters to confirm). States not guaranteed to be current (requires use of dirty parameters to confirm).
### `Errors` ## Errors
- `errors.VBANCMDError`: Exception raised when general errors occur. - `errors.VBANCMDError`: Exception raised when general errors occur.
- `errors.VBANCMDConnectionError`: Exception raised when connection/timeout errors occur. - `errors.VBANCMDConnectionError`: Exception raised when connection/timeout errors occur.
### `Tests` ## Logging
It's possible to see the messages sent by the interface's setters and getters, may be useful for debugging.
example:
```python
import vban_cmd
logging.basicConfig(level=logging.DEBUG)
opts = {"ip": "ip.local", "port": 6980, "streamname": "Command1"}
with vban_cmd.api('banana', **opts) as vban:
...
```
## Tests
First make sure you installed the [development dependencies](https://github.com/onyx-and-iris/vban-cmd-python#installation) First make sure you installed the [development dependencies](https://github.com/onyx-and-iris/vban-cmd-python#installation)

View File

@@ -8,6 +8,8 @@ from tkinter import ttk
class App(tk.Tk): class App(tk.Tk):
INDEX = 3
def __init__(self, vban): def __init__(self, vban):
super().__init__() super().__init__()
self.vban = vban self.vban = vban
@@ -15,8 +17,8 @@ class App(tk.Tk):
self.vban.observer.add(self.on_ldirty) self.vban.observer.add(self.on_ldirty)
# create widget variables # create widget variables
self.button_var = tk.BooleanVar(value=vban.strip[3].mute) self.button_var = tk.BooleanVar(value=vban.strip[self.INDEX].mute)
self.slider_var = tk.DoubleVar(value=vban.strip[3].gain) self.slider_var = tk.DoubleVar(value=vban.strip[self.INDEX].gain)
self.meter_var = tk.DoubleVar(value=self._get_level()) self.meter_var = tk.DoubleVar(value=self._get_level())
self.gainlabel_var = tk.StringVar(value=self.slider_var.get()) self.gainlabel_var = tk.StringVar(value=self.slider_var.get())
@@ -24,11 +26,12 @@ class App(tk.Tk):
self.style = ttk.Style() self.style = ttk.Style()
self.style.theme_use("clam") self.style.theme_use("clam")
self.style.configure( self.style.configure(
"Mute.TButton", foreground="#cd5c5c" if vban.strip[3].mute else "#5a5a5a" "Mute.TButton",
foreground="#cd5c5c" if vban.strip[self.INDEX].mute else "#5a5a5a",
) )
# create labelframe and grid it onto the mainframe # create labelframe and grid it onto the mainframe
self.labelframe = tk.LabelFrame(text=self.vban.strip[3].label) self.labelframe = tk.LabelFrame(text=self.vban.strip[self.INDEX].label)
self.labelframe.grid(padx=1) self.labelframe.grid(padx=1)
# create slider and grid it onto the labelframe # create slider and grid it onto the labelframe
@@ -44,6 +47,7 @@ class App(tk.Tk):
column=0, column=0,
row=0, row=0,
) )
slider.bind("<Double-Button-1>", self.on_button_double_click)
# create level meter and grid it onto the labelframe # create level meter and grid it onto the labelframe
level_meter = ttk.Progressbar( level_meter = ttk.Progressbar(
@@ -72,18 +76,23 @@ class App(tk.Tk):
def on_slider_move(self, *args): def on_slider_move(self, *args):
val = round(self.slider_var.get(), 1) val = round(self.slider_var.get(), 1)
self.vban.strip[3].gain = val self.vban.strip[self.INDEX].gain = val
self.gainlabel_var.set(val) self.gainlabel_var.set(val)
def on_button_press(self): def on_button_press(self):
self.button_var.set(not self.button_var.get()) self.button_var.set(not self.button_var.get())
self.vban.strip[3].mute = self.button_var.get() self.vban.strip[self.INDEX].mute = self.button_var.get()
self.style.configure( self.style.configure(
"Mute.TButton", foreground="#cd5c5c" if self.button_var.get() else "#5a5a5a" "Mute.TButton", foreground="#cd5c5c" if self.button_var.get() else "#5a5a5a"
) )
def on_button_double_click(self, e):
self.slider_var.set(0)
self.gainlabel_var.set(0)
self.vban.strip[self.INDEX].gain = 0
def _get_level(self): def _get_level(self):
val = max(self.vban.strip[3].levels.prefader) val = max(self.vban.strip[self.INDEX].levels.prefader)
return 0 if self.button_var.get() else 72 + val - 12 + self.slider_var.get() return 0 if self.button_var.get() else 72 + val - 12 + self.slider_var.get()
def on_ldirty(self): def on_ldirty(self):

View File

@@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "vban-cmd" name = "vban-cmd"
version = "2.3.2" version = "2.4.3"
description = "Python interface for the VBAN RT Packet Service (Sendtext)" description = "Python interface for the VBAN RT Packet Service (Sendtext)"
authors = ["onyx-and-iris <code@onyxandiris.online>"] authors = ["onyx-and-iris <code@onyxandiris.online>"]
license = "MIT" license = "MIT"

View File

@@ -102,7 +102,7 @@ class BusLevel(IRemote):
def fget(i): def fget(i):
return round((((1 << 16) - 1) - i) * -0.01, 1) return round((((1 << 16) - 1) - i) * -0.01, 1)
if self._remote.running and self._remote.event.ldirty: if not self._remote.stopped() and self._remote.event.ldirty:
return tuple( return tuple(
fget(i) fget(i)
for i in self._remote.cache["bus_level"][self.range[0] : self.range[-1]] for i in self._remote.cache["bus_level"][self.range[0] : self.range[-1]]

View File

@@ -1,6 +1,6 @@
class VBANCMDError(Exception): class VBANCMDError(Exception):
"""Exception raised when general errors occur""" """Base VBANCMD Exception class. Raised when general errors occur"""
class VBANCMDConnectionError(Exception): class VBANCMDConnectionError(VBANCMDError):
"""Exception raised when connection/timeout errors occur""" """Exception raised when connection/timeout errors occur"""

View File

@@ -2,7 +2,7 @@ import logging
from abc import abstractmethod from abc import abstractmethod
from enum import IntEnum from enum import IntEnum
from functools import cached_property from functools import cached_property
from typing import Iterable, NoReturn from typing import Iterable
from .bus import request_bus_obj as bus from .bus import request_bus_obj as bus
from .command import Command from .command import Command
@@ -41,7 +41,7 @@ class FactoryBuilder:
) )
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
def _pinfo(self, name: str) -> NoReturn: def _pinfo(self, name: str) -> None:
"""prints progress status for each step""" """prints progress status for each step"""
name = name.split("_")[1] name = name.split("_")[1]
self.logger.info(self._info[int(getattr(self.BuilderProgress, name))]) self.logger.info(self._info[int(getattr(self.BuilderProgress, name))])

View File

@@ -110,6 +110,7 @@ class IRemote(metaclass=ABCMeta):
cmd += (f".{param}",) cmd += (f".{param}",)
return "".join(cmd) return "".join(cmd)
@property
@abstractmethod @abstractmethod
def identifier(self): def identifier(self):
pass pass

View File

@@ -53,6 +53,14 @@ class KindMapClass(metaclass=SingletonType):
def num_bus(self): def num_bus(self):
return sum(self.outs) return sum(self.outs)
@property
def num_strip_levels(self) -> int:
return 2 * self.phys_in + 8 * self.virt_in
@property
def num_bus_levels(self) -> int:
return 8 * (self.phys_out + self.virt_out)
def __str__(self) -> str: def __str__(self) -> str:
return self.name.capitalize() return self.name.capitalize()

View File

@@ -9,7 +9,7 @@ class MacroButton(IRemote):
@property @property
def identifier(self): def identifier(self):
return f"button[{self.index}]" return f"command.button[{self.index}]"
@property @property
def state(self) -> bool: def state(self) -> bool:

View File

@@ -3,10 +3,14 @@ from dataclasses import dataclass
from .kinds import KindMapClass from .kinds import KindMapClass
from .util import comp from .util import comp
VBAN_PROTOCOL_TXT = 0x40
VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32 VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33 VBAN_SERVICE_RTPACKET = 33
MAX_PACKET_SIZE = 1436 MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 + 4 HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
@dataclass @dataclass
@@ -14,28 +18,28 @@ class VbanRtPacket:
"""Represents the body of a VBAN RT data packet""" """Represents the body of a VBAN RT data packet"""
_kind: KindMapClass _kind: KindMapClass
_voicemeeterType: bytes _voicemeeterType: bytes # data[28:29]
_reserved: bytes _reserved: bytes # data[29:30]
_buffersize: bytes _buffersize: bytes # data[30:32]
_voicemeeterVersion: bytes _voicemeeterVersion: bytes # data[32:36]
_optionBits: bytes _optionBits: bytes # data[36:40]
_samplerate: bytes _samplerate: bytes # data[40:44]
_inputLeveldB100: bytes _inputLeveldB100: bytes # data[44:112]
_outputLeveldB100: bytes _outputLeveldB100: bytes # data[112:240]
_TransportBit: bytes _TransportBit: bytes # data[240:244]
_stripState: bytes _stripState: bytes # data[244:276]
_busState: bytes _busState: bytes # data[276:308]
_stripGaindB100Layer1: bytes _stripGaindB100Layer1: bytes # data[308:324]
_stripGaindB100Layer2: bytes _stripGaindB100Layer2: bytes # data[324:340]
_stripGaindB100Layer3: bytes _stripGaindB100Layer3: bytes # data[340:356]
_stripGaindB100Layer4: bytes _stripGaindB100Layer4: bytes # data[356:372]
_stripGaindB100Layer5: bytes _stripGaindB100Layer5: bytes # data[372:388]
_stripGaindB100Layer6: bytes _stripGaindB100Layer6: bytes # data[388:404]
_stripGaindB100Layer7: bytes _stripGaindB100Layer7: bytes # data[404:420]
_stripGaindB100Layer8: bytes _stripGaindB100Layer8: bytes # data[420:436]
_busGaindB100: bytes _busGaindB100: bytes # data[436:452]
_stripLabelUTF8c60: bytes _stripLabelUTF8c60: bytes # data[452:932]
_busLabelUTF8c60: bytes _busLabelUTF8c60: bytes # data[932:1412]
def _generate_levels(self, levelarray) -> tuple: def _generate_levels(self, levelarray) -> tuple:
return tuple( return tuple(
@@ -103,12 +107,12 @@ class VbanRtPacket:
@property @property
def inputlevels(self) -> tuple: def inputlevels(self) -> tuple:
"""returns the entire level array across all inputs for a kind""" """returns the entire level array across all inputs for a kind"""
return self.strip_levels[0 : (2 * self._kind.phys_in + 8 * self._kind.virt_in)] return self.strip_levels[0 : self._kind.num_strip_levels]
@property @property
def outputlevels(self) -> tuple: def outputlevels(self) -> tuple:
"""returns the entire level array across all outputs for a kind""" """returns the entire level array across all outputs for a kind"""
return self.bus_levels[0 : 8 * self._kind.num_bus] return self.bus_levels[0 : self._kind.num_bus_levels]
@property @property
def stripstate(self) -> tuple: def stripstate(self) -> tuple:
@@ -206,13 +210,42 @@ class VbanRtPacket:
) )
@dataclass
class SubscribeHeader:
"""Represents the header an RT Packet Service subscription packet"""
name = "Register RTP"
timeout = 15
vban: bytes = "VBAN".encode()
format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, "little")
format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, "little") # timeout
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
framecounter: bytes = (0).to_bytes(4, "little")
@property
def header(self):
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
header += self.framecounter
assert (
len(header) == HEADER_SIZE + 4
), f"expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE +4} bytes total)"
return header
@dataclass @dataclass
class VbanRtPacketHeader: class VbanRtPacketHeader:
"""Represents the header of VBAN RT data packet""" """Represents the header of a VBAN RT response packet"""
name = "Voicemeeter-RTP" name = "Voicemeeter-RTP"
vban: bytes = "VBAN".encode() vban: bytes = "VBAN".encode()
format_sr: bytes = (0x60).to_bytes(1, "little") format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little") format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKET).to_bytes(1, "little") format_nbc: bytes = (VBAN_SERVICE_RTPACKET).to_bytes(1, "little")
format_bit: bytes = (0).to_bytes(1, "little") format_bit: bytes = (0).to_bytes(1, "little")
@@ -226,13 +259,13 @@ class VbanRtPacketHeader:
header += self.format_nbc header += self.format_nbc
header += self.format_bit header += self.format_bit
header += self.streamname header += self.streamname
assert len(header) == HEADER_SIZE - 4, f"Header expected {HEADER_SIZE-4} bytes" assert len(header) == HEADER_SIZE, f"expected header size {HEADER_SIZE} bytes"
return header return header
@dataclass @dataclass
class RequestHeader: class RequestHeader:
"""Represents a REQUEST RT PACKET header""" """Represents the header of an REQUEST RT PACKET"""
name: str name: str
bps_index: int bps_index: int
@@ -244,7 +277,7 @@ class RequestHeader:
@property @property
def sr(self): def sr(self):
return (0x40 + self.bps_index).to_bytes(1, "little") return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little")
@property @property
def nbc(self): def nbc(self):
@@ -263,32 +296,7 @@ class RequestHeader:
header += self.bit header += self.bit
header += self.streamname header += self.streamname
header += self.framecounter header += self.framecounter
assert len(header) == HEADER_SIZE, f"Header expected {HEADER_SIZE} bytes" assert (
return header len(header) == HEADER_SIZE + 4
), f"expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE +4} bytes total)"
@dataclass
class SubscribeHeader:
"""Represents a packet used to subscribe to the RT Packet Service"""
name = "Register RTP"
timeout = 15
vban: bytes = "VBAN".encode()
format_sr: bytes = (0x60).to_bytes(1, "little")
format_nbs: bytes = (0).to_bytes(1, "little")
format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, "little")
format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, "little") # timeout
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
framecounter: bytes = (0).to_bytes(4, "little")
@property
def header(self):
header = self.vban
header += self.format_sr
header += self.format_nbs
header += self.format_nbc
header += self.format_bit
header += self.streamname
header += self.framecounter
assert len(header) == HEADER_SIZE, f"Header expected {HEADER_SIZE} bytes"
return header return header

View File

@@ -296,7 +296,7 @@ class StripLevel(IRemote):
def fget(i): def fget(i):
return round((((1 << 16) - 1) - i) * -0.01, 1) return round((((1 << 16) - 1) - i) * -0.01, 1)
if self._remote.running and self._remote.event.ldirty: if not self._remote.stopped() and self._remote.event.ldirty:
return tuple( return tuple(
fget(i) fget(i)
for i in self._remote.cache["strip_level"][ for i in self._remote.cache["strip_level"][

View File

@@ -135,18 +135,15 @@ class VbanInstream(VbanStream):
class VbanAudioInstream(VbanInstream): class VbanAudioInstream(VbanInstream):
def __str__(self): """Represents a VBAN Audio Instream"""
return f"{type(self).__name__}{self._remote.kind}{self.index}"
class VbanMidiInstream(VbanInstream): class VbanMidiInstream(VbanInstream):
def __str__(self): """Represents a VBAN Midi Instream"""
return f"{type(self).__name__}{self._remote.kind}{self.index}"
class VbanTextInstream(VbanInstream): class VbanTextInstream(VbanInstream):
def __str__(self): """Represents a VBAN Text Instream"""
return f"{type(self).__name__}{self._remote.kind}{self.index}"
class VbanOutstream(VbanStream): class VbanOutstream(VbanStream):
@@ -165,13 +162,11 @@ class VbanOutstream(VbanStream):
class VbanAudioOutstream(VbanOutstream): class VbanAudioOutstream(VbanOutstream):
def __str__(self): """Represents a VBAN Audio Outstream"""
return f"{type(self).__name__}{self._remote.kind}{self.index}"
class VbanMidiOutstream(VbanOutstream): class VbanMidiOutstream(VbanOutstream):
def __str__(self): """Represents a VBAN Midi Outstream"""
return f"{type(self).__name__}{self._remote.kind}{self.index}"
def _make_stream_pair(remote, kind): def _make_stream_pair(remote, kind):

View File

@@ -1,5 +1,6 @@
import logging import logging
import socket import socket
import threading
import time import time
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from pathlib import Path from pathlib import Path
@@ -85,19 +86,20 @@ class VbanCmd(metaclass=ABCMeta):
self.login() self.login()
return self return self
def login(self): def login(self) -> None:
"""Starts the subscriber and updater threads (unless in outbound mode)""" """Starts the subscriber and updater threads (unless in outbound mode)"""
if not self.outbound: if not self.outbound:
self.running = True
self.event.info() self.event.info()
self.subscriber = Subscriber(self) self.stop_event = threading.Event()
self.stop_event.clear()
self.subscriber = Subscriber(self, self.stop_event)
self.subscriber.start() self.subscriber.start()
queue = Queue() queue = Queue()
self.updater = Updater(self, queue) self.updater = Updater(self, queue)
self.updater.start() self.updater.start()
self.producer = Producer(self, queue) self.producer = Producer(self, queue, self.stop_event)
self.producer.start() self.producer.start()
self.logger.info( self.logger.info(
@@ -106,6 +108,9 @@ class VbanCmd(metaclass=ABCMeta):
) )
) )
def stopped(self):
return self.stop_event.is_set()
def _set_rt(self, cmd: str, val: Union[str, float]): def _set_rt(self, cmd: str, val: Union[str, float]):
"""Sends a string request command over a network.""" """Sends a string request command over a network."""
self.socks[Socket.request].sendto( self.socks[Socket.request].sendto(
@@ -154,7 +159,7 @@ class VbanCmd(metaclass=ABCMeta):
def public_packet(self): def public_packet(self):
return self._public_packet return self._public_packet
def clear_dirty(self): def clear_dirty(self) -> None:
while self.pdirty: while self.pdirty:
time.sleep(self.DELAY) time.sleep(self.DELAY)
@@ -212,11 +217,13 @@ class VbanCmd(metaclass=ABCMeta):
self.apply(config) self.apply(config)
self.logger.info(f"Profile '{name}' applied!") self.logger.info(f"Profile '{name}' applied!")
def logout(self): def logout(self) -> None:
self.running = False if not self.stopped():
time.sleep(0.2) self.logger.debug("events thread shutdown started")
self.stop_event.set()
self.subscriber.join() # wait for subscriber thread to complete cycle
[sock.close() for sock in self.socks] [sock.close() for sock in self.socks]
self.logger.info(f"{type(self).__name__}: Successfully logged out of {self}") self.logger.info(f"{type(self).__name__}: Successfully logged out of {self}")
def __exit__(self, exc_type, exc_value, exc_traceback): def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
self.logout() self.logout()

View File

@@ -14,14 +14,15 @@ logger = logging.getLogger(__name__)
class Subscriber(threading.Thread): class Subscriber(threading.Thread):
"""fire a subscription packet every 10 seconds""" """fire a subscription packet every 10 seconds"""
def __init__(self, remote): def __init__(self, remote, stop_event):
super().__init__(name="subscriber", daemon=True) super().__init__(name="subscriber", daemon=False)
self._remote = remote self._remote = remote
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
self.packet = SubscribeHeader() self.packet = SubscribeHeader()
def run(self): def run(self):
while self._remote.running: while not self.stopped():
try: try:
self._remote.socks[Socket.register].sendto( self._remote.socks[Socket.register].sendto(
self.packet.header, self.packet.header,
@@ -30,23 +31,39 @@ class Subscriber(threading.Thread):
self.packet.framecounter = ( self.packet.framecounter = (
int.from_bytes(self.packet.framecounter, "little") + 1 int.from_bytes(self.packet.framecounter, "little") + 1
).to_bytes(4, "little") ).to_bytes(4, "little")
time.sleep(10) self.wait_until_stopped(10)
except socket.gaierror as e: except socket.gaierror as e:
self.logger.exception(f"{type(e).__name__}: {e}") self.logger.exception(f"{type(e).__name__}: {e}")
raise VBANCMDConnectionError( raise VBANCMDConnectionError(
f"unable to resolve hostname {self._remote.ip}" f"unable to resolve hostname {self._remote.ip}"
) from e ) from e
self.logger.debug(f"terminating {self.name} thread")
def stopped(self):
return self.stop_event.is_set()
def wait_until_stopped(self, timeout, period=0.2):
must_end = time.time() + timeout
while time.time() < must_end:
if self.stopped():
break
time.sleep(period)
class Producer(threading.Thread): class Producer(threading.Thread):
"""Continously send job queue to the Updater thread at a rate of self._remote.ratelimit.""" """Continously send job queue to the Updater thread at a rate of self._remote.ratelimit."""
def __init__(self, remote, queue): def __init__(self, remote, queue, stop_event):
super().__init__(name="producer", daemon=True) super().__init__(name="producer", daemon=False)
self._remote = remote self._remote = remote
self.queue = queue self.queue = queue
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
self.packet_expected = VbanRtPacketHeader() self.packet_expected = VbanRtPacketHeader()
self._remote.socks[Socket.response].settimeout(self._remote.timeout)
self._remote.socks[Socket.response].bind(
(socket.gethostbyname(socket.gethostname()), self._remote.port)
)
self._remote._public_packet = self._get_rt() self._remote._public_packet = self._get_rt()
( (
self._remote.cache["strip_level"], self._remote.cache["strip_level"],
@@ -60,7 +77,6 @@ class Producer(threading.Thread):
data = None data = None
while not data: while not data:
data = self._fetch_rt_packet() data = self._fetch_rt_packet()
time.sleep(self._remote.DELAY)
return data return data
return fget() return fget()
@@ -68,10 +84,10 @@ class Producer(threading.Thread):
def _fetch_rt_packet(self) -> Optional[VbanRtPacket]: def _fetch_rt_packet(self) -> Optional[VbanRtPacket]:
try: try:
data, _ = self._remote.socks[Socket.response].recvfrom(2048) data, _ = self._remote.socks[Socket.response].recvfrom(2048)
# check for packet data # do we have packet data?
if len(data) > HEADER_SIZE: if len(data) > HEADER_SIZE:
# check if packet is of type rt packet response # is the packet of type VBAN RT response?
if self.packet_expected.header == data[: HEADER_SIZE - 4]: if self.packet_expected.header == data[:HEADER_SIZE]:
return VbanRtPacket( return VbanRtPacket(
_kind=self._remote.kind, _kind=self._remote.kind,
_voicemeeterType=data[28:29], _voicemeeterType=data[28:29],
@@ -103,8 +119,11 @@ class Producer(threading.Thread):
f"timeout waiting for RtPacket from {self._remote.ip}" f"timeout waiting for RtPacket from {self._remote.ip}"
) from e ) from e
def stopped(self):
return self.stop_event.is_set()
def run(self): def run(self):
while self._remote.running: while not self.stopped():
_pp = self._get_rt() _pp = self._get_rt()
pdirty = _pp.pdirty(self._remote.public_packet) pdirty = _pp.pdirty(self._remote.public_packet)
ldirty = _pp.ldirty( ldirty = _pp.ldirty(
@@ -137,13 +156,8 @@ class Updater(threading.Thread):
self._remote = remote self._remote = remote
self.queue = queue self.queue = queue
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
self._remote.socks[Socket.response].settimeout(self._remote.timeout) self._remote._strip_comp = [False] * (self._remote.kind.num_strip_levels)
self._remote.socks[Socket.response].bind( self._remote._bus_comp = [False] * (self._remote.kind.num_bus_levels)
(socket.gethostbyname(socket.gethostname()), self._remote.port)
)
p_in, v_in = self._remote.kind.ins
self._remote._strip_comp = [False] * (2 * p_in + 8 * v_in)
self._remote._bus_comp = [False] * (self._remote.kind.num_bus * 8)
def run(self): def run(self):
""" """
@@ -151,12 +165,7 @@ class Updater(threading.Thread):
Generate _strip_comp, _bus_comp and update level cache if ldirty. Generate _strip_comp, _bus_comp and update level cache if ldirty.
""" """
while True: while event := self.queue.get():
event = self.queue.get()
if event is None:
self.logger.debug(f"terminating {self.name} thread")
break
if event == "pdirty" and self._remote.pdirty: if event == "pdirty" and self._remote.pdirty:
self._remote.subject.notify(event) self._remote.subject.notify(event)
elif event == "ldirty" and self._remote.ldirty: elif event == "ldirty" and self._remote.ldirty:
@@ -172,3 +181,4 @@ class Updater(threading.Thread):
self._remote._public_packet.outputlevels, self._remote._public_packet.outputlevels,
) )
self._remote.subject.notify(event) self._remote.subject.notify(event)
self.logger.debug(f"terminating {self.name} thread")