Compare commits

...

8 Commits

21 changed files with 366 additions and 270 deletions

View File

@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
# [0.17.0] - 2025-06-20
### Added
- input list, scene list and sceneitem list now accept --uuid flag.
- Active column added to scene list table.
### Changed
- scene list no longer prints the UUIDs by default, enable it with the --uuid flag.
### Fixed
- Issue with input list not printing all inputs if no filters were applied.
# [0.16.8] - 2025-06-07 # [0.16.8] - 2025-06-07
### Added ### Added

View File

@ -81,6 +81,10 @@ obsws-cli obs-version
#### Scene #### Scene
- list: List all scenes. - list: List all scenes.
- flags:
*optional*
- --uuid: Show UUIDs of scenes
```console ```console
obsws-cli scene list obsws-cli scene list
@ -102,6 +106,10 @@ obsws-cli scene switch LIVE
#### Scene Item #### Scene Item
- list: List all items in a scene. - list: List all items in a scene.
- flags:
*optional*
- --uuid: Show UUIDs of scene items
*optional* *optional*
- args: <scene_name> - args: <scene_name>
@ -267,6 +275,7 @@ obsws-cli group status START "test_group"
- --colour: Filter by colour source type. - --colour: Filter by colour source type.
- --ffmpeg: Filter by ffmpeg source type. - --ffmpeg: Filter by ffmpeg source type.
- --vlc: Filter by VLC source type. - --vlc: Filter by VLC source type.
- --uuid: Show UUIDs of inputs.
```console ```console
obsws-cli input list obsws-cli input list

View File

@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online> # SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online>
# #
# SPDX-License-Identifier: MIT # SPDX-License-Identifier: MIT
__version__ = "0.16.8" __version__ = "0.17.1"

View File

