adapter module added

factory method for x32 added

logging module added for tracing OSC requests/reponses.
This commit is contained in:
onyx-and-iris 2022-11-07 11:08:56 +00:00
parent d765c5b027
commit a69734b738
8 changed files with 141 additions and 76 deletions

View File

@ -22,7 +22,6 @@ class Data:
strip: int = kind.num_strip - 1
bus: int = kind.num_bus - 1
fx: int = kind.num_fx - 1
rtn: int = kind.num_rtn - 1
data = Data()

View File

@ -259,7 +259,7 @@ class TestSetAndGetStripAutomixHigher:
"param,value",
[("group", 0), ("group", 2)],
)
def test_it_sets_and_gets_fxsend_int_params(self, param, value):
def test_it_sets_and_gets_strip_int_params(self, param, value):
setattr(self.target, param, value)
assert getattr(self.target, param) == value
@ -267,7 +267,7 @@ class TestSetAndGetStripAutomixHigher:
"param,value",
[("weight", -10.5), ("weight", 3.5)],
)
def test_it_sets_and_gets_fxsend_float_params(self, param, value):
def test_it_sets_and_gets_strip_float_params(self, param, value):
setattr(self.target, param, value)
assert getattr(self.target, param) == value

40
xair_api/adapter.py Normal file
View File

@ -0,0 +1,40 @@
from .bus import Bus as IBus
from .lr import LR as ILR
from .rtn import AuxRtn as IAuxRtn
from .rtn import FxRtn as IFxRtn
class Bus(IBus):
@property
def address(self):
return f"/bus/{str(self.index).zfill(2)}"
class AuxRtn(IAuxRtn):
@property
def address(self):
return f"/auxin/{str(self.index).zfill(2)}"
class FxRtn(IFxRtn):
@property
def address(self):
return f"/fxrtn/{str(self.index).zfill(2)}"
class MainStereo(ILR):
@property
def address(self) -> str:
return f"/main/st"
class MainMono(ILR):
@property
def address(self) -> str:
return f"/main/m"
class Matrix(ILR):
@property
def address(self) -> str:
return f"/mtx/{str(self.index).zfill(2)}"

View File

@ -23,6 +23,24 @@ class IFX(abc.ABC):
pass
class FX(IFX):
"""Concrete class for fx"""
@property
def address(self) -> str:
return f"/fx/{self.index}"
@property
def type(self) -> int:
return self.getter("type")[0]
@type.setter
def type(self, val: int):
if not isinstance(val, int):
raise XAirRemoteError("type is an integer parameter")
self.setter("type", val)
class FXSend(IFX):
"""Concrete class for fxsend"""
@ -52,21 +70,3 @@ class FXSend(IFX):
@property
def address(self) -> str:
return f"/fxsend/{self.index}"
class FXReturn(IFX):
"""Concrete class for fxreturn"""
@property
def address(self) -> str:
return f"/fx/{self.index}"
@property
def type(self) -> int:
return self.getter("type")[0]
@type.setter
def type(self, val: int):
if not isinstance(val, int):
raise XAirRemoteError("type is an integer parameter")
self.setter("type", val)

View File

