mirror of
https://github.com/onyx-and-iris/slobs-cli.git
synced 2025-08-06 20:11:44 +00:00
Compare commits
5 Commits
a8bed0f4d9
...
0c5bbc114f
Author | SHA1 | Date | |
---|---|---|---|
0c5bbc114f | |||
564f4116d1 | |||
48201e4bbb | |||
00273ff461 | |||
d33c209d7c |
14
CHANGELOG.md
14
CHANGELOG.md
@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
# [0.8.0] - 2025-06-12
|
||||
|
||||
### Added
|
||||
|
||||
- custom error class SlobsCliError.
|
||||
|
||||
### Changed
|
||||
|
||||
- scene list now shows which scene is the current active scene.
|
||||
- audio list now shows mute states.
|
||||
|
||||
- erroneous paths now print error messages and return 1 instead of raising exceptions.
|
||||
- unit tests updated to reflect the changes.
|
||||
|
||||
# [0.7.6] - 2025-06-11
|
||||
|
||||
### Added
|
||||
|
@ -1 +1 @@
|
||||
__version__ = "0.7.8"
|
||||
__version__ = "0.8.2"
|
||||
|
@ -3,6 +3,7 @@ from anyio import create_task_group
|
||||
from pyslobs import AudioService
|
||||
|
||||
from .cli import cli
|
||||
from .errors import SlobsCliError
|
||||
|
||||
|
||||
@cli.group()
|
||||
@ -21,13 +22,16 @@ async def list(ctx: click.Context):
|
||||
async def _run():
|
||||
sources = await as_.get_sources()
|
||||
if not sources:
|
||||
click.echo("No audio sources found.")
|
||||
conn.close()
|
||||
click.Abort(click.style("No audio sources found.", fg="red"))
|
||||
return
|
||||
|
||||
click.echo("Available audio sources:")
|
||||
for source in sources:
|
||||
model = await source.get_model()
|
||||
click.echo(
|
||||
f"Source ID: {source.source_id}, Name: {model.name}, Muted: {model.muted}"
|
||||
f"- {click.style(model.name, fg='blue')} (ID: {model.source_id}, "
|
||||
f"Muted: {click.style('✅', fg='green') if model.muted else click.style('❌', fg='red')})"
|
||||
)
|
||||
conn.close()
|
||||
|
||||
@ -51,19 +55,21 @@ async def mute(ctx: click.Context, source_name: str):
|
||||
model = await source.get_model()
|
||||
if model.name.lower() == source_name.lower():
|
||||
break
|
||||
else:
|
||||
else: # If no source by the given name was found
|
||||
conn.close()
|
||||
raise click.Abort(
|
||||
click.style(f"Source '{source_name}' not found.", fg="red")
|
||||
)
|
||||
raise SlobsCliError(f"Source '{source_name}' not found.")
|
||||
|
||||
await source.set_muted(True)
|
||||
click.echo(f"Muted audio source: {source_name}")
|
||||
conn.close()
|
||||
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
||||
|
||||
|
||||
@audio.command()
|
||||
@ -81,19 +87,21 @@ async def unmute(ctx: click.Context, source_name: str):
|
||||
model = await source.get_model()
|
||||
if model.name.lower() == source_name.lower():
|
||||
break
|
||||
else:
|
||||
else: # If no source by the given name was found
|
||||
conn.close()
|
||||
raise click.Abort(
|
||||
click.style(f"Source '{source_name}' not found.", fg="red")
|
||||
)
|
||||
raise SlobsCliError(f"Source '{source_name}' not found.")
|
||||
|
||||
await source.set_muted(False)
|
||||
click.echo(f"Unmuted audio source: {source_name}")
|
||||
conn.close()
|
||||
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
||||
|
||||
|
||||
@audio.command()
|
||||
@ -118,12 +126,14 @@ async def toggle(ctx: click.Context, source_name: str):
|
||||
click.echo(f"Muted audio source: {source_name}")
|
||||
conn.close()
|
||||
break
|
||||
else:
|
||||
else: # If no source by the given name was found
|
||||
conn.close()
|
||||
raise click.Abort(
|
||||
click.style(f"Source '{source_name}' not found.", fg="red")
|
||||
)
|
||||
raise SlobsCliError(f"Source '{source_name}' not found.")
|
||||
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
||||
|
13
src/slobs_cli/errors.py
Normal file
13
src/slobs_cli/errors.py
Normal file
@ -0,0 +1,13 @@
|
||||
import asyncclick as click
|
||||
|
||||
|
||||
class SlobsCliError(click.ClickException):
|
||||
"""Base class for all Slobs CLI errors."""
|
||||
|
||||
def __init__(self, message: str):
|
||||
super().__init__(message)
|
||||
self.exit_code = 1
|
||||
|
||||
def show(self):
|
||||
"""Display the error message in red."""
|
||||
click.secho(f"Error: {self.message}", fg="red", err=True)
|
@ -3,6 +3,7 @@ from anyio import create_task_group
|
||||
from pyslobs import StreamingService
|
||||
|
||||
from .cli import cli
|
||||
from .errors import SlobsCliError
|
||||
|
||||
|
||||
@cli.group()
|
||||
@ -24,16 +25,20 @@ async def start(ctx: click.Context):
|
||||
|
||||
if active:
|
||||
conn.close()
|
||||
raise click.Abort(click.style("Recording is already active.", fg="red"))
|
||||
raise SlobsCliError("Recording is already active.")
|
||||
|
||||
await ss.toggle_recording()
|
||||
click.echo("Recording started.")
|
||||
|
||||
conn.close()
|
||||
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
||||
|
||||
|
||||
@record.command()
|
||||
@ -50,16 +55,20 @@ async def stop(ctx: click.Context):
|
||||
|
||||
if not active:
|
||||
conn.close()
|
||||
raise click.Abort(click.style("Recording is already inactive.", fg="red"))
|
||||
raise SlobsCliError("Recording is already inactive.")
|
||||
|
||||
await ss.toggle_recording()
|
||||
click.echo("Recording stopped.")
|
||||
|
||||
conn.close()
|
||||
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
||||
|
||||
|
||||
@record.command()
|
||||
|
@ -3,6 +3,7 @@ from anyio import create_task_group
|
||||
from pyslobs import StreamingService
|
||||
|
||||
from .cli import cli
|
||||
from .errors import SlobsCliError
|
||||
|
||||
|
||||
@cli.group()
|
||||
@ -24,15 +25,19 @@ async def start(ctx: click.Context):
|
||||
|
||||
if active:
|
||||
conn.close()
|
||||
raise click.Abort(click.style("Replay buffer is already active.", fg="red"))
|
||||
raise SlobsCliError("Replay buffer is already active.")
|
||||
|
||||
await ss.start_replay_buffer()
|
||||
click.echo("Replay buffer started.")
|
||||
conn.close()
|
||||
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
||||
|
||||
|
||||
@replaybuffer.command()
|
||||
@ -49,17 +54,19 @@ async def stop(ctx: click.Context):
|
||||
|
||||
if not active:
|
||||
conn.close()
|
||||
raise click.Abort(
|
||||
click.style("Replay buffer is already inactive.", fg="red")
|
||||
)
|
||||
raise SlobsCliError("Replay buffer is already inactive.")
|
||||
|
||||
await ss.stop_replay_buffer()
|
||||
click.echo("Replay buffer stopped.")
|
||||
conn.close()
|
||||
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
||||
|
||||
|
||||
@replaybuffer.command()
|
||||
|
@ -3,6 +3,7 @@ from anyio import create_task_group
|
||||
from pyslobs import ScenesService, TransitionsService
|
||||
|
||||
from .cli import cli
|
||||
from .errors import SlobsCliError
|
||||
|
||||
|
||||
@cli.group()
|
||||
@ -22,11 +23,19 @@ async def list(ctx: click.Context):
|
||||
scenes = await ss.get_scenes()
|
||||
if not scenes:
|
||||
click.echo("No scenes found.")
|
||||
conn.close()
|
||||
return
|
||||
|
||||
active_scene = await ss.active_scene()
|
||||
|
||||
click.echo("Available scenes:")
|
||||
for scene in scenes:
|
||||
click.echo(f"- {click.style(scene.name, fg='blue')} (ID: {scene.id})")
|
||||
if scene.id == active_scene.id:
|
||||
click.echo(
|
||||
f"- {click.style(scene.name, fg='green')} (ID: {scene.id}) [Active]"
|
||||
)
|
||||
else:
|
||||
click.echo(f"- {click.style(scene.name, fg='blue')} (ID: {scene.id})")
|
||||
|
||||
conn.close()
|
||||
|
||||
@ -45,12 +54,9 @@ async def current(ctx: click.Context):
|
||||
|
||||
async def _run():
|
||||
active_scene = await ss.active_scene()
|
||||
if active_scene:
|
||||
click.echo(
|
||||
f"Current active scene: {click.style(active_scene.name, fg='green')} (ID: {active_scene.id})"
|
||||
)
|
||||
else:
|
||||
click.echo("No active scene found.")
|
||||
click.echo(
|
||||
f"Current active scene: {click.style(active_scene.name, fg='green')} (ID: {active_scene.id})"
|
||||
)
|
||||
conn.close()
|
||||
|
||||
async with create_task_group() as tg:
|
||||
@ -93,25 +99,25 @@ async def switch(ctx: click.Context, scene_name: str, preview: bool = False):
|
||||
else:
|
||||
if preview:
|
||||
conn.close()
|
||||
raise click.Abort(
|
||||
click.style(
|
||||
"Cannot switch to preview scene in non-studio mode.",
|
||||
fg="red",
|
||||
)
|
||||
raise SlobsCliError(
|
||||
"Cannot switch to preview scene in non-studio mode."
|
||||
)
|
||||
|
||||
await ss.make_scene_active(scene.id)
|
||||
click.echo(
|
||||
f"Switched to scene: {click.style(scene.name, fg='blue')} (ID: {scene.id}) in active mode."
|
||||
)
|
||||
|
||||
conn.close()
|
||||
break
|
||||
else:
|
||||
else: # If no scene by the given name was found
|
||||
conn.close()
|
||||
raise click.ClickException(
|
||||
click.style(f"Scene '{scene_name}' not found.", fg="red")
|
||||
)
|
||||
raise SlobsCliError(f"Scene '{scene_name}' not found.")
|
||||
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
||||
|
@ -3,6 +3,7 @@ from anyio import create_task_group
|
||||
from pyslobs import StreamingService
|
||||
|
||||
from .cli import cli
|
||||
from .errors import SlobsCliError
|
||||
|
||||
|
||||
@cli.group()
|
||||
@ -24,15 +25,19 @@ async def start(ctx: click.Context):
|
||||
|
||||
if active:
|
||||
conn.close()
|
||||
raise click.Abort(click.style("Stream is already active.", fg="red"))
|
||||
raise SlobsCliError("Stream is already active.")
|
||||
|
||||
await ss.toggle_streaming()
|
||||
click.echo("Stream started.")
|
||||
conn.close()
|
||||
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
||||
|
||||
|
||||
@stream.command()
|
||||
@ -49,15 +54,19 @@ async def stop(ctx: click.Context):
|
||||
|
||||
if not active:
|
||||
conn.close()
|
||||
raise click.Abort(click.style("Stream is already inactive.", fg="red"))
|
||||
raise SlobsCliError("Stream is already inactive.")
|
||||
|
||||
await ss.toggle_streaming()
|
||||
click.echo("Stream stopped.")
|
||||
conn.close()
|
||||
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
||||
|
||||
|
||||
@stream.command()
|
||||
|
@ -3,6 +3,7 @@ from anyio import create_task_group
|
||||
from pyslobs import TransitionsService
|
||||
|
||||
from .cli import cli
|
||||
from .errors import SlobsCliError
|
||||
|
||||
|
||||
@cli.group()
|
||||
@ -22,15 +23,19 @@ async def enable(ctx: click.Context):
|
||||
model = await ts.get_model()
|
||||
if model.studio_mode:
|
||||
conn.close()
|
||||
raise click.Abort(click.style("Studio mode is already enabled.", fg="red"))
|
||||
raise SlobsCliError("Studio mode is already enabled.")
|
||||
|
||||
await ts.enable_studio_mode()
|
||||
click.echo("Studio mode enabled successfully.")
|
||||
conn.close()
|
||||
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
||||
|
||||
|
||||
@studiomode.command()
|
||||
@ -45,15 +50,19 @@ async def disable(ctx: click.Context):
|
||||
model = await ts.get_model()
|
||||
if not model.studio_mode:
|
||||
conn.close()
|
||||
raise click.Abort(click.style("Studio mode is already disabled.", fg="red"))
|
||||
raise SlobsCliError("Studio mode is already disabled.")
|
||||
|
||||
await ts.disable_studio_mode()
|
||||
click.echo("Studio mode disabled successfully.")
|
||||
conn.close()
|
||||
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
||||
|
||||
|
||||
@studiomode.command()
|
||||
@ -112,12 +121,16 @@ async def force_transition(ctx: click.Context):
|
||||
model = await ts.get_model()
|
||||
if not model.studio_mode:
|
||||
conn.close()
|
||||
raise click.Abort(click.style("Studio mode is not enabled.", fg="red"))
|
||||
raise SlobsCliError("Studio mode is not enabled.")
|
||||
|
||||
await ts.execute_studio_mode_transition()
|
||||
click.echo("Forced studio mode transition.")
|
||||
conn.close()
|
||||
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
||||
|
@ -1,5 +1,4 @@
|
||||
import anyio
|
||||
import asyncclick as click
|
||||
import pytest
|
||||
from asyncclick.testing import CliRunner
|
||||
|
||||
@ -13,19 +12,14 @@ async def test_record_start():
|
||||
assert result.exit_code == 0
|
||||
active = "Recording is currently active." in result.output
|
||||
|
||||
result = await runner.invoke(cli, ["record", "start"])
|
||||
if not active:
|
||||
result = await runner.invoke(cli, ["record", "start"])
|
||||
assert result.exit_code == 0
|
||||
assert "Recording started" in result.output
|
||||
await anyio.sleep(1) # Allow some time for the recording to start
|
||||
await anyio.sleep(0.2) # Allow some time for the recording to start
|
||||
else:
|
||||
with pytest.raises(ExceptionGroup) as exc_info:
|
||||
result = await runner.invoke(
|
||||
cli, ["record", "start"], catch_exceptions=False
|
||||
)
|
||||
assert exc_info.group_contains(
|
||||
click.Abort, match="Recording is already active."
|
||||
)
|
||||
assert result.exit_code != 0
|
||||
assert "Recording is already active." in result.output
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@ -35,16 +29,11 @@ async def test_record_stop():
|
||||
assert result.exit_code == 0
|
||||
active = "Recording is currently active." in result.output
|
||||
|
||||
result = await runner.invoke(cli, ["record", "stop"])
|
||||
if active:
|
||||
result = await runner.invoke(cli, ["record", "stop"])
|
||||
assert result.exit_code == 0
|
||||
assert "Recording stopped" in result.output
|
||||
await anyio.sleep(1) # Allow some time for the recording to stop
|
||||
await anyio.sleep(0.2) # Allow some time for the recording to stop
|
||||
else:
|
||||
with pytest.raises(ExceptionGroup) as exc_info:
|
||||
result = await runner.invoke(
|
||||
cli, ["record", "stop"], catch_exceptions=False
|
||||
)
|
||||
assert exc_info.group_contains(
|
||||
click.Abort, match="Recording is already inactive."
|
||||
)
|
||||
assert result.exit_code != 0
|
||||
assert "Recording is already inactive." in result.output
|
||||
|
@ -1,5 +1,4 @@
|
||||
import anyio
|
||||
import asyncclick as click
|
||||
import pytest
|
||||
from asyncclick.testing import CliRunner
|
||||
|
||||
@ -13,19 +12,14 @@ async def test_replaybuffer_start():
|
||||
assert result.exit_code == 0
|
||||
active = "Replay buffer is currently active." in result.output
|
||||
|
||||
result = await runner.invoke(cli, ["replaybuffer", "start"])
|
||||
if not active:
|
||||
result = await runner.invoke(cli, ["replaybuffer", "start"])
|
||||
assert result.exit_code == 0
|
||||
assert "Replay buffer started" in result.output
|
||||
await anyio.sleep(1)
|
||||
await anyio.sleep(0.2) # Allow some time for the replay buffer to start
|
||||
else:
|
||||
with pytest.raises(ExceptionGroup) as exc_info:
|
||||
result = await runner.invoke(
|
||||
cli, ["replaybuffer", "start"], catch_exceptions=False
|
||||
)
|
||||
assert exc_info.group_contains(
|
||||
click.Abort, match="Replay buffer is already active."
|
||||
)
|
||||
assert result.exit_code != 0
|
||||
assert "Replay buffer is already active." in result.output
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@ -35,16 +29,11 @@ async def test_replaybuffer_stop():
|
||||
assert result.exit_code == 0
|
||||
active = "Replay buffer is currently active." in result.output
|
||||
|
||||
result = await runner.invoke(cli, ["replaybuffer", "stop"])
|
||||
if active:
|
||||
result = await runner.invoke(cli, ["replaybuffer", "stop"])
|
||||
assert result.exit_code == 0
|
||||
assert "Replay buffer stopped" in result.output
|
||||
await anyio.sleep(1)
|
||||
await anyio.sleep(0.2) # Allow some time for the replay buffer to stop
|
||||
else:
|
||||
with pytest.raises(ExceptionGroup) as exc_info:
|
||||
result = await runner.invoke(
|
||||
cli, ["replaybuffer", "stop"], catch_exceptions=False
|
||||
)
|
||||
assert exc_info.group_contains(
|
||||
click.Abort, match="Replay buffer is already inactive."
|
||||
)
|
||||
assert result.exit_code != 0
|
||||
assert "Replay buffer is already inactive." in result.output
|
||||
|
@ -1,5 +1,4 @@
|
||||
import anyio
|
||||
import asyncclick as click
|
||||
import pytest
|
||||
from asyncclick.testing import CliRunner
|
||||
|
||||
@ -13,17 +12,14 @@ async def test_stream_start():
|
||||
assert result.exit_code == 0
|
||||
active = "Stream is currently active." in result.output
|
||||
|
||||
result = await runner.invoke(cli, ["stream", "start"])
|
||||
if not active:
|
||||
result = await runner.invoke(cli, ["stream", "start"])
|
||||
assert result.exit_code == 0
|
||||
assert "Stream started" in result.output
|
||||
await anyio.sleep(1) # Allow some time for the stream to start
|
||||
await anyio.sleep(0.2) # Allow some time for the stream to start
|
||||
else:
|
||||
with pytest.raises(ExceptionGroup) as exc_info:
|
||||
result = await runner.invoke(
|
||||
cli, ["stream", "start"], catch_exceptions=False
|
||||
)
|
||||
assert exc_info.group_contains(click.Abort, match="Stream is already active.")
|
||||
assert result.exit_code != 0
|
||||
assert "Stream is already active." in result.output
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@ -33,14 +29,11 @@ async def test_stream_stop():
|
||||
assert result.exit_code == 0
|
||||
active = "Stream is currently active." in result.output
|
||||
|
||||
result = await runner.invoke(cli, ["stream", "stop"])
|
||||
if active:
|
||||
result = await runner.invoke(cli, ["stream", "stop"])
|
||||
assert result.exit_code == 0
|
||||
assert "Stream stopped" in result.output
|
||||
await anyio.sleep(1) # Allow some time for the stream to stop
|
||||
await anyio.sleep(0.2) # Allow some time for the stream to stop
|
||||
else:
|
||||
with pytest.raises(ExceptionGroup) as exc_info:
|
||||
result = await runner.invoke(
|
||||
cli, ["stream", "stop"], catch_exceptions=False
|
||||
)
|
||||
assert exc_info.group_contains(click.Abort, match="Stream is already inactive.")
|
||||
assert result.exit_code != 0
|
||||
assert "Stream is already inactive." in result.output
|
||||
|
Loading…
x
Reference in New Issue
Block a user