diff --git a/obsws_cli/app.py b/obsws_cli/app.py index 8c15555..04e8f91 100644 --- a/obsws_cli/app.py +++ b/obsws_cli/app.py @@ -23,12 +23,13 @@ app = App( ) app.meta.group_parameters = Group('Options', sort_key=0) for sub_app in ( + 'filter', 'group', 'hotkey', 'input', 'profile', 'projector', - 'filter', + 'record', 'scene', ): module = importlib.import_module(f'.{sub_app}', package=__package__) diff --git a/obsws_cli/record.py b/obsws_cli/record.py index 8915d10..c06be7b 100644 --- a/obsws_cli/record.py +++ b/obsws_cli/record.py @@ -3,67 +3,75 @@ from pathlib import Path from typing import Annotated, Optional -import typer +from cyclopts import App, Argument, Parameter from . import console -from .alias import SubTyperAliasGroup +from .context import Context +from .enum import ExitCode +from .error import OBSWSCLIError -app = typer.Typer(cls=SubTyperAliasGroup) +app = App(name='record', help='Commands for controlling OBS recording functionality.') -@app.callback() -def main(): - """Control OBS recording functionality.""" - - -def _get_recording_status(ctx: typer.Context) -> tuple: +def _get_recording_status(ctx: Context) -> tuple: """Get recording status.""" - resp = ctx.obj['obsws'].get_record_status() + resp = ctx.client.get_record_status() return resp.output_active, resp.output_paused -@app.command('start | s') -def start(ctx: typer.Context): +@app.command(name=['start', 's']) +def start( + *, + ctx: Annotated[Context, Parameter(parse=False)], +): """Start recording.""" active, paused = _get_recording_status(ctx) if active: err_msg = 'Recording is already in progress, cannot start.' if paused: err_msg += ' Try resuming it.' + raise OBSWSCLIError(err_msg, ExitCode.ERROR) - console.err.print(err_msg) - raise typer.Exit(1) - - ctx.obj['obsws'].start_record() + ctx.client.start_record() console.out.print('Recording started successfully.') -@app.command('stop | st') -def stop(ctx: typer.Context): +@app.command(name=['stop', 'st']) +def stop( + *, + ctx: Annotated[Context, Parameter(parse=False)], +): """Stop recording.""" active, _ = _get_recording_status(ctx) if not active: - console.err.print('Recording is not in progress, cannot stop.') - raise typer.Exit(1) + raise OBSWSCLIError( + 'Recording is not in progress, cannot stop.', ExitCode.ERROR + ) - resp = ctx.obj['obsws'].stop_record() + resp = ctx.client.stop_record() console.out.print( f'Recording stopped successfully. Saved to: {console.highlight(ctx, resp.output_path)}' ) -@app.command('toggle | tg') -def toggle(ctx: typer.Context): +@app.command(name=['toggle', 'tg']) +def toggle( + *, + ctx: Annotated[Context, Parameter(parse=False)], +): """Toggle recording.""" - resp = ctx.obj['obsws'].toggle_record() + resp = ctx.client.toggle_record() if resp.output_active: console.out.print('Recording started successfully.') else: console.out.print('Recording stopped successfully.') -@app.command('status | ss') -def status(ctx: typer.Context): +@app.command(name=['status', 'ss']) +def status( + *, + ctx: Annotated[Context, Parameter(parse=False)], +): """Get recording status.""" active, paused = _get_recording_status(ctx) if active: @@ -75,98 +83,114 @@ def status(ctx: typer.Context): console.out.print('Recording is not in progress.') -@app.command('resume | r') -def resume(ctx: typer.Context): +@app.command(name=['resume', 'r']) +def resume( + *, + ctx: Annotated[Context, Parameter(parse=False)], +): """Resume recording.""" active, paused = _get_recording_status(ctx) if not active: - console.err.print('Recording is not in progress, cannot resume.') - raise typer.Exit(1) + raise OBSWSCLIError( + 'Recording is not in progress, cannot resume.', ExitCode.ERROR + ) if not paused: - console.err.print('Recording is in progress but not paused, cannot resume.') - raise typer.Exit(1) + raise OBSWSCLIError( + 'Recording is in progress but not paused, cannot resume.', ExitCode.ERROR + ) - ctx.obj['obsws'].resume_record() + ctx.client.resume_record() console.out.print('Recording resumed successfully.') -@app.command('pause | p') -def pause(ctx: typer.Context): +@app.command(name=['pause', 'p']) +def pause( + *, + ctx: Annotated[Context, Parameter(parse=False)], +): """Pause recording.""" active, paused = _get_recording_status(ctx) if not active: - console.err.print('Recording is not in progress, cannot pause.') - raise typer.Exit(1) + raise OBSWSCLIError( + 'Recording is not in progress, cannot pause.', ExitCode.ERROR + ) if paused: - console.err.print('Recording is in progress but already paused, cannot pause.') - raise typer.Exit(1) + raise OBSWSCLIError( + 'Recording is in progress but already paused, cannot pause.', ExitCode.ERROR + ) - ctx.obj['obsws'].pause_record() + ctx.client.pause_record() console.out.print('Recording paused successfully.') -@app.command('directory | d') +@app.command(name=['directory', 'd']) def directory( - ctx: typer.Context, record_directory: Annotated[ Optional[Path], # Since the CLI and OBS may be running on different platforms, # we won't validate the path here. - typer.Argument( - file_okay=False, - dir_okay=True, - help='Directory to set for recording.', + Argument( + hint='Directory to set for recording.', ), ] = None, + *, + ctx: Annotated[Context, Parameter(parse=False)], ): """Get or set the recording directory.""" if record_directory is not None: - ctx.obj['obsws'].set_record_directory(str(record_directory)) + ctx.client.set_record_directory(str(record_directory)) console.out.print( f'Recording directory updated to: {console.highlight(ctx, record_directory)}' ) else: - resp = ctx.obj['obsws'].get_record_directory() + resp = ctx.client.get_record_directory() console.out.print( f'Recording directory: {console.highlight(ctx, resp.record_directory)}' ) -@app.command('split | sp') -def split(ctx: typer.Context): +@app.command(name=['split', 'sp']) +def split( + *, + ctx: Annotated[Context, Parameter(parse=False)], +): """Split the current recording.""" active, paused = _get_recording_status(ctx) if not active: console.err.print('Recording is not in progress, cannot split.') - raise typer.Exit(1) + raise OBSWSCLIError( + 'Recording is not in progress, cannot split.', ExitCode.ERROR + ) if paused: - console.err.print('Recording is paused, cannot split.') - raise typer.Exit(1) + raise OBSWSCLIError('Recording is paused, cannot split.', ExitCode.ERROR) - ctx.obj['obsws'].split_record_file() + ctx.client.split_record_file() console.out.print('Recording split successfully.') -@app.command('chapter | ch') +@app.command(name=['chapter', 'ch']) def chapter( - ctx: typer.Context, chapter_name: Annotated[ Optional[str], - typer.Argument( - help='Name of the chapter to create.', + Argument( + hint='Name of the chapter to create.', ), ] = None, + *, + ctx: Annotated[Context, Parameter(parse=False)], ): """Create a chapter in the current recording.""" active, paused = _get_recording_status(ctx) if not active: - console.err.print('Recording is not in progress, cannot create chapter.') - raise typer.Exit(1) + raise OBSWSCLIError( + 'Recording is not in progress, cannot create chapter.', ExitCode.ERROR + ) if paused: - console.err.print('Recording is paused, cannot create chapter.') - raise typer.Exit(1) + raise OBSWSCLIError( + 'Recording is paused, cannot create chapter.', ExitCode.ERROR + ) - ctx.obj['obsws'].create_record_chapter(chapter_name) + ctx.client.create_record_chapter(chapter_name) console.out.print( f'Chapter {console.highlight(ctx, chapter_name or "unnamed")} created successfully.' )