From 6a8e7afc1dc39a3f49656e64da1c4fff9503a01e Mon Sep 17 00:00:00 2001 From: onyx-and-iris Date: Wed, 23 Apr 2025 15:29:02 +0100 Subject: [PATCH] assign request client directly to context object. --- obsws_cli/app.py | 5 ++--- obsws_cli/group.py | 18 +++++++++--------- obsws_cli/input.py | 10 +++++----- obsws_cli/record.py | 10 +++++----- obsws_cli/scene.py | 10 +++++----- obsws_cli/scenecollection.py | 10 +++++----- obsws_cli/sceneitem.py | 16 ++++++++-------- obsws_cli/stream.py | 6 +++--- obsws_cli/validate.py | 10 +++++----- 9 files changed, 47 insertions(+), 48 deletions(-) diff --git a/obsws_cli/app.py b/obsws_cli/app.py index b01efc7..aff8caa 100644 --- a/obsws_cli/app.py +++ b/obsws_cli/app.py @@ -43,7 +43,7 @@ app.add_typer(scenecollection.app, name='scene-collection') @app.command() def version(ctx: typer.Context): """Get the OBS Client and WebSocket versions.""" - resp = ctx.obj['obsws'].get_version() + resp = ctx.obj.get_version() typer.echo( f'OBS Client version: {resp.obs_version} with WebSocket version: {resp.obs_web_socket_version}' ) @@ -69,8 +69,7 @@ def main( if timeout: settings.TIMEOUT = timeout - ctx.obj = ctx.ensure_object(dict) - ctx.obj['obsws'] = ctx.with_resource( + ctx.obj = ctx.with_resource( obsws.ReqClient( host=settings.HOST, port=settings.PORT, diff --git a/obsws_cli/group.py b/obsws_cli/group.py index 77d1aad..7a9b78e 100644 --- a/obsws_cli/group.py +++ b/obsws_cli/group.py @@ -24,7 +24,7 @@ def list(ctx: typer.Context, scene_name: str): ) raise typer.Exit(code=1) - resp = ctx.obj['obsws'].get_scene_item_list(scene_name) + resp = ctx.obj.get_scene_item_list(scene_name) groups = ( item.get('sourceName') for item in resp.scene_items if item.get('isGroup') ) @@ -54,7 +54,7 @@ def show(ctx: typer.Context, scene_name: str, group_name: str): ) raise typer.Exit(code=1) - resp = ctx.obj['obsws'].get_scene_item_list(scene_name) + resp = ctx.obj.get_scene_item_list(scene_name) if (group := _get_group(group_name, resp)) is None: typer.echo( f"Group '{group_name}' not found in scene {scene_name}.", @@ -62,7 +62,7 @@ def show(ctx: typer.Context, scene_name: str, group_name: str): ) raise typer.Exit(code=1) - ctx.obj['obsws'].set_scene_item_enabled( + ctx.obj.set_scene_item_enabled( scene_name=scene_name, item_id=int(group.get('sceneItemId')), enabled=True, @@ -81,7 +81,7 @@ def hide(ctx: typer.Context, scene_name: str, group_name: str): ) raise typer.Exit(code=1) - resp = ctx.obj['obsws'].get_scene_item_list(scene_name) + resp = ctx.obj.get_scene_item_list(scene_name) if (group := _get_group(group_name, resp)) is None: typer.echo( f"Group '{group_name}' not found in scene {scene_name}.", @@ -89,7 +89,7 @@ def hide(ctx: typer.Context, scene_name: str, group_name: str): ) raise typer.Exit(code=1) - ctx.obj['obsws'].set_scene_item_enabled( + ctx.obj.set_scene_item_enabled( scene_name=scene_name, item_id=int(group.get('sceneItemId')), enabled=False, @@ -108,7 +108,7 @@ def toggle(ctx: typer.Context, scene_name: str, group_name: str): ) raise typer.Exit(code=1) - resp = ctx.obj['obsws'].get_scene_item_list(scene_name) + resp = ctx.obj.get_scene_item_list(scene_name) if (group := _get_group(group_name, resp)) is None: typer.echo( f"Group '{group_name}' not found in scene {scene_name}.", @@ -117,7 +117,7 @@ def toggle(ctx: typer.Context, scene_name: str, group_name: str): raise typer.Exit(code=1) new_state = not group.get('sceneItemEnabled') - ctx.obj['obsws'].set_scene_item_enabled( + ctx.obj.set_scene_item_enabled( scene_name=scene_name, item_id=int(group.get('sceneItemId')), enabled=new_state, @@ -139,7 +139,7 @@ def status(ctx: typer.Context, scene_name: str, group_name: str): ) raise typer.Exit(code=1) - resp = ctx.obj['obsws'].get_scene_item_list(scene_name) + resp = ctx.obj.get_scene_item_list(scene_name) if (group := _get_group(group_name, resp)) is None: typer.echo( f"Group '{group_name}' not found in scene {scene_name}.", @@ -147,7 +147,7 @@ def status(ctx: typer.Context, scene_name: str, group_name: str): ) raise typer.Exit(code=1) - enabled = ctx.obj['obsws'].get_scene_item_enabled( + enabled = ctx.obj.get_scene_item_enabled( scene_name=scene_name, item_id=int(group.get('sceneItemId')), ) diff --git a/obsws_cli/input.py b/obsws_cli/input.py index ae7e3b8..281468a 100644 --- a/obsws_cli/input.py +++ b/obsws_cli/input.py @@ -23,7 +23,7 @@ def list( colour: Annotated[bool, typer.Option(help='Filter by colour source type.')] = False, ): """List all inputs.""" - resp = ctx.obj['obsws'].get_input_list() + resp = ctx.obj.get_input_list() kinds = [] if input: @@ -52,7 +52,7 @@ def mute(ctx: typer.Context, input_name: str): ) raise typer.Exit(code=1) - ctx.obj['obsws'].set_input_mute( + ctx.obj.set_input_mute( name=input_name, muted=True, ) @@ -70,7 +70,7 @@ def unmute(ctx: typer.Context, input_name: str): ) raise typer.Exit(code=1) - ctx.obj['obsws'].set_input_mute( + ctx.obj.set_input_mute( name=input_name, muted=False, ) @@ -89,10 +89,10 @@ def toggle(ctx: typer.Context, input_name: str): raise typer.Exit(code=1) # Get the current mute state - resp = ctx.obj['obsws'].get_input_mute(name=input_name) + resp = ctx.obj.get_input_mute(name=input_name) new_state = not resp.input_muted - ctx.obj['obsws'].set_input_mute( + ctx.obj.set_input_mute( name=input_name, muted=new_state, ) diff --git a/obsws_cli/record.py b/obsws_cli/record.py index f6a7eb1..5ed2844 100644 --- a/obsws_cli/record.py +++ b/obsws_cli/record.py @@ -14,7 +14,7 @@ def main(): def _get_recording_status(ctx: typer.Context) -> tuple: """Get recording status.""" - resp = ctx.obj['obsws'].get_record_status() + resp = ctx.obj.get_record_status() return resp.output_active, resp.output_paused @@ -30,7 +30,7 @@ def start(ctx: typer.Context): typer.echo(err_msg) raise typer.Exit(1) - ctx.obj['obsws'].start_record() + ctx.obj.start_record() typer.echo('Recording started successfully.') @@ -42,7 +42,7 @@ def stop(ctx: typer.Context): typer.echo('Recording is not in progress, cannot stop.') raise typer.Exit(1) - ctx.obj['obsws'].stop_record() + ctx.obj.stop_record() typer.echo('Recording stopped successfully.') @@ -80,7 +80,7 @@ def resume(ctx: typer.Context): typer.echo('Recording is in progress but not paused, cannot resume.') raise typer.Exit(1) - ctx.obj['obsws'].resume_record() + ctx.obj.resume_record() typer.echo('Recording resumed successfully.') @@ -95,5 +95,5 @@ def pause(ctx: typer.Context): typer.echo('Recording is in progress but already paused, cannot pause.') raise typer.Exit(1) - ctx.obj['obsws'].pause_record() + ctx.obj.pause_record() typer.echo('Recording paused successfully.') diff --git a/obsws_cli/scene.py b/obsws_cli/scene.py index a5b469e..1b68eb3 100644 --- a/obsws_cli/scene.py +++ b/obsws_cli/scene.py @@ -18,7 +18,7 @@ def main(): @app.command('list | ls') def list(ctx: typer.Context): """List all scenes.""" - resp = ctx.obj['obsws'].get_scene_list() + resp = ctx.obj.get_scene_list() scenes = (scene.get('sceneName') for scene in reversed(resp.scenes)) typer.echo('\n'.join(scenes)) @@ -36,10 +36,10 @@ def current( raise typer.Exit(1) if preview: - resp = ctx.obj['obsws'].get_current_preview_scene() + resp = ctx.obj.get_current_preview_scene() typer.echo(resp.current_preview_scene_name) else: - resp = ctx.obj['obsws'].get_current_program_scene() + resp = ctx.obj.get_current_program_scene() typer.echo(resp.current_program_scene_name) @@ -65,6 +65,6 @@ def switch( raise typer.Exit(code=1) if preview: - ctx.obj['obsws'].set_current_preview_scene(scene_name) + ctx.obj.set_current_preview_scene(scene_name) else: - ctx.obj['obsws'].set_current_program_scene(scene_name) + ctx.obj.set_current_program_scene(scene_name) diff --git a/obsws_cli/scenecollection.py b/obsws_cli/scenecollection.py index 71822ce..a0a9bea 100644 --- a/obsws_cli/scenecollection.py +++ b/obsws_cli/scenecollection.py @@ -16,14 +16,14 @@ def main(): @app.command('list | ls') def list(ctx: typer.Context): """List all scene collections.""" - resp = ctx.obj['obsws'].get_scene_collection_list() + resp = ctx.obj.get_scene_collection_list() typer.echo('\n'.join(resp.scene_collections)) @app.command('current | get') def current(ctx: typer.Context): """Get the current scene collection.""" - resp = ctx.obj['obsws'].get_scene_collection_list() + resp = ctx.obj.get_scene_collection_list() typer.echo(resp.current_scene_collection_name) @@ -35,7 +35,7 @@ def switch(ctx: typer.Context, scene_collection_name: str): raise typer.Exit(code=1) current_scene_collection = ( - ctx.obj['obsws'].get_scene_collection_list().current_scene_collection_name + ctx.obj.get_scene_collection_list().current_scene_collection_name ) if scene_collection_name == current_scene_collection: typer.echo( @@ -43,7 +43,7 @@ def switch(ctx: typer.Context, scene_collection_name: str): ) raise typer.Exit(code=1) - ctx.obj['obsws'].set_current_scene_collection(scene_collection_name) + ctx.obj.set_current_scene_collection(scene_collection_name) typer.echo(f"Switched to scene collection '{scene_collection_name}'") @@ -56,5 +56,5 @@ def create(ctx: typer.Context, scene_collection_name: str): ) raise typer.Exit(code=1) - ctx.obj['obsws'].create_scene_collection(scene_collection_name) + ctx.obj.create_scene_collection(scene_collection_name) typer.echo(f'Created scene collection {scene_collection_name}') diff --git a/obsws_cli/sceneitem.py b/obsws_cli/sceneitem.py index 214e882..5147e6a 100644 --- a/obsws_cli/sceneitem.py +++ b/obsws_cli/sceneitem.py @@ -23,7 +23,7 @@ def list(ctx: typer.Context, scene_name: str): typer.echo(f"Scene '{scene_name}' not found.") typer.Exit(code=1) - resp = ctx.obj['obsws'].get_scene_item_list(scene_name) + resp = ctx.obj.get_scene_item_list(scene_name) items = (item.get('sourceName') for item in resp.scene_items) typer.echo('\n'.join(items)) @@ -60,7 +60,7 @@ def _get_scene_name_and_item_id( ctx: typer.Context, scene_name: str, item_name: str, parent: str ): if parent: - resp = ctx.obj['obsws'].get_group_scene_item_list(parent) + resp = ctx.obj.get_group_scene_item_list(parent) for item in resp.scene_items: if item.get('sourceName') == item_name: scene_name = parent @@ -70,7 +70,7 @@ def _get_scene_name_and_item_id( typer.echo(f"Item '{item_name}' not found in group '{parent}'.") raise typer.Exit(code=1) else: - resp = ctx.obj['obsws'].get_scene_item_id(scene_name, item_name) + resp = ctx.obj.get_scene_item_id(scene_name, item_name) scene_item_id = resp.scene_item_id return scene_name, scene_item_id @@ -89,7 +89,7 @@ def show( ctx, scene_name, item_name, parent ) - ctx.obj['obsws'].set_scene_item_enabled( + ctx.obj.set_scene_item_enabled( scene_name=scene_name, item_id=int(scene_item_id), enabled=True, @@ -111,7 +111,7 @@ def hide( ctx, scene_name, item_name, parent ) - ctx.obj['obsws'].set_scene_item_enabled( + ctx.obj.set_scene_item_enabled( scene_name=scene_name, item_id=int(scene_item_id), enabled=False, @@ -146,13 +146,13 @@ def toggle( ctx, scene_name, item_name, parent ) - enabled = ctx.obj['obsws'].get_scene_item_enabled( + enabled = ctx.obj.get_scene_item_enabled( scene_name=scene_name, item_id=int(scene_item_id), ) new_state = not enabled.scene_item_enabled - ctx.obj['obsws'].set_scene_item_enabled( + ctx.obj.set_scene_item_enabled( scene_name=scene_name, item_id=int(scene_item_id), enabled=new_state, @@ -186,7 +186,7 @@ def visible( ctx, scene_name, item_name, parent ) - enabled = ctx.obj['obsws'].get_scene_item_enabled( + enabled = ctx.obj.get_scene_item_enabled( scene_name=scene_name, item_id=int(scene_item_id), ) diff --git a/obsws_cli/stream.py b/obsws_cli/stream.py index 8a60ad6..bf95888 100644 --- a/obsws_cli/stream.py +++ b/obsws_cli/stream.py @@ -14,7 +14,7 @@ def main(): def _get_streaming_status(ctx: typer.Context) -> tuple: """Get streaming status.""" - resp = ctx.obj['obsws'].get_stream_status() + resp = ctx.obj.get_stream_status() return resp.output_active, resp.output_duration @@ -26,7 +26,7 @@ def start(ctx: typer.Context): typer.echo('Streaming is already in progress, cannot start.') raise typer.Exit(code=1) - ctx.obj['obsws'].start_stream() + ctx.obj.start_stream() typer.echo('Streaming started successfully.') @@ -38,7 +38,7 @@ def stop(ctx: typer.Context): typer.echo('Streaming is not in progress, cannot stop.') raise typer.Exit(code=1) - ctx.obj['obsws'].stop_stream() + ctx.obj.stop_stream() typer.echo('Streaming stopped successfully.') diff --git a/obsws_cli/validate.py b/obsws_cli/validate.py index 873e0eb..ec609bd 100644 --- a/obsws_cli/validate.py +++ b/obsws_cli/validate.py @@ -8,19 +8,19 @@ skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=Fals def input_in_inputs(ctx: typer.Context, input_name: str) -> bool: """Check if an input is in the input list.""" - inputs = ctx.obj['obsws'].get_input_list().inputs + inputs = ctx.obj.get_input_list().inputs return any(input_.get('inputName') == input_name for input_ in inputs) def scene_in_scenes(ctx: typer.Context, scene_name: str) -> bool: """Check if a scene exists in the list of scenes.""" - resp = ctx.obj['obsws'].get_scene_list() + resp = ctx.obj.get_scene_list() return any(scene.get('sceneName') == scene_name for scene in resp.scenes) def studio_mode_enabled(ctx: typer.Context) -> bool: """Check if studio mode is enabled.""" - resp = ctx.obj['obsws'].get_studio_mode_enabled() + resp = ctx.obj.get_studio_mode_enabled() return resp.studio_mode_enabled @@ -28,7 +28,7 @@ def scene_collection_in_scene_collections( ctx: typer.Context, scene_collection_name: str ) -> bool: """Check if a scene collection exists.""" - resp = ctx.obj['obsws'].get_scene_collection_list() + resp = ctx.obj.get_scene_collection_list() return any( collection == scene_collection_name for collection in resp.scene_collections ) @@ -38,5 +38,5 @@ def item_in_scene_item_list( ctx: typer.Context, scene_name: str, item_name: str ) -> bool: """Check if an item exists in a scene.""" - resp = ctx.obj['obsws'].get_scene_item_list(scene_name) + resp = ctx.obj.get_scene_item_list(scene_name) return any(item.get('sourceName') == item_name for item in resp.scene_items)