convert filter sub app

This commit is contained in:
onyx-and-iris 2025-07-17 04:29:21 +01:00
parent abbab5c746
commit eb34a1833f

View File

@ -3,44 +3,42 @@
from typing import Annotated, Optional from typing import Annotated, Optional
import obsws_python as obsws import obsws_python as obsws
import typer from cyclopts import App, Argument, CycloptsError, Parameter
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
from . import console, util from . import console, util
from .alias import SubTyperAliasGroup from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='filter')
@app.callback() @app.command(name=['list', 'ls'])
def main():
"""Control filters in OBS scenes."""
@app.command('list | ls')
def list_( def list_(
ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[
Optional[str], Optional[str],
typer.Argument( Argument(
show_default='The current scene', hint='The source to list filters for',
help='The source to list filters for',
), ),
] = None, ] = None,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""List filters for a source.""" """List filters for a source."""
if not source_name: if not source_name:
source_name = ctx.obj['obsws'].get_current_program_scene().scene_name source_name = ctx.client.get_current_program_scene().scene_name
try: try:
resp = ctx.obj['obsws'].get_source_filter_list(source_name) resp = ctx.client.get_source_filter_list(source_name)
except obsws.error.OBSSDKRequestError as e: except obsws.error.OBSSDKRequestError as e:
if e.code == 600: if e.code == 600:
console.err.print( raise OBSWSCLIError(
f'No source was found by the name of [yellow]{source_name}[/yellow].' f'No source found by the name of [yellow]{source_name}[/yellow].',
code=ExitCode.NOT_FOUND,
) )
raise typer.Exit(1)
else: else:
raise raise
@ -48,25 +46,25 @@ def list_(
console.out.print( console.out.print(
f'No filters found for source {console.highlight(ctx, source_name)}' f'No filters found for source {console.highlight(ctx, source_name)}'
) )
raise typer.Exit() return
table = Table( table = Table(
title=f'Filters for Source: {source_name}', title=f'Filters for Source: {source_name}',
padding=(0, 2), padding=(0, 2),
border_style=ctx.obj['style'].border, border_style=ctx.style.border,
) )
columns = [ columns = [
(Text('Filter Name', justify='center'), 'left', ctx.obj['style'].column), (Text('Filter Name', justify='center'), 'left', ctx.style.column),
(Text('Kind', justify='center'), 'left', ctx.obj['style'].column), (Text('Kind', justify='center'), 'left', ctx.style.column),
(Text('Enabled', justify='center'), 'center', None), (Text('Enabled', justify='center'), 'center', None),
(Text('Settings', justify='center'), 'center', ctx.obj['style'].column), (Text('Settings', justify='center'), 'center', ctx.style.column),
] ]
for heading, justify, style in columns: for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style) table.add_column(heading, justify=justify, style=style)
for filter in resp.filters: for filter in resp.filters:
resp = ctx.obj['obsws'].get_source_filter_default_settings(filter['filterKind']) resp = ctx.client.get_source_filter_default_settings(filter['filterKind'])
settings = resp.default_filter_settings | filter['filterSettings'] settings = resp.default_filter_settings | filter['filterSettings']
table.add_row( table.add_row(
@ -84,93 +82,85 @@ def list_(
console.out.print(table) console.out.print(table)
def _get_filter_enabled(ctx: typer.Context, source_name: str, filter_name: str): def _get_filter_enabled(ctx: Context, source_name: str, filter_name: str):
"""Get the status of a filter for a source.""" """Get the status of a filter for a source."""
resp = ctx.obj['obsws'].get_source_filter(source_name, filter_name) resp = ctx.client.get_source_filter(source_name, filter_name)
return resp.filter_enabled return resp.filter_enabled
@app.command('enable | on') @app.command(name=['enable', 'on'])
def enable( def enable(
ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[
str, str,
typer.Argument( Argument(hint='The source to enable the filter for'),
..., show_default=False, help='The source to enable the filter for'
),
], ],
filter_name: Annotated[ filter_name: Annotated[
str, str,
typer.Argument( Argument(hint='The name of the filter to enable'),
..., show_default=False, help='The name of the filter to enable'
),
], ],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Enable a filter for a source.""" """Enable a filter for a source."""
if _get_filter_enabled(ctx, source_name, filter_name): if _get_filter_enabled(ctx, source_name, filter_name):
console.err.print( raise CycloptsError(
f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]' f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]',
console=console.err,
) )
raise typer.Exit(1)
ctx.obj['obsws'].set_source_filter_enabled(source_name, filter_name, enabled=True) ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=True)
console.out.print( console.out.print(
f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}' f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
) )
@app.command('disable | off') @app.command(name=['disable', 'off'])
def disable( def disable(
ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[
str, str,
typer.Argument( Argument(hint='The source to disable the filter for'),
..., show_default=False, help='The source to disable the filter for'
),
], ],
filter_name: Annotated[ filter_name: Annotated[
str, str,
typer.Argument( Argument(hint='The name of the filter to disable'),
..., show_default=False, help='The name of the filter to disable'
),
], ],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Disable a filter for a source.""" """Disable a filter for a source."""
if not _get_filter_enabled(ctx, source_name, filter_name): if not _get_filter_enabled(ctx, source_name, filter_name):
console.err.print( raise CycloptsError(
f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]' f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]',
console=console.err,
) )
raise typer.Exit(1)
ctx.obj['obsws'].set_source_filter_enabled(source_name, filter_name, enabled=False) ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=False)
console.out.print( console.out.print(
f'Disabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}' f'Disabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
) )
@app.command('toggle | tg') @app.command(name=['toggle', 'tg'])
def toggle( def toggle(
ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[
str, str,
typer.Argument( Argument(hint='The source to toggle the filter for'),
..., show_default=False, help='The source to toggle the filter for'
),
], ],
filter_name: Annotated[ filter_name: Annotated[
str, str,
typer.Argument( Argument(hint='The name of the filter to toggle'),
..., show_default=False, help='The name of the filter to toggle'
),
], ],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Toggle a filter for a source.""" """Toggle a filter for a source."""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name) is_enabled = _get_filter_enabled(ctx, source_name, filter_name)
new_state = not is_enabled new_state = not is_enabled
ctx.obj['obsws'].set_source_filter_enabled( ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=new_state)
source_name, filter_name, enabled=new_state
)
if new_state: if new_state:
console.out.print( console.out.print(
f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}' f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
@ -181,21 +171,19 @@ def toggle(
) )
@app.command('status | ss') @app.command(name=['status', 'ss'])
def status( def status(
ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[
str, str,
typer.Argument( Argument(hint='The source to get the filter status for'),
..., show_default=False, help='The source to get the filter status for'
),
], ],
filter_name: Annotated[ filter_name: Annotated[
str, str,
typer.Argument( Argument(hint='The name of the filter to get the status for'),
..., show_default=False, help='The name of the filter to get the status for'
),
], ],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Get the status of a filter for a source.""" """Get the status of a filter for a source."""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name) is_enabled = _get_filter_enabled(ctx, source_name, filter_name)