add login,logout, caching system, dirty parameter to base class

add login, logout functions to base class.

add caching to setters and fetch caching to getters.

add isdirty function to rt packet dataclass for defining dirty flag.

remove sync kwarg.
This commit is contained in:
onyx-and-iris 2022-03-20 12:25:50 +00:00
parent af647f2483
commit 6c914dafbe
7 changed files with 152 additions and 102 deletions

View File

@ -2,7 +2,7 @@ from .errors import VMCMDErrors
from . import channel
from .channel import Channel
from . import kinds
from .meta import bus_mode_prop
from .meta import bus_mode_prop, bus_bool_prop
class OutputBus(Channel):
""" Base class for output buses. """
@ -24,55 +24,27 @@ class OutputBus(Channel):
def identifier(self):
return f'Bus[{self.index}]'
@property
def mute(self) -> bool:
return not int.from_bytes(self.public_packet.busstate[self.index], 'little') & self._modes._mute == 0
mute = bus_bool_prop('mute')
@mute.setter
def mute(self, val: bool):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors('mute is a boolean parameter')
self.setter('mute', 1 if val else 0)
mono = bus_bool_prop('mono')
@property
def mono(self) -> bool:
return not int.from_bytes(self.public_packet.busstate[self.index], 'little') & self._modes._mono == 0
eq = bus_bool_prop('eq.On')
@mono.setter
def mono(self, val: bool):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors('mono is a boolean parameter')
self.setter('mono', 1 if val else 0)
@property
def eq(self) -> bool:
return not int.from_bytes(self.public_packet.busstate[self.index], 'little') & self._modes._eq == 0
@eq.setter
def eq(self, val: bool):
if not isinstance(val, bool) and val not in (0,1):
raise ('eq is a boolean parameter')
self.setter('eq.On', 1 if val else 0)
@property
def eq_ab(self) -> bool:
return not int.from_bytes(self.public_packet.busstate[self.index], 'little') & self._modes._eqb == 0
@eq_ab.setter
def eq_ab(self, val: bool):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors('eq_ab is a boolean parameter')
self.setter('eq.ab', 1 if val else 0)
eq_ab = bus_bool_prop('eq.ab')
@property
def label(self) -> str:
return self.public_packet.buslabels[self.index]
val = self.getter('label')
if val is None:
val = self.public_packet.buslabels[self.index]
self._remote.cache[f'{self.identifier}.label'] = [val, False]
return val
@label.setter
def label(self, val: str):
if not isinstance(val, str):
raise VMCMDErrors('label is a string parameter')
self.setter('Label', val)
self.setter('label', val)
@property
def gain(self) -> float:
@ -84,7 +56,12 @@ class OutputBus(Channel):
return 0
else:
return ((1 << 16) - 1) - val
return round((fget() * 0.01), 1)
val = self.getter('gain')
if val is None:
val = round((fget() * 0.01), 1)
self._remote.cache[f'{self.identifier}.gain'] = [val, False]
return round(val, 1)
@gain.setter
def gain(self, val: float):

View File

@ -1,6 +1,7 @@
import abc
from .errors import VMCMDErrors
from dataclasses import dataclass
from time import sleep
@dataclass
class Modes:
@ -25,9 +26,9 @@ class Modes:
_mask: hex=0x000000F0
_eq: hex=0x00000100
_eq_on: hex=0x00000100
_cross: hex=0x00000200
_eqb: hex=0x00000800
_eq_ab: hex=0x00000800
_busa: hex=0x00001000
_busa1: hex=0x00001000
@ -70,6 +71,15 @@ class Channel(abc.ABC):
self.index = index
self._modes = Modes()
def getter(self, param):
cmd = f'{self.identifier}.{param}'
if cmd in self._remote.cache and self._remote.cache[cmd][1]:
for _ in range(2):
if self._remote.pdirty:
val = self._remote.cache.pop(f'{self.identifier}.{param}')[0]
return val
sleep(0.001)
def setter(self, param, val):
""" Sends a string request RT packet. """
self._remote.set_rt(f'{self.identifier}', param, val)

View File

@ -31,6 +31,18 @@ class VBAN_VMRT_Packet_Data:
_stripLabelUTF8c60: bytes
_busLabelUTF8c60: bytes
def isdirty(self, other):
""" defines the dirty flag """
if \
self._stripState == other._stripState and \
self._busState == other._busState and \
self._stripLabelUTF8c60 == other._stripLabelUTF8c60 and \
self._busLabelUTF8c60 == other._busLabelUTF8c60 and \
self._stripGaindB100Layer1 == other._stripGaindB100Layer1 and \
self._busGaindB100 == other._busGaindB100:
return False
return True
@property
def voicemeetertype(self) -> str:
""" returns voicemeeter type as a string """

View File