@ -1,15 +1,15 @@
"""Command line interface for the OBS WebSocket API.""" """Command line interface for the OBS WebSocket API."""
import importlib import importlib
import logging
from typing import Annotated from typing import Annotated
import obsws_python as obsws import obsws_python as obsws
import typer import typer
from rich.console import Console
from obsws_cli.__about__ import __version__ as obsws_cli_version from obsws_cli.__about__ import __version__ as obsws_cli_version
from . import settings from . import console, settings
from .alias import AliasGroup from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
@ -33,17 +33,23 @@ for sub_typer in (
module = importlib.import_module(f'.{sub_typer}', package=__package__) module = importlib.import_module(f'.{sub_typer}', package=__package__)
app.add_typer(module.app, name=sub_typer) app.add_typer(module.app, name=sub_typer)
out_console = Console()
err_console = Console(stderr=True, style='bold red')
def version_callback(value: bool): def version_callback(value: bool):
"""Show the version of the CLI.""" """Show the version of the CLI."""
if value: if value:
out_console.print(f'obsws-cli version: {obsws_cli_version}') console.out.print(f'obsws-cli version: {obsws_cli_version}')
raise typer.Exit() raise typer.Exit()
def setup_logging(debug: bool):
"""Set up logging for the application."""
log_level = logging.DEBUG if debug else logging.CRITICAL
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
@app.callback() @app.callback()
def main( def main(
ctx: typer.Context, ctx: typer.Context,
@ -60,7 +66,11 @@ def main(
port: Annotated[ port: Annotated[
int, int,
typer.Option( typer.Option(
'--port', '-P', envvar='OBS_PORT', help='WebSocket port', show_default=4455 '--port',
'-P',
envvar='OBS_PORT',
help='WebSocket port',
show_default=4455,
), ),
] = settings.get('port'), ] = settings.get('port'),
password: Annotated[ password: Annotated[
@ -94,6 +104,19 @@ def main(
callback=version_callback, callback=version_callback,
), ),
] = False, ] = False,
debug: Annotated[
bool,
typer.Option(
'--debug',
'-d',
envvar='OBS_DEBUG',
is_eager=True,
help='Enable debug logging',
show_default=False,
callback=setup_logging,
hidden=True,
),
] = settings.get('debug'),
): ):
"""obsws_cli is a command line interface for the OBS WebSocket API.""" """obsws_cli is a command line interface for the OBS WebSocket API."""
ctx.obj = ctx.with_resource(obsws.ReqClient(**ctx.params)) ctx.obj = ctx.with_resource(obsws.ReqClient(**ctx.params))
@ -103,6 +126,6 @@ def main(
def obs_version(ctx: typer.Context): def obs_version(ctx: typer.Context):
"""Get the OBS Client and WebSocket versions.""" """Get the OBS Client and WebSocket versions."""
resp = ctx.obj.get_version() resp = ctx.obj.get_version()
out_console.print( console.out.print(
f'OBS Client version: {resp.obs_version} with WebSocket version: {resp.obs_web_socket_version}' f'OBS Client version: {resp.obs_version} with WebSocket version: {resp.obs_web_socket_version}'
) )

6
obsws_cli/console.py Normal file
View File

@ -0,0 +1,6 @@
"""module for console output handling in obsws_cli."""
from rich.console import Console
out = Console()
err = Console(stderr=True, style='bold red')

View File

@ -4,15 +4,12 @@ from typing import Annotated, Optional
import obsws_python as obsws import obsws_python as obsws
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from . import util from . import console, util
from .alias import AliasGroup from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True, style='bold red')
@app.callback() @app.callback()
@ -39,7 +36,7 @@ def list_(
resp = ctx.obj.get_source_filter_list(source_name) resp = ctx.obj.get_source_filter_list(source_name)
except obsws.error.OBSSDKRequestError as e: except obsws.error.OBSSDKRequestError as e:
if e.code == 600: if e.code == 600:
err_console.print( console.err.print(
f'No source was found by the name of [yellow]{source_name}[/yellow].' f'No source was found by the name of [yellow]{source_name}[/yellow].'
) )
raise typer.Exit(1) raise typer.Exit(1)
@ -47,7 +44,7 @@ def list_(
raise raise
if not resp.filters: if not resp.filters:
out_console.print(f'No filters found for source [yellow]{source_name}[/yellow]') console.out.print(f'No filters found for source [yellow]{source_name}[/yellow]')
raise typer.Exit() raise typer.Exit()
table = Table(title=f'Filters for Source: {source_name}', padding=(0, 2)) table = Table(title=f'Filters for Source: {source_name}', padding=(0, 2))
@ -77,7 +74,7 @@ def list_(
), ),
) )
out_console.print(table) console.out.print(table)
def _get_filter_enabled(ctx: typer.Context, source_name: str, filter_name: str): def _get_filter_enabled(ctx: typer.Context, source_name: str, filter_name: str):
@ -104,13 +101,13 @@ def enable(
): ):
"""Enable a filter for a source.""" """Enable a filter for a source."""
if _get_filter_enabled(ctx, source_name, filter_name): if _get_filter_enabled(ctx, source_name, filter_name):
err_console.print( console.err.print(
f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]' f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]'
) )
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.set_source_filter_enabled(source_name, filter_name, enabled=True) ctx.obj.set_source_filter_enabled(source_name, filter_name, enabled=True)
out_console.print( console.out.print(
f'Enabled filter [green]{filter_name}[/green] for source [green]{source_name}[/green]' f'Enabled filter [green]{filter_name}[/green] for source [green]{source_name}[/green]'
) )
@ -133,13 +130,13 @@ def disable(
): ):
"""Disable a filter for a source.""" """Disable a filter for a source."""
if not _get_filter_enabled(ctx, source_name, filter_name): if not _get_filter_enabled(ctx, source_name, filter_name):
err_console.print( console.err.print(
f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]' f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]'
) )
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.set_source_filter_enabled(source_name, filter_name, enabled=False) ctx.obj.set_source_filter_enabled(source_name, filter_name, enabled=False)
out_console.print( console.out.print(
f'Disabled filter [green]{filter_name}[/green] for source [green]{source_name}[/green]' f'Disabled filter [green]{filter_name}[/green] for source [green]{source_name}[/green]'
) )
@ -166,11 +163,11 @@ def toggle(
ctx.obj.set_source_filter_enabled(source_name, filter_name, enabled=new_state) ctx.obj.set_source_filter_enabled(source_name, filter_name, enabled=new_state)
if new_state: if new_state:
out_console.print( console.out.print(
f'Enabled filter [green]{filter_name}[/green] for source [green]{source_name}[/green]' f'Enabled filter [green]{filter_name}[/green] for source [green]{source_name}[/green]'
) )
else: else:
out_console.print( console.out.print(
f'Disabled filter [green]{filter_name}[/green] for source [green]{source_name}[/green]' f'Disabled filter [green]{filter_name}[/green] for source [green]{source_name}[/green]'
) )
@ -194,10 +191,10 @@ def status(
"""Get the status of a filter for a source.""" """Get the status of a filter for a source."""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name) is_enabled = _get_filter_enabled(ctx, source_name, filter_name)
if is_enabled: if is_enabled:
out_console.print( console.out.print(
f'Filter [green]{filter_name}[/green] is enabled for source [green]{source_name}[/green]' f'Filter [green]{filter_name}[/green] is enabled for source [green]{source_name}[/green]'
) )
else: else:
out_console.print( console.out.print(
f'Filter [green]{filter_name}[/green] is disabled for source [green]{source_name}[/green]' f'Filter [green]{filter_name}[/green] is disabled for source [green]{source_name}[/green]'
) )

View File

@ -3,16 +3,13 @@
from typing import Annotated, Optional from typing import Annotated, Optional
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from . import validate from . import console, validate
from .alias import AliasGroup from .alias import AliasGroup
from .protocols import DataclassProtocol from .protocols import DataclassProtocol
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True, style='bold red')
@app.callback() @app.callback()
@ -36,7 +33,7 @@ def list_(
scene_name = ctx.obj.get_current_program_scene().scene_name scene_name = ctx.obj.get_current_program_scene().scene_name
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.") console.err.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_scene_item_list(scene_name) resp = ctx.obj.get_scene_item_list(scene_name)
@ -47,7 +44,7 @@ def list_(
] ]
if not groups: if not groups:
out_console.print(f"No groups found in scene '{scene_name}'.") console.out.print(f"No groups found in scene '{scene_name}'.")
raise typer.Exit() raise typer.Exit()
table = Table(title=f'Groups in Scene: {scene_name}', padding=(0, 2)) table = Table(title=f'Groups in Scene: {scene_name}', padding=(0, 2))
@ -67,7 +64,7 @@ def list_(
':white_heavy_check_mark:' if is_enabled else ':x:', ':white_heavy_check_mark:' if is_enabled else ':x:',
) )
out_console.print(table) console.out.print(table)
def _get_group(group_name: str, resp: DataclassProtocol) -> dict | None: def _get_group(group_name: str, resp: DataclassProtocol) -> dict | None:
@ -96,12 +93,12 @@ def show(
): ):
"""Show a group in a scene.""" """Show a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.") console.err.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_scene_item_list(scene_name) resp = ctx.obj.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
err_console.print( console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].' f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
) )
raise typer.Exit(1) raise typer.Exit(1)
@ -112,7 +109,7 @@ def show(
enabled=True, enabled=True,
) )
out_console.print(f'Group [green]{group_name}[/green] is now visible.') console.out.print(f'Group [green]{group_name}[/green] is now visible.')
@app.command('hide | h') @app.command('hide | h')
@ -127,12 +124,12 @@ def hide(
): ):
"""Hide a group in a scene.""" """Hide a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f'Scene [yellow]{scene_name}[/yellow] not found.') console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_scene_item_list(scene_name) resp = ctx.obj.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
err_console.print( console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].' f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
) )
raise typer.Exit(1) raise typer.Exit(1)
@ -143,7 +140,7 @@ def hide(
enabled=False, enabled=False,
) )
out_console.print(f'Group [green]{group_name}[/green] is now hidden.') console.out.print(f'Group [green]{group_name}[/green] is now hidden.')
@app.command('toggle | tg') @app.command('toggle | tg')
@ -158,12 +155,12 @@ def toggle(
): ):
"""Toggle a group in a scene.""" """Toggle a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f'Scene [yellow]{scene_name}[/yellow] not found.') console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_scene_item_list(scene_name) resp = ctx.obj.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
err_console.print( console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].' f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
) )
raise typer.Exit(1) raise typer.Exit(1)
@ -176,9 +173,9 @@ def toggle(
) )
if new_state: if new_state:
out_console.print(f'Group [green]{group_name}[/green] is now visible.') console.out.print(f'Group [green]{group_name}[/green] is now visible.')
else: else:
out_console.print(f'Group [green]{group_name}[/green] is now hidden.') console.out.print(f'Group [green]{group_name}[/green] is now hidden.')
@app.command('status | ss') @app.command('status | ss')
@ -193,12 +190,12 @@ def status(
): ):
"""Get the status of a group in a scene.""" """Get the status of a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f'Scene [yellow]{scene_name}[/yellow] not found.') console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_scene_item_list(scene_name) resp = ctx.obj.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
err_console.print( console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].' f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
) )
raise typer.Exit(1) raise typer.Exit(1)
@ -209,6 +206,6 @@ def status(
) )
if enabled.scene_item_enabled: if enabled.scene_item_enabled:
out_console.print(f'Group [green]{group_name}[/green] is now visible.') console.out.print(f'Group [green]{group_name}[/green] is now visible.')
else: else:
out_console.print(f'Group [green]{group_name}[/green] is now hidden.') console.out.print(f'Group [green]{group_name}[/green] is now hidden.')

View File

