convert replaybuffer commands

This commit is contained in:
onyx-and-iris 2025-07-24 02:29:58 +01:00
parent 6da9df5ceb
commit a3dff0f739
2 changed files with 44 additions and 29 deletions

View File

@ -30,6 +30,7 @@ for sub_app in (
'profile',
'projector',
'record',
'replaybuffer',
'scene',
):
module = importlib.import_module(f'.{sub_app}', package=__package__)

View File

@ -1,64 +1,78 @@
"""module containing commands for manipulating the replay buffer in OBS."""
import typer
from typing import Annotated
from cyclopts import App, Parameter
from . import console
from .alias import SubTyperAliasGroup
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup)
app = App(
name='replaybuffer', help='Commands for controlling the replay buffer in OBS.'
)
@app.callback()
def main():
"""Control profiles in OBS."""
@app.command('start | s')
def start(ctx: typer.Context):
@app.command(name=['start', 's'])
def start(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start the replay buffer."""
resp = ctx.obj['obsws'].get_replay_buffer_status()
resp = ctx.client.get_replay_buffer_status()
if resp.output_active:
console.err.print('Replay buffer is already active.')
raise typer.Exit(1)
raise OBSWSCLIError('Replay buffer is already active.', ExitCode.ERROR)
ctx.obj['obsws'].start_replay_buffer()
ctx.client.start_replay_buffer()
console.out.print('Replay buffer started.')
@app.command('stop | st')
def stop(ctx: typer.Context):
@app.command(name=['stop', 'st'])
def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop the replay buffer."""
resp = ctx.obj['obsws'].get_replay_buffer_status()
resp = ctx.client.get_replay_buffer_status()
if not resp.output_active:
console.err.print('Replay buffer is not active.')
raise typer.Exit(1)
raise OBSWSCLIError('Replay buffer is not active.', ExitCode.ERROR)
ctx.obj['obsws'].stop_replay_buffer()
ctx.client.stop_replay_buffer()
console.out.print('Replay buffer stopped.')
@app.command('toggle | tg')
def toggle(ctx: typer.Context):
@app.command(name=['toggle', 'tg'])
def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle the replay buffer."""
resp = ctx.obj['obsws'].toggle_replay_buffer()
resp = ctx.client.toggle_replay_buffer()
if resp.output_active:
console.out.print('Replay buffer is active.')
else:
console.out.print('Replay buffer is not active.')
@app.command('status | ss')
def status(ctx: typer.Context):
@app.command(name=['status', 'ss'])
def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of the replay buffer."""
resp = ctx.obj['obsws'].get_replay_buffer_status()
resp = ctx.client.get_replay_buffer_status()
if resp.output_active:
console.out.print('Replay buffer is active.')
else:
console.out.print('Replay buffer is not active.')
@app.command('save | sv')
def save(ctx: typer.Context):
@app.command(name=['save', 'sv'])
def save(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Save the replay buffer."""
ctx.obj['obsws'].save_replay_buffer()
ctx.client.save_replay_buffer()
console.out.print('Replay buffer saved.')