Compare commits

...

5 Commits

Author SHA1 Message Date
0c5bbc114f fix regression
patch bump
2025-06-12 05:42:30 +01:00
564f4116d1 remove exception handling, we no longer raise exception on empty sources list.
split long line

patch bump
2025-06-12 05:34:36 +01:00
48201e4bbb add 0.8.0 to CHANGELOG 2025-06-12 05:29:48 +01:00
00273ff461 upd tests to check exit_code and err output instead of exceptions 2025-06-12 05:29:22 +01:00
d33c209d7c add custom error class SlobsCliError
add exception group handling for all commands that may raise exceptions

add comments to for-else blocks
2025-06-12 05:28:54 +01:00
12 changed files with 186 additions and 134 deletions

View File

@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
# [0.8.0] - 2025-06-12
### Added
- custom error class SlobsCliError.
### Changed
- scene list now shows which scene is the current active scene.
- audio list now shows mute states.
- erroneous paths now print error messages and return 1 instead of raising exceptions.
- unit tests updated to reflect the changes.
# [0.7.6] - 2025-06-11 # [0.7.6] - 2025-06-11
### Added ### Added

View File

@ -1 +1 @@
__version__ = "0.7.8" __version__ = "0.8.2"

View File

@ -3,6 +3,7 @@ from anyio import create_task_group
from pyslobs import AudioService from pyslobs import AudioService
from .cli import cli from .cli import cli
from .errors import SlobsCliError
@cli.group() @cli.group()
@ -21,13 +22,16 @@ async def list(ctx: click.Context):
async def _run(): async def _run():
sources = await as_.get_sources() sources = await as_.get_sources()
if not sources: if not sources:
click.echo("No audio sources found.")
conn.close() conn.close()
click.Abort(click.style("No audio sources found.", fg="red")) return
click.echo("Available audio sources:")
for source in sources: for source in sources:
model = await source.get_model() model = await source.get_model()
click.echo( click.echo(
f"Source ID: {source.source_id}, Name: {model.name}, Muted: {model.muted}" f"- {click.style(model.name, fg='blue')} (ID: {model.source_id}, "
f"Muted: {click.style('', fg='green') if model.muted else click.style('', fg='red')})"
) )
conn.close() conn.close()
@ -51,19 +55,21 @@ async def mute(ctx: click.Context, source_name: str):
model = await source.get_model() model = await source.get_model()
if model.name.lower() == source_name.lower(): if model.name.lower() == source_name.lower():
break break
else: else: # If no source by the given name was found
conn.close() conn.close()
raise click.Abort( raise SlobsCliError(f"Source '{source_name}' not found.")
click.style(f"Source '{source_name}' not found.", fg="red")
)
await source.set_muted(True) await source.set_muted(True)
click.echo(f"Muted audio source: {source_name}") click.echo(f"Muted audio source: {source_name}")
conn.close() conn.close()
async with create_task_group() as tg: try:
tg.start_soon(conn.background_processing) async with create_task_group() as tg:
tg.start_soon(_run) tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@audio.command() @audio.command()
@ -81,19 +87,21 @@ async def unmute(ctx: click.Context, source_name: str):
model = await source.get_model() model = await source.get_model()
if model.name.lower() == source_name.lower(): if model.name.lower() == source_name.lower():
break break
else: else: # If no source by the given name was found
conn.close() conn.close()
raise click.Abort( raise SlobsCliError(f"Source '{source_name}' not found.")
click.style(f"Source '{source_name}' not found.", fg="red")
)
await source.set_muted(False) await source.set_muted(False)
click.echo(f"Unmuted audio source: {source_name}") click.echo(f"Unmuted audio source: {source_name}")
conn.close() conn.close()
async with create_task_group() as tg: try:
tg.start_soon(conn.background_processing) async with create_task_group() as tg:
tg.start_soon(_run) tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@audio.command() @audio.command()
@ -118,12 +126,14 @@ async def toggle(ctx: click.Context, source_name: str):
click.echo(f"Muted audio source: {source_name}") click.echo(f"Muted audio source: {source_name}")
conn.close() conn.close()
break break
else: else: # If no source by the given name was found
conn.close() conn.close()
raise click.Abort( raise SlobsCliError(f"Source '{source_name}' not found.")
click.style(f"Source '{source_name}' not found.", fg="red")
)
async with create_task_group() as tg: try:
tg.start_soon(conn.background_processing) async with create_task_group() as tg:
tg.start_soon(_run) tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e