@ -1,10 +1,45 @@
from .errors import VMCMDErrors
from time import sleep
def strip_bool_prop(param):
""" A strip bool prop. """
def fget(self):
val = self.getter(param)
if val is None:
val = not int.from_bytes(self.public_packet.stripstate[self.index], 'little') & getattr(self._modes, f'_{param}') == 0
self._remote.cache[f'{self.identifier}.{param}'] = [val, False]
return val
return val == 1
def fset(self, val):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors(f'{param} is a boolean parameter')
self.setter(param, 1 if val else 0)
return property(fget, fset)
def bus_bool_prop(param):
""" A bus bool prop. """
def fget(self):
val = self.getter(param)
if val is None:
val = not int.from_bytes(self.public_packet.busstate[self.index], 'little') & getattr(self._modes, f'_{param.replace(".", "_").lower()}') == 0
self._remote.cache[f'{self.identifier}.{param}'] = [val, False]
return val
return val == 1
def fset(self, val):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors(f'{param} is a boolean parameter')
self.setter(param, 1 if val else 0)
return property(fget, fset)
def strip_output_prop(param):
""" A strip output prop. """
def fget(self):
return not int.from_bytes(self.public_packet.stripstate[self.index], 'little') & getattr(self._modes, f'_bus{param.lower()}') == 0
val = self.getter(param)
if val is None:
val = not int.from_bytes(self.public_packet.stripstate[self.index], 'little') & getattr(self._modes, f'_bus{param.lower()}') == 0
self._remote.cache[f'{self.identifier}.{param}'] = [val, False]
return val
return val == 1
def fset(self, val):
if not isinstance(val, bool) and val not in (0, 1):
raise VMCMDErrors(f'{param} is a boolean parameter')
@ -30,7 +65,12 @@ def bus_mode_prop(param):
'rearonly': (False, True, True, True, False, True, True, True, True, True, True, True),
}
vals = tuple(not int.from_bytes(data.busstate[self.index], 'little') & val == 0 for val in self._modes.modevals)
return vals == modes[param.lower()]
val = self.getter(f'mode.{param}')
if val is None:
val = vals == modes[param.lower()]
self._remote.cache[f'{self.identifier}.mode.{param}'] = [val, False]
return val
return val == 1
def fset(self, val):
if not isinstance(val, bool) and val not in (0, 1):
raise VMCMDErrors(f'mode.{param} is a boolean parameter')

View File

@ -2,7 +2,7 @@ from .errors import VMCMDErrors
from . import channel
from .channel import Channel
from . import kinds
from .meta import strip_output_prop
from .meta import strip_output_prop, strip_bool_prop
class InputStrip(Channel):
""" Base class for input strips. """
@ -24,35 +24,11 @@ class InputStrip(Channel):
def identifier(self):
return f'Strip[{self.index}]'
@property
def mono(self) -> bool:
return not int.from_bytes(self.public_packet.stripstate[self.index], 'little') & self._modes._mono == 0
mono = strip_bool_prop('mono')
@mono.setter
def mono(self, val: bool):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors('mono is a boolean parameter')
self.setter('mono', 1 if val else 0)
solo = strip_bool_prop('solo')
@property
def solo(self) -> bool:
return not int.from_bytes(self.public_packet.stripstate[self.index], 'little') & self._modes._solo == 0
@solo.setter
def solo(self, val: bool):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors('solo is a boolean parameter')
self.setter('solo', 1 if val else 0)
@property
def mute(self) -> bool:
return not int.from_bytes(self.public_packet.stripstate[self.index], 'little') & self._modes._mute == 0
@mute.setter
def mute(self, val: bool):
if not isinstance(val, bool) and val not in (0,1):
raise VMCMDErrors('mute is a boolean parameter')
self.setter('mute', 1 if val else 0)
mute = strip_bool_prop('mute')
@property
def limit(self) -> int:
@ -66,7 +42,11 @@ class InputStrip(Channel):
@property
def label(self) -> str:
return self.public_packet.striplabels[self.index]
val = self.getter('label')
if val is None:
val = self.public_packet.striplabels[self.index]
self._remote.cache[f'{self.identifier}.label'] = [val, False]
return val
@label.setter
def label(self, val: str):
@ -76,7 +56,11 @@ class InputStrip(Channel):
@property
def gain(self) -> float:
return self.gainlayer[0].gain
val = self.getter('GainLayer[0]')
if val is None:
val = self.gainlayer[0].gain
self._remote.cache[f'{self.identifier}.GainLayer[0]'] = [val, False]
return round(val, 1)
@gain.setter
def gain(self, val: float):
@ -175,7 +159,12 @@ class GainLayer(InputStrip):
return 0
else:
return ((1 << 16) - 1) - val
return round((fget() * 0.01), 1)
val = self.getter(f'GainLayer[{self._i}]')
if val is None:
val = round((fget() * 0.01), 1)
self._remote.cache[f'{self.identifier}.GainLayer[{self._i}]'] = [val, False]
return val
return round(val, 1)
@gain.setter
def gain(self, val: float):

8
vbancmd/util.py Normal file
View File

@ -0,0 +1,8 @@
def cache(func):
""" check if recently cached was an updated value """
def wrapper(*args, **kwargs):
# setup cache check
res = func(*args, **kwargs)
# update cache
return res
return wrapper

