import logging import subprocess as sp import time import winreg from asyncio.subprocess import DEVNULL from functools import cached_property from pathlib import Path import slobs_websocket from slobs_websocket import StreamlabsOBS from . import configuration from .util import ensure_sl logger = logging.getLogger(__name__) class StreamlabsController: def __init__(self, duckypad, **kwargs): self.logger = logger.getChild(__class__.__name__) self._duckypad = duckypad for attr, val in kwargs.items(): setattr(self, attr, val) self.conn = StreamlabsOBS() self.proc = None #################################################################################### # CONNECT/DISCONNECT from the API #################################################################################### def connect(self): try: conn = configuration.get('streamlabs') assert conn is not None, 'expected configuration for streamlabs' self.conn.connect(**conn) except slobs_websocket.exceptions.ConnectionFailure as e: self.logger.error(f'{type(e).__name__}: {e}') raise self._duckypad.scene.scenes = {scene.name: scene.id for scene in self.conn.ScenesService.getScenes()} self.logger.debug(f'registered scenes: {self._duckypad.scene.scenes}') self.conn.ScenesService.sceneSwitched += self.on_scene_switched self.conn.StreamingService.streamingStatusChange += self.on_streaming_status_change def disconnect(self): self.conn.disconnect() #################################################################################### # EVENTS #################################################################################### def on_streaming_status_change(self, data): self.logger.debug(f'streaming status changed, now: {data}') if data in ('live', 'starting'): self._duckypad.stream.is_live = True else: self._duckypad.stream.is_live = False def on_scene_switched(self, data): self._duckypad.stream.current_scene = data.name self.logger.debug(f'stream.current_scene updated to {self._duckypad.stream.current_scene}') #################################################################################### # START/STOP the stream #################################################################################### @ensure_sl def begin_stream(self): if self._duckypad.stream.is_live: self.logger.info('Stream is already online') return self.conn.StreamingService.toggleStreaming() @ensure_sl def end_stream(self): if not self._duckypad.stream.is_live: self.logger.info('Stream is already offline') return self.conn.StreamingService.toggleStreaming() #################################################################################### # CONTROL the stream #################################################################################### @ensure_sl def switch_scene(self, name): return self.conn.ScenesService.makeSceneActive(self._duckypad.scene.scenes[name.upper()]) #################################################################################### # LAUNCH/SHUTDOWN the streamlabs process #################################################################################### @cached_property def sl_fullpath(self) -> Path: try: self.logger.debug('fetching sl_fullpath from the registry') SL_KEY = '029c4619-0385-5543-9426-46f9987161d9' with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r'{}'.format('SOFTWARE' + '\\' + SL_KEY)) as regpath: slpath = winreg.QueryValueEx(regpath, r'InstallLocation')[0] return Path(slpath) / 'Streamlabs OBS.exe' except FileNotFoundError as e: self.logger.exception(f'{type(e).__name__}: {e}') raise def launch(self, delay=5): if self.proc is None: self.proc = sp.Popen(str(self.sl_fullpath), shell=False, stdout=DEVNULL) time.sleep(delay) self.connect() def shutdown(self): self.disconnect() time.sleep(1) if self.proc is not None: self.proc.terminate() self.proc = None self._duckypad.stream.current_scene = ''