13
src/slobs_cli/errors.py Normal file
View File

@ -0,0 +1,13 @@
import asyncclick as click
class SlobsCliError(click.ClickException):
"""Base class for all Slobs CLI errors."""
def __init__(self, message: str):
super().__init__(message)
self.exit_code = 1
def show(self):
"""Display the error message in red."""
click.secho(f"Error: {self.message}", fg="red", err=True)

View File

@ -3,6 +3,7 @@ from anyio import create_task_group
from pyslobs import StreamingService from pyslobs import StreamingService
from .cli import cli from .cli import cli
from .errors import SlobsCliError
@cli.group() @cli.group()
@ -24,16 +25,20 @@ async def start(ctx: click.Context):
if active: if active:
conn.close() conn.close()
raise click.Abort(click.style("Recording is already active.", fg="red")) raise SlobsCliError("Recording is already active.")
await ss.toggle_recording() await ss.toggle_recording()
click.echo("Recording started.") click.echo("Recording started.")
conn.close() conn.close()
async with create_task_group() as tg: try:
tg.start_soon(conn.background_processing) async with create_task_group() as tg:
tg.start_soon(_run) tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@record.command() @record.command()
@ -50,16 +55,20 @@ async def stop(ctx: click.Context):
if not active: if not active:
conn.close() conn.close()
raise click.Abort(click.style("Recording is already inactive.", fg="red")) raise SlobsCliError("Recording is already inactive.")
await ss.toggle_recording() await ss.toggle_recording()
click.echo("Recording stopped.") click.echo("Recording stopped.")
conn.close() conn.close()
async with create_task_group() as tg: try:
tg.start_soon(conn.background_processing) async with create_task_group() as tg:
tg.start_soon(_run) tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@record.command() @record.command()

View File

@ -3,6 +3,7 @@ from anyio import create_task_group
from pyslobs import StreamingService from pyslobs import StreamingService
from .cli import cli from .cli import cli
from .errors import SlobsCliError
@cli.group() @cli.group()
@ -24,15 +25,19 @@ async def start(ctx: click.Context):
if active: if active:
conn.close() conn.close()
raise click.Abort(click.style("Replay buffer is already active.", fg="red")) raise SlobsCliError("Replay buffer is already active.")
await ss.start_replay_buffer() await ss.start_replay_buffer()
click.echo("Replay buffer started.") click.echo("Replay buffer started.")
conn.close() conn.close()
async with create_task_group() as tg: try:
tg.start_soon(conn.background_processing) async with create_task_group() as tg:
tg.start_soon(_run) tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@replaybuffer.command() @replaybuffer.command()
@ -49,17 +54,19 @@ async def stop(ctx: click.Context):
if not active: if not active:
conn.close() conn.close()
raise click.Abort( raise SlobsCliError("Replay buffer is already inactive.")
click.style("Replay buffer is already inactive.", fg="red")
)
await ss.stop_replay_buffer() await ss.stop_replay_buffer()
click.echo("Replay buffer stopped.") click.echo("Replay buffer stopped.")
conn.close() conn.close()
async with create_task_group() as tg: try:
tg.start_soon(conn.background_processing) async with create_task_group() as tg:
tg.start_soon(_run) tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@replaybuffer.command() @replaybuffer.command()

View File