@ -3,14 +3,12 @@
from typing import Annotated from typing import Annotated
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from . import console
from .alias import AliasGroup from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True, style='bold red')
@app.callback() @app.callback()
@ -31,7 +29,7 @@ def list_(
for hotkey in resp.hotkeys: for hotkey in resp.hotkeys:
table.add_row(hotkey) table.add_row(hotkey)
out_console.print(table) console.out.print(table)
@app.command('trigger | tr') @app.command('trigger | tr')

View File

@ -2,16 +2,14 @@
from typing import Annotated from typing import Annotated
import obsws_python as obsws
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from . import util, validate from . import console, util, validate
from .alias import AliasGroup from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True, style='bold red')
@app.callback() @app.callback()
@ -27,6 +25,7 @@ def list_(
colour: Annotated[bool, typer.Option(help='Filter by colour source type.')] = False, colour: Annotated[bool, typer.Option(help='Filter by colour source type.')] = False,
ffmpeg: Annotated[bool, typer.Option(help='Filter by ffmpeg source type.')] = False, ffmpeg: Annotated[bool, typer.Option(help='Filter by ffmpeg source type.')] = False,
vlc: Annotated[bool, typer.Option(help='Filter by VLC source type.')] = False, vlc: Annotated[bool, typer.Option(help='Filter by VLC source type.')] = False,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of inputs.')] = False,
): ):
"""List all inputs.""" """List all inputs."""
resp = ctx.obj.get_input_list() resp = ctx.obj.get_input_list()
@ -43,11 +42,11 @@ def list_(
if vlc: if vlc:
kinds.append('vlc') kinds.append('vlc')
if not any([input, output, colour, ffmpeg, vlc]): if not any([input, output, colour, ffmpeg, vlc]):
kinds = ['input', 'output', 'color', 'ffmpeg', 'vlc'] kinds = ctx.obj.get_input_kind_list(False).input_kinds
inputs = sorted( inputs = sorted(
( (
(input_.get('inputName'), input_.get('inputKind')) (input_.get('inputName'), input_.get('inputKind'), input_.get('inputUuid'))
for input_ in filter( for input_ in filter(
lambda input_: any(kind in input_.get('inputKind') for kind in kinds), lambda input_: any(kind in input_.get('inputKind') for kind in kinds),
resp.inputs, resp.inputs,
@ -57,34 +56,53 @@ def list_(
) )
if not inputs: if not inputs:
out_console.print('No inputs found.') console.out.print('No inputs found.')
raise typer.Exit() raise typer.Exit()
table = Table(title='Inputs', padding=(0, 2)) table = Table(title='Inputs', padding=(0, 2))
columns = [ if uuid:
('Input Name', 'left', 'cyan'), columns = [
('Kind', 'center', 'cyan'), ('Input Name', 'left', 'cyan'),
('Muted', 'center', None), ('Kind', 'center', 'cyan'),
] ('Muted', 'center', None),
('UUID', 'left', 'cyan'),
]
else:
table.title += ' (UUIDs hidden)'
columns = [
('Input Name', 'left', 'cyan'),
('Kind', 'center', 'cyan'),
('Muted', 'center', None),
]
for column, justify, style in columns: for column, justify, style in columns:
table.add_column(column, justify=justify, style=style) table.add_column(column, justify=justify, style=style)
for input_name, input_kind in inputs: for input_name, input_kind, input_uuid in inputs:
input_mark = '' input_mark = ''
if any( try:
kind in input_kind
for kind in ['input_capture', 'output_capture', 'ffmpeg', 'vlc']
):
input_muted = ctx.obj.get_input_mute(name=input_name).input_muted input_muted = ctx.obj.get_input_mute(name=input_name).input_muted
input_mark = ':white_heavy_check_mark:' if input_muted else ':x:' input_mark = ':white_heavy_check_mark:' if input_muted else ':x:'
except obsws.error.OBSSDKRequestError as e:
if e.code == 604: # Input does not support audio
input_mark = 'N/A'
else:
raise
table.add_row( if uuid:
input_name, table.add_row(
util.snakecase_to_titlecase(input_kind), input_name,
input_mark, util.snakecase_to_titlecase(input_kind),
) input_mark,
input_uuid,
)
else:
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
input_mark,
)
out_console.print(table) console.out.print(table)
@app.command('mute | m') @app.command('mute | m')
@ -96,7 +114,7 @@ def mute(
): ):
"""Mute an input.""" """Mute an input."""
if not validate.input_in_inputs(ctx, input_name): if not validate.input_in_inputs(ctx, input_name):
err_console.print(f'Input [yellow]{input_name}[/yellow] not found.') console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.set_input_mute( ctx.obj.set_input_mute(
@ -104,7 +122,7 @@ def mute(
muted=True, muted=True,
) )
out_console.print(f'Input [green]{input_name}[/green] muted.') console.out.print(f'Input [green]{input_name}[/green] muted.')
@app.command('unmute | um') @app.command('unmute | um')
@ -117,7 +135,7 @@ def unmute(
): ):
"""Unmute an input.""" """Unmute an input."""
if not validate.input_in_inputs(ctx, input_name): if not validate.input_in_inputs(ctx, input_name):
err_console.print(f'Input [yellow]{input_name}[/yellow] not found.') console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.set_input_mute( ctx.obj.set_input_mute(
@ -125,7 +143,7 @@ def unmute(
muted=False, muted=False,
) )
out_console.print(f'Input [green]{input_name}[/green] unmuted.') console.out.print(f'Input [green]{input_name}[/green] unmuted.')
@app.command('toggle | tg') @app.command('toggle | tg')
@ -138,7 +156,7 @@ def toggle(
): ):
"""Toggle an input.""" """Toggle an input."""
if not validate.input_in_inputs(ctx, input_name): if not validate.input_in_inputs(ctx, input_name):
err_console.print(f'Input [yellow]{input_name}[/yellow] not found.') console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_input_mute(name=input_name) resp = ctx.obj.get_input_mute(name=input_name)
@ -150,10 +168,10 @@ def toggle(
) )
if new_state: if new_state:
out_console.print( console.out.print(
f'Input [green]{input_name}[/green] muted.', f'Input [green]{input_name}[/green] muted.',
) )
else: else:
out_console.print( console.out.print(
f'Input [green]{input_name}[/green] unmuted.', f'Input [green]{input_name}[/green] unmuted.',
) )

View File

@ -3,15 +3,12 @@
from typing import Annotated from typing import Annotated
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from . import validate from . import console, validate
from .alias import AliasGroup from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True, style='bold red')
@app.callback() @app.callback()
@ -38,14 +35,14 @@ def list_(ctx: typer.Context):
':white_heavy_check_mark:' if profile == resp.current_profile_name else '', ':white_heavy_check_mark:' if profile == resp.current_profile_name else '',
) )
out_console.print(table) console.out.print(table)
@app.command('current | get') @app.command('current | get')
def current(ctx: typer.Context): def current(ctx: typer.Context):
"""Get the current profile.""" """Get the current profile."""
resp = ctx.obj.get_profile_list() resp = ctx.obj.get_profile_list()
out_console.print(resp.current_profile_name) console.out.print(resp.current_profile_name)
@app.command('switch | set') @app.command('switch | set')
@ -60,18 +57,18 @@ def switch(
): ):
"""Switch to a profile.""" """Switch to a profile."""
if not validate.profile_exists(ctx, profile_name): if not validate.profile_exists(ctx, profile_name):
err_console.print(f'Profile [yellow]{profile_name}[/yellow] not found.') console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_profile_list() resp = ctx.obj.get_profile_list()
if resp.current_profile_name == profile_name: if resp.current_profile_name == profile_name:
err_console.print( console.err.print(
f'Profile [yellow]{profile_name}[/yellow] is already the current profile.' f'Profile [yellow]{profile_name}[/yellow] is already the current profile.'
) )
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.set_current_profile(profile_name) ctx.obj.set_current_profile(profile_name)
out_console.print(f'Switched to profile [green]{profile_name}[/green].') console.out.print(f'Switched to profile [green]{profile_name}[/green].')
@app.command('create | new') @app.command('create | new')
@ -84,11 +81,11 @@ def create(
): ):
"""Create a new profile.""" """Create a new profile."""
if validate.profile_exists(ctx, profile_name): if validate.profile_exists(ctx, profile_name):
err_console.print(f'Profile [yellow]{profile_name}[/yellow] already exists.') console.err.print(f'Profile [yellow]{profile_name}[/yellow] already exists.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.create_profile(profile_name) ctx.obj.create_profile(profile_name)
out_console.print(f'Created profile [green]{profile_name}[/green].') console.out.print(f'Created profile [green]{profile_name}[/green].')
@app.command('remove | rm') @app.command('remove | rm')
@ -101,8 +98,8 @@ def remove(
): ):
"""Remove a profile.""" """Remove a profile."""
if not validate.profile_exists(ctx, profile_name): if not validate.profile_exists(ctx, profile_name):
err_console.print(f'Profile [yellow]{profile_name}[/yellow] not found.') console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.remove_profile(profile_name) ctx.obj.remove_profile(profile_name)
out_console.print(f'Removed profile [green]{profile_name}[/green].') console.out.print(f'Removed profile [green]{profile_name}[/green].')