@ -1,18 +1,5 @@
from dataclasses import dataclass
"""
# osc slightly different, interface would need adjusting to support this mixer.
@dataclass
class X32KindMap:
id_: str = "X32"
num_dca: int = 8
num_strip: int = 32
num_bus: int = 16
num_fx: int = 8
num_rtn: int = 6
"""
@dataclass
class KindMap:
@ -20,6 +7,17 @@ class KindMap:
return self.id_
@dataclass
class X32KindMap(KindMap):
id_: str
num_dca: int = 8
num_strip: int = 32
num_bus: int = 16
num_fx: int = 8
num_auxrtn: int = 8
num_matrix: int = 6
@dataclass
class MR18KindMap(KindMap):
# note ch 17-18 defined as aux rtn
@ -28,7 +26,6 @@ class MR18KindMap(KindMap):
num_strip: int = 16
num_bus: int = 6
num_fx: int = 4
num_rtn: int = 4
@dataclass
@ -38,7 +35,6 @@ class XR16KindMap(KindMap):
num_strip: int = 16
num_bus: int = 4
num_fx: int = 4
num_rtn: int = 4
@dataclass
@ -48,10 +44,10 @@ class XR12KindMap(KindMap):
num_strip: int = 12
num_bus: int = 2
num_fx: int = 4
num_rtn: int = 4
_kinds = {
"X32": X32KindMap(id_="X32"),
"XR18": MR18KindMap(id_="XR18"),
"MR18": MR18KindMap(id_="MR18"),
"XR16": XR16KindMap(id_="XR16"),

View File

@ -1,4 +1,5 @@
import abc
from typing import Optional
from .errors import XAirRemoteError
from .shared import EQ, GEQ, Automix, Config, Dyn, Gate, Group, Insert, Mix, Preamp
@ -7,8 +8,10 @@ from .shared import EQ, GEQ, Automix, Config, Dyn, Gate, Group, Insert, Mix, Pre
class ILR(abc.ABC):
"""Abstract Base Class for buses"""
def __init__(self, remote):
def __init__(self, remote, index: Optional[int] = None):
self._remote = remote
if index is not None:
self.index = index + 1
def getter(self, param: str):
self._remote.send(f"{self.address}/{param}")
@ -26,7 +29,7 @@ class LR(ILR):
"""Concrete class for buses"""
@classmethod
def make(cls, remote):
def make(cls, remote, index=None):
"""
Factory function for LR
@ -41,19 +44,19 @@ class LR(ILR):
**{
_cls.__name__.lower(): type(
f"{_cls.__name__}{remote.kind}", (_cls, cls), {}
)(remote)
)(remote, index)
for _cls in (
Config,
Dyn,
Insert,
GEQ.make(),
EQ.make_sixband(cls, remote),
EQ.make_sixband(cls, remote, index),
Mix,
)
},
},
)
return LR_cls(remote)
return LR_cls(remote, index)
@property
def address(self) -> str:

View File

@ -25,26 +25,24 @@ class IRtn(abc.ABC):
pass
class Aux(IRtn):
class AuxRtn(IRtn):
"""Concrete class for aux"""
@classmethod
def make(cls, remote):
def make(cls, remote, index=None):
"""
Factory function for aux
Factory function for auxrtn
Creates a mixin of shared subclasses, sets them as class attributes.
Returns an Aux class of a kind.
Returns an AuxRtn class of a kind.
"""
AUX_cls = type(
f"Aux{remote.kind}",
AUXRTN_cls = type(
f"AuxRtn{remote.kind}",
(cls,),
{
**{
_cls.__name__.lower(): type(
f"{_cls.__name__}{remote.kind}", (_cls, cls), {}
)(remote)
)(remote, index)
for _cls in (
Config,
Preamp,
@ -55,32 +53,30 @@ class Aux(IRtn):
}
},
)
return AUX_cls(remote)
return AUXRTN_cls(remote, index)
@property
def address(self):
return "/rtn/aux"
class Rtn(IRtn):
class FxRtn(IRtn):
"""Concrete class for rtn"""
@classmethod
def make(cls, remote, index):
"""
Factory function for rtn
Factory function for fxrtn
Creates a mixin of shared subclasses, sets them as class attributes.
Returns an Rtn class of a kind.
Returns an FxRtn class of a kind.
"""
RTN_cls = type(
f"Rtn{remote.kind.id_}",
FXRTN_cls = type(
f"FxRtn{remote.kind}",
(cls,),
{
**{
_cls.__name__.lower(): type(
f"{_cls.__name__}{remote.kind.id_}", (_cls, cls), {}
f"{_cls.__name__}{remote.kind}", (_cls, cls), {}
)(remote, index)
for _cls in (
Config,
@ -92,7 +88,7 @@ class Rtn(IRtn):
}
},
)
return RTN_cls(remote, index)
return FXRTN_cls(remote, index)
@property
def address(self):

View File

@ -1,4 +1,5 @@
import abc
import logging
import threading
import time
from pathlib import Path
@ -13,15 +14,15 @@ from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_message_builder import OscMessageBuilder
from pythonosc.osc_server import BlockingOSCUDPServer
from . import kinds
from . import adapter, kinds
from .bus import Bus
from .config import Config
from .dca import DCA
from .errors import XAirRemoteError
from .fx import FXReturn, FXSend
from .fx import FX, FXSend
from .kinds import KindMap
from .lr import LR
from .rtn import Aux, Rtn
from .rtn import AuxRtn, FxRtn
from .strip import Strip
@ -47,19 +48,19 @@ class OSCClientServer(BlockingOSCUDPServer):
class XAirRemote(abc.ABC):
"""Handles the communication with the mixer via the OSC protocol"""
logger = logging.getLogger("xair.xairremote")
_CONNECT_TIMEOUT = 0.5
_WAIT_TIME = 0.025
_REFRESH_TIMEOUT = 5
XAIR_PORT = 10024
info_response = []
def __init__(self, **kwargs):
dispatcher = Dispatcher()
dispatcher.set_default_handler(self.msg_handler)
self.xair_ip = kwargs["ip"] or self._ip_from_toml()
self.xair_port = kwargs["port"] or self.XAIR_PORT
self.xair_port = kwargs["port"]
if not (self.xair_ip and self.xair_port):
raise XAirRemoteError("No valid ip or password detected")
self.server = OSCClientServer((self.xair_ip, self.xair_port), dispatcher)
@ -80,7 +81,9 @@ class XAirRemote(abc.ABC):
self.send("/xinfo")
time.sleep(self._CONNECT_TIMEOUT)
if len(self.info_response) > 0:
print(f"Successfully connected to {self.info_response[2]}.")
print(
f"Successfully connected to {self.info_response[2]} at {self.info_response[0]}."
)
else:
print(
"Error: Failed to setup OSC connection to mixer. Please check for correct ip address."
@ -90,10 +93,12 @@ class XAirRemote(abc.ABC):
self.server.serve_forever()
def msg_handler(self, addr, *data):
self.logger.debug(f"received: {addr} {data if data else ''}")
self.info_response = data[:]
def send(self, address: str, param: Optional[str] = None):
self.server.send_message(address, param)
def send(self, addr: str, param: Optional[str] = None):
self.logger.debug(f"sending: {addr} {param if param else ''}")
self.server.send_message(addr, param)
time.sleep(self._WAIT_TIME)
def _query(self, address):
@ -112,8 +117,26 @@ def _make_remote(kind: KindMap) -> XAirRemote:
The returned class will subclass XAirRemote.
"""
def init(self, *args, **kwargs):
defaultkwargs = {"ip": None, "port": None}
def init_x32(self, *args, **kwargs):
defaultkwargs = {"ip": None, "port": 10023}
kwargs = defaultkwargs | kwargs
XAirRemote.__init__(self, *args, **kwargs)
self.kind = kind
self.mainst = adapter.MainStereo.make(self)
self.mainmono = adapter.MainMono.make(self)
self.matrix = tuple(
adapter.Matrix.make(self, i) for i in range(kind.num_matrix)
)
self.strip = tuple(Strip.make(self, i) for i in range(kind.num_strip))
self.bus = tuple(adapter.Bus.make(self, i) for i in range(kind.num_bus))
self.dca = tuple(DCA(self, i) for i in range(kind.num_dca))
self.fx = tuple(FX(self, i) for i in range(kind.num_fx))
self.fxreturn = tuple(adapter.FxRtn.make(self, i) for i in range(kind.num_fx))
self.config = Config.make(self)
self.auxin = tuple(adapter.AuxRtn.make(self, i) for i in range(kind.num_auxrtn))
def init_xair(self, *args, **kwargs):
defaultkwargs = {"ip": None, "port": 10024}
kwargs = defaultkwargs | kwargs
XAirRemote.__init__(self, *args, **kwargs)
self.kind = kind
@ -121,17 +144,25 @@ def _make_remote(kind: KindMap) -> XAirRemote:
self.strip = tuple(Strip.make(self, i) for i in range(kind.num_strip))
self.bus = tuple(Bus.make(self, i) for i in range(kind.num_bus))
self.dca = tuple(DCA(self, i) for i in range(kind.num_dca))
self.fx = tuple(FX(self, i) for i in range(kind.num_fx))
self.fxsend = tuple(FXSend.make(self, i) for i in range(kind.num_fx))
self.fxreturn = tuple(FXReturn(self, i) for i in range(kind.num_fx))
self.fxreturn = tuple(FxRtn.make(self, i) for i in range(kind.num_fx))
self.config = Config.make(self)
self.aux = Aux.make(self)
self.rtn = tuple(Rtn.make(self, i) for i in range(kind.num_rtn))
self.auxreturn = AuxRtn.make(self)
if kind.id_ == "X32":
return type(
f"XAirRemote{kind}",
(XAirRemote,),
{
"__init__": init_x32,
},
)
return type(
f"XAirRemote{kind}",
(XAirRemote,),
{
"__init__": init,
"__init__": init_xair,
},
)