assign request client directly to context object.

This commit is contained in:
onyx-and-iris 2025-04-23 15:29:02 +01:00
parent 3852ba53f5
commit 6a8e7afc1d
9 changed files with 47 additions and 48 deletions

View File

@ -43,7 +43,7 @@ app.add_typer(scenecollection.app, name='scene-collection')
@app.command() @app.command()
def version(ctx: typer.Context): def version(ctx: typer.Context):
"""Get the OBS Client and WebSocket versions.""" """Get the OBS Client and WebSocket versions."""
resp = ctx.obj['obsws'].get_version() resp = ctx.obj.get_version()
typer.echo( typer.echo(
f'OBS Client version: {resp.obs_version} with WebSocket version: {resp.obs_web_socket_version}' f'OBS Client version: {resp.obs_version} with WebSocket version: {resp.obs_web_socket_version}'
) )
@ -69,8 +69,7 @@ def main(
if timeout: if timeout:
settings.TIMEOUT = timeout settings.TIMEOUT = timeout
ctx.obj = ctx.ensure_object(dict) ctx.obj = ctx.with_resource(
ctx.obj['obsws'] = ctx.with_resource(
obsws.ReqClient( obsws.ReqClient(
host=settings.HOST, host=settings.HOST,
port=settings.PORT, port=settings.PORT,

View File

@ -24,7 +24,7 @@ def list(ctx: typer.Context, scene_name: str):
) )
raise typer.Exit(code=1) raise typer.Exit(code=1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.obj.get_scene_item_list(scene_name)
groups = ( groups = (
item.get('sourceName') for item in resp.scene_items if item.get('isGroup') item.get('sourceName') for item in resp.scene_items if item.get('isGroup')
) )
@ -54,7 +54,7 @@ def show(ctx: typer.Context, scene_name: str, group_name: str):
) )
raise typer.Exit(code=1) raise typer.Exit(code=1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.obj.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
typer.echo( typer.echo(
f"Group '{group_name}' not found in scene {scene_name}.", f"Group '{group_name}' not found in scene {scene_name}.",
@ -62,7 +62,7 @@ def show(ctx: typer.Context, scene_name: str, group_name: str):
) )
raise typer.Exit(code=1) raise typer.Exit(code=1)
ctx.obj['obsws'].set_scene_item_enabled( ctx.obj.set_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(group.get('sceneItemId')), item_id=int(group.get('sceneItemId')),
enabled=True, enabled=True,
@ -81,7 +81,7 @@ def hide(ctx: typer.Context, scene_name: str, group_name: str):
) )
raise typer.Exit(code=1) raise typer.Exit(code=1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.obj.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
typer.echo( typer.echo(
f"Group '{group_name}' not found in scene {scene_name}.", f"Group '{group_name}' not found in scene {scene_name}.",
@ -89,7 +89,7 @@ def hide(ctx: typer.Context, scene_name: str, group_name: str):
) )
raise typer.Exit(code=1) raise typer.Exit(code=1)
ctx.obj['obsws'].set_scene_item_enabled( ctx.obj.set_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(group.get('sceneItemId')), item_id=int(group.get('sceneItemId')),
enabled=False, enabled=False,
@ -108,7 +108,7 @@ def toggle(ctx: typer.Context, scene_name: str, group_name: str):
) )
raise typer.Exit(code=1) raise typer.Exit(code=1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.obj.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
typer.echo( typer.echo(
f"Group '{group_name}' not found in scene {scene_name}.", f"Group '{group_name}' not found in scene {scene_name}.",
@ -117,7 +117,7 @@ def toggle(ctx: typer.Context, scene_name: str, group_name: str):
raise typer.Exit(code=1) raise typer.Exit(code=1)
new_state = not group.get('sceneItemEnabled') new_state = not group.get('sceneItemEnabled')
ctx.obj['obsws'].set_scene_item_enabled( ctx.obj.set_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(group.get('sceneItemId')), item_id=int(group.get('sceneItemId')),
enabled=new_state, enabled=new_state,
@ -139,7 +139,7 @@ def status(ctx: typer.Context, scene_name: str, group_name: str):
) )
raise typer.Exit(code=1) raise typer.Exit(code=1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.obj.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
typer.echo( typer.echo(
f"Group '{group_name}' not found in scene {scene_name}.", f"Group '{group_name}' not found in scene {scene_name}.",
@ -147,7 +147,7 @@ def status(ctx: typer.Context, scene_name: str, group_name: str):
) )
raise typer.Exit(code=1) raise typer.Exit(code=1)
enabled = ctx.obj['obsws'].get_scene_item_enabled( enabled = ctx.obj.get_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(group.get('sceneItemId')), item_id=int(group.get('sceneItemId')),
) )

View File

@ -23,7 +23,7 @@ def list(
colour: Annotated[bool, typer.Option(help='Filter by colour source type.')] = False, colour: Annotated[bool, typer.Option(help='Filter by colour source type.')] = False,
): ):
"""List all inputs.""" """List all inputs."""
resp = ctx.obj['obsws'].get_input_list() resp = ctx.obj.get_input_list()
kinds = [] kinds = []
if input: if input:
@ -52,7 +52,7 @@ def mute(ctx: typer.Context, input_name: str):
) )
raise typer.Exit(code=1) raise typer.Exit(code=1)
ctx.obj['obsws'].set_input_mute( ctx.obj.set_input_mute(
name=input_name, name=input_name,
muted=True, muted=True,
) )
@ -70,7 +70,7 @@ def unmute(ctx: typer.Context, input_name: str):
) )
raise typer.Exit(code=1) raise typer.Exit(code=1)
ctx.obj['obsws'].set_input_mute( ctx.obj.set_input_mute(
name=input_name, name=input_name,
muted=False, muted=False,
) )
@ -89,10 +89,10 @@ def toggle(ctx: typer.Context, input_name: str):
raise typer.Exit(code=1) raise typer.Exit(code=1)
# Get the current mute state # Get the current mute state
resp = ctx.obj['obsws'].get_input_mute(name=input_name) resp = ctx.obj.get_input_mute(name=input_name)
new_state = not resp.input_muted new_state = not resp.input_muted
ctx.obj['obsws'].set_input_mute( ctx.obj.set_input_mute(
name=input_name, name=input_name,
muted=new_state, muted=new_state,
) )