@ -3,6 +3,7 @@ from anyio import create_task_group
from pyslobs import ScenesService, TransitionsService from pyslobs import ScenesService, TransitionsService
from .cli import cli from .cli import cli
from .errors import SlobsCliError
@cli.group() @cli.group()
@ -22,11 +23,19 @@ async def list(ctx: click.Context):
scenes = await ss.get_scenes() scenes = await ss.get_scenes()
if not scenes: if not scenes:
click.echo("No scenes found.") click.echo("No scenes found.")
conn.close()
return return
active_scene = await ss.active_scene()
click.echo("Available scenes:") click.echo("Available scenes:")
for scene in scenes: for scene in scenes:
click.echo(f"- {click.style(scene.name, fg='blue')} (ID: {scene.id})") if scene.id == active_scene.id:
click.echo(
f"- {click.style(scene.name, fg='green')} (ID: {scene.id}) [Active]"
)
else:
click.echo(f"- {click.style(scene.name, fg='blue')} (ID: {scene.id})")
conn.close() conn.close()
@ -45,12 +54,9 @@ async def current(ctx: click.Context):
async def _run(): async def _run():
active_scene = await ss.active_scene() active_scene = await ss.active_scene()
if active_scene: click.echo(
click.echo( f"Current active scene: {click.style(active_scene.name, fg='green')} (ID: {active_scene.id})"
f"Current active scene: {click.style(active_scene.name, fg='green')} (ID: {active_scene.id})" )
)
else:
click.echo("No active scene found.")
conn.close() conn.close()
async with create_task_group() as tg: async with create_task_group() as tg:
@ -93,25 +99,25 @@ async def switch(ctx: click.Context, scene_name: str, preview: bool = False):
else: else:
if preview: if preview:
conn.close() conn.close()
raise click.Abort( raise SlobsCliError(
click.style( "Cannot switch to preview scene in non-studio mode."
"Cannot switch to preview scene in non-studio mode.",
fg="red",
)
) )
await ss.make_scene_active(scene.id) await ss.make_scene_active(scene.id)
click.echo( click.echo(
f"Switched to scene: {click.style(scene.name, fg='blue')} (ID: {scene.id}) in active mode." f"Switched to scene: {click.style(scene.name, fg='blue')} (ID: {scene.id}) in active mode."
) )
conn.close() conn.close()
break break
else: else: # If no scene by the given name was found
conn.close() conn.close()
raise click.ClickException( raise SlobsCliError(f"Scene '{scene_name}' not found.")
click.style(f"Scene '{scene_name}' not found.", fg="red")
)
async with create_task_group() as tg: try:
tg.start_soon(conn.background_processing) async with create_task_group() as tg:
tg.start_soon(_run) tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e

View File

@ -3,6 +3,7 @@ from anyio import create_task_group
from pyslobs import StreamingService from pyslobs import StreamingService
from .cli import cli from .cli import cli
from .errors import SlobsCliError
@cli.group() @cli.group()
@ -24,15 +25,19 @@ async def start(ctx: click.Context):
if active: if active:
conn.close() conn.close()
raise click.Abort(click.style("Stream is already active.", fg="red")) raise SlobsCliError("Stream is already active.")
await ss.toggle_streaming() await ss.toggle_streaming()
click.echo("Stream started.") click.echo("Stream started.")
conn.close() conn.close()
async with create_task_group() as tg: try:
tg.start_soon(conn.background_processing) async with create_task_group() as tg:
tg.start_soon(_run) tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@stream.command() @stream.command()
@ -49,15 +54,19 @@ async def stop(ctx: click.Context):
if not active: if not active:
conn.close() conn.close()
raise click.Abort(click.style("Stream is already inactive.", fg="red")) raise SlobsCliError("Stream is already inactive.")
await ss.toggle_streaming() await ss.toggle_streaming()
click.echo("Stream stopped.") click.echo("Stream stopped.")
conn.close() conn.close()
async with create_task_group() as tg: try:
tg.start_soon(conn.background_processing) async with create_task_group() as tg:
tg.start_soon(_run) tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@stream.command() @stream.command()

View File

