add docstrings containing help output

This commit is contained in:
onyx-and-iris 2025-07-28 11:32:23 +01:00
parent 3b184c6531
commit 417ad54a0c
17 changed files with 843 additions and 355 deletions

View File

@ -47,7 +47,14 @@ for sub_app in (
@Parameter(name='*')
@dataclass
class OBSConfig:
"""Dataclass to hold OBS connection parameters."""
"""Dataclass to hold OBS connection parameters.
Attributes:
host (str): The hostname or IP address of the OBS WebSocket server.
port (int): The port number of the OBS WebSocket server.
password (str): The password for the OBS WebSocket server, if required.
"""
host: str = 'localhost'
port: int = 4455
@ -56,7 +63,13 @@ class OBSConfig:
@dataclass
class StyleConfig:
"""Dataclass to hold style parameters."""
"""Dataclass to hold style parameters.
Attributes:
name (str): The name of the style to use for console output.
no_border (bool): Whether to style the borders in the console output.
"""
name: str = 'disabled'
no_border: bool = False
@ -74,19 +87,11 @@ def setup_logging(type_, value: Any):
@app.meta.default
def launcher(
*tokens: Annotated[str, Parameter(show=False, allow_leading_hyphen=True)],
obs_config: OBSConfig = Annotated[
OBSConfig,
Parameter(
show=False, allow_leading_hyphen=True, help='OBS connection parameters'
),
],
style_config: StyleConfig = Annotated[
StyleConfig,
Parameter(show=False, allow_leading_hyphen=True, help='Style parameters'),
],
obs_config: OBSConfig,
style_config: StyleConfig,
debug: Annotated[
bool,
Parameter(validator=setup_logging),
Parameter(validator=setup_logging, help='Enable debug logging'),
] = False,
):
"""Command line interface for the OBS WebSocket API."""

View File

@ -3,7 +3,7 @@
from typing import Annotated, Optional
import obsws_python as obsws
from cyclopts import App, Argument, Parameter
from cyclopts import App, Parameter
from rich.table import Table
from rich.text import Text
@ -17,17 +17,21 @@ app = App(name='filter', help='Commands for managing filters in OBS sources')
@app.command(name=['list', 'ls'])
def list_(
source_name: Annotated[
Optional[str],
Argument(
hint='The source to list filters for',
),
] = None,
source_name: Optional[str] = None,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List filters for a source."""
"""List filters for a source.
Parameters
----------
source_name : str, optional
The name of the source to list filters for. If not provided, the current program scene's source will be used.
ctx : Context
The context containing the OBS client and other settings.
"""
if not source_name:
source_name = ctx.client.get_current_program_scene().scene_name
@ -90,19 +94,24 @@ def _get_filter_enabled(ctx: Context, source_name: str, filter_name: str):
@app.command(name=['enable', 'on'])
def enable(
source_name: Annotated[
str,
Argument(hint='The source to enable the filter for'),
],
filter_name: Annotated[
str,
Argument(hint='The name of the filter to enable'),
],
source_name: str,
filter_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Enable a filter for a source."""
"""Enable a filter for a source.
Parameters
----------
source_name : str
The name of the source to enable the filter for.
filter_name : str
The name of the filter to enable.
ctx : Context
The context containing the OBS client and other settings.
"""
if _get_filter_enabled(ctx, source_name, filter_name):
raise OBSWSCLIError(
f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]',
@ -117,19 +126,24 @@ def enable(
@app.command(name=['disable', 'off'])
def disable(
source_name: Annotated[
str,
Argument(hint='The source to disable the filter for'),
],
filter_name: Annotated[
str,
Argument(hint='The name of the filter to disable'),
],
source_name: str,
filter_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Disable a filter for a source."""
"""Disable a filter for a source.
Parameters
----------
source_name : str
The name of the source to disable the filter for.
filter_name : str
The name of the filter to disable.
ctx : Context
The context containing the OBS client and other settings.
"""
if not _get_filter_enabled(ctx, source_name, filter_name):
raise OBSWSCLIError(
f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]',
@ -144,19 +158,24 @@ def disable(
@app.command(name=['toggle', 'tg'])
def toggle(
source_name: Annotated[
str,
Argument(hint='The source to toggle the filter for'),
],
filter_name: Annotated[
str,
Argument(hint='The name of the filter to toggle'),
],
source_name: str,
filter_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle a filter for a source."""
"""Toggle a filter for a source.
Parameters
----------
source_name : str
The name of the source to toggle the filter for.
filter_name : str
The name of the filter to toggle.
ctx : Context
The context containing the OBS client and other settings.
"""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name)
new_state = not is_enabled
@ -173,19 +192,24 @@ def toggle(
@app.command(name=['status', 'ss'])
def status(
source_name: Annotated[
str,
Argument(hint='The source to get the filter status for'),
],
filter_name: Annotated[
str,
Argument(hint='The name of the filter to get the status for'),
],
source_name: str,
filter_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of a filter for a source."""
"""Get the status of a filter for a source.
Parameters
----------
source_name : str
The name of the source to check the filter status for.
filter_name : str
The name of the filter to check the status for.
ctx : Context
The context containing the OBS client and other settings.
"""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name)
if is_enabled:
console.out.print(

View File

@ -2,7 +2,7 @@
from typing import Annotated, Optional
from cyclopts import App, Argument, Parameter
from cyclopts import App, Parameter
from rich.table import Table
from rich.text import Text
@ -17,17 +17,22 @@ app = App(name='group', help='Commands for managing groups in OBS scenes')
@app.command(name=['list', 'ls'])
def list_(
scene_name: Annotated[
Optional[str],
Argument(
hint='Scene name to list groups for',
),
] = None,
scene_name: Optional[str] = None,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List groups in a scene."""
"""List groups in a scene.
Parameters
----------
scene_name : str, optional
The name of the scene to list groups for. If not provided, the current program scene
will be used.
ctx : Context
The context containing the OBS client and other settings.
"""
if not scene_name:
scene_name = ctx.client.get_current_program_scene().scene_name
@ -89,16 +94,24 @@ def _get_group(group_name: str, resp: DataclassProtocol) -> dict | None:
@app.command(name=['show', 'sh'])
def show(
scene_name: Annotated[
str,
Argument(hint='Scene name the group is in'),
],
group_name: Annotated[str, Argument(hint='Group name to show')],
scene_name: str,
group_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Show a group in a scene."""
"""Show a group in a scene.
Parameters
----------
scene_name : str
The name of the scene where the group is located.
group_name : str
The name of the group to show.
ctx : Context
The context containing the OBS client and other settings.
"""
if not validate.scene_in_scenes(ctx, scene_name):
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
@ -123,13 +136,24 @@ def show(
@app.command(name=['hide', 'h'])
def hide(
scene_name: Annotated[str, Argument(hint='Scene name the group is in')],
group_name: Annotated[str, Argument(hint='Group name to hide')],
scene_name: str,
group_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Hide a group in a scene."""
"""Hide a group in a scene.
Parameters
----------
scene_name : str
The name of the scene where the group is located.
group_name : str
The name of the group to hide.
ctx : Context
The context containing the OBS client and other settings.
"""
if not validate.scene_in_scenes(ctx, scene_name):
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
@ -154,13 +178,24 @@ def hide(
@app.command(name=['toggle', 'tg'])
def toggle(
scene_name: Annotated[str, Argument(hint='Scene name the group is in')],
group_name: Annotated[str, Argument(hint='Group name to toggle')],
scene_name: str,
group_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle a group in a scene."""
"""Toggle a group in a scene.
Parameters
----------
scene_name : str
The name of the scene where the group is located.
group_name : str
The name of the group to toggle.
ctx : Context
The context containing the OBS client and other settings.
"""
if not validate.scene_in_scenes(ctx, scene_name):
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
@ -189,13 +224,24 @@ def toggle(
@app.command(name=['status', 'ss'])
def status(
scene_name: Annotated[str, Argument(hint='Scene name the group is in')],
group_name: Annotated[str, Argument(hint='Group name to check status')],
scene_name: str,
group_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of a group in a scene."""
"""Get the status of a group in a scene.
Parameters
----------
scene_name : str
The name of the scene where the group is located.
group_name : str
The name of the group to check.
ctx : Context
The context containing the OBS client and other settings.
"""
if not validate.scene_in_scenes(ctx, scene_name):
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',

View File

@ -2,7 +2,7 @@
from typing import Annotated
from cyclopts import App, Argument, Parameter
from cyclopts import App, Parameter
from rich.table import Table
from rich.text import Text
@ -17,7 +17,14 @@ def list_(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all hotkeys."""
"""List all hotkeys.
Parameters
----------
ctx : Context
The context containing the OBS client to interact with.
"""
resp = ctx.client.get_hotkey_list()
table = Table(
@ -39,38 +46,51 @@ def list_(
@app.command(name=['trigger', 'tr'])
def trigger(
hotkey: Annotated[str, Argument(hint='The hotkey to trigger')],
hotkey: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Trigger a hotkey by name."""
"""Trigger a hotkey by name.
Parameters
----------
hotkey : str
The name of the hotkey to trigger.
ctx : Context
The context containing the OBS client to interact with.
"""
ctx.client.trigger_hotkey_by_name(hotkey)
@app.command(name=['trigger-sequence', 'trs'])
def trigger_sequence(
key_id: Annotated[
str,
Argument(
hint='The OBS key ID to trigger, see https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#hotkey for more info',
),
],
key_id: str,
/,
shift: Annotated[
bool, Parameter(help='Press shift when triggering the hotkey')
] = False,
ctrl: Annotated[
bool, Parameter(help='Press control when triggering the hotkey')
] = False,
alt: Annotated[
bool, Parameter(help='Press alt when triggering the hotkey')
] = False,
cmd: Annotated[
bool, Parameter(help='Press cmd when triggering the hotkey')
] = False,
shift: bool = False,
ctrl: bool = False,
alt: bool = False,
cmd: bool = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Trigger a hotkey by sequence."""
"""Trigger a hotkey by sequence.
Parameters
----------
key_id : str
The OBS key ID to trigger, see https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#hotkey for more info
shift : bool, optional
Press shift when triggering the hotkey (default is False)
ctrl : bool, optional
Press control when triggering the hotkey (default is False)
alt : bool, optional
Press alt when triggering the hotkey (default is False)
cmd : bool, optional
Press cmd when triggering the hotkey (default is False)
ctx : Context
The context containing the OBS client to interact with.
"""
ctx.client.trigger_hotkey_by_key_sequence(key_id, shift, ctrl, alt, cmd)

View File

@ -3,7 +3,7 @@
from typing import Annotated
import obsws_python as obsws
from cyclopts import App, Argument, Parameter
from cyclopts import App, Parameter
from rich.table import Table
from rich.text import Text
@ -17,16 +17,35 @@ app = App(name='input', help='Commands for managing inputs in OBS')
@app.command(name=['list', 'ls'])
def list_(
input: Annotated[bool, Parameter(help='Filter by input type.')] = False,
output: Annotated[bool, Parameter(help='Filter by output type.')] = False,
colour: Annotated[bool, Parameter(help='Filter by colour source type.')] = False,
ffmpeg: Annotated[bool, Parameter(help='Filter by ffmpeg source type.')] = False,
vlc: Annotated[bool, Parameter(help='Filter by VLC source type.')] = False,
uuid: Annotated[bool, Parameter(help='Show UUIDs of inputs.')] = False,
input: bool = False,
output: bool = False,
colour: bool = False,
ffmpeg: bool = False,
vlc: bool = False,
uuid: bool = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all inputs."""
"""List all inputs.
Parameters
----------
input:
Filter by input type.
output:
Filter by output type.
colour:
Filter by colour source type.
ffmpeg:
Filter by ffmpeg source type.
vlc:
Filter by VLC source type.
uuid:
Show UUIDs of inputs.
ctx:
The context containing the client and style.
"""
resp = ctx.client.get_input_list()
kinds = []
@ -104,12 +123,21 @@ def list_(
@app.command(name=['mute', 'm'])
def mute(
input_name: Annotated[str, Argument(hint='Name of the input to mute.')],
input_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Mute an input."""
"""Mute an input.
Parameters
----------
input_name: str
Name of the input to mute.
ctx: Context
The context containing the client and style.
"""
if not validate.input_in_inputs(ctx, input_name):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] not found.',
@ -126,12 +154,21 @@ def mute(
@app.command(name=['unmute', 'um'])
def unmute(
input_name: Annotated[str, Argument(hint='Name of the input to unmute.')],
input_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Unmute an input."""
"""Unmute an input.
Parameters
----------
input_name: str
Name of the input to unmute.
ctx: Context
The context containing the client and style.
"""
if not validate.input_in_inputs(ctx, input_name):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] not found.',
@ -148,15 +185,21 @@ def unmute(
@app.command(name=['toggle', 'tg'])
def toggle(
input_name: Annotated[
str,
Argument(hint='Name of the input to toggle.'),
],
input_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle an input."""
"""Toggle an input.
Parameters
----------
input_name: str
Name of the input to toggle.
ctx: Context
The context containing the client and style.
"""
if not validate.input_in_inputs(ctx, input_name):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] not found.',

View File

@ -2,7 +2,7 @@
from typing import Annotated
from cyclopts import App, Argument, Parameter
from cyclopts import App, Parameter
from rich.table import Table
from rich.text import Text
@ -19,7 +19,14 @@ def list_(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List profiles."""
"""List profiles.
Parameters
----------
ctx: Context
The context containing the client and style.
"""
resp = ctx.client.get_profile_list()
table = Table(title='Profiles', padding=(0, 2), border_style=ctx.style.border)
@ -46,7 +53,14 @@ def current(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the current profile."""
"""Get the current profile.
Parameters
----------
ctx: Context
The context containing the client and style.
"""
resp = ctx.client.get_profile_list()
console.out.print(
f'Current profile: {console.highlight(ctx, resp.current_profile_name)}'
@ -55,15 +69,21 @@ def current(
@app.command(name=['switch', 'set'])
def switch(
profile_name: Annotated[
str,
Argument(hint='Name of the profile to switch to'),
],
profile_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Switch to a profile."""
"""Switch to a profile.
Parameters
----------
profile_name: str
Name of the profile to switch to.
ctx: Context
The context containing the client and style.
"""
if not validate.profile_exists(ctx, profile_name):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise OBSWSCLIError(
@ -84,15 +104,21 @@ def switch(
@app.command(name=['create', 'new'])
def create(
profile_name: Annotated[
str,
Argument(hint='Name of the profile to create.'),
],
profile_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Create a new profile."""
"""Create a new profile.
Parameters
----------
profile_name: str
Name of the profile to create.
ctx: Context
The context containing the client and style.
"""
if validate.profile_exists(ctx, profile_name):
raise OBSWSCLIError(
f'Profile [yellow]{profile_name}[/yellow] already exists.',
@ -105,15 +131,21 @@ def create(
@app.command(name=['remove', 'rm'])
def remove(
profile_name: Annotated[
str,
Argument(hint='Name of the profile to remove.'),
],
profile_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Remove a profile."""
"""Remove a profile.
Parameters
----------
profile_name: str
Name of the profile to remove.
ctx: Context
The context containing the client and style.
"""
if not validate.profile_exists(ctx, profile_name):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise OBSWSCLIError(

View File

@ -1,8 +1,8 @@
"""module containing commands for manipulating projectors in OBS."""
from typing import Annotated
from typing import Annotated, Optional
from cyclopts import App, Argument, Parameter
from cyclopts import App, Parameter, validators
from rich.table import Table
from rich.text import Text
@ -19,7 +19,14 @@ def list_monitors(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List available monitors."""
"""List available monitors.
Parameters
----------
ctx : Context
The context containing the OBS client and configuration.
"""
resp = ctx.client.get_monitor_list()
if not resp.monitors:
@ -51,21 +58,24 @@ def list_monitors(
@app.command(name=['open', 'o'])
def open(
source_name: Annotated[
str,
Argument(
hint='Name of the source to project.',
),
] = '',
source_name: Optional[str] = None,
/,
monitor_index: Annotated[
int,
Parameter(help='Index of the monitor to open the projector on.'),
] = 0,
monitor_index: Annotated[int, Parameter(validator=validators.Number(gte=0))] = 0,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Open a fullscreen projector for a source on a specific monitor."""
"""Open a fullscreen projector for a source on a specific monitor.
Parameters
----------
source_name : str, optional
The name of the source to project. If not provided, the current program scene will be used.
monitor_index : int, optional
The index of the monitor to open the projector on. Defaults to 0 (the primary monitor).
ctx : Context
The context containing the OBS client and configuration.
"""
if not source_name:
source_name = ctx.client.get_current_program_scene().scene_name

View File

@ -3,7 +3,7 @@
from pathlib import Path
from typing import Annotated, Optional
from cyclopts import App, Argument, Parameter
from cyclopts import App, Parameter
from . import console
from .context import Context
@ -24,7 +24,14 @@ def start(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start recording."""
"""Start recording.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
active, paused = _get_recording_status(ctx)
if active:
err_msg = 'Recording is already in progress, cannot start.'
@ -41,7 +48,14 @@ def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop recording."""
"""Stop recording.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
active, _ = _get_recording_status(ctx)
if not active:
raise OBSWSCLIError(
@ -59,7 +73,14 @@ def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle recording."""
"""Toggle recording.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
resp = ctx.client.toggle_record()
if resp.output_active:
console.out.print('Recording started successfully.')
@ -72,7 +93,14 @@ def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get recording status."""
"""Get recording status.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
active, paused = _get_recording_status(ctx)
if active:
if paused:
@ -88,7 +116,14 @@ def resume(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Resume recording."""
"""Resume recording.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
active, paused = _get_recording_status(ctx)
if not active:
raise OBSWSCLIError(
@ -108,7 +143,14 @@ def pause(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Pause recording."""
"""Pause recording.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
active, paused = _get_recording_status(ctx)
if not active:
raise OBSWSCLIError(
@ -125,18 +167,22 @@ def pause(
@app.command(name=['directory', 'd'])
def directory(
record_directory: Annotated[
Optional[Path],
# Since the CLI and OBS may be running on different platforms,
# we won't validate the path here.
Argument(
hint='Directory to set for recording.',
),
] = None,
# Since the CLI and OBS may be running on different platforms,
# we won't validate the path here.
record_directory: Optional[Path] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get or set the recording directory."""
"""Get or set the recording directory.
Parameters
----------
record_directory: Optional[Path]
The directory to set for recording. If not provided, the current recording directory is displayed.
ctx: Context
The context containing the OBS client and other settings.
"""
if record_directory is not None:
ctx.client.set_record_directory(str(record_directory))
console.out.print(
@ -154,7 +200,14 @@ def split(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Split the current recording."""
"""Split the current recording.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
active, paused = _get_recording_status(ctx)
if not active:
console.err.print('Recording is not in progress, cannot split.')
@ -170,16 +223,20 @@ def split(
@app.command(name=['chapter', 'ch'])
def chapter(
chapter_name: Annotated[
Optional[str],
Argument(
hint='Name of the chapter to create.',
),
] = None,
chapter_name: Optional[str] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Create a chapter in the current recording."""
"""Create a chapter in the current recording.
Parameters
----------
chapter_name: Optional[str]
The name of the chapter to create. If not provided, an unnamed chapter is created.
ctx: Context
The context containing the OBS client and other settings.
"""
active, paused = _get_recording_status(ctx)
if not active:
raise OBSWSCLIError(

View File

@ -19,7 +19,14 @@ def start(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start the replay buffer."""
"""Start the replay buffer.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
resp = ctx.client.get_replay_buffer_status()
if resp.output_active:
raise OBSWSCLIError('Replay buffer is already active.', ExitCode.ERROR)
@ -33,7 +40,14 @@ def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop the replay buffer."""
"""Stop the replay buffer.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
resp = ctx.client.get_replay_buffer_status()
if not resp.output_active:
raise OBSWSCLIError('Replay buffer is not active.', ExitCode.ERROR)
@ -47,7 +61,14 @@ def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle the replay buffer."""
"""Toggle the replay buffer.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
resp = ctx.client.toggle_replay_buffer()
if resp.output_active:
console.out.print('Replay buffer is active.')
@ -60,7 +81,14 @@ def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of the replay buffer."""
"""Get the status of the replay buffer.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
resp = ctx.client.get_replay_buffer_status()
if resp.output_active:
console.out.print('Replay buffer is active.')
@ -73,6 +101,13 @@ def save(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Save the replay buffer."""
"""Save the replay buffer.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
ctx.client.save_replay_buffer()
console.out.print('Replay buffer saved.')

View File

@ -2,7 +2,7 @@
from typing import Annotated
from cyclopts import App, Argument, Parameter
from cyclopts import App, Parameter
from rich.table import Table
from rich.text import Text
@ -16,11 +16,20 @@ app = App(name='scene', help='Commands for managing OBS scenes')
@app.command(name=['list', 'ls'])
def list_(
uuid: Annotated[bool, Parameter(help='Show UUIDs of scenes')] = False,
uuid: bool = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all scenes."""
"""List all scenes.
Parameters
----------
uuid : bool
Show UUIDs of scenes.
ctx : Context
The context containing the OBS client and configuration.
"""
resp = ctx.client.get_scene_list()
scenes = (
(scene.get('sceneName'), scene.get('sceneUuid'))
@ -62,13 +71,20 @@ def list_(
@app.command(name=['current', 'get'])
def current(
preview: Annotated[
bool, Parameter(help='Get the preview scene instead of the program scene')
] = False,
preview: bool = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the current program scene or preview scene."""
"""Get the current program scene or preview scene.
Parameters
----------
preview : bool
If True, get the preview scene instead of the program scene.
ctx : Context
The context containing the OBS client and configuration.
"""
if preview and not validate.studio_mode_enabled(ctx):
raise OBSWSCLIError(
'Studio mode is not enabled, cannot get preview scene.',
@ -89,16 +105,24 @@ def current(
@app.command(name=['switch', 'set'])
def switch(
scene_name: Annotated[str, Argument(hint='Name of the scene to switch to')],
scene_name: str,
/,
preview: Annotated[
bool,
Parameter(help='Switch to the preview scene instead of the program scene'),
] = False,
preview: bool = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Switch to a scene."""
"""Switch to a scene.
Parameters
----------
scene_name : str
The name of the scene to switch to.
preview : bool
If True, switch to the preview scene instead of the program scene.
ctx : Context
The context containing the OBS client and configuration.
"""
if preview and not validate.studio_mode_enabled(ctx):
raise OBSWSCLIError(
'Studio mode is not enabled, cannot switch to preview scene.',

View File

@ -2,7 +2,7 @@
from typing import Annotated
from cyclopts import App, Argument, Parameter
from cyclopts import App, Parameter
from rich.table import Table
from . import console, validate
@ -20,7 +20,14 @@ def list_(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all scene collections."""
"""List all scene collections.
Parameters
----------
ctx : Context
The context containing the OBS client and configuration.
"""
resp = ctx.client.get_scene_collection_list()
table = Table(
@ -41,7 +48,14 @@ def current(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the current scene collection."""
"""Get the current scene collection.
Parameters
----------
ctx : Context
The context containing the OBS client and configuration.
"""
resp = ctx.client.get_scene_collection_list()
console.out.print(
f'Current scene collection: {console.highlight(ctx, resp.current_scene_collection_name)}'
@ -50,14 +64,21 @@ def current(
@app.command(name=['switch', 'set'])
def switch(
scene_collection_name: Annotated[
str, Argument(hint='Name of the scene collection to switch to')
],
scene_collection_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Switch to a scene collection."""
"""Switch to a scene collection.
Parameters
----------
scene_collection_name : str
The name of the scene collection to switch to.
ctx : Context
The context containing the OBS client and configuration.
"""
if not validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
raise OBSWSCLIError(
f'Scene collection [yellow]{scene_collection_name}[/yellow] not found.',
@ -81,14 +102,21 @@ def switch(
@app.command(name=['create', 'new'])
def create(
scene_collection_name: Annotated[
str, Argument(hint='Name of the scene collection to create')
],
scene_collection_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Create a new scene collection."""
"""Create a new scene collection.
Parameters
----------
scene_collection_name : str
The name of the scene collection to create.
ctx : Context
The context containing the OBS client and configuration.
"""
if validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
raise OBSWSCLIError(
f'Scene collection [yellow]{scene_collection_name}[/yellow] already exists.',

View File

@ -2,7 +2,7 @@
from typing import Annotated, Optional
from cyclopts import App, Argument, Parameter
from cyclopts import App, Parameter
from rich.table import Table
from . import console, util, validate
@ -15,18 +15,25 @@ app = App(name='sceneitem', help='Commands for controlling scene items in OBS.')
@app.command(name=['list', 'ls'])
def list_(
scene_name: Annotated[
Optional[str],
Argument(
hint='Scene name to list items for',
),
] = None,
scene_name: Optional[str] = None,
/,
uuid: Annotated[bool, Parameter(help='Show UUIDs of scene items')] = False,
uuid: bool = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all items in a scene."""
"""List all items in a scene.
Parameters
----------
scene_name : str, optional
The name of the scene to list items for. If not provided, the current program scene
will be used.
uuid : bool
Show UUIDs of scene items.
ctx : Context
The context containing the OBS client and configuration.
"""
if not scene_name:
scene_name = ctx.client.get_current_program_scene().scene_name
@ -61,21 +68,21 @@ def list_(
table = Table(
title=f'Items in Scene: {scene_name}',
padding=(0, 2),
border_style=ctx.obj['style'].border,
border_style=ctx.style.border,
)
if uuid:
columns = [
('Item ID', 'center', ctx.obj['style'].column),
('Item Name', 'left', ctx.obj['style'].column),
('In Group', 'left', ctx.obj['style'].column),
('Item ID', 'center', ctx.style.column),
('Item Name', 'left', ctx.style.column),
('In Group', 'left', ctx.style.column),
('Enabled', 'center', None),
('UUID', 'left', ctx.obj['style'].column),
('UUID', 'left', ctx.style.column),
]
else:
columns = [
('Item ID', 'center', ctx.obj['style'].column),
('Item Name', 'left', ctx.obj['style'].column),
('In Group', 'left', ctx.obj['style'].column),
('Item ID', 'center', ctx.style.column),
('Item Name', 'left', ctx.style.column),
('In Group', 'left', ctx.style.column),
('Enabled', 'center', None),
]
# Add columns to the table
@ -84,7 +91,7 @@ def list_(
for item_id, item_name, is_group, is_enabled, source_uuid in items:
if is_group:
resp = ctx.obj['obsws'].get_group_scene_item_list(item_name)
resp = ctx.client.get_group_scene_item_list(item_name)
group_items = sorted(
(
(
@ -175,7 +182,7 @@ def _get_scene_name_and_item_id(
):
"""Get the scene name and item ID for the given scene and item name."""
if group:
resp = ctx.obj['obsws'].get_group_scene_item_list(group)
resp = ctx.client.get_group_scene_item_list(group)
for item in resp.scene_items:
if item.get('sourceName') == item_name:
scene_name = group
@ -187,7 +194,7 @@ def _get_scene_name_and_item_id(
exit_code=ExitCode.ERROR,
)
else:
resp = ctx.obj['obsws'].get_scene_item_id(scene_name, item_name)
resp = ctx.client.get_scene_item_id(scene_name, item_name)
scene_item_id = resp.scene_item_id
return scene_name, scene_item_id
@ -195,17 +202,27 @@ def _get_scene_name_and_item_id(
@app.command(name=['show', 'sh'])
def show(
scene_name: Annotated[str, Argument(hint='Scene name the item is in')],
item_name: Annotated[
str,
Argument(hint='Item name to show in the scene'),
],
scene_name: str,
item_name: str,
/,
group: Annotated[Optional[str], Parameter(help='Parent group name')] = None,
group: Optional[str] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Show an item in a scene."""
"""Show an item in a scene.
Parameters
----------
scene_name : str
The name of the scene the item is in.
item_name : str
The name of the item to show in the scene.
group : str, optional
The name of the parent group the item is in, if applicable.
ctx : Context
The context containing the OBS client and configuration.
"""
_validate_sources(ctx, scene_name, item_name, group)
old_scene_name = scene_name
@ -213,7 +230,7 @@ def show(
ctx, scene_name, item_name, group
)
ctx.obj['obsws'].set_scene_item_enabled(
ctx.client.set_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
enabled=True,
@ -236,17 +253,27 @@ def show(
@app.command(name=['hide', 'h'])
def hide(
scene_name: Annotated[str, Argument(hint='Scene name the item is in')],
item_name: Annotated[
str,
Argument(hint='Item name to hide in the scene'),
],
scene_name: str,
item_name: str,
/,
group: Annotated[Optional[str], Parameter(help='Parent group name')] = None,
group: Optional[str] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Hide an item in a scene."""
"""Hide an item in a scene.
Parameters
----------
scene_name : str
The name of the scene the item is in.
item_name : str
The name of the item to hide in the scene.
group : str, optional
The name of the parent group the item is in, if applicable.
ctx : Context
The context containing the OBS client and configuration.
"""
_validate_sources(ctx, scene_name, item_name, group)
old_scene_name = scene_name
@ -254,7 +281,7 @@ def hide(
ctx, scene_name, item_name, group
)
ctx.obj['obsws'].set_scene_item_enabled(
ctx.client.set_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
enabled=False,
@ -276,14 +303,27 @@ def hide(
@app.command(name=['toggle', 'tg'])
def toggle(
scene_name: Annotated[str, Argument(hint='Scene name the item is in')],
item_name: Annotated[str, Argument(hint='Item name to toggle in the scene')],
scene_name: str,
item_name: str,
/,
group: Annotated[Optional[str], Parameter(help='Parent group name')] = None,
group: Optional[str] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle an item in a scene."""
"""Toggle an item in a scene.
Parameters
----------
scene_name : str
The name of the scene the item is in.
item_name : str
The name of the item to toggle in the scene.
group : str, optional
The name of the parent group the item is in, if applicable.
ctx : Context
The context containing the OBS client and configuration.
"""
_validate_sources(ctx, scene_name, item_name, group)
old_scene_name = scene_name
@ -331,16 +371,27 @@ def toggle(
@app.command(name=['visible', 'v'])
def visible(
scene_name: Annotated[str, Argument(hint='Scene name the item is in')],
item_name: Annotated[
str, Argument(hint='Item name to check visibility in the scene')
],
scene_name: str,
item_name: str,
/,
group: Annotated[Optional[str], Parameter(help='Parent group name')] = None,
group: Optional[str] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Check if an item in a scene is visible."""
"""Check if an item in a scene is visible.
Parameters
----------
scene_name : str
The name of the scene the item is in.
item_name : str
The name of the item to check visibility in the scene.
group : str, optional
The name of the parent group the item is in, if applicable.
ctx : Context
The context containing the OBS client and configuration.
"""
_validate_sources(ctx, scene_name, item_name, group)
old_scene_name = scene_name
@ -371,59 +422,72 @@ def visible(
@app.command(name=['transform', 't'])
def transform(
scene_name: Annotated[str, Argument(hint='Scene name the item is in')],
item_name: Annotated[str, Argument(hint='Item name to transform in the scene')],
scene_name: str,
item_name: str,
/,
group: Annotated[Optional[str], Parameter(help='Parent group name')] = None,
alignment: Annotated[
Optional[int], Parameter(help='Alignment of the item in the scene')
] = None,
bounds_alignment: Annotated[
Optional[int], Parameter(help='Bounds alignment of the item in the scene')
] = None,
bounds_height: Annotated[
Optional[float], Parameter(help='Height of the item in the scene')
] = None,
bounds_type: Annotated[
Optional[str], Parameter(help='Type of bounds for the item in the scene')
] = None,
bounds_width: Annotated[
Optional[float], Parameter(help='Width of the item in the scene')
] = None,
crop_to_bounds: Annotated[
Optional[bool], Parameter(help='Crop the item to the bounds')
] = None,
crop_bottom: Annotated[
Optional[float], Parameter(help='Bottom crop of the item in the scene')
] = None,
crop_left: Annotated[
Optional[float], Parameter(help='Left crop of the item in the scene')
] = None,
crop_right: Annotated[
Optional[float], Parameter(help='Right crop of the item in the scene')
] = None,
crop_top: Annotated[
Optional[float], Parameter(help='Top crop of the item in the scene')
] = None,
position_x: Annotated[
Optional[float], Parameter(help='X position of the item in the scene')
] = None,
position_y: Annotated[
Optional[float], Parameter(help='Y position of the item in the scene')
] = None,
rotation: Annotated[
Optional[float], Parameter(help='Rotation of the item in the scene')
] = None,
scale_x: Annotated[
Optional[float], Parameter(help='X scale of the item in the scene')
] = None,
scale_y: Annotated[
Optional[float], Parameter(help='Y scale of the item in the scene')
] = None,
group: Optional[str] = None,
alignment: Optional[int] = None,
bounds_alignment: Optional[int] = None,
bounds_height: Optional[float] = None,
bounds_type: Optional[str] = None,
bounds_width: Optional[float] = None,
crop_to_bounds: Optional[bool] = None,
crop_bottom: Optional[float] = None,
crop_left: Optional[float] = None,
crop_right: Optional[float] = None,
crop_top: Optional[float] = None,
position_x: Optional[float] = None,
position_y: Optional[float] = None,
rotation: Optional[float] = None,
scale_x: Optional[float] = None,
scale_y: Optional[float] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Set the transform of an item in a scene."""
"""Set the transform of an item in a scene.
Parameters
----------
scene_name : str
The name of the scene the item is in.
item_name : str
The name of the item to transform in the scene.
group : str, optional
The name of the parent group the item is in, if applicable.
alignment : int, optional
Alignment of the item in the scene.
bounds_alignment : int, optional
Bounds alignment of the item in the scene.
bounds_height : float, optional
Height of the item in the scene.
bounds_type : str, optional
Type of bounds for the item in the scene.
bounds_width : float, optional
Width of the item in the scene.
crop_to_bounds : bool, optional
Crop the item to the bounds.
crop_bottom : float, optional
Bottom crop of the item in the scene.
crop_left : float, optional
Left crop of the item in the scene.
crop_right : float, optional
Right crop of the item in the scene.
crop_top : float, optional
Top crop of the item in the scene.
position_x : float, optional
X position of the item in the scene.
position_y : float, optional
Y position of the item in the scene.
rotation : float, optional
Rotation of the item in the scene.
scale_x : float, optional
X scale of the item in the scene.
scale_y : float, optional
Y scale of the item in the scene.
ctx : Context
The context containing the OBS client and configuration.
"""
_validate_sources(ctx, scene_name, item_name, group)
old_scene_name = scene_name

View File

@ -4,7 +4,7 @@ from pathlib import Path
from typing import Annotated
import obsws_python as obsws
from cyclopts import App, Argument, Parameter
from cyclopts import App, Parameter, validators
from . import console
from .context import Context
@ -16,43 +16,37 @@ app = App(name='screenshot', help='Commands for taking screenshots using OBS.')
@app.command(name=['save', 'sv'])
def save(
source_name: Annotated[
str,
Argument(
hint='Name of the source to take a screenshot of.',
),
],
output_path: Annotated[
Path,
# Since the CLI and OBS may be running on different platforms,
# we won't validate the path here.
Argument(
hint='Path to save the screenshot (must include file name and extension).',
),
],
source_name: str,
# Since the CLI and OBS may be running on different platforms,
# we won't validate the path here.
output_path: Path,
/,
width: Annotated[
float,
Parameter(
help='Width of the screenshot.',
),
] = 1920,
height: Annotated[
float,
Parameter(
help='Height of the screenshot.',
),
] = 1080,
width: float = 1920,
height: float = 1080,
quality: Annotated[
float,
Parameter(
help='Quality of the screenshot.',
),
] = -1,
float, Parameter(validator=validators.Number(gte=-1, lte=100))
] = -1.0,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Take a screenshot and save it to a file."""
"""Take a screenshot and save it to a file.
Parameters
----------
source_name : str
Name of the source to take a screenshot of.
output_path : Path
Path to save the screenshot (must include file name and extension).
width : float
Width of the screenshot.
height : float
Height of the screenshot.
quality : float
Quality of the screenshot. A value of -1 uses the default quality.
ctx : Context
Context containing the OBS WebSocket client instance.
"""
try:
ctx.client.save_source_screenshot(
name=source_name,

View File

@ -23,7 +23,14 @@ def start(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start streaming."""
"""Start streaming.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
"""
active, _ = _get_streaming_status(ctx)
if active:
raise OBSWSCLIError(
@ -40,7 +47,14 @@ def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop streaming."""
"""Stop streaming.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
"""
active, _ = _get_streaming_status(ctx)
if not active:
raise OBSWSCLIError(
@ -57,7 +71,14 @@ def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle streaming."""
"""Toggle streaming.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
"""
resp = ctx.client.toggle_stream()
if resp.output_active:
console.out.print('Streaming started successfully.')
@ -70,7 +91,14 @@ def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get streaming status."""
"""Get streaming status.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
"""
active, duration = _get_streaming_status(ctx)
if active:
if duration > 0:

View File

@ -15,8 +15,15 @@ def enable(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Enable studio mode."""
ctx.obj['obsws'].set_studio_mode_enabled(True)
"""Enable studio mode.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
"""
ctx.client.set_studio_mode_enabled(True)
console.out.print('Studio mode has been enabled.')
@ -25,7 +32,14 @@ def disable(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Disable studio mode."""
"""Disable studio mode.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
"""
ctx.client.set_studio_mode_enabled(False)
console.out.print('Studio mode has been disabled.')
@ -35,7 +49,14 @@ def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle studio mode."""
"""Toggle studio mode.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
"""
resp = ctx.client.get_studio_mode_enabled()
if resp.studio_mode_enabled:
ctx.client.set_studio_mode_enabled(False)
@ -50,7 +71,14 @@ def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of studio mode."""
"""Get the status of studio mode.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
"""
resp = ctx.client.get_studio_mode_enabled()
if resp.studio_mode_enabled:
console.out.print('Studio mode is enabled.')

View File

@ -2,7 +2,7 @@
from typing import Annotated, Optional
from cyclopts import App, Argument, Parameter
from cyclopts import App, Parameter
from . import console, validate
from .context import Context
@ -14,11 +14,20 @@ app = App(name='text', help='Commands for controlling text inputs in OBS.')
@app.command(name=['current', 'get'])
def current(
input_name: Annotated[str, Argument(hint='Name of the text input to get.')],
input_name: str,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the current text for a text input."""
"""Get the current text for a text input.
Parameters
----------
input_name : str
The name of the text input to retrieve the current text from.
ctx : Context
The context containing the OBS client and other settings.
"""
if not validate.input_in_inputs(ctx, input_name):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] not found.', code=ExitCode.ERROR
@ -41,16 +50,25 @@ def current(
@app.command(name=['update', 'set'])
def update(
input_name: Annotated[str, Argument(hint='Name of the text input to update.')],
new_text: Annotated[
Optional[str],
Argument(hint='The new text to set for the input.'),
] = None,
input_name: str,
new_text: Optional[str] = None,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Update the text of a text input."""
"""Update the text of a text input.
Parameters
----------
input_name : str
The name of the text input to update.
new_text : Optional[str]
The new text to set for the input. If not provided, the text will be cleared
(set to an empty string).
ctx : Context
The context containing the OBS client and other settings.
"""
if not validate.input_in_inputs(ctx, input_name):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] not found.', code=ExitCode.ERROR

View File

@ -12,27 +12,51 @@ app = App(name='virtualcam', help='Commands for controlling the virtual camera i
@app.command(name=['start', 's'])
def start(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start the virtual camera."""
"""Start the virtual camera.
Parameters
----------
ctx : Context
The context containing the OBS client and other settings.
"""
ctx.client.start_virtual_cam()
console.out.print('Virtual camera started.')
@app.command(name=['stop', 'p'])
def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop the virtual camera."""
"""Stop the virtual camera.
Parameters
----------
ctx : Context
The context containing the OBS client and other settings.
"""
ctx.client.stop_virtual_cam()
console.out.print('Virtual camera stopped.')
@app.command(name=['toggle', 'tg'])
def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle the virtual camera."""
"""Toggle the virtual camera.
Parameters
----------
ctx : Context
The context containing the OBS client and other settings.
"""
resp = ctx.client.toggle_virtual_cam()
if resp.output_active:
console.out.print('Virtual camera is enabled.')
@ -42,9 +66,17 @@ def toggle(
@app.command(name=['status', 'ss'])
def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of the virtual camera."""
"""Get the status of the virtual camera.
Parameters
----------
ctx : Context
The context containing the OBS client and other settings.
"""
resp = ctx.client.get_virtual_cam_status()
if resp.output_active:
console.out.print('Virtual camera is enabled.')