From dceafba0652ae34831d931f49a2e44cdede01d5f Mon Sep 17 00:00:00 2001 From: onyx-and-iris Date: Fri, 9 Jan 2026 09:20:45 +0000 Subject: [PATCH] extend input command group --- obsws_cli/input.py | 309 ++++++++++++++++++++++++++++++++++++++++-- obsws_cli/text.py | 20 +-- obsws_cli/validate.py | 29 +++- 3 files changed, 330 insertions(+), 28 deletions(-) diff --git a/obsws_cli/input.py b/obsws_cli/input.py index 5d13b97..63a63ba 100644 --- a/obsws_cli/input.py +++ b/obsws_cli/input.py @@ -18,6 +18,69 @@ def main(): """Control inputs in OBS.""" +@app.command('create | add') +def create( + ctx: typer.Context, + input_name: Annotated[ + str, + typer.Argument( + ..., + show_default=False, + help='Name of the input to create.', + callback=validate.input_not_in_inputs, + ), + ], + input_kind: Annotated[ + str, + typer.Argument( + ..., + show_default=False, + help='Kind of the input to create.', + callback=validate.kind_in_input_kinds, + ), + ], +): + """Create a new input.""" + current_scene = ( + ctx.obj['obsws'].get_current_program_scene().current_program_scene_name + ) + try: + ctx.obj['obsws'].create_input( + inputName=input_name, + inputKind=input_kind, + sceneItemEnabled=True, + sceneName=current_scene, + inputSettings={}, + ) + except obsws.error.OBSSDKRequestError as e: + console.err.print(f'Failed to create input: [yellow]{e}[/yellow]') + raise typer.Exit(1) + + console.out.print( + f'Input {console.highlight(ctx, input_name)} of kind ' + f'{console.highlight(ctx, input_kind)} created.', + ) + + +@app.command('remove | rm') +def remove( + ctx: typer.Context, + input_name: Annotated[ + str, + typer.Argument( + ..., + show_default=False, + help='Name of the input to remove.', + callback=validate.input_in_inputs, + ), + ], +): + """Remove an input.""" + ctx.obj['obsws'].remove_input(name=input_name) + + console.out.print(f'Input {console.highlight(ctx, input_name)} removed.') + + @app.command('list | ls') def list_( ctx: typer.Context, @@ -105,18 +168,47 @@ def list_( console.out.print(table) +@app.command('list-kinds | ls-k') +def list_kinds( + ctx: typer.Context, +): + """List all input kinds.""" + resp = ctx.obj['obsws'].get_input_kind_list(False) + kinds = sorted(resp.input_kinds) + + if not kinds: + console.out.print('No input kinds found.') + raise typer.Exit() + + table = Table( + title='Input Kinds', padding=(0, 2), border_style=ctx.obj['style'].border + ) + table.add_column( + Text('Input Kind', justify='center'), + justify='left', + style=ctx.obj['style'].column, + ) + + for kind in kinds: + table.add_row(util.snakecase_to_titlecase(kind)) + + console.out.print(table) + + @app.command('mute | m') def mute( ctx: typer.Context, input_name: Annotated[ - str, typer.Argument(..., show_default=False, help='Name of the input to mute.') + str, + typer.Argument( + ..., + show_default=False, + help='Name of the input to mute.', + callback=validate.input_in_inputs, + ), ], ): """Mute an input.""" - if not validate.input_in_inputs(ctx, input_name): - console.err.print(f'Input [yellow]{input_name}[/yellow] not found.') - raise typer.Exit(1) - ctx.obj['obsws'].set_input_mute( name=input_name, muted=True, @@ -130,14 +222,15 @@ def unmute( ctx: typer.Context, input_name: Annotated[ str, - typer.Argument(..., show_default=False, help='Name of the input to unmute.'), + typer.Argument( + ..., + show_default=False, + help='Name of the input to unmute.', + callback=validate.input_in_inputs, + ), ], ): """Unmute an input.""" - if not validate.input_in_inputs(ctx, input_name): - console.err.print(f'Input [yellow]{input_name}[/yellow] not found.') - raise typer.Exit(1) - ctx.obj['obsws'].set_input_mute( name=input_name, muted=False, @@ -151,14 +244,15 @@ def toggle( ctx: typer.Context, input_name: Annotated[ str, - typer.Argument(..., show_default=False, help='Name of the input to toggle.'), + typer.Argument( + ..., + show_default=False, + help='Name of the input to toggle.', + callback=validate.input_in_inputs, + ), ], ): """Toggle an input.""" - if not validate.input_in_inputs(ctx, input_name): - console.err.print(f'Input [yellow]{input_name}[/yellow] not found.') - raise typer.Exit(1) - resp = ctx.obj['obsws'].get_input_mute(name=input_name) new_state = not resp.input_muted @@ -175,3 +269,188 @@ def toggle( console.out.print( f'Input {console.highlight(ctx, input_name)} unmuted.', ) + + +@app.command('volume | vol') +def volume( + ctx: typer.Context, + input_name: Annotated[ + str, + typer.Argument( + ..., + show_default=False, + help='Name of the input to set volume for.', + callback=validate.input_in_inputs, + ), + ], + volume: Annotated[ + float, + typer.Argument( + ..., + show_default=False, + help='Volume level to set (-90 to 0).', + min=-90, + max=0, + ), + ], +): + """Set the volume of an input.""" + ctx.obj['obsws'].set_input_volume( + name=input_name, + vol_db=volume, + ) + + console.out.print( + f'Input {console.highlight(ctx, input_name)} volume set to {console.highlight(ctx, volume)}.', + ) + + +@app.command('show | s') +def show( + ctx: typer.Context, + input_name: Annotated[ + str, + typer.Argument( + ..., + show_default=False, + help='Name of the input to show.', + callback=validate.input_in_inputs, + ), + ], + verbose: Annotated[ + bool, typer.Option(help='List all available input devices.') + ] = False, +): + """Show information for an input in the current scene.""" + input_list = ctx.obj['obsws'].get_input_list() + for input_ in input_list.inputs: + if input_['inputName'] == input_name: + input_kind = input_['inputKind'] + break + + for prop in ['device', 'device_id']: + try: + device_id = ( + ctx.obj['obsws'] + .get_input_settings( + name=input_name, + ) + .input_settings.get(prop) + ) + if device_id: + break + except obsws.error.OBSSDKRequestError: + continue + else: + device_id = '(N/A)' + + for device in ( + ctx.obj['obsws'] + .get_input_properties_list_property_items( + input_name=input_name, + prop_name=prop, + ) + .property_items + ): + if device.get('itemValue') == device_id: + device_id = device.get('itemName') + break + + table = Table( + title='Input Information', padding=(0, 2), border_style=ctx.obj['style'].border + ) + columns = [ + (Text('Input Name', justify='center'), 'left', ctx.obj['style'].column), + (Text('Kind', justify='center'), 'left', ctx.obj['style'].column), + (Text('Device', justify='center'), 'left', ctx.obj['style'].column), + ] + for heading, justify, style in columns: + table.add_column(heading, justify=justify, style=style) + table.add_row( + input_name, + util.snakecase_to_titlecase(input_kind), + device_id, + ) + + console.out.print(table) + + if verbose: + resp = ctx.obj['obsws'].get_input_properties_list_property_items( + input_name=input_name, + prop_name=prop, + ) + table = Table( + title='Devices', + padding=(0, 2), + border_style=ctx.obj['style'].border, + ) + columns = [ + (Text('Name', justify='center'), 'left', ctx.obj['style'].column), + ] + for heading, justify, style in columns: + table.add_column(heading, justify=justify, style=style) + for i, item in enumerate(resp.property_items): + table.add_row( + item.get('itemName'), + style='' if i % 2 == 0 else 'dim', + ) + + console.out.print(table) + + +@app.command('update | upd') +def update( + ctx: typer.Context, + input_name: Annotated[ + str, + typer.Argument( + ..., + show_default=False, + help='Name of the input to update.', + callback=validate.input_in_inputs, + ), + ], + device_name: Annotated[ + str, + typer.Argument( + ..., + show_default=False, + help='Name of the device to set for the input.', + ), + ], +): + """Update a setting for an input.""" + device_id = None + for prop in ['device', 'device_id']: + try: + for device in ( + ctx.obj['obsws'] + .get_input_properties_list_property_items( + input_name=input_name, + prop_name=prop, + ) + .property_items + ): + if device.get('itemName') == device_name: + device_id = device.get('itemValue') + break + except obsws.error.OBSSDKRequestError: + continue + if device_id: + break + + if not device_id: + console.err.print( + f'Failed to find device ID for device ' + f'{console.highlight(ctx, device_name)}.', + ) + raise typer.Exit(1) + + ctx.obj['obsws'].set_input_settings( + name=input_name, settings={prop: device_id}, overlay=True + ) + + console.out.print( + f'Input {console.highlight(ctx, input_name)} updated to use device ' + f'{console.highlight(ctx, device_name)}.', + ) diff --git a/obsws_cli/text.py b/obsws_cli/text.py index a6bd0f4..9484a0d 100644 --- a/obsws_cli/text.py +++ b/obsws_cli/text.py @@ -18,13 +18,14 @@ def main(): @app.command('current | get') def current( ctx: typer.Context, - input_name: Annotated[str, typer.Argument(help='Name of the text input to get.')], + input_name: Annotated[ + str, + typer.Argument( + help='Name of the text input to get.', callback=validate.input_in_inputs + ), + ], ): """Get the current text for a text input.""" - if not validate.input_in_inputs(ctx, input_name): - console.err.print(f'Input [yellow]{input_name}[/yellow] not found.') - raise typer.Exit(1) - resp = ctx.obj['obsws'].get_input_settings(name=input_name) if not resp.input_kind.startswith('text_'): console.err.print( @@ -44,7 +45,10 @@ def current( def update( ctx: typer.Context, input_name: Annotated[ - str, typer.Argument(help='Name of the text input to update.') + str, + typer.Argument( + help='Name of the text input to update.', callback=validate.input_in_inputs + ), ], new_text: Annotated[ Optional[str], @@ -54,10 +58,6 @@ def update( ] = None, ): """Update the text of a text input.""" - if not validate.input_in_inputs(ctx, input_name): - console.err.print(f'Input [yellow]{input_name}[/yellow] not found.') - raise typer.Exit(1) - resp = ctx.obj['obsws'].get_input_settings(name=input_name) if not resp.input_kind.startswith('text_'): console.err.print( diff --git a/obsws_cli/validate.py b/obsws_cli/validate.py index ebfec1b..02c2a20 100644 --- a/obsws_cli/validate.py +++ b/obsws_cli/validate.py @@ -2,14 +2,28 @@ import typer +from . import console + # type alias for an option that is skipped when the command is run skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=False) def input_in_inputs(ctx: typer.Context, input_name: str) -> bool: - """Check if an input is in the input list.""" - inputs = ctx.obj['obsws'].get_input_list().inputs - return any(input_.get('inputName') == input_name for input_ in inputs) + """Ensure the given input exists in the list of inputs.""" + resp = ctx.obj['obsws'].get_input_list() + if not any(input.get('inputName') == input_name for input in resp.inputs): + console.err.print(f'Input [yellow]{input_name}[/yellow] does not exist.') + raise typer.Exit(1) + return input_name + + +def input_not_in_inputs(ctx: typer.Context, input_name: str) -> bool: + """Ensure an input does not already exist in the list of inputs.""" + resp = ctx.obj['obsws'].get_input_list() + if any(input.get('inputName') == input_name for input in resp.inputs): + console.err.print(f'Input [yellow]{input_name}[/yellow] already exists.') + raise typer.Exit(1) + return input_name def scene_in_scenes(ctx: typer.Context, scene_name: str) -> bool: @@ -52,3 +66,12 @@ def monitor_exists(ctx: typer.Context, monitor_index: int) -> bool: """Check if a monitor exists.""" resp = ctx.obj['obsws'].get_monitor_list() return any(monitor['monitorIndex'] == monitor_index for monitor in resp.monitors) + + +def kind_in_input_kinds(ctx: typer.Context, input_kind: str) -> str: + """Check if an input kind is valid.""" + resp = ctx.obj['obsws'].get_input_kind_list(False) + if not any(kind == input_kind for kind in resp.input_kinds): + console.err.print(f'Input kind [yellow]{input_kind}[/yellow] not found.') + raise typer.Exit(1) + return input_kind