@ -3,6 +3,7 @@ from anyio import create_task_group
from pyslobs import TransitionsService from pyslobs import TransitionsService
from .cli import cli from .cli import cli
from .errors import SlobsCliError
@cli.group() @cli.group()
@ -22,15 +23,19 @@ async def enable(ctx: click.Context):
model = await ts.get_model() model = await ts.get_model()
if model.studio_mode: if model.studio_mode:
conn.close() conn.close()
raise click.Abort(click.style("Studio mode is already enabled.", fg="red")) raise SlobsCliError("Studio mode is already enabled.")
await ts.enable_studio_mode() await ts.enable_studio_mode()
click.echo("Studio mode enabled successfully.") click.echo("Studio mode enabled successfully.")
conn.close() conn.close()
async with create_task_group() as tg: try:
tg.start_soon(conn.background_processing) async with create_task_group() as tg:
tg.start_soon(_run) tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@studiomode.command() @studiomode.command()
@ -45,15 +50,19 @@ async def disable(ctx: click.Context):
model = await ts.get_model() model = await ts.get_model()
if not model.studio_mode: if not model.studio_mode:
conn.close() conn.close()
raise click.Abort(click.style("Studio mode is already disabled.", fg="red")) raise SlobsCliError("Studio mode is already disabled.")
await ts.disable_studio_mode() await ts.disable_studio_mode()
click.echo("Studio mode disabled successfully.") click.echo("Studio mode disabled successfully.")
conn.close() conn.close()
async with create_task_group() as tg: try:
tg.start_soon(conn.background_processing) async with create_task_group() as tg:
tg.start_soon(_run) tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@studiomode.command() @studiomode.command()
@ -112,12 +121,16 @@ async def force_transition(ctx: click.Context):
model = await ts.get_model() model = await ts.get_model()
if not model.studio_mode: if not model.studio_mode:
conn.close() conn.close()
raise click.Abort(click.style("Studio mode is not enabled.", fg="red")) raise SlobsCliError("Studio mode is not enabled.")
await ts.execute_studio_mode_transition() await ts.execute_studio_mode_transition()
click.echo("Forced studio mode transition.") click.echo("Forced studio mode transition.")
conn.close() conn.close()
async with create_task_group() as tg: try:
tg.start_soon(conn.background_processing) async with create_task_group() as tg:
tg.start_soon(_run) tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e

View File

@ -1,5 +1,4 @@
import anyio import anyio
import asyncclick as click
import pytest import pytest
from asyncclick.testing import CliRunner from asyncclick.testing import CliRunner
@ -13,19 +12,14 @@ async def test_record_start():
assert result.exit_code == 0 assert result.exit_code == 0
active = "Recording is currently active." in result.output active = "Recording is currently active." in result.output
result = await runner.invoke(cli, ["record", "start"])
if not active: if not active:
result = await runner.invoke(cli, ["record", "start"])
assert result.exit_code == 0 assert result.exit_code == 0
assert "Recording started" in result.output assert "Recording started" in result.output
await anyio.sleep(1) # Allow some time for the recording to start await anyio.sleep(0.2) # Allow some time for the recording to start
else: else:
with pytest.raises(ExceptionGroup) as exc_info: assert result.exit_code != 0
result = await runner.invoke( assert "Recording is already active." in result.output
cli, ["record", "start"], catch_exceptions=False
)
assert exc_info.group_contains(
click.Abort, match="Recording is already active."
)
@pytest.mark.anyio @pytest.mark.anyio
@ -35,16 +29,11 @@ async def test_record_stop():
assert result.exit_code == 0 assert result.exit_code == 0
active = "Recording is currently active." in result.output active = "Recording is currently active." in result.output
result = await runner.invoke(cli, ["record", "stop"])
if active: if active:
result = await runner.invoke(cli, ["record", "stop"])
assert result.exit_code == 0 assert result.exit_code == 0
assert "Recording stopped" in result.output assert "Recording stopped" in result.output
await anyio.sleep(1) # Allow some time for the recording to stop await anyio.sleep(0.2) # Allow some time for the recording to stop
else: else:
with pytest.raises(ExceptionGroup) as exc_info: assert result.exit_code != 0
result = await runner.invoke( assert "Recording is already inactive." in result.output
cli, ["record", "stop"], catch_exceptions=False
)
assert exc_info.group_contains(
click.Abort, match="Recording is already inactive."
)

View File

