xair-api-python/xair_api/xair.py

194 lines
6.2 KiB
Python
Raw Normal View History

2022-04-05 20:05:55 +01:00
import abc
import logging
2022-04-05 20:05:55 +01:00
import threading
import time
2022-04-05 20:05:55 +01:00
from pathlib import Path
2022-11-07 15:22:59 +00:00
from typing import Optional, Union
try:
import tomllib
except ModuleNotFoundError:
import tomli as tomllib
from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_message_builder import OscMessageBuilder
from pythonosc.osc_server import BlockingOSCUDPServer
2022-04-05 20:05:55 +01:00
from . import adapter, kinds, util
2022-04-05 20:05:55 +01:00
from .bus import Bus
from .config import Config
from .dca import DCA
from .errors import XAirRemoteConnectionTimeoutError, XAirRemoteError
from .fx import FX, FXSend
2025-01-03 10:19:06 +00:00
from .headamp import HeadAmp
from .kinds import KindMap
from .lr import LR
from .rtn import AuxRtn, FxRtn
from .strip import Strip
2022-04-05 20:05:55 +01:00
logger = logging.getLogger(__name__)
2022-04-05 20:05:55 +01:00
class OSCClientServer(BlockingOSCUDPServer):
def __init__(self, address: str, dispatcher: Dispatcher):
2025-01-15 10:54:52 +00:00
super().__init__(('', 0), dispatcher)
2022-04-05 20:05:55 +01:00
self.xr_address = address
2022-11-07 15:22:59 +00:00
def send_message(self, address: str, vals: Optional[Union[str, list]]):
2022-04-05 20:05:55 +01:00
builder = OscMessageBuilder(address=address)
2022-11-07 15:22:59 +00:00
vals = vals if vals is not None else []
if not isinstance(vals, list):
vals = [vals]
for val in vals:
2022-04-05 20:05:55 +01:00
builder.add_arg(val)
msg = builder.build()
self.socket.sendto(msg.dgram, self.xr_address)
class XAirRemote(abc.ABC):
"""Handles the communication with the mixer via the OSC protocol"""
2022-04-05 20:05:55 +01:00
_info_response = []
2022-04-05 20:05:55 +01:00
def __init__(self, **kwargs):
dispatcher = Dispatcher()
dispatcher.set_default_handler(self.msg_handler)
2025-01-15 10:54:52 +00:00
self.xair_ip = kwargs['ip'] or self._ip_from_toml()
self.xair_port = kwargs['port']
self._delay = kwargs['delay']
self.connect_timeout = kwargs['connect_timeout']
self.logger = logger.getChild(self.__class__.__name__)
if not self.xair_ip:
2025-01-15 10:54:52 +00:00
raise XAirRemoteError('No valid ip detected')
self.server = OSCClientServer((self.xair_ip, self.xair_port), dispatcher)
2022-04-05 20:05:55 +01:00
def __enter__(self):
self.worker = threading.Thread(target=self.run_server, daemon=True)
2022-04-05 20:05:55 +01:00
self.worker.start()
self.validate_connection()
return self
def _ip_from_toml(self) -> str:
2025-01-15 10:54:52 +00:00
filepath = Path.cwd() / 'config.toml'
with open(filepath, 'rb') as f:
conn = tomllib.load(f)
2025-01-15 10:54:52 +00:00
return conn['connection'].get('ip')
2022-04-05 20:05:55 +01:00
@util.timeout
2022-04-05 20:05:55 +01:00
def validate_connection(self):
2025-01-15 10:54:52 +00:00
if not self.query('/xinfo'):
raise XAirRemoteConnectionTimeoutError(self.xair_ip, self.xair_port)
self.logger.info(
2025-01-15 10:54:52 +00:00
f'Successfully connected to {self.info_response[2]} at {self.info_response[0]}.'
2022-11-07 15:22:59 +00:00
)
2022-04-05 20:05:55 +01:00
@property
def info_response(self):
return self._info_response
2022-04-05 20:05:55 +01:00
def run_server(self):
self.server.serve_forever()
def msg_handler(self, addr, *data):
self.logger.debug(f"received: {addr} {data if data else ''}")
self._info_response = data[:]
2022-04-05 20:05:55 +01:00
def send(self, addr: str, param: Optional[str] = None):
2022-11-07 21:05:27 +00:00
self.logger.debug(f"sending: {addr} {param if param is not None else ''}")
self.server.send_message(addr, param)
2022-04-05 20:05:55 +01:00
def query(self, address):
2022-04-05 20:05:55 +01:00
self.send(address)
time.sleep(self._delay)
2022-04-05 20:05:55 +01:00
return self.info_response
def __exit__(self, exc_type, exc_value, exc_tr):
self.server.shutdown()
def _make_remote(kind: KindMap) -> XAirRemote:
2022-04-05 20:05:55 +01:00
"""
Creates a new XAIR remote class.
2022-04-05 20:05:55 +01:00
2022-10-28 21:07:14 +01:00
The returned class will subclass XAirRemote.
2022-04-05 20:05:55 +01:00
"""
def init_x32(self, *args, **kwargs):
defaultkwargs = {
2025-01-15 10:54:52 +00:00
'ip': None,
'port': 10023,
'delay': 0.02,
'connect_timeout': 2,
}
kwargs = defaultkwargs | kwargs
XAirRemote.__init__(self, *args, **kwargs)
self.kind = kind
self.mainst = adapter.MainStereo.make(self)
self.mainmono = adapter.MainMono.make(self)
self.matrix = tuple(
adapter.Matrix.make(self, i) for i in range(kind.num_matrix)
)
self.strip = tuple(Strip.make(self, i) for i in range(kind.num_strip))
self.bus = tuple(adapter.Bus.make(self, i) for i in range(kind.num_bus))
self.dca = tuple(DCA(self, i) for i in range(kind.num_dca))
self.fx = tuple(FX(self, i) for i in range(kind.num_fx))
self.fxreturn = tuple(adapter.FxRtn.make(self, i) for i in range(kind.num_fx))
self.auxin = tuple(adapter.AuxRtn.make(self, i) for i in range(kind.num_auxrtn))
self.config = Config.make(self)
2025-01-03 10:19:06 +00:00
self.headamp = tuple(adapter.HeadAmp(self, i) for i in range(kind.num_headamp))
def init_xair(self, *args, **kwargs):
defaultkwargs = {
2025-01-15 10:54:52 +00:00
'ip': None,
'port': 10024,
'delay': 0.02,
'connect_timeout': 2,
}
2022-04-05 20:05:55 +01:00
kwargs = defaultkwargs | kwargs
XAirRemote.__init__(self, *args, **kwargs)
2022-04-05 20:05:55 +01:00
self.kind = kind
self.lr = LR.make(self)
self.strip = tuple(Strip.make(self, i) for i in range(kind.num_strip))
self.bus = tuple(Bus.make(self, i) for i in range(kind.num_bus))
self.dca = tuple(DCA(self, i) for i in range(kind.num_dca))
self.fx = tuple(FX(self, i) for i in range(kind.num_fx))
2022-04-05 20:05:55 +01:00
self.fxsend = tuple(FXSend.make(self, i) for i in range(kind.num_fx))
self.fxreturn = tuple(FxRtn.make(self, i) for i in range(kind.num_fx))
self.auxreturn = AuxRtn.make(self)
self.config = Config.make(self)
2025-01-03 10:19:06 +00:00
self.headamp = tuple(HeadAmp(self, i) for i in range(kind.num_strip))
2025-01-15 10:54:52 +00:00
if kind.id_ == 'X32':
return type(
2025-01-15 10:54:52 +00:00
f'XAirRemote{kind}',
(XAirRemote,),
{
2025-01-15 10:54:52 +00:00
'__init__': init_x32,
},
)
2022-04-05 20:05:55 +01:00
return type(
2025-01-15 10:54:52 +00:00
f'XAirRemote{kind}',
(XAirRemote,),
2022-04-05 20:05:55 +01:00
{
2025-01-15 10:54:52 +00:00
'__init__': init_xair,
2022-04-05 20:05:55 +01:00
},
)
_remotes = {kind.id_: _make_remote(kind) for kind in kinds.all}
def request_remote_obj(kind_id: str, *args, **kwargs) -> XAirRemote:
"""
Interface entry point. Wraps factory expression and handles errors
Returns a reference to an XAirRemote class of a kind
"""
XAIRREMOTE_cls = None
try:
XAIRREMOTE_cls = _remotes[kind_id]
except KeyError as e:
raise XAirRemoteError(f"Unknown mixer kind '{kind_id}'") from e
return XAIRREMOTE_cls(*args, **kwargs)