mirror of
https://github.com/onyx-and-iris/xair-api-python.git
synced 2024-11-15 17:40:57 +00:00
minor refactors
This commit is contained in:
parent
e4dc4d0b13
commit
6944ba5128
@ -5,7 +5,7 @@ from .meta import geq_prop
|
|||||||
from .util import _get_fader_val, _set_fader_val, lin_get, lin_set, log_get, log_set
|
from .util import _get_fader_val, _set_fader_val, lin_get, lin_set, log_get, log_set
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Classes shared by /ch, /rtn, /rt/aux, /bus, /fxsend, /lr
|
Classes shared by /ch, /rtn, /rtn/aux, /bus, /fxsend, /lr
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
@ -3,7 +3,7 @@ import logging
|
|||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional
|
from typing import Optional, Union
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import tomllib
|
import tomllib
|
||||||
@ -31,15 +31,12 @@ class OSCClientServer(BlockingOSCUDPServer):
|
|||||||
super().__init__(("", 0), dispatcher)
|
super().__init__(("", 0), dispatcher)
|
||||||
self.xr_address = address
|
self.xr_address = address
|
||||||
|
|
||||||
def send_message(self, address: str, value: str):
|
def send_message(self, address: str, vals: Optional[Union[str, list]]):
|
||||||
builder = OscMessageBuilder(address=address)
|
builder = OscMessageBuilder(address=address)
|
||||||
if value is None:
|
vals = vals if vals is not None else []
|
||||||
values = list()
|
if not isinstance(vals, list):
|
||||||
elif isinstance(value, list):
|
vals = [vals]
|
||||||
values = value
|
for val in vals:
|
||||||
else:
|
|
||||||
values = [value]
|
|
||||||
for val in values:
|
|
||||||
builder.add_arg(val)
|
builder.add_arg(val)
|
||||||
msg = builder.build()
|
msg = builder.build()
|
||||||
self.socket.sendto(msg.dgram, self.xr_address)
|
self.socket.sendto(msg.dgram, self.xr_address)
|
||||||
@ -80,14 +77,13 @@ class XAirRemote(abc.ABC):
|
|||||||
def validate_connection(self):
|
def validate_connection(self):
|
||||||
self.send("/xinfo")
|
self.send("/xinfo")
|
||||||
time.sleep(self._CONNECT_TIMEOUT)
|
time.sleep(self._CONNECT_TIMEOUT)
|
||||||
if len(self.info_response) > 0:
|
if not self.info_response:
|
||||||
|
raise XAirRemoteError(
|
||||||
|
"Error: Failed to setup OSC connection to mixer. Please check for correct ip address."
|
||||||
|
)
|
||||||
print(
|
print(
|
||||||
f"Successfully connected to {self.info_response[2]} at {self.info_response[0]}."
|
f"Successfully connected to {self.info_response[2]} at {self.info_response[0]}."
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
print(
|
|
||||||
"Error: Failed to setup OSC connection to mixer. Please check for correct ip address."
|
|
||||||
)
|
|
||||||
|
|
||||||
def run_server(self):
|
def run_server(self):
|
||||||
self.server.serve_forever()
|
self.server.serve_forever()
|
||||||
|
Loading…
Reference in New Issue
Block a user