add replaybuffer subcommand group

minor bump
This commit is contained in:
onyx-and-iris 2025-06-10 14:09:36 +01:00
parent 4fc3dc015a
commit 7a88b2b6ed
3 changed files with 102 additions and 2 deletions

View File

@ -1,6 +1,6 @@
[project] [project]
name = "slobs-cli" name = "slobs-cli"
version = "0.5.0" version = "0.6.0"
description = "A command line application for Streamlabs Desktop" description = "A command line application for Streamlabs Desktop"
authors = [{ name = "onyx-and-iris", email = "code@onyxandiris.online" }] authors = [{ name = "onyx-and-iris", email = "code@onyxandiris.online" }]
dependencies = ["pyslobs>=2.0.4", "asyncclick>=8.1.8"] dependencies = ["pyslobs>=2.0.4", "asyncclick>=8.1.8"]

View File

@ -1,7 +1,8 @@
from .audio import audio from .audio import audio
from .cli import cli from .cli import cli
from .record import record from .record import record
from .replaybuffer import replaybuffer
from .scene import scene from .scene import scene
from .stream import stream from .stream import stream
__all__ = ["cli", "scene", "stream", "record", "audio"] __all__ = ["cli", "scene", "stream", "record", "audio", "replaybuffer"]

View File

@ -0,0 +1,99 @@
import asyncclick as click
from anyio import create_task_group
from pyslobs import StreamingService
from .cli import cli
@cli.group()
def replaybuffer():
"""Replay buffer management commands."""
@replaybuffer.command()
@click.pass_context
async def start(ctx: click.Context):
"""Start the replay buffer."""
conn = ctx.obj["connection"]
ss = StreamingService(conn)
async def _run():
current_state = await ss.get_model()
active = current_state.replay_buffer_status != "offline"
if active:
conn.close()
raise click.Abort(click.style("Replay buffer is already active.", fg="red"))
await ss.start_replay_buffer()
click.echo("Replay buffer started.")
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
@replaybuffer.command()
@click.pass_context
async def stop(ctx: click.Context):
"""Stop the replay buffer."""
conn = ctx.obj["connection"]
ss = StreamingService(conn)
async def _run():
current_state = await ss.get_model()
active = current_state.replay_buffer_status != "offline"
if not active:
conn.close()
raise click.Abort(
click.style("Replay buffer is already inactive.", fg="red")
)
await ss.stop_replay_buffer()
click.echo("Replay buffer stopped.")
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
@replaybuffer.command()
@click.pass_context
async def status(ctx: click.Context):
"""Get the current status of the replay buffer."""
conn = ctx.obj["connection"]
ss = StreamingService(conn)
async def _run():
current_state = await ss.get_model()
status = current_state.replay_buffer_status
click.echo(f"Replay buffer status: {status}")
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
@replaybuffer.command()
@click.pass_context
async def save(ctx: click.Context):
"""Save the current replay buffer."""
conn = ctx.obj["connection"]
ss = StreamingService(conn)
async def _run():
await ss.save_replay()
click.echo("Replay buffer saved.")
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)