View File

@ -3,14 +3,12 @@
from typing import Annotated from typing import Annotated
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from . import console
from .alias import AliasGroup from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True, style='bold red')
@app.callback() @app.callback()
@ -24,7 +22,7 @@ def list_monitors(ctx: typer.Context):
resp = ctx.obj.get_monitor_list() resp = ctx.obj.get_monitor_list()
if not resp.monitors: if not resp.monitors:
out_console.print('No monitors found.') console.out.print('No monitors found.')
return return
monitors = sorted( monitors = sorted(
@ -39,7 +37,7 @@ def list_monitors(ctx: typer.Context):
for index, monitor in monitors: for index, monitor in monitors:
table.add_row(str(index), monitor) table.add_row(str(index), monitor)
out_console.print(table) console.out.print(table)
@app.command('open | o') @app.command('open | o')
@ -69,13 +67,13 @@ def open(
monitor_index=monitor_index, monitor_index=monitor_index,
) )
out_console.print( console.out.print(
f'Opened projector for source [green]{source_name}[/] on monitor [green]{monitor["monitorName"]}[/].' f'Opened projector for source [green]{source_name}[/] on monitor [green]{monitor["monitorName"]}[/].'
) )
break break
else: else:
err_console.print( console.err.print(
f'Monitor with index [yellow]{monitor_index}[/yellow] not found.' f'Monitor with index [yellow]{monitor_index}[/yellow] not found.'
) )
raise typer.Exit(code=1) raise typer.Exit(code=1)

View File

@ -4,13 +4,11 @@ from pathlib import Path
from typing import Annotated, Optional from typing import Annotated, Optional
import typer import typer
from rich.console import Console
from . import console
from .alias import AliasGroup from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True, style='bold red')
@app.callback() @app.callback()
@ -33,11 +31,11 @@ def start(ctx: typer.Context):
if paused: if paused:
err_msg += ' Try resuming it.' err_msg += ' Try resuming it.'
err_console.print(err_msg) console.err.print(err_msg)
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.start_record() ctx.obj.start_record()
out_console.print('Recording started successfully.') console.out.print('Recording started successfully.')
@app.command('stop | st') @app.command('stop | st')
@ -45,11 +43,11 @@ def stop(ctx: typer.Context):
"""Stop recording.""" """Stop recording."""
active, _ = _get_recording_status(ctx) active, _ = _get_recording_status(ctx)
if not active: if not active:
err_console.print('Recording is not in progress, cannot stop.') console.err.print('Recording is not in progress, cannot stop.')
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.stop_record() resp = ctx.obj.stop_record()
out_console.print( console.out.print(
f'Recording stopped successfully. Saved to: [green]{resp.output_path}[/green]' f'Recording stopped successfully. Saved to: [green]{resp.output_path}[/green]'
) )
@ -59,9 +57,9 @@ def toggle(ctx: typer.Context):
"""Toggle recording.""" """Toggle recording."""
resp = ctx.obj.toggle_record() resp = ctx.obj.toggle_record()
if resp.output_active: if resp.output_active:
out_console.print('Recording started successfully.') console.out.print('Recording started successfully.')
else: else:
out_console.print('Recording stopped successfully.') console.out.print('Recording stopped successfully.')
@app.command('status | ss') @app.command('status | ss')
@ -70,11 +68,11 @@ def status(ctx: typer.Context):
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if active: if active:
if paused: if paused:
out_console.print('Recording is in progress and paused.') console.out.print('Recording is in progress and paused.')
else: else:
out_console.print('Recording is in progress.') console.out.print('Recording is in progress.')
else: else:
out_console.print('Recording is not in progress.') console.out.print('Recording is not in progress.')
@app.command('resume | r') @app.command('resume | r')
@ -82,14 +80,14 @@ def resume(ctx: typer.Context):
"""Resume recording.""" """Resume recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if not active: if not active:
err_console.print('Recording is not in progress, cannot resume.') console.err.print('Recording is not in progress, cannot resume.')
raise typer.Exit(1) raise typer.Exit(1)
if not paused: if not paused:
err_console.print('Recording is in progress but not paused, cannot resume.') console.err.print('Recording is in progress but not paused, cannot resume.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.resume_record() ctx.obj.resume_record()
out_console.print('Recording resumed successfully.') console.out.print('Recording resumed successfully.')
@app.command('pause | p') @app.command('pause | p')
@ -97,14 +95,14 @@ def pause(ctx: typer.Context):
"""Pause recording.""" """Pause recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if not active: if not active:
err_console.print('Recording is not in progress, cannot pause.') console.err.print('Recording is not in progress, cannot pause.')
raise typer.Exit(1) raise typer.Exit(1)
if paused: if paused:
err_console.print('Recording is in progress but already paused, cannot pause.') console.err.print('Recording is in progress but already paused, cannot pause.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.pause_record() ctx.obj.pause_record()
out_console.print('Recording paused successfully.') console.out.print('Recording paused successfully.')
@app.command('directory | d') @app.command('directory | d')
@ -124,8 +122,8 @@ def directory(
"""Get or set the recording directory.""" """Get or set the recording directory."""
if record_directory is not None: if record_directory is not None:
ctx.obj.set_record_directory(str(record_directory)) ctx.obj.set_record_directory(str(record_directory))
out_console.print(f'Recording directory updated to: {record_directory}') console.out.print(f'Recording directory updated to: {record_directory}')
else: else:
out_console.print( console.out.print(
f'Recording directory: [green]{ctx.obj.get_record_directory().record_directory}[/green]' f'Recording directory: [green]{ctx.obj.get_record_directory().record_directory}[/green]'
) )

View File

@ -1,13 +1,11 @@
"""module containing commands for manipulating the replay buffer in OBS.""" """module containing commands for manipulating the replay buffer in OBS."""
import typer import typer
from rich.console import Console
from . import console
from .alias import AliasGroup from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True, style='bold red')
@app.callback() @app.callback()
@ -20,11 +18,11 @@ def start(ctx: typer.Context):
"""Start the replay buffer.""" """Start the replay buffer."""
resp = ctx.obj.get_replay_buffer_status() resp = ctx.obj.get_replay_buffer_status()
if resp.output_active: if resp.output_active:
err_console.print('Replay buffer is already active.') console.err.print('Replay buffer is already active.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.start_replay_buffer() ctx.obj.start_replay_buffer()
out_console.print('Replay buffer started.') console.out.print('Replay buffer started.')
@app.command('stop | st') @app.command('stop | st')
@ -32,11 +30,11 @@ def stop(ctx: typer.Context):
"""Stop the replay buffer.""" """Stop the replay buffer."""
resp = ctx.obj.get_replay_buffer_status() resp = ctx.obj.get_replay_buffer_status()
if not resp.output_active: if not resp.output_active:
err_console.print('Replay buffer is not active.') console.err.print('Replay buffer is not active.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.stop_replay_buffer() ctx.obj.stop_replay_buffer()
out_console.print('Replay buffer stopped.') console.out.print('Replay buffer stopped.')
@app.command('toggle | tg') @app.command('toggle | tg')
@ -44,9 +42,9 @@ def toggle(ctx: typer.Context):
"""Toggle the replay buffer.""" """Toggle the replay buffer."""
resp = ctx.obj.toggle_replay_buffer() resp = ctx.obj.toggle_replay_buffer()
if resp.output_active: if resp.output_active:
out_console.print('Replay buffer is active.') console.out.print('Replay buffer is active.')
else: else:
out_console.print('Replay buffer is not active.') console.out.print('Replay buffer is not active.')
@app.command('status | ss') @app.command('status | ss')
@ -54,13 +52,13 @@ def status(ctx: typer.Context):
"""Get the status of the replay buffer.""" """Get the status of the replay buffer."""
resp = ctx.obj.get_replay_buffer_status() resp = ctx.obj.get_replay_buffer_status()
if resp.output_active: if resp.output_active:
out_console.print('Replay buffer is active.') console.out.print('Replay buffer is active.')
else: else:
out_console.print('Replay buffer is not active.') console.out.print('Replay buffer is not active.')
@app.command('save | sv') @app.command('save | sv')
def save(ctx: typer.Context): def save(ctx: typer.Context):
"""Save the replay buffer.""" """Save the replay buffer."""
ctx.obj.save_replay_buffer() ctx.obj.save_replay_buffer()
out_console.print('Replay buffer saved.') console.out.print('Replay buffer saved.')

View File

@ -3,15 +3,12 @@
from typing import Annotated from typing import Annotated
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from . import validate from . import console, validate
from .alias import AliasGroup from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True, style='bold red')
@app.callback() @app.callback()
@ -20,7 +17,10 @@ def main():
@app.command('list | ls') @app.command('list | ls')
def list_(ctx: typer.Context): def list_(
ctx: typer.Context,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of scenes')] = False,
):
"""List all scenes.""" """List all scenes."""
resp = ctx.obj.get_scene_list() resp = ctx.obj.get_scene_list()
scenes = ( scenes = (
@ -28,21 +28,43 @@ def list_(ctx: typer.Context):
for scene in reversed(resp.scenes) for scene in reversed(resp.scenes)
) )
active_scene = ctx.obj.get_current_program_scene().scene_name
table = Table(title='Scenes', padding=(0, 2)) table = Table(title='Scenes', padding=(0, 2))
columns = [ if uuid:
('Scene Name', 'left', 'cyan'), columns = [
('UUID', 'left', 'cyan'), ('Scene Name', 'left', 'cyan'),
] ('Active', 'center', None),
('UUID', 'left', 'cyan'),
]
else:
table.title += ' (UUIDs hidden)'
columns = [
('Scene Name', 'left', 'cyan'),
('Active', 'center', None),
]
for column, justify, style in columns: for column, justify, style in columns:
table.add_column(column, justify=justify, style=style) table.add_column(column, justify=justify, style=style)
for scene_name, scene_uuid in scenes: for scene_name, scene_uuid in scenes:
table.add_row( if scene_name == active_scene:
scene_name, scene_output = f'[bold green]{scene_name}[/bold green]'
scene_uuid, else:
) scene_output = f'[dim]{scene_name}[/dim]'
out_console.print(table) if uuid:
table.add_row(
scene_output,
':white_heavy_check_mark:' if scene_name == active_scene else '',
scene_uuid,
)
else:
table.add_row(
scene_output,
':white_heavy_check_mark:' if scene_name == active_scene else '',
)
console.out.print(table)
@app.command('current | get') @app.command('current | get')
@ -54,15 +76,15 @@ def current(
): ):
"""Get the current program scene or preview scene.""" """Get the current program scene or preview scene."""
if preview and not validate.studio_mode_enabled(ctx): if preview and not validate.studio_mode_enabled(ctx):
err_console.print('Studio mode is not enabled, cannot get preview scene.') console.err.print('Studio mode is not enabled, cannot get preview scene.')
raise typer.Exit(1) raise typer.Exit(1)
if preview: if preview:
resp = ctx.obj.get_current_preview_scene() resp = ctx.obj.get_current_preview_scene()
out_console.print(resp.current_preview_scene_name) console.out.print(resp.current_preview_scene_name)
else: else:
resp = ctx.obj.get_current_program_scene() resp = ctx.obj.get_current_program_scene()
out_console.print(resp.current_program_scene_name) console.out.print(resp.current_program_scene_name)
@app.command('switch | set') @app.command('switch | set')
@ -78,16 +100,16 @@ def switch(
): ):
"""Switch to a scene.""" """Switch to a scene."""
if preview and not validate.studio_mode_enabled(ctx): if preview and not validate.studio_mode_enabled(ctx):
err_console.print('Studio mode is not enabled, cannot set the preview scene.') console.err.print('Studio mode is not enabled, cannot set the preview scene.')
raise typer.Exit(1) raise typer.Exit(1)
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f'Scene [yellow]{scene_name}[/yellow] not found.') console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
if preview: if preview:
ctx.obj.set_current_preview_scene(scene_name) ctx.obj.set_current_preview_scene(scene_name)
out_console.print(f'Switched to preview scene: [green]{scene_name}[/green]') console.out.print(f'Switched to preview scene: [green]{scene_name}[/green]')
else: else:
ctx.obj.set_current_program_scene(scene_name) ctx.obj.set_current_program_scene(scene_name)
out_console.print(f'Switched to program scene: [green]{scene_name}[/green]') console.out.print(f'Switched to program scene: [green]{scene_name}[/green]')