View File

@ -14,7 +14,7 @@ def main():
def _get_recording_status(ctx: typer.Context) -> tuple: def _get_recording_status(ctx: typer.Context) -> tuple:
"""Get recording status.""" """Get recording status."""
resp = ctx.obj['obsws'].get_record_status() resp = ctx.obj.get_record_status()
return resp.output_active, resp.output_paused return resp.output_active, resp.output_paused
@ -30,7 +30,7 @@ def start(ctx: typer.Context):
typer.echo(err_msg) typer.echo(err_msg)
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj['obsws'].start_record() ctx.obj.start_record()
typer.echo('Recording started successfully.') typer.echo('Recording started successfully.')
@ -42,7 +42,7 @@ def stop(ctx: typer.Context):
typer.echo('Recording is not in progress, cannot stop.') typer.echo('Recording is not in progress, cannot stop.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj['obsws'].stop_record() ctx.obj.stop_record()
typer.echo('Recording stopped successfully.') typer.echo('Recording stopped successfully.')
@ -80,7 +80,7 @@ def resume(ctx: typer.Context):
typer.echo('Recording is in progress but not paused, cannot resume.') typer.echo('Recording is in progress but not paused, cannot resume.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj['obsws'].resume_record() ctx.obj.resume_record()
typer.echo('Recording resumed successfully.') typer.echo('Recording resumed successfully.')
@ -95,5 +95,5 @@ def pause(ctx: typer.Context):
typer.echo('Recording is in progress but already paused, cannot pause.') typer.echo('Recording is in progress but already paused, cannot pause.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj['obsws'].pause_record() ctx.obj.pause_record()
typer.echo('Recording paused successfully.') typer.echo('Recording paused successfully.')

View File

@ -18,7 +18,7 @@ def main():
@app.command('list | ls') @app.command('list | ls')
def list(ctx: typer.Context): def list(ctx: typer.Context):
"""List all scenes.""" """List all scenes."""
resp = ctx.obj['obsws'].get_scene_list() resp = ctx.obj.get_scene_list()
scenes = (scene.get('sceneName') for scene in reversed(resp.scenes)) scenes = (scene.get('sceneName') for scene in reversed(resp.scenes))
typer.echo('\n'.join(scenes)) typer.echo('\n'.join(scenes))
@ -36,10 +36,10 @@ def current(
raise typer.Exit(1) raise typer.Exit(1)
if preview: if preview:
resp = ctx.obj['obsws'].get_current_preview_scene() resp = ctx.obj.get_current_preview_scene()
typer.echo(resp.current_preview_scene_name) typer.echo(resp.current_preview_scene_name)
else: else:
resp = ctx.obj['obsws'].get_current_program_scene() resp = ctx.obj.get_current_program_scene()
typer.echo(resp.current_program_scene_name) typer.echo(resp.current_program_scene_name)
@ -65,6 +65,6 @@ def switch(
raise typer.Exit(code=1) raise typer.Exit(code=1)
if preview: if preview:
ctx.obj['obsws'].set_current_preview_scene(scene_name) ctx.obj.set_current_preview_scene(scene_name)
else: else:
ctx.obj['obsws'].set_current_program_scene(scene_name) ctx.obj.set_current_program_scene(scene_name)

View File

@ -16,14 +16,14 @@ def main():
@app.command('list | ls') @app.command('list | ls')
def list(ctx: typer.Context): def list(ctx: typer.Context):
"""List all scene collections.""" """List all scene collections."""
resp = ctx.obj['obsws'].get_scene_collection_list() resp = ctx.obj.get_scene_collection_list()
typer.echo('\n'.join(resp.scene_collections)) typer.echo('\n'.join(resp.scene_collections))
@app.command('current | get') @app.command('current | get')
def current(ctx: typer.Context): def current(ctx: typer.Context):
"""Get the current scene collection.""" """Get the current scene collection."""
resp = ctx.obj['obsws'].get_scene_collection_list() resp = ctx.obj.get_scene_collection_list()
typer.echo(resp.current_scene_collection_name) typer.echo(resp.current_scene_collection_name)
@ -35,7 +35,7 @@ def switch(ctx: typer.Context, scene_collection_name: str):
raise typer.Exit(code=1) raise typer.Exit(code=1)
current_scene_collection = ( current_scene_collection = (
ctx.obj['obsws'].get_scene_collection_list().current_scene_collection_name ctx.obj.get_scene_collection_list().current_scene_collection_name
) )
if scene_collection_name == current_scene_collection: if scene_collection_name == current_scene_collection:
typer.echo( typer.echo(
@ -43,7 +43,7 @@ def switch(ctx: typer.Context, scene_collection_name: str):
) )
raise typer.Exit(code=1) raise typer.Exit(code=1)
ctx.obj['obsws'].set_current_scene_collection(scene_collection_name) ctx.obj.set_current_scene_collection(scene_collection_name)
typer.echo(f"Switched to scene collection '{scene_collection_name}'") typer.echo(f"Switched to scene collection '{scene_collection_name}'")
@ -56,5 +56,5 @@ def create(ctx: typer.Context, scene_collection_name: str):
) )
raise typer.Exit(code=1) raise typer.Exit(code=1)
ctx.obj['obsws'].create_scene_collection(scene_collection_name) ctx.obj.create_scene_collection(scene_collection_name)
typer.echo(f'Created scene collection {scene_collection_name}') typer.echo(f'Created scene collection {scene_collection_name}')

View File

@ -23,7 +23,7 @@ def list(ctx: typer.Context, scene_name: str):
typer.echo(f"Scene '{scene_name}' not found.") typer.echo(f"Scene '{scene_name}' not found.")
typer.Exit(code=1) typer.Exit(code=1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.obj.get_scene_item_list(scene_name)
items = (item.get('sourceName') for item in resp.scene_items) items = (item.get('sourceName') for item in resp.scene_items)
typer.echo('\n'.join(items)) typer.echo('\n'.join(items))
@ -60,7 +60,7 @@ def _get_scene_name_and_item_id(
ctx: typer.Context, scene_name: str, item_name: str, parent: str ctx: typer.Context, scene_name: str, item_name: str, parent: str
): ):
if parent: if parent:
resp = ctx.obj['obsws'].get_group_scene_item_list(parent) resp = ctx.obj.get_group_scene_item_list(parent)
for item in resp.scene_items: for item in resp.scene_items:
if item.get('sourceName') == item_name: if item.get('sourceName') == item_name:
scene_name = parent scene_name = parent
@ -70,7 +70,7 @@ def _get_scene_name_and_item_id(
typer.echo(f"Item '{item_name}' not found in group '{parent}'.") typer.echo(f"Item '{item_name}' not found in group '{parent}'.")
raise typer.Exit(code=1) raise typer.Exit(code=1)
else: else:
resp = ctx.obj['obsws'].get_scene_item_id(scene_name, item_name) resp = ctx.obj.get_scene_item_id(scene_name, item_name)
scene_item_id = resp.scene_item_id scene_item_id = resp.scene_item_id
return scene_name, scene_item_id return scene_name, scene_item_id
@ -89,7 +89,7 @@ def show(
ctx, scene_name, item_name, parent ctx, scene_name, item_name, parent
) )
ctx.obj['obsws'].set_scene_item_enabled( ctx.obj.set_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(scene_item_id), item_id=int(scene_item_id),
enabled=True, enabled=True,
@ -111,7 +111,7 @@ def hide(
ctx, scene_name, item_name, parent ctx, scene_name, item_name, parent
) )
ctx.obj['obsws'].set_scene_item_enabled( ctx.obj.set_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(scene_item_id), item_id=int(scene_item_id),
enabled=False, enabled=False,
@ -146,13 +146,13 @@ def toggle(
ctx, scene_name, item_name, parent ctx, scene_name, item_name, parent
) )
enabled = ctx.obj['obsws'].get_scene_item_enabled( enabled = ctx.obj.get_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(scene_item_id), item_id=int(scene_item_id),
) )
new_state = not enabled.scene_item_enabled new_state = not enabled.scene_item_enabled
ctx.obj['obsws'].set_scene_item_enabled( ctx.obj.set_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(scene_item_id), item_id=int(scene_item_id),
enabled=new_state, enabled=new_state,
@ -186,7 +186,7 @@ def visible(
ctx, scene_name, item_name, parent ctx, scene_name, item_name, parent
) )
enabled = ctx.obj['obsws'].get_scene_item_enabled( enabled = ctx.obj.get_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(scene_item_id), item_id=int(scene_item_id),
) )

View File

@ -14,7 +14,7 @@ def main():
def _get_streaming_status(ctx: typer.Context) -> tuple: def _get_streaming_status(ctx: typer.Context) -> tuple:
"""Get streaming status.""" """Get streaming status."""
resp = ctx.obj['obsws'].get_stream_status() resp = ctx.obj.get_stream_status()
return resp.output_active, resp.output_duration return resp.output_active, resp.output_duration
@ -26,7 +26,7 @@ def start(ctx: typer.Context):
typer.echo('Streaming is already in progress, cannot start.') typer.echo('Streaming is already in progress, cannot start.')
raise typer.Exit(code=1) raise typer.Exit(code=1)
ctx.obj['obsws'].start_stream() ctx.obj.start_stream()
typer.echo('Streaming started successfully.') typer.echo('Streaming started successfully.')
@ -38,7 +38,7 @@ def stop(ctx: typer.Context):
typer.echo('Streaming is not in progress, cannot stop.') typer.echo('Streaming is not in progress, cannot stop.')
raise typer.Exit(code=1) raise typer.Exit(code=1)
ctx.obj['obsws'].stop_stream() ctx.obj.stop_stream()
typer.echo('Streaming stopped successfully.') typer.echo('Streaming stopped successfully.')

View File

@ -8,19 +8,19 @@ skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=Fals
def input_in_inputs(ctx: typer.Context, input_name: str) -> bool: def input_in_inputs(ctx: typer.Context, input_name: str) -> bool:
"""Check if an input is in the input list.""" """Check if an input is in the input list."""
inputs = ctx.obj['obsws'].get_input_list().inputs inputs = ctx.obj.get_input_list().inputs
return any(input_.get('inputName') == input_name for input_ in inputs) return any(input_.get('inputName') == input_name for input_ in inputs)
def scene_in_scenes(ctx: typer.Context, scene_name: str) -> bool: def scene_in_scenes(ctx: typer.Context, scene_name: str) -> bool:
"""Check if a scene exists in the list of scenes.""" """Check if a scene exists in the list of scenes."""
resp = ctx.obj['obsws'].get_scene_list() resp = ctx.obj.get_scene_list()
return any(scene.get('sceneName') == scene_name for scene in resp.scenes) return any(scene.get('sceneName') == scene_name for scene in resp.scenes)
def studio_mode_enabled(ctx: typer.Context) -> bool: def studio_mode_enabled(ctx: typer.Context) -> bool:
"""Check if studio mode is enabled.""" """Check if studio mode is enabled."""
resp = ctx.obj['obsws'].get_studio_mode_enabled() resp = ctx.obj.get_studio_mode_enabled()
return resp.studio_mode_enabled return resp.studio_mode_enabled
@ -28,7 +28,7 @@ def scene_collection_in_scene_collections(
ctx: typer.Context, scene_collection_name: str ctx: typer.Context, scene_collection_name: str
) -> bool: ) -> bool:
"""Check if a scene collection exists.""" """Check if a scene collection exists."""
resp = ctx.obj['obsws'].get_scene_collection_list() resp = ctx.obj.get_scene_collection_list()
return any( return any(
collection == scene_collection_name for collection in resp.scene_collections collection == scene_collection_name for collection in resp.scene_collections
) )
@ -38,5 +38,5 @@ def item_in_scene_item_list(
ctx: typer.Context, scene_name: str, item_name: str ctx: typer.Context, scene_name: str, item_name: str
) -> bool: ) -> bool:
"""Check if an item exists in a scene.""" """Check if an item exists in a scene."""
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.obj.get_scene_item_list(scene_name)
return any(item.get('sourceName') == item_name for item in resp.scene_items) return any(item.get('sourceName') == item_name for item in resp.scene_items)