move lower level logic into engine

This commit is contained in:
onyx-and-iris 2026-01-31 20:48:09 +00:00
parent fc8c8ad69a
commit 19779ae4c1
2 changed files with 103 additions and 95 deletions

View File

@ -3,7 +3,6 @@ package xair
import ( import (
"fmt" "fmt"
"net" "net"
"time"
"github.com/charmbracelet/log" "github.com/charmbracelet/log"
@ -14,18 +13,6 @@ type parser interface {
Parse(data []byte) (*osc.Message, error) Parse(data []byte) (*osc.Message, error)
} }
type engine struct {
Kind MixerKind
conn *net.UDPConn
mixerAddr *net.UDPAddr
parser parser
addressMap map[string]string
done chan bool
respChan chan *osc.Message
}
type Client struct { type Client struct {
engine engine
Strip *Strip Strip *Strip
@ -75,97 +62,21 @@ func NewClient(mixerIP string, mixerPort int, opts ...Option) (*Client, error) {
// Start begins listening for messages in a goroutine // Start begins listening for messages in a goroutine
func (c *Client) StartListening() { func (c *Client) StartListening() {
go c.receiveLoop() go c.engine.receiveLoop()
log.Debugf("Started listening on %s...", c.conn.LocalAddr().String()) log.Debugf("Started listening on %s...", c.engine.conn.LocalAddr().String())
}
// receiveLoop handles incoming OSC messages
func (c *Client) receiveLoop() {
buffer := make([]byte, 4096)
for {
select {
case <-c.done:
return
default:
// Set read timeout to avoid blocking forever
c.conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
n, _, err := c.conn.ReadFromUDP(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
// Timeout is expected, continue loop
continue
}
// Check if we're shutting down to avoid logging expected errors
select {
case <-c.done:
return
default:
log.Errorf("Read error: %v", err)
return
}
}
msg, err := c.parseOSCMessage(buffer[:n])
if err != nil {
log.Errorf("Failed to parse OSC message: %v", err)
continue
}
c.respChan <- msg
}
}
}
// parseOSCMessage parses raw bytes into an OSC message with improved error handling
func (c *Client) parseOSCMessage(data []byte) (*osc.Message, error) {
msg, err := c.parser.Parse(data)
if err != nil {
return nil, err
}
return msg, nil
} }
// Stop stops the client and closes the connection // Stop stops the client and closes the connection
func (c *Client) Stop() { func (c *Client) Stop() {
close(c.done) close(c.engine.done)
if c.conn != nil { if c.engine.conn != nil {
c.conn.Close() c.engine.conn.Close()
} }
} }
// SendMessage sends an OSC message to the mixer using the unified connection // SendMessage sends an OSC message to the mixer using the unified connection
func (c *Client) SendMessage(address string, args ...any) error { func (c *Client) SendMessage(address string, args ...any) error {
return c.SendToAddress(c.mixerAddr, address, args...) return c.engine.sendToAddress(c.mixerAddr, address, args...)
}
// SendToAddress sends an OSC message to a specific address (enables replying to different ports)
func (c *Client) SendToAddress(addr *net.UDPAddr, oscAddress string, args ...any) error {
msg := osc.NewMessage(oscAddress)
for _, arg := range args {
msg.Append(arg)
}
log.Debugf("Sending to %v: %s", addr, msg.String())
if len(args) > 0 {
log.Debug(" - Arguments: ")
for i, arg := range args {
if i > 0 {
log.Debug(", ")
}
log.Debugf("%v", arg)
}
}
log.Debug("")
data, err := msg.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal message: %v", err)
}
_, err = c.conn.WriteToUDP(data, addr)
return err
} }
// RequestInfo requests mixer information // RequestInfo requests mixer information

97
internal/xair/engine.go Normal file
View File

@ -0,0 +1,97 @@
package xair
import (
"fmt"
"net"
"time"
"github.com/charmbracelet/log"
"github.com/hypebeast/go-osc/osc"
)
type engine struct {
Kind MixerKind
conn *net.UDPConn
mixerAddr *net.UDPAddr
parser parser
addressMap map[string]string
done chan bool
respChan chan *osc.Message
}
// receiveLoop handles incoming OSC messages
func (e *engine) receiveLoop() {
buffer := make([]byte, 4096)
for {
select {
case <-e.done:
return
default:
// Set read timeout to avoid blocking forever
e.conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
n, _, err := e.conn.ReadFromUDP(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
// Timeout is expected, continue loop
continue
}
// Check if we're shutting down to avoid logging expected errors
select {
case <-e.done:
return
default:
log.Errorf("Read error: %v", err)
return
}
}
msg, err := e.parseOSCMessage(buffer[:n])
if err != nil {
log.Errorf("Failed to parse OSC message: %v", err)
continue
}
e.respChan <- msg
}
}
}
// parseOSCMessage parses raw bytes into an OSC message with improved error handling
func (e *engine) parseOSCMessage(data []byte) (*osc.Message, error) {
msg, err := e.parser.Parse(data)
if err != nil {
return nil, err
}
return msg, nil
}
// sendToAddress sends an OSC message to a specific address (enables replying to different ports)
func (e *engine) sendToAddress(addr *net.UDPAddr, oscAddress string, args ...any) error {
msg := osc.NewMessage(oscAddress)
for _, arg := range args {
msg.Append(arg)
}
log.Debugf("Sending to %v: %s", addr, msg.String())
if len(args) > 0 {
log.Debug(" - Arguments: ")
for i, arg := range args {
if i > 0 {
log.Debug(", ")
}
log.Debugf("%v", arg)
}
}
log.Debug("")
data, err := msg.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal message: %v", err)
}
_, err = e.conn.WriteToUDP(data, addr)
return err
}