convert record commands

This commit is contained in:
onyx-and-iris 2025-07-24 02:26:18 +01:00
parent 75fc18273e
commit 6da9df5ceb
2 changed files with 89 additions and 64 deletions

View File

@ -23,12 +23,13 @@ app = App(
) )
app.meta.group_parameters = Group('Options', sort_key=0) app.meta.group_parameters = Group('Options', sort_key=0)
for sub_app in ( for sub_app in (
'filter',
'group', 'group',
'hotkey', 'hotkey',
'input', 'input',
'profile', 'profile',
'projector', 'projector',
'filter', 'record',
'scene', 'scene',
): ):
module = importlib.import_module(f'.{sub_app}', package=__package__) module = importlib.import_module(f'.{sub_app}', package=__package__)

View File

@ -3,67 +3,75 @@
from pathlib import Path from pathlib import Path
from typing import Annotated, Optional from typing import Annotated, Optional
import typer from cyclopts import App, Argument, Parameter
from . import console from . import console
from .alias import SubTyperAliasGroup from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='record', help='Commands for controlling OBS recording functionality.')
@app.callback() def _get_recording_status(ctx: Context) -> tuple:
def main():
"""Control OBS recording functionality."""
def _get_recording_status(ctx: typer.Context) -> tuple:
"""Get recording status.""" """Get recording status."""
resp = ctx.obj['obsws'].get_record_status() resp = ctx.client.get_record_status()
return resp.output_active, resp.output_paused return resp.output_active, resp.output_paused
@app.command('start | s') @app.command(name=['start', 's'])
def start(ctx: typer.Context): def start(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start recording.""" """Start recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if active: if active:
err_msg = 'Recording is already in progress, cannot start.' err_msg = 'Recording is already in progress, cannot start.'
if paused: if paused:
err_msg += ' Try resuming it.' err_msg += ' Try resuming it.'
raise OBSWSCLIError(err_msg, ExitCode.ERROR)
console.err.print(err_msg) ctx.client.start_record()
raise typer.Exit(1)
ctx.obj['obsws'].start_record()
console.out.print('Recording started successfully.') console.out.print('Recording started successfully.')
@app.command('stop | st') @app.command(name=['stop', 'st'])
def stop(ctx: typer.Context): def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop recording.""" """Stop recording."""
active, _ = _get_recording_status(ctx) active, _ = _get_recording_status(ctx)
if not active: if not active:
console.err.print('Recording is not in progress, cannot stop.') raise OBSWSCLIError(
raise typer.Exit(1) 'Recording is not in progress, cannot stop.', ExitCode.ERROR
)
resp = ctx.obj['obsws'].stop_record() resp = ctx.client.stop_record()
console.out.print( console.out.print(
f'Recording stopped successfully. Saved to: {console.highlight(ctx, resp.output_path)}' f'Recording stopped successfully. Saved to: {console.highlight(ctx, resp.output_path)}'
) )
@app.command('toggle | tg') @app.command(name=['toggle', 'tg'])
def toggle(ctx: typer.Context): def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle recording.""" """Toggle recording."""
resp = ctx.obj['obsws'].toggle_record() resp = ctx.client.toggle_record()
if resp.output_active: if resp.output_active:
console.out.print('Recording started successfully.') console.out.print('Recording started successfully.')
else: else:
console.out.print('Recording stopped successfully.') console.out.print('Recording stopped successfully.')
@app.command('status | ss') @app.command(name=['status', 'ss'])
def status(ctx: typer.Context): def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get recording status.""" """Get recording status."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if active: if active:
@ -75,98 +83,114 @@ def status(ctx: typer.Context):
console.out.print('Recording is not in progress.') console.out.print('Recording is not in progress.')
@app.command('resume | r') @app.command(name=['resume', 'r'])
def resume(ctx: typer.Context): def resume(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Resume recording.""" """Resume recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if not active: if not active:
console.err.print('Recording is not in progress, cannot resume.') raise OBSWSCLIError(
raise typer.Exit(1) 'Recording is not in progress, cannot resume.', ExitCode.ERROR
)
if not paused: if not paused:
console.err.print('Recording is in progress but not paused, cannot resume.') raise OBSWSCLIError(
raise typer.Exit(1) 'Recording is in progress but not paused, cannot resume.', ExitCode.ERROR
)
ctx.obj['obsws'].resume_record() ctx.client.resume_record()
console.out.print('Recording resumed successfully.') console.out.print('Recording resumed successfully.')
@app.command('pause | p') @app.command(name=['pause', 'p'])
def pause(ctx: typer.Context): def pause(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Pause recording.""" """Pause recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if not active: if not active:
console.err.print('Recording is not in progress, cannot pause.') raise OBSWSCLIError(
raise typer.Exit(1) 'Recording is not in progress, cannot pause.', ExitCode.ERROR
)
if paused: if paused:
console.err.print('Recording is in progress but already paused, cannot pause.') raise OBSWSCLIError(
raise typer.Exit(1) 'Recording is in progress but already paused, cannot pause.', ExitCode.ERROR
)
ctx.obj['obsws'].pause_record() ctx.client.pause_record()
console.out.print('Recording paused successfully.') console.out.print('Recording paused successfully.')
@app.command('directory | d') @app.command(name=['directory', 'd'])
def directory( def directory(
ctx: typer.Context,
record_directory: Annotated[ record_directory: Annotated[
Optional[Path], Optional[Path],
# Since the CLI and OBS may be running on different platforms, # Since the CLI and OBS may be running on different platforms,
# we won't validate the path here. # we won't validate the path here.
typer.Argument( Argument(
file_okay=False, hint='Directory to set for recording.',
dir_okay=True,
help='Directory to set for recording.',
), ),
] = None, ] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Get or set the recording directory.""" """Get or set the recording directory."""
if record_directory is not None: if record_directory is not None:
ctx.obj['obsws'].set_record_directory(str(record_directory)) ctx.client.set_record_directory(str(record_directory))
console.out.print( console.out.print(
f'Recording directory updated to: {console.highlight(ctx, record_directory)}' f'Recording directory updated to: {console.highlight(ctx, record_directory)}'
) )
else: else:
resp = ctx.obj['obsws'].get_record_directory() resp = ctx.client.get_record_directory()
console.out.print( console.out.print(
f'Recording directory: {console.highlight(ctx, resp.record_directory)}' f'Recording directory: {console.highlight(ctx, resp.record_directory)}'
) )
@app.command('split | sp') @app.command(name=['split', 'sp'])
def split(ctx: typer.Context): def split(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Split the current recording.""" """Split the current recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if not active: if not active:
console.err.print('Recording is not in progress, cannot split.') console.err.print('Recording is not in progress, cannot split.')
raise typer.Exit(1) raise OBSWSCLIError(
'Recording is not in progress, cannot split.', ExitCode.ERROR
)
if paused: if paused:
console.err.print('Recording is paused, cannot split.') raise OBSWSCLIError('Recording is paused, cannot split.', ExitCode.ERROR)
raise typer.Exit(1)
ctx.obj['obsws'].split_record_file() ctx.client.split_record_file()
console.out.print('Recording split successfully.') console.out.print('Recording split successfully.')
@app.command('chapter | ch') @app.command(name=['chapter', 'ch'])
def chapter( def chapter(
ctx: typer.Context,
chapter_name: Annotated[ chapter_name: Annotated[
Optional[str], Optional[str],
typer.Argument( Argument(
help='Name of the chapter to create.', hint='Name of the chapter to create.',
), ),
] = None, ] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Create a chapter in the current recording.""" """Create a chapter in the current recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if not active: if not active:
console.err.print('Recording is not in progress, cannot create chapter.') raise OBSWSCLIError(
raise typer.Exit(1) 'Recording is not in progress, cannot create chapter.', ExitCode.ERROR
)
if paused: if paused:
console.err.print('Recording is paused, cannot create chapter.') raise OBSWSCLIError(
raise typer.Exit(1) 'Recording is paused, cannot create chapter.', ExitCode.ERROR
)
ctx.obj['obsws'].create_record_chapter(chapter_name) ctx.client.create_record_chapter(chapter_name)
console.out.print( console.out.print(
f'Chapter {console.highlight(ctx, chapter_name or "unnamed")} created successfully.' f'Chapter {console.highlight(ctx, chapter_name or "unnamed")} created successfully.'
) )