From 81fcb4e5048954fe2fa568930badd9928eef9bc1 Mon Sep 17 00:00:00 2001 From: onyx-and-iris Date: Fri, 9 Jan 2026 19:51:03 +0000 Subject: [PATCH] implement media command group --- obsws_cli/app.py | 3 +- obsws_cli/media.py | 99 ++++++++++++++++++++++++++++++++++++++++++++++ obsws_cli/util.py | 25 ++++++++++++ 3 files changed, 126 insertions(+), 1 deletion(-) create mode 100644 obsws_cli/media.py diff --git a/obsws_cli/app.py b/obsws_cli/app.py index 4ce9b86..ce926b5 100644 --- a/obsws_cli/app.py +++ b/obsws_cli/app.py @@ -18,10 +18,12 @@ for sub_typer in ( 'group', 'hotkey', 'input', + 'media', 'profile', 'projector', 'record', 'replaybuffer', + 'settings', 'scene', 'scenecollection', 'sceneitem', @@ -30,7 +32,6 @@ for sub_typer in ( 'studiomode', 'text', 'virtualcam', - 'settings', ): module = importlib.import_module(f'.{sub_typer}', package=__package__) app.add_typer(module.app, name=sub_typer) diff --git a/obsws_cli/media.py b/obsws_cli/media.py new file mode 100644 index 0000000..656f38c --- /dev/null +++ b/obsws_cli/media.py @@ -0,0 +1,99 @@ +"""module containing commands for media inputs.""" + +from typing import Annotated, Optional + +import typer + +from . import console, util +from .alias import SubTyperAliasGroup + +app = typer.Typer(cls=SubTyperAliasGroup) + + +@app.callback() +def main(): + """Commands for media inputs.""" + + +@app.command('cursor | c') +def cursor( + ctx: typer.Context, + input_name: Annotated[ + str, typer.Argument(..., help='The name of the media input.') + ], + timecode: Annotated[ + Optional[str], + typer.Argument( + ..., help='The timecode to set the cursor to (format: HH:MM:SS).' + ), + ] = None, +): + """Get/set the cursor position of a media input.""" + if timecode is None: + resp = ctx.obj['obsws'].get_media_input_status(input_name) + console.out.print( + f'Cursor for {console.highlight(ctx, input_name)} is at {util.milliseconds_to_timecode(resp.media_cursor)}.' + ) + return + + cursor_position = util.timecode_to_milliseconds(timecode) + ctx.obj['obsws'].set_media_input_cursor(input_name, cursor_position) + console.out.print( + f'Cursor for {console.highlight(ctx, input_name)} set to {timecode}.' + ) + + +@app.command('play | p') +def play( + ctx: typer.Context, + input_name: Annotated[ + str, typer.Argument(..., help='The name of the media input.') + ], +): + """Get/set the playing status of a media input.""" + ctx.obj['obsws'].trigger_media_input_action( + input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_PLAY' + ) + console.out.print(f'Playing media input {console.highlight(ctx, input_name)}.') + + +@app.command('pause | pa') +def pause( + ctx: typer.Context, + input_name: Annotated[ + str, typer.Argument(..., help='The name of the media input.') + ], +): + """Pause a media input.""" + ctx.obj['obsws'].trigger_media_input_action( + input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_PAUSE' + ) + console.out.print(f'Paused media input {console.highlight(ctx, input_name)}.') + + +@app.command('stop | s') +def stop( + ctx: typer.Context, + input_name: Annotated[ + str, typer.Argument(..., help='The name of the media input.') + ], +): + """Stop a media input.""" + ctx.obj['obsws'].trigger_media_input_action( + input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_STOP' + ) + console.out.print(f'Stopped media input {console.highlight(ctx, input_name)}.') + + +@app.command('restart | r') +def restart( + ctx: typer.Context, + input_name: Annotated[ + str, typer.Argument(..., help='The name of the media input.') + ], +): + """Restart a media input.""" + ctx.obj['obsws'].trigger_media_input_action( + input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_RESTART' + ) + console.out.print(f'Restarted media input {console.highlight(ctx, input_name)}.') diff --git a/obsws_cli/util.py b/obsws_cli/util.py index ba9a0d4..ffb1d9d 100644 --- a/obsws_cli/util.py +++ b/obsws_cli/util.py @@ -20,3 +20,28 @@ def check_mark(value: bool, empty_if_false: bool = False) -> str: if os.getenv('NO_COLOR', '') != '': return '✓' if value else '✗' return '✅' if value else '❌' + + +def timecode_to_milliseconds(timecode: str) -> int: + """Convert a timecode string (HH:MM:SS) to total milliseconds.""" + match timecode.split(':'): + case [mm, ss]: + hours = 0 + minutes = int(mm) + seconds = int(ss) + case [hh, mm, ss]: + hours = int(hh) + minutes = int(mm) + seconds = int(ss) + return (hours * 3600 + minutes * 60 + seconds) * 1000 + + +def milliseconds_to_timecode(milliseconds: int) -> str: + """Convert total milliseconds to a timecode string (HH:MM:SS).""" + total_seconds = milliseconds // 1000 + hours = total_seconds // 3600 + minutes = (total_seconds % 3600) // 60 + seconds = total_seconds % 60 + if hours == 0: + return f'{minutes:02}:{seconds:02}' + return f'{hours:02}:{minutes:02}:{seconds:02}'