2022-04-05 20:05:55 +01:00
|
|
|
import abc
|
|
|
|
import threading
|
2022-08-07 23:55:51 +01:00
|
|
|
import time
|
2022-04-05 20:05:55 +01:00
|
|
|
from pathlib import Path
|
2022-09-05 17:06:11 +01:00
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
try:
|
|
|
|
import tomllib
|
|
|
|
except ModuleNotFoundError:
|
|
|
|
import tomli as tomllib
|
2022-08-07 23:55:51 +01:00
|
|
|
|
|
|
|
from pythonosc.dispatcher import Dispatcher
|
|
|
|
from pythonosc.osc_message_builder import OscMessageBuilder
|
|
|
|
from pythonosc.osc_server import BlockingOSCUDPServer
|
2022-04-05 20:05:55 +01:00
|
|
|
|
|
|
|
from . import kinds
|
|
|
|
from .bus import Bus
|
|
|
|
from .config import Config
|
2022-08-07 23:55:51 +01:00
|
|
|
from .dca import DCA
|
|
|
|
from .errors import XAirRemoteError
|
|
|
|
from .fx import FXReturn, FXSend
|
|
|
|
from .kinds import KindMap
|
|
|
|
from .lr import LR
|
2022-04-05 20:05:55 +01:00
|
|
|
from .rtn import Aux, Rtn
|
2022-08-07 23:55:51 +01:00
|
|
|
from .strip import Strip
|
2022-04-05 20:05:55 +01:00
|
|
|
|
|
|
|
|
|
|
|
class OSCClientServer(BlockingOSCUDPServer):
|
2022-08-07 23:55:51 +01:00
|
|
|
def __init__(self, address: str, dispatcher: Dispatcher):
|
2022-04-05 20:05:55 +01:00
|
|
|
super().__init__(("", 0), dispatcher)
|
|
|
|
self.xr_address = address
|
|
|
|
|
2022-08-07 23:55:51 +01:00
|
|
|
def send_message(self, address: str, value: str):
|
2022-04-05 20:05:55 +01:00
|
|
|
builder = OscMessageBuilder(address=address)
|
|
|
|
if value is None:
|
|
|
|
values = list()
|
|
|
|
elif isinstance(value, list):
|
|
|
|
values = value
|
|
|
|
else:
|
|
|
|
values = [value]
|
|
|
|
for val in values:
|
|
|
|
builder.add_arg(val)
|
|
|
|
msg = builder.build()
|
|
|
|
self.socket.sendto(msg.dgram, self.xr_address)
|
|
|
|
|
|
|
|
|
2022-08-07 23:55:51 +01:00
|
|
|
class XAirRemote(abc.ABC):
|
2022-08-08 00:13:14 +01:00
|
|
|
"""Handles the communication with the mixer via the OSC protocol"""
|
2022-04-05 20:05:55 +01:00
|
|
|
|
|
|
|
_CONNECT_TIMEOUT = 0.5
|
|
|
|
_WAIT_TIME = 0.025
|
|
|
|
_REFRESH_TIMEOUT = 5
|
|
|
|
|
|
|
|
XAIR_PORT = 10024
|
|
|
|
|
|
|
|
info_response = []
|
|
|
|
|
|
|
|
def __init__(self, **kwargs):
|
|
|
|
dispatcher = Dispatcher()
|
|
|
|
dispatcher.set_default_handler(self.msg_handler)
|
2022-08-07 23:55:51 +01:00
|
|
|
self.xair_ip = kwargs["ip"] or self._ip_from_toml()
|
2022-05-01 03:46:44 +01:00
|
|
|
self.xair_port = kwargs["port"] or self.XAIR_PORT
|
2022-08-07 23:55:51 +01:00
|
|
|
if not (self.xair_ip and self.xair_port):
|
|
|
|
raise XAirRemoteError("No valid ip or password detected")
|
2022-05-01 03:46:44 +01:00
|
|
|
self.server = OSCClientServer((self.xair_ip, self.xair_port), dispatcher)
|
2022-04-05 20:05:55 +01:00
|
|
|
|
2022-09-05 17:06:11 +01:00
|
|
|
def __enter__(self):
|
2022-08-07 23:55:51 +01:00
|
|
|
self.worker = threading.Thread(target=self.run_server, daemon=True)
|
2022-04-05 20:05:55 +01:00
|
|
|
self.worker.start()
|
|
|
|
self.validate_connection()
|
|
|
|
return self
|
|
|
|
|
2022-08-07 23:55:51 +01:00
|
|
|
def _ip_from_toml(self) -> str:
|
|
|
|
filepath = Path.cwd() / "config.toml"
|
|
|
|
with open(filepath, "rb") as f:
|
|
|
|
conn = tomllib.load(f)
|
|
|
|
return conn["connection"].get("ip")
|
2022-04-05 20:05:55 +01:00
|
|
|
|
|
|
|
def validate_connection(self):
|
|
|
|
self.send("/xinfo")
|
|
|
|
time.sleep(self._CONNECT_TIMEOUT)
|
|
|
|
if len(self.info_response) > 0:
|
|
|
|
print(f"Successfully connected to {self.info_response[2]}.")
|
|
|
|
else:
|
|
|
|
print(
|
|
|
|
"Error: Failed to setup OSC connection to mixer. Please check for correct ip address."
|
|
|
|
)
|
|
|
|
|
|
|
|
def run_server(self):
|
|
|
|
self.server.serve_forever()
|
|
|
|
|
|
|
|
def msg_handler(self, addr, *data):
|
|
|
|
self.info_response = data[:]
|
|
|
|
|
2022-08-07 23:55:51 +01:00
|
|
|
def send(self, address: str, param: Optional[str] = None):
|
2022-04-05 20:05:55 +01:00
|
|
|
self.server.send_message(address, param)
|
|
|
|
time.sleep(self._WAIT_TIME)
|
|
|
|
|
|
|
|
def _query(self, address):
|
|
|
|
self.send(address)
|
|
|
|
time.sleep(self._WAIT_TIME)
|
|
|
|
return self.info_response
|
|
|
|
|
|
|
|
def __exit__(self, exc_type, exc_value, exc_tr):
|
|
|
|
self.server.shutdown()
|
|
|
|
|
|
|
|
|
2022-08-07 23:55:51 +01:00
|
|
|
def _make_remote(kind: KindMap) -> XAirRemote:
|
2022-04-05 20:05:55 +01:00
|
|
|
"""
|
2022-08-08 00:13:14 +01:00
|
|
|
Creates a new XAIR remote class.
|
2022-04-05 20:05:55 +01:00
|
|
|
|
|
|
|
The returned class will subclass MAirRemote.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def init(self, *args, **kwargs):
|
2022-05-01 03:46:44 +01:00
|
|
|
defaultkwargs = {"ip": None, "port": None}
|
2022-04-05 20:05:55 +01:00
|
|
|
kwargs = defaultkwargs | kwargs
|
2022-08-07 23:55:51 +01:00
|
|
|
XAirRemote.__init__(self, *args, **kwargs)
|
2022-04-05 20:05:55 +01:00
|
|
|
self.kind = kind
|
|
|
|
self.lr = LR.make(self)
|
|
|
|
self.strip = tuple(Strip.make(self, i) for i in range(kind.num_strip))
|
|
|
|
self.bus = tuple(Bus.make(self, i) for i in range(kind.num_bus))
|
|
|
|
self.dca = tuple(DCA(self, i) for i in range(kind.num_dca))
|
|
|
|
self.fxsend = tuple(FXSend.make(self, i) for i in range(kind.num_fx))
|
|
|
|
self.fxreturn = tuple(FXReturn(self, i) for i in range(kind.num_fx))
|
|
|
|
self.config = Config.make(self)
|
|
|
|
self.aux = Aux.make(self)
|
|
|
|
self.rtn = tuple(Rtn.make(self, i) for i in range(kind.num_rtn))
|
|
|
|
|
|
|
|
return type(
|
2022-08-08 00:13:14 +01:00
|
|
|
f"XAirRemote{kind}",
|
2022-08-07 23:55:51 +01:00
|
|
|
(XAirRemote,),
|
2022-04-05 20:05:55 +01:00
|
|
|
{
|
|
|
|
"__init__": init,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
_remotes = {kind.id_: _make_remote(kind) for kind in kinds.all}
|
|
|
|
|
|
|
|
|
2022-08-07 23:55:51 +01:00
|
|
|
def request_remote_obj(kind_id: str, *args, **kwargs) -> XAirRemote:
|
|
|
|
"""
|
|
|
|
Interface entry point. Wraps factory expression and handles errors
|
|
|
|
|
|
|
|
Returns a reference to an XAirRemote class of a kind
|
|
|
|
"""
|
2022-08-08 00:13:14 +01:00
|
|
|
XAIRREMOTE_cls = None
|
2022-08-07 23:55:51 +01:00
|
|
|
try:
|
2022-08-08 00:13:14 +01:00
|
|
|
XAIRREMOTE_cls = _remotes[kind_id]
|
2022-08-07 23:55:51 +01:00
|
|
|
except ValueError as e:
|
|
|
|
raise SystemExit(e)
|
2022-08-08 00:13:14 +01:00
|
|
|
return XAIRREMOTE_cls(*args, **kwargs)
|