View File

@ -3,15 +3,12 @@
from typing import Annotated from typing import Annotated
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from . import validate from . import console, validate
from .alias import AliasGroup from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True, style='bold red')
@app.callback() @app.callback()
@ -30,14 +27,14 @@ def list_(ctx: typer.Context):
for scene_collection_name in resp.scene_collections: for scene_collection_name in resp.scene_collections:
table.add_row(scene_collection_name) table.add_row(scene_collection_name)
out_console.print(table) console.out.print(table)
@app.command('current | get') @app.command('current | get')
def current(ctx: typer.Context): def current(ctx: typer.Context):
"""Get the current scene collection.""" """Get the current scene collection."""
resp = ctx.obj.get_scene_collection_list() resp = ctx.obj.get_scene_collection_list()
out_console.print(resp.current_scene_collection_name) console.out.print(resp.current_scene_collection_name)
@app.command('switch | set') @app.command('switch | set')
@ -49,7 +46,7 @@ def switch(
): ):
"""Switch to a scene collection.""" """Switch to a scene collection."""
if not validate.scene_collection_in_scene_collections(ctx, scene_collection_name): if not validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
err_console.print( console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] not found.' f'Scene collection [yellow]{scene_collection_name}[/yellow] not found.'
) )
raise typer.Exit(1) raise typer.Exit(1)
@ -58,13 +55,13 @@ def switch(
ctx.obj.get_scene_collection_list().current_scene_collection_name ctx.obj.get_scene_collection_list().current_scene_collection_name
) )
if scene_collection_name == current_scene_collection: if scene_collection_name == current_scene_collection:
err_console.print( console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] is already active.' f'Scene collection [yellow]{scene_collection_name}[/yellow] is already active.'
) )
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.set_current_scene_collection(scene_collection_name) ctx.obj.set_current_scene_collection(scene_collection_name)
out_console.print( console.out.print(
f'Switched to scene collection [green]{scene_collection_name}[/green].' f'Switched to scene collection [green]{scene_collection_name}[/green].'
) )
@ -78,12 +75,12 @@ def create(
): ):
"""Create a new scene collection.""" """Create a new scene collection."""
if validate.scene_collection_in_scene_collections(ctx, scene_collection_name): if validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
err_console.print( console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] already exists.' f'Scene collection [yellow]{scene_collection_name}[/yellow] already exists.'
) )
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.create_scene_collection(scene_collection_name) ctx.obj.create_scene_collection(scene_collection_name)
out_console.print( console.out.print(
f'Created scene collection [green]{scene_collection_name}[/green].' f'Created scene collection [green]{scene_collection_name}[/green].'
) )