View File

@ -26,7 +26,6 @@ class VbanCmd(abc.ABC):
self._channel = kwargs['channel']
self._delay = kwargs['delay']
self._ratelimiter = kwargs['ratelimiter']
self._sync = kwargs['sync']
self._bps_opts = \
[0, 110, 150, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 31250,
38400, 57600, 115200, 128000, 230400, 250000, 256000, 460800,921600,
@ -52,25 +51,19 @@ class VbanCmd(abc.ABC):
self.ready_to_read, self.ready_to_write, in_error = select.select(is_readable, is_writable, is_error, 60)
self._public_packet = None
self.running = True
self._pdirty = False
self.cache = {}
def __enter__(self):
"""
Start listening for RT Packets
Start background threads:
register to RT service
keep public packet updated.
"""
self._rt_packet_socket.bind((socket.gethostbyname(socket.gethostname()), self._port))
worker = Thread(target=self._send_register_rt, daemon=True)
worker.start()
self._public_packet = self._get_rt()
if self._sync:
worker2 = Thread(target=self._keepupdated, daemon=True)
worker2.start()
self.login()
return self
def _send_register_rt(self):
"""
Continuously register to the RT Packet Service
This function to be run in its own thread.
"""
while self.running:
if self._rt_register_socket in self.ready_to_write:
self._rt_register_socket.sendto(
@ -81,7 +74,7 @@ class VbanCmd(abc.ABC):
sleep(10)
def _fetch_rt_packet(self) -> Optional[VBAN_VMRT_Packet_Data]:
""" Returns only a valid RT Data Packet. May Return None """
""" Returns a valid RT Data Packet or None """
if self._rt_packet_socket in self.ready_to_write:
data, _ = self._rt_packet_socket.recvfrom(1024*1024*2)
# check for packet data
@ -113,6 +106,11 @@ class VbanCmd(abc.ABC):
_busLabelUTF8c60=data[932:1412],
)
@property
def pdirty(self):
""" True iff a parameter has changed """
return self._pdirty
@property
def public_packet(self):
return self._public_packet
@ -124,12 +122,15 @@ class VbanCmd(abc.ABC):
"""
Continously update public packet in background.
This function to be run in its own thread.
Set parameter dirty flag.
Update public packet only if new private packet is found.
This function to be run in its own thread.
"""
while self.running:
private_packet = self._get_rt()
self._pdirty = private_packet.isdirty(self.public_packet)
if not private_packet.__eq__(self.public_packet):
self.public_packet = private_packet
@ -143,9 +144,7 @@ class VbanCmd(abc.ABC):
return fget()
def set_rt(self, id_: str, param: Optional[str]=None, val: Optional[Union[int, float]]=None):
"""
Sends a string request command over a network.
"""
""" Sends a string request command over a network. """
cmd = id_ if not param and val else f'{id_}.{param}={val}'
if self._sendrequest_string_socket in self.ready_to_write:
self._sendrequest_string_socket.sendto(
@ -153,12 +152,11 @@ class VbanCmd(abc.ABC):
)
count = int.from_bytes(self._text_header.framecounter, 'little') + 1
self._text_header.framecounter = count.to_bytes(4, 'little')
sleep(self._ratelimiter)
self.cache[f'{id_}.{param}'] = [val, True]
sleep(0.018)
def sendtext(self, cmd):
"""
Sends a multiple parameter string over a network.
"""
""" Sends a multiple parameter string over a network. """
self.set_rt(cmd)
sleep(self._delay)
@ -198,7 +196,23 @@ class VbanCmd(abc.ABC):
raise ValueError(obj)
target.apply(submapping)
def close(self):
def login(self):
"""
Start listening for RT Packets
Start background threads:
Register to RT service
Keep public packet updated.
"""
self._rt_packet_socket.bind((socket.gethostbyname(socket.gethostname()), self._port))
worker = Thread(target=self._send_register_rt, daemon=True)
worker.start()
self._public_packet = self._get_rt()
worker2 = Thread(target=self._keepupdated, daemon=True)
worker2.start()
def logout(self):
""" sets thread flag, closes sockets """
self.running = False
sleep(0.2)
@ -207,7 +221,7 @@ class VbanCmd(abc.ABC):
self._rt_packet_socket.close()
def __exit__(self, exc_type, exc_value, exc_traceback):
self.close()
self.logout()
def _make_remote(kind: NamedTuple) -> VbanCmd:
@ -220,7 +234,7 @@ def _make_remote(kind: NamedTuple) -> VbanCmd:
def init(self, **kwargs):
defaultkwargs = {
'ip': None, 'port': 6990, 'streamname': 'Command1', 'bps': 0,
'channel': 0, 'delay': 0.001, 'ratelimiter': 0.035, 'sync': True
'channel': 0, 'delay': 0.001, 'ratelimiter': 0.018
}
kwargs = defaultkwargs | kwargs
VbanCmd.__init__(self, **kwargs)