obsws-python/obsws_python/events.py

92 lines
2.9 KiB
Python
Raw Normal View History

import json
import logging
import threading
from websocket import WebSocketConnectionClosedException, WebSocketTimeoutException
from .baseclient import ObsClient
from .callback import Callback
2023-06-28 17:56:56 +01:00
from .error import OBSSDKError, OBSSDKTimeoutError
from .subs import Subs
"""
A class to interact with obs-websocket events
defined in official github repo
https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#events
"""
logger = logging.getLogger(__name__)
class EventClient:
def __init__(self, **kwargs):
self.logger = logger.getChild(self.__class__.__name__)
defaultkwargs = {"subs": Subs.LOW_VOLUME}
2022-07-27 19:49:37 +01:00
kwargs = defaultkwargs | kwargs
self.base_client = ObsClient(**kwargs)
2023-06-28 17:56:56 +01:00
try:
success = self.base_client.authenticate()
self.logger.info(
f"Successfully identified {self} with the server using RPC version:{success['negotiatedRpcVersion']}"
)
except OBSSDKError as e:
self.logger.error(f"{type(e).__name__}: {e}")
raise
self.callback = Callback()
self.subscribe()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.disconnect()
2022-10-25 05:28:50 +01:00
def __repr__(self):
return type(
self
).__name__ + "(host='{host}', port={port}, password='{password}', subs={subs}, timeout={timeout})".format(
**self.base_client.__dict__,
)
def __str__(self):
2022-10-25 05:28:50 +01:00
return type(self).__name__
def subscribe(self):
self.base_client.ws.settimeout(None)
stop_event = threading.Event()
self.worker = threading.Thread(
target=self.trigger, daemon=True, args=(stop_event,)
)
self.worker.start()
def trigger(self, stop_event):
"""
Continuously listen for events.
Triggers a callback on event received.
"""
while not stop_event.is_set():
try:
if r := self.base_client.ws.recv():
event = json.loads(r)
self.logger.debug(f"Event received {event}")
type_, data = (
event["d"].get("eventType"),
event["d"].get("eventData"),
)
self.callback.trigger(type_, data if data else {})
except WebSocketTimeoutException as e:
self.logger.exception(f"{type(e).__name__}: {e}")
raise OBSSDKTimeoutError("Timeout while waiting for event") from e
except (WebSocketConnectionClosedException, OSError) as e:
self.logger.debug(f"{type(e).__name__} terminating the event thread")
stop_event.set()
def disconnect(self):
"""stop listening for events"""
self.base_client.ws.close()
self.worker.join()
unsubscribe = disconnect