View File

@ -2,17 +2,13 @@
from typing import Annotated, Optional from typing import Annotated, Optional
import obsws_python as obsws
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from . import validate from . import console, validate
from .alias import AliasGroup from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True, style='bold red')
@app.callback() @app.callback()
@ -30,13 +26,14 @@ def list_(
help='Scene name to list items for', help='Scene name to list items for',
), ),
] = None, ] = None,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of scene items')] = False,
): ):
"""List all items in a scene.""" """List all items in a scene."""
if not scene_name: if not scene_name:
scene_name = ctx.obj.get_current_program_scene().scene_name scene_name = ctx.obj.get_current_program_scene().scene_name
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f'Scene [yellow]{scene_name}[/yellow] not found.') console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_scene_item_list(scene_name) resp = ctx.obj.get_scene_item_list(scene_name)
@ -47,6 +44,7 @@ def list_(
item.get('sourceName'), item.get('sourceName'),
item.get('isGroup'), item.get('isGroup'),
item.get('sceneItemEnabled'), item.get('sceneItemEnabled'),
item.get('sourceUuid', 'N/A'), # Include source UUID
) )
for item in resp.scene_items for item in resp.scene_items
), ),
@ -54,21 +52,31 @@ def list_(
) )
if not items: if not items:
out_console.print(f'No items found in scene [green]{scene_name}[/green].') console.out.print(f'No items found in scene [green]{scene_name}[/green].')
raise typer.Exit() raise typer.Exit()
table = Table(title=f'Items in Scene: {scene_name}', padding=(0, 2)) table = Table(title=f'Items in Scene: {scene_name}', padding=(0, 2))
columns = [ if uuid:
('Item ID', 'center', 'cyan'), columns = [
('Item Name', 'left', 'cyan'), ('Item ID', 'center', 'cyan'),
('In Group', 'left', 'cyan'), ('Item Name', 'left', 'cyan'),
('Enabled', 'center', None), ('In Group', 'left', 'cyan'),
] ('Enabled', 'center', None),
('UUID', 'left', 'cyan'),
]
else:
table.title += ' (UUIDs hidden)'
columns = [
('Item ID', 'center', 'cyan'),
('Item Name', 'left', 'cyan'),
('In Group', 'left', 'cyan'),
('Enabled', 'center', None),
]
# Add columns to the table # Add columns to the table
for column, justify, style in columns: for column, justify, style in columns:
table.add_column(column, justify=justify, style=style) table.add_column(column, justify=justify, style=style)
for item_id, item_name, is_group, is_enabled in items: for item_id, item_name, is_group, is_enabled, source_uuid in items:
if is_group: if is_group:
resp = ctx.obj.get_group_scene_item_list(item_name) resp = ctx.obj.get_group_scene_item_list(item_name)
group_items = sorted( group_items = sorted(
@ -77,29 +85,55 @@ def list_(
gi.get('sceneItemId'), gi.get('sceneItemId'),
gi.get('sourceName'), gi.get('sourceName'),
gi.get('sceneItemEnabled'), gi.get('sceneItemEnabled'),
gi.get('sourceUuid', 'N/A'), # Include source UUID
) )
for gi in resp.scene_items for gi in resp.scene_items
), ),
key=lambda x: x[0], # Sort by sceneItemId key=lambda x: x[0], # Sort by sceneItemId
) )
for group_item_id, group_item_name, group_item_enabled in group_items: for (
table.add_row( group_item_id,
str(group_item_id), group_item_name,
group_item_name, group_item_enabled,
item_name, group_item_source_uuid,
':white_heavy_check_mark:' ) in group_items:
if is_enabled and group_item_enabled if uuid:
else ':x:', table.add_row(
) str(group_item_id),
group_item_name,
item_name,
':white_heavy_check_mark:'
if is_enabled and group_item_enabled
else ':x:',
group_item_source_uuid,
)
else:
table.add_row(
str(group_item_id),
group_item_name,
item_name,
':white_heavy_check_mark:'
if is_enabled and group_item_enabled
else ':x:',
)
else: else:
table.add_row( if uuid:
str(item_id), table.add_row(
item_name, str(item_id),
'', item_name,
':white_heavy_check_mark:' if is_enabled else ':x:', '',
) ':white_heavy_check_mark:' if is_enabled else ':x:',
source_uuid,
)
else:
table.add_row(
str(item_id),
item_name,
'',
':white_heavy_check_mark:' if is_enabled else ':x:',
)
out_console.print(table) console.out.print(table)
def _validate_sources( def _validate_sources(
@ -110,19 +144,21 @@ def _validate_sources(
) -> bool: ) -> bool:
"""Validate the scene name and item name.""" """Validate the scene name and item name."""
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f'Scene [yellow]{scene_name}[/yellow] not found.') console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
return False return False
if group: if group:
if not validate.item_in_scene_item_list(ctx, scene_name, group): if not validate.item_in_scene_item_list(ctx, scene_name, group):
err_console.print( console.err.print(
f'Group [yellow]{group}[/yellow] not found in scene [yellow]{scene_name}[/yellow].' f'Group [yellow]{group}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
) )
return False return False
else: else:
if not validate.item_in_scene_item_list(ctx, scene_name, item_name): if not validate.item_in_scene_item_list(ctx, scene_name, item_name):
err_console.print( console.err.print(
f'Item [yellow]{item_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].' f'Item [yellow]{item_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow]. Is the item in a group? '
f'If so use the [yellow]--group[/yellow] option to specify the parent group.\n'
'Use `obsws-cli sceneitem list` for a list of items in the scene.'
) )
return False return False
@ -141,23 +177,12 @@ def _get_scene_name_and_item_id(
scene_item_id = item.get('sceneItemId') scene_item_id = item.get('sceneItemId')
break break
else: else:
err_console.print( console.err.print(
f'Item [yellow]{item_name}[/yellow] not found in group [yellow]{group}[/yellow].' f'Item [yellow]{item_name}[/yellow] not found in group [yellow]{group}[/yellow].'
) )
raise typer.Exit(1) raise typer.Exit(1)
else: else:
try: resp = ctx.obj.get_scene_item_id(scene_name, item_name)
resp = ctx.obj.get_scene_item_id(scene_name, item_name)
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
err_console.print(
f'Item [yellow]{item_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow]. Is the item in a group? '
'If so use the --group option to specify the parent group. '
'Use `obsws-cli sceneitem list` for a list of items in the scene.'
)
raise typer.Exit(1)
else:
raise
scene_item_id = resp.scene_item_id scene_item_id = resp.scene_item_id
return scene_name, scene_item_id return scene_name, scene_item_id
@ -191,7 +216,7 @@ def show(
) )
if group: if group:
out_console.print( console.out.print(
f'Item [green]{item_name}[/green] in group [green]{group}[/green] in scene [green]{old_scene_name}[/green] has been shown.' f'Item [green]{item_name}[/green] in group [green]{group}[/green] in scene [green]{old_scene_name}[/green] has been shown.'
) )
else: else:
@ -199,7 +224,7 @@ def show(
# This is to avoid confusion with the parent group name # This is to avoid confusion with the parent group name
# which is not the same as the scene name # which is not the same as the scene name
# and is not needed in this case # and is not needed in this case
out_console.print( console.out.print(
f'Item [green]{item_name}[/green] in scene [green]{scene_name}[/green] has been shown.' f'Item [green]{item_name}[/green] in scene [green]{scene_name}[/green] has been shown.'
) )
@ -232,7 +257,7 @@ def hide(
) )
if group: if group:
out_console.print( console.out.print(
f'Item [green]{item_name}[/green] in group [green]{group}[/green] in scene [green]{old_scene_name}[/green] has been hidden.' f'Item [green]{item_name}[/green] in group [green]{group}[/green] in scene [green]{old_scene_name}[/green] has been hidden.'
) )
else: else:
@ -240,7 +265,7 @@ def hide(
# This is to avoid confusion with the parent group name # This is to avoid confusion with the parent group name
# which is not the same as the scene name # which is not the same as the scene name
# and is not needed in this case # and is not needed in this case
out_console.print( console.out.print(
f'Item [green]{item_name}[/green] in scene [green]{scene_name}[/green] has been hidden.' f'Item [green]{item_name}[/green] in scene [green]{scene_name}[/green] has been hidden.'
) )
@ -282,11 +307,11 @@ def toggle(
if group: if group:
if new_state: if new_state:
out_console.print( console.out.print(
f'Item [green]{item_name}[/green] in group [green]{group}[/green] in scene [green]{old_scene_name}[/green] has been shown.' f'Item [green]{item_name}[/green] in group [green]{group}[/green] in scene [green]{old_scene_name}[/green] has been shown.'
) )
else: else:
out_console.print( console.out.print(
f'Item [green]{item_name}[/green] in group [green]{group}[/green] in scene [green]{old_scene_name}[/green] has been hidden.' f'Item [green]{item_name}[/green] in group [green]{group}[/green] in scene [green]{old_scene_name}[/green] has been hidden.'
) )
else: else:
@ -295,11 +320,11 @@ def toggle(
# which is not the same as the scene name # which is not the same as the scene name
# and is not needed in this case # and is not needed in this case
if new_state: if new_state:
out_console.print( console.out.print(
f'Item [green]{item_name}[/green] in scene [green]{scene_name}[/green] has been shown.' f'Item [green]{item_name}[/green] in scene [green]{scene_name}[/green] has been shown.'
) )
else: else:
out_console.print( console.out.print(
f'Item [green]{item_name}[/green] in scene [green]{scene_name}[/green] has been hidden.' f'Item [green]{item_name}[/green] in scene [green]{scene_name}[/green] has been hidden.'
) )
@ -333,7 +358,7 @@ def visible(
) )
if group: if group:
out_console.print( console.out.print(
f'Item [green]{item_name}[/green] in group [green]{group}[/green] in scene [green]{old_scene_name}[/green] is currently {"visible" if enabled.scene_item_enabled else "hidden"}.' f'Item [green]{item_name}[/green] in group [green]{group}[/green] in scene [green]{old_scene_name}[/green] is currently {"visible" if enabled.scene_item_enabled else "hidden"}.'
) )
else: else:
@ -341,7 +366,7 @@ def visible(
# This is to avoid confusion with the parent group name # This is to avoid confusion with the parent group name
# which is not the same as the scene name # which is not the same as the scene name
# and is not needed in this case # and is not needed in this case
out_console.print( console.out.print(
f'Item [green]{item_name}[/green] in scene [green]{scene_name}[/green] is currently {"visible" if enabled.scene_item_enabled else "hidden"}.' f'Item [green]{item_name}[/green] in scene [green]{scene_name}[/green] is currently {"visible" if enabled.scene_item_enabled else "hidden"}.'
) )
@ -447,7 +472,7 @@ def transform(
transform['scaleY'] = scale_y transform['scaleY'] = scale_y
if not transform: if not transform:
err_console.print('No transform options provided.') console.err.print('No transform options provided.')
raise typer.Exit(1) raise typer.Exit(1)
transform = ctx.obj.set_scene_item_transform( transform = ctx.obj.set_scene_item_transform(
@ -457,7 +482,7 @@ def transform(
) )
if group: if group:
out_console.print( console.out.print(
f'Item [green]{item_name}[/green] in group [green]{group}[/green] in scene [green]{old_scene_name}[/green] has been transformed.' f'Item [green]{item_name}[/green] in group [green]{group}[/green] in scene [green]{old_scene_name}[/green] has been transformed.'
) )
else: else:
@ -465,6 +490,6 @@ def transform(
# This is to avoid confusion with the parent group name # This is to avoid confusion with the parent group name
# which is not the same as the scene name # which is not the same as the scene name
# and is not needed in this case # and is not needed in this case
out_console.print( console.out.print(
f'Item [green]{item_name}[/green] in scene [green]{scene_name}[/green] has been transformed.' f'Item [green]{item_name}[/green] in scene [green]{scene_name}[/green] has been transformed.'
) )

View File

@ -5,15 +5,11 @@ from typing import Annotated
import obsws_python as obsws import obsws_python as obsws
import typer import typer
from rich.console import Console
from . import console
from .alias import AliasGroup from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(
stderr=True,
)
@app.callback() @app.callback()
@ -78,17 +74,17 @@ def save(
except obsws.error.OBSSDKRequestError as e: except obsws.error.OBSSDKRequestError as e:
match e.code: match e.code:
case 403: case 403:
err_console.print( console.err.print(
'The [yellow]image format[/yellow] (file extension) must be included in the file name, ' 'The [yellow]image format[/yellow] (file extension) must be included in the file name, '
"for example: '/path/to/screenshot.png'.", "for example: '/path/to/screenshot.png'.",
) )
raise typer.Exit(1) raise typer.Exit(1)
case 600: case 600:
err_console.print( console.err.print(
f'No source was found by the name of [yellow]{source_name}[/yellow]' f'No source was found by the name of [yellow]{source_name}[/yellow]'
) )
raise typer.Exit(1) raise typer.Exit(1)
case _: case _:
raise raise
out_console.print(f'Screenshot saved to [green]{output_path}[/green].') console.out.print(f'Screenshot saved to [green]{output_path}[/green].')

View File

@ -22,6 +22,8 @@ class Settings(UserDict):
""" """
PREFIX = 'OBS_'
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
"""Initialize the Settings object.""" """Initialize the Settings object."""
kwargs.update( kwargs.update(
@ -34,19 +36,25 @@ class Settings(UserDict):
def __getitem__(self, key: str) -> SettingsValue: def __getitem__(self, key: str) -> SettingsValue:
"""Get a setting value by key.""" """Get a setting value by key."""
if not key.startswith('OBS_'): key = key.upper()
key = f'OBS_{key}' if not key.startswith(Settings.PREFIX):
return self.data[key.upper()] key = f'{Settings.PREFIX}{key}'
return self.data[key]
def __setitem__(self, key: str, value: SettingsValue): def __setitem__(self, key: str, value: SettingsValue):
"""Set a setting value by key.""" """Set a setting value by key."""
if not key.startswith('OBS_'): key = key.upper()
key = f'OBS_{key}' if not key.startswith(Settings.PREFIX):
self.data[key.upper()] = value key = f'{Settings.PREFIX}{key}'
self.data[key] = value
_settings = Settings( _settings = Settings(
OBS_HOST='localhost', OBS_PORT=4455, OBS_PASSWORD='', OBS_TIMEOUT=5 OBS_HOST='localhost',
OBS_PORT=4455,
OBS_PASSWORD='',
OBS_TIMEOUT=5,
OBS_DEBUG=False,
) )

View File

@ -1,13 +1,11 @@
"""module for controlling OBS stream functionality.""" """module for controlling OBS stream functionality."""
import typer import typer
from rich.console import Console
from . import console
from .alias import AliasGroup from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True, style='bold red')
@app.callback() @app.callback()
@ -26,11 +24,11 @@ def start(ctx: typer.Context):
"""Start streaming.""" """Start streaming."""
active, _ = _get_streaming_status(ctx) active, _ = _get_streaming_status(ctx)
if active: if active:
err_console.print('Streaming is already in progress, cannot start.') console.err.print('Streaming is already in progress, cannot start.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.start_stream() ctx.obj.start_stream()
out_console.print('Streaming started successfully.') console.out.print('Streaming started successfully.')
@app.command('stop | st') @app.command('stop | st')
@ -38,11 +36,11 @@ def stop(ctx: typer.Context):
"""Stop streaming.""" """Stop streaming."""
active, _ = _get_streaming_status(ctx) active, _ = _get_streaming_status(ctx)
if not active: if not active:
err_console.print('Streaming is not in progress, cannot stop.') console.err.print('Streaming is not in progress, cannot stop.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.stop_stream() ctx.obj.stop_stream()
out_console.print('Streaming stopped successfully.') console.out.print('Streaming stopped successfully.')
@app.command('toggle | tg') @app.command('toggle | tg')
@ -50,9 +48,9 @@ def toggle(ctx: typer.Context):
"""Toggle streaming.""" """Toggle streaming."""
resp = ctx.obj.toggle_stream() resp = ctx.obj.toggle_stream()
if resp.output_active: if resp.output_active:
out_console.print('Streaming started successfully.') console.out.print('Streaming started successfully.')
else: else:
out_console.print('Streaming stopped successfully.') console.out.print('Streaming stopped successfully.')
@app.command('status | ss') @app.command('status | ss')
@ -65,19 +63,19 @@ def status(ctx: typer.Context):
minutes = int(seconds // 60) minutes = int(seconds // 60)
seconds = int(seconds % 60) seconds = int(seconds % 60)
if minutes > 0: if minutes > 0:
out_console.print( console.out.print(
f'Streaming is in progress for {minutes} minutes and {seconds} seconds.' f'Streaming is in progress for {minutes} minutes and {seconds} seconds.'
) )
else: else:
if seconds > 0: if seconds > 0:
out_console.print( console.out.print(
f'Streaming is in progress for {seconds} seconds.' f'Streaming is in progress for {seconds} seconds.'
) )
else: else:
out_console.print( console.out.print(
'Streaming is in progress for less than a second.' 'Streaming is in progress for less than a second.'
) )
else: else:
out_console.print('Streaming is in progress.') console.out.print('Streaming is in progress.')
else: else:
out_console.print('Streaming is not in progress.') console.out.print('Streaming is not in progress.')

View File

@ -1,13 +1,11 @@
"""module containing commands for manipulating studio mode in OBS.""" """module containing commands for manipulating studio mode in OBS."""
import typer import typer
from rich.console import Console
from . import console
from .alias import AliasGroup from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True, style='bold red')
@app.callback() @app.callback()
@ -19,14 +17,14 @@ def main():
def enable(ctx: typer.Context): def enable(ctx: typer.Context):
"""Enable studio mode.""" """Enable studio mode."""
ctx.obj.set_studio_mode_enabled(True) ctx.obj.set_studio_mode_enabled(True)
out_console.print('Studio mode has been enabled.') console.out.print('Studio mode has been enabled.')
@app.command('disable | off') @app.command('disable | off')
def disable(ctx: typer.Context): def disable(ctx: typer.Context):
"""Disable studio mode.""" """Disable studio mode."""
ctx.obj.set_studio_mode_enabled(False) ctx.obj.set_studio_mode_enabled(False)
out_console.print('Studio mode has been disabled.') console.out.print('Studio mode has been disabled.')
@app.command('toggle | tg') @app.command('toggle | tg')
@ -35,10 +33,10 @@ def toggle(ctx: typer.Context):
resp = ctx.obj.get_studio_mode_enabled() resp = ctx.obj.get_studio_mode_enabled()
if resp.studio_mode_enabled: if resp.studio_mode_enabled:
ctx.obj.set_studio_mode_enabled(False) ctx.obj.set_studio_mode_enabled(False)
out_console.print('Studio mode is now disabled.') console.out.print('Studio mode is now disabled.')
else: else:
ctx.obj.set_studio_mode_enabled(True) ctx.obj.set_studio_mode_enabled(True)
out_console.print('Studio mode is now enabled.') console.out.print('Studio mode is now enabled.')
@app.command('status | ss') @app.command('status | ss')
@ -46,6 +44,6 @@ def status(ctx: typer.Context):
"""Get the status of studio mode.""" """Get the status of studio mode."""
resp = ctx.obj.get_studio_mode_enabled() resp = ctx.obj.get_studio_mode_enabled()
if resp.studio_mode_enabled: if resp.studio_mode_enabled:
out_console.print('Studio mode is enabled.') console.out.print('Studio mode is enabled.')
else: else:
out_console.print('Studio mode is disabled.') console.out.print('Studio mode is disabled.')

View File

@ -1,13 +1,11 @@
"""module containing commands for manipulating virtual camera in OBS.""" """module containing commands for manipulating virtual camera in OBS."""
import typer import typer
from rich.console import Console
from . import console
from .alias import AliasGroup from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True, style='bold red')
@app.callback() @app.callback()
@ -19,14 +17,14 @@ def main():
def start(ctx: typer.Context): def start(ctx: typer.Context):
"""Start the virtual camera.""" """Start the virtual camera."""
ctx.obj.start_virtual_cam() ctx.obj.start_virtual_cam()
out_console.print('Virtual camera started.') console.out.print('Virtual camera started.')
@app.command('stop | p') @app.command('stop | p')
def stop(ctx: typer.Context): def stop(ctx: typer.Context):
"""Stop the virtual camera.""" """Stop the virtual camera."""
ctx.obj.stop_virtual_cam() ctx.obj.stop_virtual_cam()
out_console.print('Virtual camera stopped.') console.out.print('Virtual camera stopped.')
@app.command('toggle | tg') @app.command('toggle | tg')
@ -34,9 +32,9 @@ def toggle(ctx: typer.Context):
"""Toggle the virtual camera.""" """Toggle the virtual camera."""
resp = ctx.obj.toggle_virtual_cam() resp = ctx.obj.toggle_virtual_cam()
if resp.output_active: if resp.output_active:
out_console.print('Virtual camera is enabled.') console.out.print('Virtual camera is enabled.')
else: else:
out_console.print('Virtual camera is disabled.') console.out.print('Virtual camera is disabled.')
@app.command('status | ss') @app.command('status | ss')
@ -44,6 +42,6 @@ def status(ctx: typer.Context):
"""Get the status of the virtual camera.""" """Get the status of the virtual camera."""
resp = ctx.obj.get_virtual_cam_status() resp = ctx.obj.get_virtual_cam_status()
if resp.output_active: if resp.output_active:
out_console.print('Virtual camera is enabled.') console.out.print('Virtual camera is enabled.')
else: else:
out_console.print('Virtual camera is disabled.') console.out.print('Virtual camera is disabled.')