mirror of
https://github.com/onyx-and-iris/slobs-cli.git
synced 2025-08-07 20:21:55 +00:00
Compare commits
No commits in common. "0c5bbc114fe653db8f9e80fb908f43b22dc34402" and "a8bed0f4d92f3fb63c504a4aa3a662faa51e14c6" have entirely different histories.
0c5bbc114f
...
a8bed0f4d9
14
CHANGELOG.md
14
CHANGELOG.md
@ -5,20 +5,6 @@ All notable changes to this project will be documented in this file.
|
|||||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||||
|
|
||||||
# [0.8.0] - 2025-06-12
|
|
||||||
|
|
||||||
### Added
|
|
||||||
|
|
||||||
- custom error class SlobsCliError.
|
|
||||||
|
|
||||||
### Changed
|
|
||||||
|
|
||||||
- scene list now shows which scene is the current active scene.
|
|
||||||
- audio list now shows mute states.
|
|
||||||
|
|
||||||
- erroneous paths now print error messages and return 1 instead of raising exceptions.
|
|
||||||
- unit tests updated to reflect the changes.
|
|
||||||
|
|
||||||
# [0.7.6] - 2025-06-11
|
# [0.7.6] - 2025-06-11
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
@ -1 +1 @@
|
|||||||
__version__ = "0.8.2"
|
__version__ = "0.7.8"
|
||||||
|
@ -3,7 +3,6 @@ from anyio import create_task_group
|
|||||||
from pyslobs import AudioService
|
from pyslobs import AudioService
|
||||||
|
|
||||||
from .cli import cli
|
from .cli import cli
|
||||||
from .errors import SlobsCliError
|
|
||||||
|
|
||||||
|
|
||||||
@cli.group()
|
@cli.group()
|
||||||
@ -22,16 +21,13 @@ async def list(ctx: click.Context):
|
|||||||
async def _run():
|
async def _run():
|
||||||
sources = await as_.get_sources()
|
sources = await as_.get_sources()
|
||||||
if not sources:
|
if not sources:
|
||||||
click.echo("No audio sources found.")
|
|
||||||
conn.close()
|
conn.close()
|
||||||
return
|
click.Abort(click.style("No audio sources found.", fg="red"))
|
||||||
|
|
||||||
click.echo("Available audio sources:")
|
|
||||||
for source in sources:
|
for source in sources:
|
||||||
model = await source.get_model()
|
model = await source.get_model()
|
||||||
click.echo(
|
click.echo(
|
||||||
f"- {click.style(model.name, fg='blue')} (ID: {model.source_id}, "
|
f"Source ID: {source.source_id}, Name: {model.name}, Muted: {model.muted}"
|
||||||
f"Muted: {click.style('✅', fg='green') if model.muted else click.style('❌', fg='red')})"
|
|
||||||
)
|
)
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
@ -55,21 +51,19 @@ async def mute(ctx: click.Context, source_name: str):
|
|||||||
model = await source.get_model()
|
model = await source.get_model()
|
||||||
if model.name.lower() == source_name.lower():
|
if model.name.lower() == source_name.lower():
|
||||||
break
|
break
|
||||||
else: # If no source by the given name was found
|
else:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise SlobsCliError(f"Source '{source_name}' not found.")
|
raise click.Abort(
|
||||||
|
click.style(f"Source '{source_name}' not found.", fg="red")
|
||||||
|
)
|
||||||
|
|
||||||
await source.set_muted(True)
|
await source.set_muted(True)
|
||||||
click.echo(f"Muted audio source: {source_name}")
|
click.echo(f"Muted audio source: {source_name}")
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
try:
|
async with create_task_group() as tg:
|
||||||
async with create_task_group() as tg:
|
tg.start_soon(conn.background_processing)
|
||||||
tg.start_soon(conn.background_processing)
|
tg.start_soon(_run)
|
||||||
tg.start_soon(_run)
|
|
||||||
except* SlobsCliError as excgroup:
|
|
||||||
for e in excgroup.exceptions:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
|
|
||||||
@audio.command()
|
@audio.command()
|
||||||
@ -87,21 +81,19 @@ async def unmute(ctx: click.Context, source_name: str):
|
|||||||
model = await source.get_model()
|
model = await source.get_model()
|
||||||
if model.name.lower() == source_name.lower():
|
if model.name.lower() == source_name.lower():
|
||||||
break
|
break
|
||||||
else: # If no source by the given name was found
|
else:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise SlobsCliError(f"Source '{source_name}' not found.")
|
raise click.Abort(
|
||||||
|
click.style(f"Source '{source_name}' not found.", fg="red")
|
||||||
|
)
|
||||||
|
|
||||||
await source.set_muted(False)
|
await source.set_muted(False)
|
||||||
click.echo(f"Unmuted audio source: {source_name}")
|
click.echo(f"Unmuted audio source: {source_name}")
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
try:
|
async with create_task_group() as tg:
|
||||||
async with create_task_group() as tg:
|
tg.start_soon(conn.background_processing)
|
||||||
tg.start_soon(conn.background_processing)
|
tg.start_soon(_run)
|
||||||
tg.start_soon(_run)
|
|
||||||
except* SlobsCliError as excgroup:
|
|
||||||
for e in excgroup.exceptions:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
|
|
||||||
@audio.command()
|
@audio.command()
|
||||||
@ -126,14 +118,12 @@ async def toggle(ctx: click.Context, source_name: str):
|
|||||||
click.echo(f"Muted audio source: {source_name}")
|
click.echo(f"Muted audio source: {source_name}")
|
||||||
conn.close()
|
conn.close()
|
||||||
break
|
break
|
||||||
else: # If no source by the given name was found
|
else:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise SlobsCliError(f"Source '{source_name}' not found.")
|
raise click.Abort(
|
||||||
|
click.style(f"Source '{source_name}' not found.", fg="red")
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
async with create_task_group() as tg:
|
||||||
async with create_task_group() as tg:
|
tg.start_soon(conn.background_processing)
|
||||||
tg.start_soon(conn.background_processing)
|
tg.start_soon(_run)
|
||||||
tg.start_soon(_run)
|
|
||||||
except* SlobsCliError as excgroup:
|
|
||||||
for e in excgroup.exceptions:
|
|
||||||
raise e
|
|
||||||
|
@ -1,13 +0,0 @@
|
|||||||
import asyncclick as click
|
|
||||||
|
|
||||||
|
|
||||||
class SlobsCliError(click.ClickException):
|
|
||||||
"""Base class for all Slobs CLI errors."""
|
|
||||||
|
|
||||||
def __init__(self, message: str):
|
|
||||||
super().__init__(message)
|
|
||||||
self.exit_code = 1
|
|
||||||
|
|
||||||
def show(self):
|
|
||||||
"""Display the error message in red."""
|
|
||||||
click.secho(f"Error: {self.message}", fg="red", err=True)
|
|
@ -3,7 +3,6 @@ from anyio import create_task_group
|
|||||||
from pyslobs import StreamingService
|
from pyslobs import StreamingService
|
||||||
|
|
||||||
from .cli import cli
|
from .cli import cli
|
||||||
from .errors import SlobsCliError
|
|
||||||
|
|
||||||
|
|
||||||
@cli.group()
|
@cli.group()
|
||||||
@ -25,20 +24,16 @@ async def start(ctx: click.Context):
|
|||||||
|
|
||||||
if active:
|
if active:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise SlobsCliError("Recording is already active.")
|
raise click.Abort(click.style("Recording is already active.", fg="red"))
|
||||||
|
|
||||||
await ss.toggle_recording()
|
await ss.toggle_recording()
|
||||||
click.echo("Recording started.")
|
click.echo("Recording started.")
|
||||||
|
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
try:
|
async with create_task_group() as tg:
|
||||||
async with create_task_group() as tg:
|
tg.start_soon(conn.background_processing)
|
||||||
tg.start_soon(conn.background_processing)
|
tg.start_soon(_run)
|
||||||
tg.start_soon(_run)
|
|
||||||
except* SlobsCliError as excgroup:
|
|
||||||
for e in excgroup.exceptions:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
|
|
||||||
@record.command()
|
@record.command()
|
||||||
@ -55,20 +50,16 @@ async def stop(ctx: click.Context):
|
|||||||
|
|
||||||
if not active:
|
if not active:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise SlobsCliError("Recording is already inactive.")
|
raise click.Abort(click.style("Recording is already inactive.", fg="red"))
|
||||||
|
|
||||||
await ss.toggle_recording()
|
await ss.toggle_recording()
|
||||||
click.echo("Recording stopped.")
|
click.echo("Recording stopped.")
|
||||||
|
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
try:
|
async with create_task_group() as tg:
|
||||||
async with create_task_group() as tg:
|
tg.start_soon(conn.background_processing)
|
||||||
tg.start_soon(conn.background_processing)
|
tg.start_soon(_run)
|
||||||
tg.start_soon(_run)
|
|
||||||
except* SlobsCliError as excgroup:
|
|
||||||
for e in excgroup.exceptions:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
|
|
||||||
@record.command()
|
@record.command()
|
||||||
|
@ -3,7 +3,6 @@ from anyio import create_task_group
|
|||||||
from pyslobs import StreamingService
|
from pyslobs import StreamingService
|
||||||
|
|
||||||
from .cli import cli
|
from .cli import cli
|
||||||
from .errors import SlobsCliError
|
|
||||||
|
|
||||||
|
|
||||||
@cli.group()
|
@cli.group()
|
||||||
@ -25,19 +24,15 @@ async def start(ctx: click.Context):
|
|||||||
|
|
||||||
if active:
|
if active:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise SlobsCliError("Replay buffer is already active.")
|
raise click.Abort(click.style("Replay buffer is already active.", fg="red"))
|
||||||
|
|
||||||
await ss.start_replay_buffer()
|
await ss.start_replay_buffer()
|
||||||
click.echo("Replay buffer started.")
|
click.echo("Replay buffer started.")
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
try:
|
async with create_task_group() as tg:
|
||||||
async with create_task_group() as tg:
|
tg.start_soon(conn.background_processing)
|
||||||
tg.start_soon(conn.background_processing)
|
tg.start_soon(_run)
|
||||||
tg.start_soon(_run)
|
|
||||||
except* SlobsCliError as excgroup:
|
|
||||||
for e in excgroup.exceptions:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
|
|
||||||
@replaybuffer.command()
|
@replaybuffer.command()
|
||||||
@ -54,19 +49,17 @@ async def stop(ctx: click.Context):
|
|||||||
|
|
||||||
if not active:
|
if not active:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise SlobsCliError("Replay buffer is already inactive.")
|
raise click.Abort(
|
||||||
|
click.style("Replay buffer is already inactive.", fg="red")
|
||||||
|
)
|
||||||
|
|
||||||
await ss.stop_replay_buffer()
|
await ss.stop_replay_buffer()
|
||||||
click.echo("Replay buffer stopped.")
|
click.echo("Replay buffer stopped.")
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
try:
|
async with create_task_group() as tg:
|
||||||
async with create_task_group() as tg:
|
tg.start_soon(conn.background_processing)
|
||||||
tg.start_soon(conn.background_processing)
|
tg.start_soon(_run)
|
||||||
tg.start_soon(_run)
|
|
||||||
except* SlobsCliError as excgroup:
|
|
||||||
for e in excgroup.exceptions:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
|
|
||||||
@replaybuffer.command()
|
@replaybuffer.command()
|
||||||
|
@ -3,7 +3,6 @@ from anyio import create_task_group
|
|||||||
from pyslobs import ScenesService, TransitionsService
|
from pyslobs import ScenesService, TransitionsService
|
||||||
|
|
||||||
from .cli import cli
|
from .cli import cli
|
||||||
from .errors import SlobsCliError
|
|
||||||
|
|
||||||
|
|
||||||
@cli.group()
|
@cli.group()
|
||||||
@ -23,19 +22,11 @@ async def list(ctx: click.Context):
|
|||||||
scenes = await ss.get_scenes()
|
scenes = await ss.get_scenes()
|
||||||
if not scenes:
|
if not scenes:
|
||||||
click.echo("No scenes found.")
|
click.echo("No scenes found.")
|
||||||
conn.close()
|
|
||||||
return
|
return
|
||||||
|
|
||||||
active_scene = await ss.active_scene()
|
|
||||||
|
|
||||||
click.echo("Available scenes:")
|
click.echo("Available scenes:")
|
||||||
for scene in scenes:
|
for scene in scenes:
|
||||||
if scene.id == active_scene.id:
|
click.echo(f"- {click.style(scene.name, fg='blue')} (ID: {scene.id})")
|
||||||
click.echo(
|
|
||||||
f"- {click.style(scene.name, fg='green')} (ID: {scene.id}) [Active]"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
click.echo(f"- {click.style(scene.name, fg='blue')} (ID: {scene.id})")
|
|
||||||
|
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
@ -54,9 +45,12 @@ async def current(ctx: click.Context):
|
|||||||
|
|
||||||
async def _run():
|
async def _run():
|
||||||
active_scene = await ss.active_scene()
|
active_scene = await ss.active_scene()
|
||||||
click.echo(
|
if active_scene:
|
||||||
f"Current active scene: {click.style(active_scene.name, fg='green')} (ID: {active_scene.id})"
|
click.echo(
|
||||||
)
|
f"Current active scene: {click.style(active_scene.name, fg='green')} (ID: {active_scene.id})"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
click.echo("No active scene found.")
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
async with create_task_group() as tg:
|
async with create_task_group() as tg:
|
||||||
@ -99,25 +93,25 @@ async def switch(ctx: click.Context, scene_name: str, preview: bool = False):
|
|||||||
else:
|
else:
|
||||||
if preview:
|
if preview:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise SlobsCliError(
|
raise click.Abort(
|
||||||
"Cannot switch to preview scene in non-studio mode."
|
click.style(
|
||||||
|
"Cannot switch to preview scene in non-studio mode.",
|
||||||
|
fg="red",
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
await ss.make_scene_active(scene.id)
|
await ss.make_scene_active(scene.id)
|
||||||
click.echo(
|
click.echo(
|
||||||
f"Switched to scene: {click.style(scene.name, fg='blue')} (ID: {scene.id}) in active mode."
|
f"Switched to scene: {click.style(scene.name, fg='blue')} (ID: {scene.id}) in active mode."
|
||||||
)
|
)
|
||||||
|
|
||||||
conn.close()
|
conn.close()
|
||||||
break
|
break
|
||||||
else: # If no scene by the given name was found
|
else:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise SlobsCliError(f"Scene '{scene_name}' not found.")
|
raise click.ClickException(
|
||||||
|
click.style(f"Scene '{scene_name}' not found.", fg="red")
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
async with create_task_group() as tg:
|
||||||
async with create_task_group() as tg:
|
tg.start_soon(conn.background_processing)
|
||||||
tg.start_soon(conn.background_processing)
|
tg.start_soon(_run)
|
||||||
tg.start_soon(_run)
|
|
||||||
except* SlobsCliError as excgroup:
|
|
||||||
for e in excgroup.exceptions:
|
|
||||||
raise e
|
|
||||||
|
@ -3,7 +3,6 @@ from anyio import create_task_group
|
|||||||
from pyslobs import StreamingService
|
from pyslobs import StreamingService
|
||||||
|
|
||||||
from .cli import cli
|
from .cli import cli
|
||||||
from .errors import SlobsCliError
|
|
||||||
|
|
||||||
|
|
||||||
@cli.group()
|
@cli.group()
|
||||||
@ -25,19 +24,15 @@ async def start(ctx: click.Context):
|
|||||||
|
|
||||||
if active:
|
if active:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise SlobsCliError("Stream is already active.")
|
raise click.Abort(click.style("Stream is already active.", fg="red"))
|
||||||
|
|
||||||
await ss.toggle_streaming()
|
await ss.toggle_streaming()
|
||||||
click.echo("Stream started.")
|
click.echo("Stream started.")
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
try:
|
async with create_task_group() as tg:
|
||||||
async with create_task_group() as tg:
|
tg.start_soon(conn.background_processing)
|
||||||
tg.start_soon(conn.background_processing)
|
tg.start_soon(_run)
|
||||||
tg.start_soon(_run)
|
|
||||||
except* SlobsCliError as excgroup:
|
|
||||||
for e in excgroup.exceptions:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
|
|
||||||
@stream.command()
|
@stream.command()
|
||||||
@ -54,19 +49,15 @@ async def stop(ctx: click.Context):
|
|||||||
|
|
||||||
if not active:
|
if not active:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise SlobsCliError("Stream is already inactive.")
|
raise click.Abort(click.style("Stream is already inactive.", fg="red"))
|
||||||
|
|
||||||
await ss.toggle_streaming()
|
await ss.toggle_streaming()
|
||||||
click.echo("Stream stopped.")
|
click.echo("Stream stopped.")
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
try:
|
async with create_task_group() as tg:
|
||||||
async with create_task_group() as tg:
|
tg.start_soon(conn.background_processing)
|
||||||
tg.start_soon(conn.background_processing)
|
tg.start_soon(_run)
|
||||||
tg.start_soon(_run)
|
|
||||||
except* SlobsCliError as excgroup:
|
|
||||||
for e in excgroup.exceptions:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
|
|
||||||
@stream.command()
|
@stream.command()
|
||||||
|
@ -3,7 +3,6 @@ from anyio import create_task_group
|
|||||||
from pyslobs import TransitionsService
|
from pyslobs import TransitionsService
|
||||||
|
|
||||||
from .cli import cli
|
from .cli import cli
|
||||||
from .errors import SlobsCliError
|
|
||||||
|
|
||||||
|
|
||||||
@cli.group()
|
@cli.group()
|
||||||
@ -23,19 +22,15 @@ async def enable(ctx: click.Context):
|
|||||||
model = await ts.get_model()
|
model = await ts.get_model()
|
||||||
if model.studio_mode:
|
if model.studio_mode:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise SlobsCliError("Studio mode is already enabled.")
|
raise click.Abort(click.style("Studio mode is already enabled.", fg="red"))
|
||||||
|
|
||||||
await ts.enable_studio_mode()
|
await ts.enable_studio_mode()
|
||||||
click.echo("Studio mode enabled successfully.")
|
click.echo("Studio mode enabled successfully.")
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
try:
|
async with create_task_group() as tg:
|
||||||
async with create_task_group() as tg:
|
tg.start_soon(conn.background_processing)
|
||||||
tg.start_soon(conn.background_processing)
|
tg.start_soon(_run)
|
||||||
tg.start_soon(_run)
|
|
||||||
except* SlobsCliError as excgroup:
|
|
||||||
for e in excgroup.exceptions:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
|
|
||||||
@studiomode.command()
|
@studiomode.command()
|
||||||
@ -50,19 +45,15 @@ async def disable(ctx: click.Context):
|
|||||||
model = await ts.get_model()
|
model = await ts.get_model()
|
||||||
if not model.studio_mode:
|
if not model.studio_mode:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise SlobsCliError("Studio mode is already disabled.")
|
raise click.Abort(click.style("Studio mode is already disabled.", fg="red"))
|
||||||
|
|
||||||
await ts.disable_studio_mode()
|
await ts.disable_studio_mode()
|
||||||
click.echo("Studio mode disabled successfully.")
|
click.echo("Studio mode disabled successfully.")
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
try:
|
async with create_task_group() as tg:
|
||||||
async with create_task_group() as tg:
|
tg.start_soon(conn.background_processing)
|
||||||
tg.start_soon(conn.background_processing)
|
tg.start_soon(_run)
|
||||||
tg.start_soon(_run)
|
|
||||||
except* SlobsCliError as excgroup:
|
|
||||||
for e in excgroup.exceptions:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
|
|
||||||
@studiomode.command()
|
@studiomode.command()
|
||||||
@ -121,16 +112,12 @@ async def force_transition(ctx: click.Context):
|
|||||||
model = await ts.get_model()
|
model = await ts.get_model()
|
||||||
if not model.studio_mode:
|
if not model.studio_mode:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise SlobsCliError("Studio mode is not enabled.")
|
raise click.Abort(click.style("Studio mode is not enabled.", fg="red"))
|
||||||
|
|
||||||
await ts.execute_studio_mode_transition()
|
await ts.execute_studio_mode_transition()
|
||||||
click.echo("Forced studio mode transition.")
|
click.echo("Forced studio mode transition.")
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
try:
|
async with create_task_group() as tg:
|
||||||
async with create_task_group() as tg:
|
tg.start_soon(conn.background_processing)
|
||||||
tg.start_soon(conn.background_processing)
|
tg.start_soon(_run)
|
||||||
tg.start_soon(_run)
|
|
||||||
except* SlobsCliError as excgroup:
|
|
||||||
for e in excgroup.exceptions:
|
|
||||||
raise e
|
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
import anyio
|
import anyio
|
||||||
|
import asyncclick as click
|
||||||
import pytest
|
import pytest
|
||||||
from asyncclick.testing import CliRunner
|
from asyncclick.testing import CliRunner
|
||||||
|
|
||||||
@ -12,14 +13,19 @@ async def test_record_start():
|
|||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
active = "Recording is currently active." in result.output
|
active = "Recording is currently active." in result.output
|
||||||
|
|
||||||
result = await runner.invoke(cli, ["record", "start"])
|
|
||||||
if not active:
|
if not active:
|
||||||
|
result = await runner.invoke(cli, ["record", "start"])
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert "Recording started" in result.output
|
assert "Recording started" in result.output
|
||||||
await anyio.sleep(0.2) # Allow some time for the recording to start
|
await anyio.sleep(1) # Allow some time for the recording to start
|
||||||
else:
|
else:
|
||||||
assert result.exit_code != 0
|
with pytest.raises(ExceptionGroup) as exc_info:
|
||||||
assert "Recording is already active." in result.output
|
result = await runner.invoke(
|
||||||
|
cli, ["record", "start"], catch_exceptions=False
|
||||||
|
)
|
||||||
|
assert exc_info.group_contains(
|
||||||
|
click.Abort, match="Recording is already active."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
@ -29,11 +35,16 @@ async def test_record_stop():
|
|||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
active = "Recording is currently active." in result.output
|
active = "Recording is currently active." in result.output
|
||||||
|
|
||||||
result = await runner.invoke(cli, ["record", "stop"])
|
|
||||||
if active:
|
if active:
|
||||||
|
result = await runner.invoke(cli, ["record", "stop"])
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert "Recording stopped" in result.output
|
assert "Recording stopped" in result.output
|
||||||
await anyio.sleep(0.2) # Allow some time for the recording to stop
|
await anyio.sleep(1) # Allow some time for the recording to stop
|
||||||
else:
|
else:
|
||||||
assert result.exit_code != 0
|
with pytest.raises(ExceptionGroup) as exc_info:
|
||||||
assert "Recording is already inactive." in result.output
|
result = await runner.invoke(
|
||||||
|
cli, ["record", "stop"], catch_exceptions=False
|
||||||
|
)
|
||||||
|
assert exc_info.group_contains(
|
||||||
|
click.Abort, match="Recording is already inactive."
|
||||||
|
)
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
import anyio
|
import anyio
|
||||||
|
import asyncclick as click
|
||||||
import pytest
|
import pytest
|
||||||
from asyncclick.testing import CliRunner
|
from asyncclick.testing import CliRunner
|
||||||
|
|
||||||
@ -12,14 +13,19 @@ async def test_replaybuffer_start():
|
|||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
active = "Replay buffer is currently active." in result.output
|
active = "Replay buffer is currently active." in result.output
|
||||||
|
|
||||||
result = await runner.invoke(cli, ["replaybuffer", "start"])
|
|
||||||
if not active:
|
if not active:
|
||||||
|
result = await runner.invoke(cli, ["replaybuffer", "start"])
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert "Replay buffer started" in result.output
|
assert "Replay buffer started" in result.output
|
||||||
await anyio.sleep(0.2) # Allow some time for the replay buffer to start
|
await anyio.sleep(1)
|
||||||
else:
|
else:
|
||||||
assert result.exit_code != 0
|
with pytest.raises(ExceptionGroup) as exc_info:
|
||||||
assert "Replay buffer is already active." in result.output
|
result = await runner.invoke(
|
||||||
|
cli, ["replaybuffer", "start"], catch_exceptions=False
|
||||||
|
)
|
||||||
|
assert exc_info.group_contains(
|
||||||
|
click.Abort, match="Replay buffer is already active."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
@ -29,11 +35,16 @@ async def test_replaybuffer_stop():
|
|||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
active = "Replay buffer is currently active." in result.output
|
active = "Replay buffer is currently active." in result.output
|
||||||
|
|
||||||
result = await runner.invoke(cli, ["replaybuffer", "stop"])
|
|
||||||
if active:
|
if active:
|
||||||
|
result = await runner.invoke(cli, ["replaybuffer", "stop"])
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert "Replay buffer stopped" in result.output
|
assert "Replay buffer stopped" in result.output
|
||||||
await anyio.sleep(0.2) # Allow some time for the replay buffer to stop
|
await anyio.sleep(1)
|
||||||
else:
|
else:
|
||||||
assert result.exit_code != 0
|
with pytest.raises(ExceptionGroup) as exc_info:
|
||||||
assert "Replay buffer is already inactive." in result.output
|
result = await runner.invoke(
|
||||||
|
cli, ["replaybuffer", "stop"], catch_exceptions=False
|
||||||
|
)
|
||||||
|
assert exc_info.group_contains(
|
||||||
|
click.Abort, match="Replay buffer is already inactive."
|
||||||
|
)
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
import anyio
|
import anyio
|
||||||
|
import asyncclick as click
|
||||||
import pytest
|
import pytest
|
||||||
from asyncclick.testing import CliRunner
|
from asyncclick.testing import CliRunner
|
||||||
|
|
||||||
@ -12,14 +13,17 @@ async def test_stream_start():
|
|||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
active = "Stream is currently active." in result.output
|
active = "Stream is currently active." in result.output
|
||||||
|
|
||||||
result = await runner.invoke(cli, ["stream", "start"])
|
|
||||||
if not active:
|
if not active:
|
||||||
|
result = await runner.invoke(cli, ["stream", "start"])
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert "Stream started" in result.output
|
assert "Stream started" in result.output
|
||||||
await anyio.sleep(0.2) # Allow some time for the stream to start
|
await anyio.sleep(1) # Allow some time for the stream to start
|
||||||
else:
|
else:
|
||||||
assert result.exit_code != 0
|
with pytest.raises(ExceptionGroup) as exc_info:
|
||||||
assert "Stream is already active." in result.output
|
result = await runner.invoke(
|
||||||
|
cli, ["stream", "start"], catch_exceptions=False
|
||||||
|
)
|
||||||
|
assert exc_info.group_contains(click.Abort, match="Stream is already active.")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
@ -29,11 +33,14 @@ async def test_stream_stop():
|
|||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
active = "Stream is currently active." in result.output
|
active = "Stream is currently active." in result.output
|
||||||
|
|
||||||
result = await runner.invoke(cli, ["stream", "stop"])
|
|
||||||
if active:
|
if active:
|
||||||
|
result = await runner.invoke(cli, ["stream", "stop"])
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert "Stream stopped" in result.output
|
assert "Stream stopped" in result.output
|
||||||
await anyio.sleep(0.2) # Allow some time for the stream to stop
|
await anyio.sleep(1) # Allow some time for the stream to stop
|
||||||
else:
|
else:
|
||||||
assert result.exit_code != 0
|
with pytest.raises(ExceptionGroup) as exc_info:
|
||||||
assert "Stream is already inactive." in result.output
|
result = await runner.invoke(
|
||||||
|
cli, ["stream", "stop"], catch_exceptions=False
|
||||||
|
)
|
||||||
|
assert exc_info.group_contains(click.Abort, match="Stream is already inactive.")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user