add filter flags to input list command

add validation logic to input commands

minor version bump
This commit is contained in:
onyx-and-iris 2025-04-20 20:16:02 +01:00
parent 50b566bb71
commit 2b4c3add3c
2 changed files with 60 additions and 52 deletions

View File

@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online> # SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online>
# #
# SPDX-License-Identifier: MIT # SPDX-License-Identifier: MIT
__version__ = "0.5.0" __version__ = "0.6.0"

View File

@ -1,11 +1,10 @@
"""module containing commands for manipulating inputs.""" """module containing commands for manipulating inputs."""
import obsws_python as obsws from typing import Annotated
import typer import typer
from .alias import AliasGroup from .alias import AliasGroup
from .errors import ObswsCliBadParameter
from .protocols import DataclassProtocol
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
@ -15,75 +14,84 @@ def main():
"""Control inputs in OBS.""" """Control inputs in OBS."""
@app.command('ls') @app.command('list | ls')
def list(ctx: typer.Context): def list(
ctx: typer.Context,
input: Annotated[bool, typer.Option(help='Filter by input type.')] = False,
output: Annotated[bool, typer.Option(help='Filter by output type.')] = False,
colour: Annotated[bool, typer.Option(help='Filter by colour source type.')] = False,
):
"""List all inputs.""" """List all inputs."""
resp = ctx.obj['obsws'].get_input_list() resp = ctx.obj['obsws'].get_input_list()
inputs = (input.get('inputName') for input in resp.inputs)
typer.echo('\n'.join(inputs))
filters = []
if input:
filters.append('input')
if output:
filters.append('output')
if colour:
filters.append('color')
def _get_input(input_name: str, resp: DataclassProtocol) -> dict | None: inputs = filter(
"""Get an input from the input list response.""" lambda input_: any(kind in input_.get('inputKind') for kind in filters),
input_ = next( resp.inputs,
(input_ for input_ in resp.inputs if input_.get('inputName') == input_name),
None,
) )
typer.echo('\n'.join(input_.get('inputName') for input_ in inputs))
return input_
def _input_in_inputs(ctx: typer.Context, input_name: str) -> bool:
"""Check if an input is in the input list."""
inputs = ctx.obj['obsws'].get_input_list().inputs
return any(input_.get('inputName') == input_name for input_ in inputs)
@app.command() @app.command()
def mute(ctx: typer.Context, input_name: str): def mute(ctx: typer.Context, input_name: str):
"""Mute an input.""" """Mute an input."""
try: if not _input_in_inputs(ctx, input_name):
resp = ctx.obj['obsws'].get_input_list() typer.echo(
if (input_ := _get_input(input_name, resp)) is None: f"Input '{input_name}' not found.",
raise ObswsCliBadParameter(f"Input '{input_name}' not found.") err=True,
)
raise typer.Exit(code=1)
ctx.obj['obsws'].set_input_mute( ctx.obj['obsws'].set_input_mute(
name=input_.get('inputName'), name=input_name,
muted=True, muted=True,
) )
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
raise ObswsCliBadParameter(str(e)) from e
raise
@app.command() @app.command()
def unmute(ctx: typer.Context, input_name: str): def unmute(ctx: typer.Context, input_name: str):
"""Unmute an input.""" """Unmute an input."""
try: if not _input_in_inputs(ctx, input_name):
resp = ctx.obj['obsws'].get_input_list() typer.echo(
if (input_ := _get_input(input_name, resp)) is None: f"Input '{input_name}' not found.",
raise ObswsCliBadParameter(f"Input '{input_name}' not found.") err=True,
)
raise typer.Exit(code=1)
ctx.obj['obsws'].set_input_mute( ctx.obj['obsws'].set_input_mute(
name=input_.get('inputName'), name=input_name,
muted=False, muted=False,
) )
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
raise ObswsCliBadParameter(str(e)) from e
raise
@app.command() @app.command()
def toggle(ctx: typer.Context, input_name: str): def toggle(ctx: typer.Context, input_name: str):
"""Toggle an input.""" """Toggle an input."""
try: if not _input_in_inputs(ctx, input_name):
resp = ctx.obj['obsws'].get_input_list() typer.echo(
if (input_ := _get_input(input_name, resp)) is None: f"Input '{input_name}' not found.",
raise ObswsCliBadParameter(f"Input '{input_name}' not found.") err=True,
)
raise typer.Exit(code=1)
resp = ctx.obj['obsws'].get_input_mute(name=input_.get('inputName')) # Get the current mute state
resp = ctx.obj['obsws'].get_input_mute(name=input_name)
new_state = not resp.input_muted
ctx.obj['obsws'].set_input_mute( ctx.obj['obsws'].set_input_mute(
name=input_.get('inputName'), name=input_name,
muted=not resp.input_muted, muted=new_state,
) )
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
raise ObswsCliBadParameter(str(e)) from e
raise