mirror of
https://github.com/onyx-and-iris/xair-api-python.git
synced 2024-11-23 13:20:58 +00:00
191 lines
6.0 KiB
Python
191 lines
6.0 KiB
Python
import abc
|
|
import logging
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Optional, Union
|
|
|
|
try:
|
|
import tomllib
|
|
except ModuleNotFoundError:
|
|
import tomli as tomllib
|
|
|
|
from pythonosc.dispatcher import Dispatcher
|
|
from pythonosc.osc_message_builder import OscMessageBuilder
|
|
from pythonosc.osc_server import BlockingOSCUDPServer
|
|
|
|
from . import adapter, kinds, util
|
|
from .bus import Bus
|
|
from .config import Config
|
|
from .dca import DCA
|
|
from .errors import XAirRemoteConnectionTimeoutError, XAirRemoteError
|
|
from .fx import FX, FXSend
|
|
from .kinds import KindMap
|
|
from .lr import LR
|
|
from .rtn import AuxRtn, FxRtn
|
|
from .strip import Strip
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OSCClientServer(BlockingOSCUDPServer):
|
|
def __init__(self, address: str, dispatcher: Dispatcher):
|
|
super().__init__(("", 0), dispatcher)
|
|
self.xr_address = address
|
|
|
|
def send_message(self, address: str, vals: Optional[Union[str, list]]):
|
|
builder = OscMessageBuilder(address=address)
|
|
vals = vals if vals is not None else []
|
|
if not isinstance(vals, list):
|
|
vals = [vals]
|
|
for val in vals:
|
|
builder.add_arg(val)
|
|
msg = builder.build()
|
|
self.socket.sendto(msg.dgram, self.xr_address)
|
|
|
|
|
|
class XAirRemote(abc.ABC):
|
|
"""Handles the communication with the mixer via the OSC protocol"""
|
|
|
|
_info_response = []
|
|
|
|
def __init__(self, **kwargs):
|
|
dispatcher = Dispatcher()
|
|
dispatcher.set_default_handler(self.msg_handler)
|
|
self.xair_ip = kwargs["ip"] or self._ip_from_toml()
|
|
self.xair_port = kwargs["port"]
|
|
self._delay = kwargs["delay"]
|
|
self.connect_timeout = kwargs["connect_timeout"]
|
|
self.logger = logger.getChild(self.__class__.__name__)
|
|
if not self.xair_ip:
|
|
raise XAirRemoteError("No valid ip detected")
|
|
self.server = OSCClientServer((self.xair_ip, self.xair_port), dispatcher)
|
|
|
|
def __enter__(self):
|
|
self.worker = threading.Thread(target=self.run_server, daemon=True)
|
|
self.worker.start()
|
|
self.validate_connection()
|
|
return self
|
|
|
|
def _ip_from_toml(self) -> str:
|
|
filepath = Path.cwd() / "config.toml"
|
|
with open(filepath, "rb") as f:
|
|
conn = tomllib.load(f)
|
|
return conn["connection"].get("ip")
|
|
|
|
@util.timeout
|
|
def validate_connection(self):
|
|
if not self.query("/xinfo"):
|
|
raise XAirRemoteConnectionTimeoutError(self.xair_ip, self.xair_port)
|
|
self.logger.info(
|
|
f"Successfully connected to {self.info_response[2]} at {self.info_response[0]}."
|
|
)
|
|
|
|
@property
|
|
def info_response(self):
|
|
return self._info_response
|
|
|
|
def run_server(self):
|
|
self.server.serve_forever()
|
|
|
|
def msg_handler(self, addr, *data):
|
|
self.logger.debug(f"received: {addr} {data if data else ''}")
|
|
self._info_response = data[:]
|
|
|
|
def send(self, addr: str, param: Optional[str] = None):
|
|
self.logger.debug(f"sending: {addr} {param if param is not None else ''}")
|
|
self.server.send_message(addr, param)
|
|
|
|
def query(self, address):
|
|
self.send(address)
|
|
time.sleep(self._delay)
|
|
return self.info_response
|
|
|
|
def __exit__(self, exc_type, exc_value, exc_tr):
|
|
self.server.shutdown()
|
|
|
|
|
|
def _make_remote(kind: KindMap) -> XAirRemote:
|
|
"""
|
|
Creates a new XAIR remote class.
|
|
|
|
The returned class will subclass XAirRemote.
|
|
"""
|
|
|
|
def init_x32(self, *args, **kwargs):
|
|
defaultkwargs = {
|
|
"ip": None,
|
|
"port": 10023,
|
|
"delay": 0.02,
|
|
"connect_timeout": 2,
|
|
}
|
|
kwargs = defaultkwargs | kwargs
|
|
XAirRemote.__init__(self, *args, **kwargs)
|
|
self.kind = kind
|
|
self.mainst = adapter.MainStereo.make(self)
|
|
self.mainmono = adapter.MainMono.make(self)
|
|
self.matrix = tuple(
|
|
adapter.Matrix.make(self, i) for i in range(kind.num_matrix)
|
|
)
|
|
self.strip = tuple(Strip.make(self, i) for i in range(kind.num_strip))
|
|
self.bus = tuple(adapter.Bus.make(self, i) for i in range(kind.num_bus))
|
|
self.dca = tuple(DCA(self, i) for i in range(kind.num_dca))
|
|
self.fx = tuple(FX(self, i) for i in range(kind.num_fx))
|
|
self.fxreturn = tuple(adapter.FxRtn.make(self, i) for i in range(kind.num_fx))
|
|
self.auxin = tuple(adapter.AuxRtn.make(self, i) for i in range(kind.num_auxrtn))
|
|
self.config = Config.make(self)
|
|
|
|
def init_xair(self, *args, **kwargs):
|
|
defaultkwargs = {
|
|
"ip": None,
|
|
"port": 10024,
|
|
"delay": 0.02,
|
|
"connect_timeout": 2,
|
|
}
|
|
kwargs = defaultkwargs | kwargs
|
|
XAirRemote.__init__(self, *args, **kwargs)
|
|
self.kind = kind
|
|
self.lr = LR.make(self)
|
|
self.strip = tuple(Strip.make(self, i) for i in range(kind.num_strip))
|
|
self.bus = tuple(Bus.make(self, i) for i in range(kind.num_bus))
|
|
self.dca = tuple(DCA(self, i) for i in range(kind.num_dca))
|
|
self.fx = tuple(FX(self, i) for i in range(kind.num_fx))
|
|
self.fxsend = tuple(FXSend.make(self, i) for i in range(kind.num_fx))
|
|
self.fxreturn = tuple(FxRtn.make(self, i) for i in range(kind.num_fx))
|
|
self.auxreturn = AuxRtn.make(self)
|
|
self.config = Config.make(self)
|
|
|
|
if kind.id_ == "X32":
|
|
return type(
|
|
f"XAirRemote{kind}",
|
|
(XAirRemote,),
|
|
{
|
|
"__init__": init_x32,
|
|
},
|
|
)
|
|
return type(
|
|
f"XAirRemote{kind}",
|
|
(XAirRemote,),
|
|
{
|
|
"__init__": init_xair,
|
|
},
|
|
)
|
|
|
|
|
|
_remotes = {kind.id_: _make_remote(kind) for kind in kinds.all}
|
|
|
|
|
|
def request_remote_obj(kind_id: str, *args, **kwargs) -> XAirRemote:
|
|
"""
|
|
Interface entry point. Wraps factory expression and handles errors
|
|
|
|
Returns a reference to an XAirRemote class of a kind
|
|
"""
|
|
|
|
XAIRREMOTE_cls = None
|
|
try:
|
|
XAIRREMOTE_cls = _remotes[kind_id]
|
|
except KeyError as e:
|
|
raise XAirRemoteError(f"Unknown mixer kind '{kind_id}'") from e
|
|
return XAIRREMOTE_cls(*args, **kwargs)
|