@ -1,5 +1,4 @@
import anyio import anyio
import asyncclick as click
import pytest import pytest
from asyncclick.testing import CliRunner from asyncclick.testing import CliRunner
@ -13,19 +12,14 @@ async def test_replaybuffer_start():
assert result.exit_code == 0 assert result.exit_code == 0
active = "Replay buffer is currently active." in result.output active = "Replay buffer is currently active." in result.output
result = await runner.invoke(cli, ["replaybuffer", "start"])
if not active: if not active:
result = await runner.invoke(cli, ["replaybuffer", "start"])
assert result.exit_code == 0 assert result.exit_code == 0
assert "Replay buffer started" in result.output assert "Replay buffer started" in result.output
await anyio.sleep(1) await anyio.sleep(0.2) # Allow some time for the replay buffer to start
else: else:
with pytest.raises(ExceptionGroup) as exc_info: assert result.exit_code != 0
result = await runner.invoke( assert "Replay buffer is already active." in result.output
cli, ["replaybuffer", "start"], catch_exceptions=False
)
assert exc_info.group_contains(
click.Abort, match="Replay buffer is already active."
)
@pytest.mark.anyio @pytest.mark.anyio
@ -35,16 +29,11 @@ async def test_replaybuffer_stop():
assert result.exit_code == 0 assert result.exit_code == 0
active = "Replay buffer is currently active." in result.output active = "Replay buffer is currently active." in result.output
result = await runner.invoke(cli, ["replaybuffer", "stop"])
if active: if active:
result = await runner.invoke(cli, ["replaybuffer", "stop"])
assert result.exit_code == 0 assert result.exit_code == 0
assert "Replay buffer stopped" in result.output assert "Replay buffer stopped" in result.output
await anyio.sleep(1) await anyio.sleep(0.2) # Allow some time for the replay buffer to stop
else: else:
with pytest.raises(ExceptionGroup) as exc_info: assert result.exit_code != 0
result = await runner.invoke( assert "Replay buffer is already inactive." in result.output
cli, ["replaybuffer", "stop"], catch_exceptions=False
)
assert exc_info.group_contains(
click.Abort, match="Replay buffer is already inactive."
)

View File

@ -1,5 +1,4 @@
import anyio import anyio
import asyncclick as click
import pytest import pytest
from asyncclick.testing import CliRunner from asyncclick.testing import CliRunner
@ -13,17 +12,14 @@ async def test_stream_start():
assert result.exit_code == 0 assert result.exit_code == 0
active = "Stream is currently active." in result.output active = "Stream is currently active." in result.output
result = await runner.invoke(cli, ["stream", "start"])
if not active: if not active:
result = await runner.invoke(cli, ["stream", "start"])
assert result.exit_code == 0 assert result.exit_code == 0
assert "Stream started" in result.output assert "Stream started" in result.output
await anyio.sleep(1) # Allow some time for the stream to start await anyio.sleep(0.2) # Allow some time for the stream to start
else: else:
with pytest.raises(ExceptionGroup) as exc_info: assert result.exit_code != 0
result = await runner.invoke( assert "Stream is already active." in result.output
cli, ["stream", "start"], catch_exceptions=False
)
assert exc_info.group_contains(click.Abort, match="Stream is already active.")
@pytest.mark.anyio @pytest.mark.anyio
@ -33,14 +29,11 @@ async def test_stream_stop():
assert result.exit_code == 0 assert result.exit_code == 0
active = "Stream is currently active." in result.output active = "Stream is currently active." in result.output
result = await runner.invoke(cli, ["stream", "stop"])
if active: if active:
result = await runner.invoke(cli, ["stream", "stop"])
assert result.exit_code == 0 assert result.exit_code == 0
assert "Stream stopped" in result.output assert "Stream stopped" in result.output
await anyio.sleep(1) # Allow some time for the stream to stop await anyio.sleep(0.2) # Allow some time for the stream to stop
else: else:
with pytest.raises(ExceptionGroup) as exc_info: assert result.exit_code != 0
result = await runner.invoke( assert "Stream is already inactive." in result.output
cli, ["stream", "stop"], catch_exceptions=False
)
assert exc_info.group_contains(click.Abort, match="Stream is already inactive.")