From d33c209d7ccbe9f9c46a9a72b0278d834dcdb2fa Mon Sep 17 00:00:00 2001 From: onyx-and-iris Date: Thu, 12 Jun 2025 05:28:54 +0100 Subject: [PATCH] add custom error class SlobsCliError add exception group handling for all commands that may raise exceptions add comments to for-else blocks --- src/slobs_cli/audio.py | 65 +++++++++++++++++++++-------------- src/slobs_cli/errors.py | 13 +++++++ src/slobs_cli/record.py | 25 +++++++++----- src/slobs_cli/replaybuffer.py | 27 +++++++++------ src/slobs_cli/scene.py | 45 +++++++++++++----------- src/slobs_cli/stream.py | 25 +++++++++----- src/slobs_cli/studiomode.py | 37 +++++++++++++------- 7 files changed, 153 insertions(+), 84 deletions(-) create mode 100644 src/slobs_cli/errors.py diff --git a/src/slobs_cli/audio.py b/src/slobs_cli/audio.py index 3a596c4..7604478 100644 --- a/src/slobs_cli/audio.py +++ b/src/slobs_cli/audio.py @@ -3,6 +3,7 @@ from anyio import create_task_group from pyslobs import AudioService from .cli import cli +from .errors import SlobsCliError @cli.group() @@ -22,18 +23,24 @@ async def list(ctx: click.Context): sources = await as_.get_sources() if not sources: conn.close() - click.Abort(click.style("No audio sources found.", fg="red")) + click.echo("No audio sources found.") + return + click.echo("Available audio sources:") for source in sources: model = await source.get_model() click.echo( - f"Source ID: {source.source_id}, Name: {model.name}, Muted: {model.muted}" + f"- {click.style(model.name, fg='blue')} (ID: {model.source_id}, Muted: {click.style('✅', fg='green') if model.muted else click.style('❌', fg='red')})" ) conn.close() - async with create_task_group() as tg: - tg.start_soon(conn.background_processing) - tg.start_soon(_run) + try: + async with create_task_group() as tg: + tg.start_soon(conn.background_processing) + tg.start_soon(_run) + except* SlobsCliError as excgroup: + for e in excgroup.exceptions: + raise e @audio.command() @@ -51,19 +58,21 @@ async def mute(ctx: click.Context, source_name: str): model = await source.get_model() if model.name.lower() == source_name.lower(): break - else: + else: # If no source by the given name was found conn.close() - raise click.Abort( - click.style(f"Source '{source_name}' not found.", fg="red") - ) + raise SlobsCliError(f"Source '{source_name}' not found.") await source.set_muted(True) click.echo(f"Muted audio source: {source_name}") conn.close() - async with create_task_group() as tg: - tg.start_soon(conn.background_processing) - tg.start_soon(_run) + try: + async with create_task_group() as tg: + tg.start_soon(conn.background_processing) + tg.start_soon(_run) + except* SlobsCliError as excgroup: + for e in excgroup.exceptions: + raise e @audio.command() @@ -81,19 +90,21 @@ async def unmute(ctx: click.Context, source_name: str): model = await source.get_model() if model.name.lower() == source_name.lower(): break - else: + else: # If no source by the given name was found conn.close() - raise click.Abort( - click.style(f"Source '{source_name}' not found.", fg="red") - ) + raise SlobsCliError(f"Source '{source_name}' not found.") await source.set_muted(False) click.echo(f"Unmuted audio source: {source_name}") conn.close() - async with create_task_group() as tg: - tg.start_soon(conn.background_processing) - tg.start_soon(_run) + try: + async with create_task_group() as tg: + tg.start_soon(conn.background_processing) + tg.start_soon(_run) + except* SlobsCliError as excgroup: + for e in excgroup.exceptions: + raise e @audio.command() @@ -118,12 +129,14 @@ async def toggle(ctx: click.Context, source_name: str): click.echo(f"Muted audio source: {source_name}") conn.close() break - else: + else: # If no source by the given name was found conn.close() - raise click.Abort( - click.style(f"Source '{source_name}' not found.", fg="red") - ) + raise SlobsCliError(f"Source '{source_name}' not found.") - async with create_task_group() as tg: - tg.start_soon(conn.background_processing) - tg.start_soon(_run) + try: + async with create_task_group() as tg: + tg.start_soon(conn.background_processing) + tg.start_soon(_run) + except* SlobsCliError as excgroup: + for e in excgroup.exceptions: + raise e diff --git a/src/slobs_cli/errors.py b/src/slobs_cli/errors.py new file mode 100644 index 0000000..5164f0e --- /dev/null +++ b/src/slobs_cli/errors.py @@ -0,0 +1,13 @@ +import asyncclick as click + + +class SlobsCliError(click.ClickException): + """Base class for all Slobs CLI errors.""" + + def __init__(self, message: str): + super().__init__(message) + self.exit_code = 1 + + def show(self): + """Display the error message in red.""" + click.secho(f"Error: {self.message}", fg="red", err=True) diff --git a/src/slobs_cli/record.py b/src/slobs_cli/record.py index 30c7c75..226443a 100644 --- a/src/slobs_cli/record.py +++ b/src/slobs_cli/record.py @@ -3,6 +3,7 @@ from anyio import create_task_group from pyslobs import StreamingService from .cli import cli +from .errors import SlobsCliError @cli.group() @@ -24,16 +25,20 @@ async def start(ctx: click.Context): if active: conn.close() - raise click.Abort(click.style("Recording is already active.", fg="red")) + raise SlobsCliError("Recording is already active.") await ss.toggle_recording() click.echo("Recording started.") conn.close() - async with create_task_group() as tg: - tg.start_soon(conn.background_processing) - tg.start_soon(_run) + try: + async with create_task_group() as tg: + tg.start_soon(conn.background_processing) + tg.start_soon(_run) + except* SlobsCliError as excgroup: + for e in excgroup.exceptions: + raise e @record.command() @@ -50,16 +55,20 @@ async def stop(ctx: click.Context): if not active: conn.close() - raise click.Abort(click.style("Recording is already inactive.", fg="red")) + raise SlobsCliError("Recording is already inactive.") await ss.toggle_recording() click.echo("Recording stopped.") conn.close() - async with create_task_group() as tg: - tg.start_soon(conn.background_processing) - tg.start_soon(_run) + try: + async with create_task_group() as tg: + tg.start_soon(conn.background_processing) + tg.start_soon(_run) + except* SlobsCliError as excgroup: + for e in excgroup.exceptions: + raise e @record.command() diff --git a/src/slobs_cli/replaybuffer.py b/src/slobs_cli/replaybuffer.py index 5da2444..d274487 100644 --- a/src/slobs_cli/replaybuffer.py +++ b/src/slobs_cli/replaybuffer.py @@ -3,6 +3,7 @@ from anyio import create_task_group from pyslobs import StreamingService from .cli import cli +from .errors import SlobsCliError @cli.group() @@ -24,15 +25,19 @@ async def start(ctx: click.Context): if active: conn.close() - raise click.Abort(click.style("Replay buffer is already active.", fg="red")) + raise SlobsCliError("Replay buffer is already active.") await ss.start_replay_buffer() click.echo("Replay buffer started.") conn.close() - async with create_task_group() as tg: - tg.start_soon(conn.background_processing) - tg.start_soon(_run) + try: + async with create_task_group() as tg: + tg.start_soon(conn.background_processing) + tg.start_soon(_run) + except* SlobsCliError as excgroup: + for e in excgroup.exceptions: + raise e @replaybuffer.command() @@ -49,17 +54,19 @@ async def stop(ctx: click.Context): if not active: conn.close() - raise click.Abort( - click.style("Replay buffer is already inactive.", fg="red") - ) + raise SlobsCliError("Replay buffer is already inactive.") await ss.stop_replay_buffer() click.echo("Replay buffer stopped.") conn.close() - async with create_task_group() as tg: - tg.start_soon(conn.background_processing) - tg.start_soon(_run) + try: + async with create_task_group() as tg: + tg.start_soon(conn.background_processing) + tg.start_soon(_run) + except* SlobsCliError as excgroup: + for e in excgroup.exceptions: + raise e @replaybuffer.command() diff --git a/src/slobs_cli/scene.py b/src/slobs_cli/scene.py index 6880955..07116eb 100644 --- a/src/slobs_cli/scene.py +++ b/src/slobs_cli/scene.py @@ -3,6 +3,7 @@ from anyio import create_task_group from pyslobs import ScenesService, TransitionsService from .cli import cli +from .errors import SlobsCliError @cli.group() @@ -22,11 +23,19 @@ async def list(ctx: click.Context): scenes = await ss.get_scenes() if not scenes: click.echo("No scenes found.") + conn.close() return + active_scene = await ss.active_scene() + click.echo("Available scenes:") for scene in scenes: - click.echo(f"- {click.style(scene.name, fg='blue')} (ID: {scene.id})") + if scene.id == active_scene.id: + click.echo( + f"- {click.style(scene.name, fg='green')} (ID: {scene.id}) [Active]" + ) + else: + click.echo(f"- {click.style(scene.name, fg='blue')} (ID: {scene.id})") conn.close() @@ -45,12 +54,9 @@ async def current(ctx: click.Context): async def _run(): active_scene = await ss.active_scene() - if active_scene: - click.echo( - f"Current active scene: {click.style(active_scene.name, fg='green')} (ID: {active_scene.id})" - ) - else: - click.echo("No active scene found.") + click.echo( + f"Current active scene: {click.style(active_scene.name, fg='green')} (ID: {active_scene.id})" + ) conn.close() async with create_task_group() as tg: @@ -93,25 +99,24 @@ async def switch(ctx: click.Context, scene_name: str, preview: bool = False): else: if preview: conn.close() - raise click.Abort( - click.style( - "Cannot switch to preview scene in non-studio mode.", - fg="red", - ) + raise SlobsCliError( + "Cannot switch to preview scene in non-studio mode." ) await ss.make_scene_active(scene.id) click.echo( f"Switched to scene: {click.style(scene.name, fg='blue')} (ID: {scene.id}) in active mode." ) + conn.close() break - else: - conn.close() - raise click.ClickException( - click.style(f"Scene '{scene_name}' not found.", fg="red") - ) + else: # If no scene by the given name was found + raise SlobsCliError(f"Scene '{scene_name}' not found.") - async with create_task_group() as tg: - tg.start_soon(conn.background_processing) - tg.start_soon(_run) + try: + async with create_task_group() as tg: + tg.start_soon(conn.background_processing) + tg.start_soon(_run) + except* SlobsCliError as excgroup: + for e in excgroup.exceptions: + raise e diff --git a/src/slobs_cli/stream.py b/src/slobs_cli/stream.py index f8b47d5..11752f9 100644 --- a/src/slobs_cli/stream.py +++ b/src/slobs_cli/stream.py @@ -3,6 +3,7 @@ from anyio import create_task_group from pyslobs import StreamingService from .cli import cli +from .errors import SlobsCliError @cli.group() @@ -24,15 +25,19 @@ async def start(ctx: click.Context): if active: conn.close() - raise click.Abort(click.style("Stream is already active.", fg="red")) + raise SlobsCliError("Stream is already active.") await ss.toggle_streaming() click.echo("Stream started.") conn.close() - async with create_task_group() as tg: - tg.start_soon(conn.background_processing) - tg.start_soon(_run) + try: + async with create_task_group() as tg: + tg.start_soon(conn.background_processing) + tg.start_soon(_run) + except* SlobsCliError as excgroup: + for e in excgroup.exceptions: + raise e @stream.command() @@ -49,15 +54,19 @@ async def stop(ctx: click.Context): if not active: conn.close() - raise click.Abort(click.style("Stream is already inactive.", fg="red")) + raise SlobsCliError("Stream is already inactive.") await ss.toggle_streaming() click.echo("Stream stopped.") conn.close() - async with create_task_group() as tg: - tg.start_soon(conn.background_processing) - tg.start_soon(_run) + try: + async with create_task_group() as tg: + tg.start_soon(conn.background_processing) + tg.start_soon(_run) + except* SlobsCliError as excgroup: + for e in excgroup.exceptions: + raise e @stream.command() diff --git a/src/slobs_cli/studiomode.py b/src/slobs_cli/studiomode.py index 6643817..1151dc7 100644 --- a/src/slobs_cli/studiomode.py +++ b/src/slobs_cli/studiomode.py @@ -3,6 +3,7 @@ from anyio import create_task_group from pyslobs import TransitionsService from .cli import cli +from .errors import SlobsCliError @cli.group() @@ -22,15 +23,19 @@ async def enable(ctx: click.Context): model = await ts.get_model() if model.studio_mode: conn.close() - raise click.Abort(click.style("Studio mode is already enabled.", fg="red")) + raise SlobsCliError("Studio mode is already enabled.") await ts.enable_studio_mode() click.echo("Studio mode enabled successfully.") conn.close() - async with create_task_group() as tg: - tg.start_soon(conn.background_processing) - tg.start_soon(_run) + try: + async with create_task_group() as tg: + tg.start_soon(conn.background_processing) + tg.start_soon(_run) + except* SlobsCliError as excgroup: + for e in excgroup.exceptions: + raise e @studiomode.command() @@ -45,15 +50,19 @@ async def disable(ctx: click.Context): model = await ts.get_model() if not model.studio_mode: conn.close() - raise click.Abort(click.style("Studio mode is already disabled.", fg="red")) + raise SlobsCliError("Studio mode is already disabled.") await ts.disable_studio_mode() click.echo("Studio mode disabled successfully.") conn.close() - async with create_task_group() as tg: - tg.start_soon(conn.background_processing) - tg.start_soon(_run) + try: + async with create_task_group() as tg: + tg.start_soon(conn.background_processing) + tg.start_soon(_run) + except* SlobsCliError as excgroup: + for e in excgroup.exceptions: + raise e @studiomode.command() @@ -112,12 +121,16 @@ async def force_transition(ctx: click.Context): model = await ts.get_model() if not model.studio_mode: conn.close() - raise click.Abort(click.style("Studio mode is not enabled.", fg="red")) + raise SlobsCliError("Studio mode is not enabled.") await ts.execute_studio_mode_transition() click.echo("Forced studio mode transition.") conn.close() - async with create_task_group() as tg: - tg.start_soon(conn.background_processing) - tg.start_soon(_run) + try: + async with create_task_group() as tg: + tg.start_soon(conn.background_processing) + tg.start_soon(_run) + except* SlobsCliError as excgroup: + for e in excgroup.exceptions: + raise e