From eb34a1833f449df7dd129897dd73d9b4d7c502e0 Mon Sep 17 00:00:00 2001 From: onyx-and-iris Date: Thu, 17 Jul 2025 04:29:21 +0100 Subject: [PATCH] convert filter sub app --- obsws_cli/filter.py | 126 ++++++++++++++++++++------------------------ 1 file changed, 57 insertions(+), 69 deletions(-) diff --git a/obsws_cli/filter.py b/obsws_cli/filter.py index 470be77..0d86e4f 100644 --- a/obsws_cli/filter.py +++ b/obsws_cli/filter.py @@ -3,44 +3,42 @@ from typing import Annotated, Optional import obsws_python as obsws -import typer +from cyclopts import App, Argument, CycloptsError, Parameter from rich.table import Table from rich.text import Text from . import console, util -from .alias import SubTyperAliasGroup +from .context import Context +from .enum import ExitCode +from .error import OBSWSCLIError -app = typer.Typer(cls=SubTyperAliasGroup) +app = App(name='filter') -@app.callback() -def main(): - """Control filters in OBS scenes.""" - - -@app.command('list | ls') +@app.command(name=['list', 'ls']) def list_( - ctx: typer.Context, source_name: Annotated[ Optional[str], - typer.Argument( - show_default='The current scene', - help='The source to list filters for', + Argument( + hint='The source to list filters for', ), ] = None, + /, + *, + ctx: Annotated[Context, Parameter(parse=False)], ): """List filters for a source.""" if not source_name: - source_name = ctx.obj['obsws'].get_current_program_scene().scene_name + source_name = ctx.client.get_current_program_scene().scene_name try: - resp = ctx.obj['obsws'].get_source_filter_list(source_name) + resp = ctx.client.get_source_filter_list(source_name) except obsws.error.OBSSDKRequestError as e: if e.code == 600: - console.err.print( - f'No source was found by the name of [yellow]{source_name}[/yellow].' + raise OBSWSCLIError( + f'No source found by the name of [yellow]{source_name}[/yellow].', + code=ExitCode.NOT_FOUND, ) - raise typer.Exit(1) else: raise @@ -48,25 +46,25 @@ def list_( console.out.print( f'No filters found for source {console.highlight(ctx, source_name)}' ) - raise typer.Exit() + return table = Table( title=f'Filters for Source: {source_name}', padding=(0, 2), - border_style=ctx.obj['style'].border, + border_style=ctx.style.border, ) columns = [ - (Text('Filter Name', justify='center'), 'left', ctx.obj['style'].column), - (Text('Kind', justify='center'), 'left', ctx.obj['style'].column), + (Text('Filter Name', justify='center'), 'left', ctx.style.column), + (Text('Kind', justify='center'), 'left', ctx.style.column), (Text('Enabled', justify='center'), 'center', None), - (Text('Settings', justify='center'), 'center', ctx.obj['style'].column), + (Text('Settings', justify='center'), 'center', ctx.style.column), ] for heading, justify, style in columns: table.add_column(heading, justify=justify, style=style) for filter in resp.filters: - resp = ctx.obj['obsws'].get_source_filter_default_settings(filter['filterKind']) + resp = ctx.client.get_source_filter_default_settings(filter['filterKind']) settings = resp.default_filter_settings | filter['filterSettings'] table.add_row( @@ -84,93 +82,85 @@ def list_( console.out.print(table) -def _get_filter_enabled(ctx: typer.Context, source_name: str, filter_name: str): +def _get_filter_enabled(ctx: Context, source_name: str, filter_name: str): """Get the status of a filter for a source.""" - resp = ctx.obj['obsws'].get_source_filter(source_name, filter_name) + resp = ctx.client.get_source_filter(source_name, filter_name) return resp.filter_enabled -@app.command('enable | on') +@app.command(name=['enable', 'on']) def enable( - ctx: typer.Context, source_name: Annotated[ str, - typer.Argument( - ..., show_default=False, help='The source to enable the filter for' - ), + Argument(hint='The source to enable the filter for'), ], filter_name: Annotated[ str, - typer.Argument( - ..., show_default=False, help='The name of the filter to enable' - ), + Argument(hint='The name of the filter to enable'), ], + /, + *, + ctx: Annotated[Context, Parameter(parse=False)], ): """Enable a filter for a source.""" if _get_filter_enabled(ctx, source_name, filter_name): - console.err.print( - f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]' + raise CycloptsError( + f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]', + console=console.err, ) - raise typer.Exit(1) - ctx.obj['obsws'].set_source_filter_enabled(source_name, filter_name, enabled=True) + ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=True) console.out.print( f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}' ) -@app.command('disable | off') +@app.command(name=['disable', 'off']) def disable( - ctx: typer.Context, source_name: Annotated[ str, - typer.Argument( - ..., show_default=False, help='The source to disable the filter for' - ), + Argument(hint='The source to disable the filter for'), ], filter_name: Annotated[ str, - typer.Argument( - ..., show_default=False, help='The name of the filter to disable' - ), + Argument(hint='The name of the filter to disable'), ], + /, + *, + ctx: Annotated[Context, Parameter(parse=False)], ): """Disable a filter for a source.""" if not _get_filter_enabled(ctx, source_name, filter_name): - console.err.print( - f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]' + raise CycloptsError( + f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]', + console=console.err, ) - raise typer.Exit(1) - ctx.obj['obsws'].set_source_filter_enabled(source_name, filter_name, enabled=False) + ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=False) console.out.print( f'Disabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}' ) -@app.command('toggle | tg') +@app.command(name=['toggle', 'tg']) def toggle( - ctx: typer.Context, source_name: Annotated[ str, - typer.Argument( - ..., show_default=False, help='The source to toggle the filter for' - ), + Argument(hint='The source to toggle the filter for'), ], filter_name: Annotated[ str, - typer.Argument( - ..., show_default=False, help='The name of the filter to toggle' - ), + Argument(hint='The name of the filter to toggle'), ], + /, + *, + ctx: Annotated[Context, Parameter(parse=False)], ): """Toggle a filter for a source.""" is_enabled = _get_filter_enabled(ctx, source_name, filter_name) new_state = not is_enabled - ctx.obj['obsws'].set_source_filter_enabled( - source_name, filter_name, enabled=new_state - ) + ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=new_state) if new_state: console.out.print( f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}' @@ -181,21 +171,19 @@ def toggle( ) -@app.command('status | ss') +@app.command(name=['status', 'ss']) def status( - ctx: typer.Context, source_name: Annotated[ str, - typer.Argument( - ..., show_default=False, help='The source to get the filter status for' - ), + Argument(hint='The source to get the filter status for'), ], filter_name: Annotated[ str, - typer.Argument( - ..., show_default=False, help='The name of the filter to get the status for' - ), + Argument(hint='The name of the filter to get the status for'), ], + /, + *, + ctx: Annotated[Context, Parameter(parse=False)], ): """Get the status of a filter for a source.""" is_enabled = _get_filter_enabled(ctx, source_name, filter_name)