Compare commits

...

27 Commits

Author SHA1 Message Date
34067ca61d patch bump 2026-01-25 16:20:03 +00:00
3ca138ef6d add 0.24.6 to CHANGELOG 2026-01-25 16:19:37 +00:00
1b8bc72097 update Environment Variable section in README 2026-01-25 16:19:28 +00:00
3a8d4ef0f0 rename config module to envconfig
envconfig:
- add method for normalising the keys
- add has_key method
- update env var prefix to OBSWS_CLI_

update tests to reflect changes
2026-01-25 16:19:12 +00:00
1fc0bef237 dynamically load commands from obsws_cli.commands
no changes to files other than imports

patch bump
2026-01-24 22:29:33 +00:00
8bec6908e5 move studio mode enabled validation into callback
patch bump
2026-01-24 02:34:45 +00:00
2c03b28fc6 fix type annotations 2026-01-12 21:13:16 +00:00
f1e29e0d4f fix settings position 2026-01-10 14:21:30 +00:00
c7b60ecaf9 patch bump 2026-01-10 14:18:24 +00:00
a05fce26f2 add media and settings aliases on the root typer 2026-01-10 14:18:04 +00:00
5355d29a31 keep it consistent 2026-01-10 14:04:27 +00:00
add9743b00 patch bump 2026-01-10 14:00:20 +00:00
8aa1fb2c09 add validate.timecode_format
add None checks for callbacks with optional values
2026-01-10 13:59:55 +00:00
5c7fc24839 patch bump 2026-01-09 23:24:54 +00:00
e4ab4ae630 remove unused monitor_exists() function 2026-01-09 23:22:14 +00:00
9cdbc657fa profile_exists validation log now callbacks 2026-01-09 23:19:49 +00:00
f74ec9cd93 scene_collection validation logic now in callbacks 2026-01-09 23:14:34 +00:00
329aec084c scene_in_scenes validation now a callback 2026-01-09 23:07:06 +00:00
3eaa3992a0 bump version in CHANGELOG 2026-01-09 19:53:18 +00:00
7c86aa8a8b minor version bump 2026-01-09 19:51:31 +00:00
09ca892fcb add Media to README 2026-01-09 19:51:11 +00:00
81fcb4e504 implement media command group 2026-01-09 19:51:03 +00:00
3f3b331363 bump version in CHANGELOG 2026-01-09 13:48:52 +00:00
2535fe85c5 minor bump 2026-01-09 13:47:18 +00:00
7d4485ec05 add Settings to README 2026-01-09 13:45:32 +00:00
2c2501e017 implement settings command group 2026-01-09 13:45:25 +00:00
356684e5d4 rename Settings to Config 2026-01-09 09:39:19 +00:00
28 changed files with 934 additions and 249 deletions

View File

@ -5,11 +5,19 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
# [0.22.0] - 2026-01-09
# [0.24.6] - 2026-01-26
### Changed
- environment variables should now be prefixed with 'OBSWS_CLI_', see [Environment Variables](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#environment-variables)
# [0.24.0] - 2026-01-09
### Added
- new subcommands added to input, see [Input](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#input)
- settings command group, see [Settings](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#settings)
- media command group, see [Media](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#media)
# [0.20.0] - 2025-07-14

103
README.md
View File

@ -62,9 +62,9 @@ Store and load environment variables from:
- `user home directory / .config / obsws-cli / obsws.env`
```env
OBS_HOST=localhost
OBS_PORT=4455
OBS_PASSWORD=<websocket password>
OBSWS_CLI_HOST=localhost
OBSWS_CLI_PORT=4455
OBSWS_CLI_PASSWORD=<websocket password>
```
Flags can be used to override environment variables.
@ -96,8 +96,8 @@ obsws-cli --style="cyan" --no-border sceneitem list
Or with environment variables:
```env
OBS_STYLE=cyan
OBS_STYLE_NO_BORDER=true
OBSWS_CLI_STYLE=cyan
OBSWS_CLI_STYLE_NO_BORDER=true
```
## Root Typer
@ -722,6 +722,99 @@ obsws-cli projector open --monitor-index=1 "test_group"
obsws-cli screenshot save --width=2560 --height=1440 "Scene" "C:\Users\me\Videos\screenshot.png"
```
#### Settings
- show: Show current OBS settings.
- flags:
*optional*
- --video: Show video settings.
- --record: Show recording settings.
- --profile: Show profile settings.
```console
obsws-cli settings show --video --record
```
- profile: Get/set OBS profile settings.
- args: <category> <name> <value>
```console
obsws-cli settings profile SimpleOutput VBitrate
obsws-cli settings profile SimpleOutput VBitrate 6000
```
- stream-service: Get/set OBS stream service settings.
- flags:
- --key: Stream key.
- --server: Stream server URL.
*optional*
- args: <type>
```console
obsws-cli settings stream-service
obsws-cli settings stream-service --key='live_xyzxyzxyzxyz' rtmp_common
```
- video: Get/set OBS video settings.
- flags:
*optional*
- --base-width: Base (canvas) width.
- --base-height: Base (canvas) height.
- --output-width: Output (scaled) width.
- --output-height: Output (scaled) height.
- --fps-num: Frames per second numerator.
- --fps-den: Frames per second denominator.
```console
obsws-cli settings video
obsws-cli settings video --base-width=1920 --base-height=1080
```
#### Media
- cursor: Get/set the cursor position of a media input.
- args: InputName
*optional*
- TimeString
```console
obsws-cli media cursor "Media"
obsws-cli media cursor "Media" "00:08:30"
```
- play: Plays a media input.
```console
obsws-cli media play "Media"
```
- pause: Pauses a media input.
```console
obsws-cli media pause "Media"
```
- stop: Stops a media input.
```console
obsws-cli media stop "Media"
```
- restart: Restarts a media input.
```console
obsws-cli media restart "Media"
```
## License
`obsws-cli` is distributed under the terms of the [MIT](https://spdx.org/licenses/MIT.html) license.

View File

@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online>
#
# SPDX-License-Identifier: MIT
__version__ = '0.22.0'
__version__ = '0.24.6'

View File

@ -24,6 +24,8 @@ class RootTyperAliasGroup(typer.core.TyperGroup):
cmd_name = 'hotkey'
case 'i':
cmd_name = 'input'
case 'm':
cmd_name = 'media'
case 'prf':
cmd_name = 'profile'
case 'prj':
@ -40,6 +42,8 @@ class RootTyperAliasGroup(typer.core.TyperGroup):
cmd_name = 'sceneitem'
case 'ss':
cmd_name = 'screenshot'
case 'set':
cmd_name = 'settings'
case 'st':
cmd_name = 'stream'
case 'sm':

View File

@ -2,6 +2,7 @@
import importlib
import logging
import pkgutil
from typing import Annotated
import obsws_python as obsws
@ -9,30 +10,15 @@ import typer
from obsws_cli.__about__ import __version__ as version
from . import console, settings, styles
from . import commands, console, envconfig, styles
from .alias import RootTyperAliasGroup
app = typer.Typer(cls=RootTyperAliasGroup)
for sub_typer in (
'filter',
'group',
'hotkey',
'input',
'profile',
'projector',
'record',
'replaybuffer',
'scene',
'scenecollection',
'sceneitem',
'screenshot',
'stream',
'studiomode',
'text',
'virtualcam',
for importer, modname, ispkg in pkgutil.iter_modules(
commands.__path__, commands.__name__ + '.'
):
module = importlib.import_module(f'.{sub_typer}', package=__package__)
app.add_typer(module.app, name=sub_typer)
subtyper = importlib.import_module(modname)
app.add_typer(subtyper.app, name=modname.split('.')[-1])
def version_callback(value: bool):
@ -68,62 +54,62 @@ def main(
typer.Option(
'--host',
'-H',
envvar='OBS_HOST',
envvar='OBSWS_CLI_HOST',
help='WebSocket host',
show_default='localhost',
),
] = settings.get('host'),
] = envconfig.get('host'),
port: Annotated[
int,
typer.Option(
'--port',
'-P',
envvar='OBS_PORT',
envvar='OBSWS_CLI_PORT',
help='WebSocket port',
show_default=4455,
),
] = settings.get('port'),
] = envconfig.get('port'),
password: Annotated[
str,
typer.Option(
'--password',
'-p',
envvar='OBS_PASSWORD',
envvar='OBSWS_CLI_PASSWORD',
help='WebSocket password',
show_default=False,
),
] = settings.get('password'),
] = envconfig.get('password'),
timeout: Annotated[
int,
typer.Option(
'--timeout',
'-T',
envvar='OBS_TIMEOUT',
envvar='OBSWS_CLI_TIMEOUT',
help='WebSocket timeout',
show_default=5,
),
] = settings.get('timeout'),
] = envconfig.get('timeout'),
style: Annotated[
str,
typer.Option(
'--style',
'-s',
envvar='OBS_STYLE',
envvar='OBSWS_CLI_STYLE',
help='Set the style for the CLI output',
show_default='disabled',
callback=validate_style,
),
] = settings.get('style'),
] = envconfig.get('style'),
no_border: Annotated[
bool,
typer.Option(
'--no-border',
'-b',
envvar='OBS_STYLE_NO_BORDER',
envvar='OBSWS_CLI_STYLE_NO_BORDER',
help='Disable table border styling in the CLI output',
show_default=False,
),
] = settings.get('style_no_border'),
] = envconfig.get('style_no_border'),
version: Annotated[
bool,
typer.Option(
@ -140,14 +126,14 @@ def main(
typer.Option(
'--debug',
'-d',
envvar='OBS_DEBUG',
envvar='OBSWS_CLI_DEBUG',
is_eager=True,
help='Enable debug logging',
show_default=False,
callback=setup_logging,
hidden=True,
),
] = settings.get('debug'),
] = envconfig.get('debug'),
):
"""obsws_cli is a command line interface for the OBS WebSocket API."""
ctx.ensure_object(dict)

View File

@ -7,8 +7,8 @@ import typer
from rich.table import Table
from rich.text import Text
from . import console, util
from .alias import SubTyperAliasGroup
from obsws_cli import console, util
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)

View File

@ -6,9 +6,9 @@ import typer
from rich.table import Table
from rich.text import Text
from . import console, util, validate
from .alias import SubTyperAliasGroup
from .protocols import DataclassProtocol
from obsws_cli import console, util, validate
from obsws_cli.alias import SubTyperAliasGroup
from obsws_cli.protocols import DataclassProtocol
app = typer.Typer(cls=SubTyperAliasGroup)
@ -26,17 +26,14 @@ def list_(
typer.Argument(
show_default='The current scene',
help='Scene name to list groups for',
callback=validate.scene_in_scenes,
),
] = None,
):
"""List groups in a scene."""
if not scene_name:
if scene_name is None:
scene_name = ctx.obj['obsws'].get_current_program_scene().scene_name
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
groups = [
(item.get('sceneItemId'), item.get('sourceName'), item.get('sceneItemEnabled'))
@ -92,17 +89,18 @@ def show(
ctx: typer.Context,
scene_name: Annotated[
str,
typer.Argument(..., show_default=False, help='Scene name the group is in'),
typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
],
group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to show')
],
):
"""Show a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
console.err.print(
@ -123,17 +121,19 @@ def show(
def hide(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the group is in')
str,
typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
],
group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to hide')
],
):
"""Hide a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
console.err.print(
@ -154,17 +154,19 @@ def hide(
def toggle(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the group is in')
str,
typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
],
group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to toggle')
],
):
"""Toggle a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
console.err.print(
@ -189,17 +191,19 @@ def toggle(
def status(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the group is in')
str,
typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
],
group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to check status')
],
):
"""Get the status of a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
console.err.print(

View File

@ -6,8 +6,8 @@ import typer
from rich.table import Table
from rich.text import Text
from . import console
from .alias import SubTyperAliasGroup
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)

View File

@ -7,8 +7,8 @@ import typer
from rich.table import Table
from rich.text import Text
from . import console, util, validate
from .alias import SubTyperAliasGroup
from obsws_cli import console, util, validate
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)

101
obsws_cli/commands/media.py Normal file
View File

@ -0,0 +1,101 @@
"""module containing commands for media inputs."""
from typing import Annotated, Optional
import typer
from obsws_cli import console, util, validate
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Commands for media inputs."""
@app.command('cursor | c')
def cursor(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
timecode: Annotated[
Optional[str],
typer.Argument(
...,
help='The timecode to set the cursor to (format: HH:MM:SS).',
callback=validate.timecode_format,
),
] = None,
):
"""Get/set the cursor position of a media input."""
if timecode is None:
resp = ctx.obj['obsws'].get_media_input_status(input_name)
console.out.print(
f'Cursor for {console.highlight(ctx, input_name)} is at {util.milliseconds_to_timecode(resp.media_cursor)}.'
)
return
cursor_position = util.timecode_to_milliseconds(timecode)
ctx.obj['obsws'].set_media_input_cursor(input_name, cursor_position)
console.out.print(
f'Cursor for {console.highlight(ctx, input_name)} set to {timecode}.'
)
@app.command('play | p')
def play(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Get/set the playing status of a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_PLAY'
)
console.out.print(f'Playing media input {console.highlight(ctx, input_name)}.')
@app.command('pause | pa')
def pause(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Pause a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_PAUSE'
)
console.out.print(f'Paused media input {console.highlight(ctx, input_name)}.')
@app.command('stop | s')
def stop(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Stop a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_STOP'
)
console.out.print(f'Stopped media input {console.highlight(ctx, input_name)}.')
@app.command('restart | r')
def restart(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Restart a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_RESTART'
)
console.out.print(f'Restarted media input {console.highlight(ctx, input_name)}.')

View File

@ -6,8 +6,8 @@ import typer
from rich.table import Table
from rich.text import Text
from . import console, util, validate
from .alias import SubTyperAliasGroup
from obsws_cli import console, util, validate
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@ -62,15 +62,14 @@ def switch(
profile_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='Name of the profile to switch to'
...,
show_default=False,
help='Name of the profile to switch to',
callback=validate.profile_exists,
),
],
):
"""Switch to a profile."""
if not validate.profile_exists(ctx, profile_name):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_profile_list()
if resp.current_profile_name == profile_name:
console.err.print(
@ -87,14 +86,15 @@ def create(
ctx: typer.Context,
profile_name: Annotated[
str,
typer.Argument(..., show_default=False, help='Name of the profile to create.'),
typer.Argument(
...,
show_default=False,
help='Name of the profile to create.',
callback=validate.profile_not_exists,
),
],
):
"""Create a new profile."""
if validate.profile_exists(ctx, profile_name):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] already exists.')
raise typer.Exit(1)
ctx.obj['obsws'].create_profile(profile_name)
console.out.print(f'Created profile {console.highlight(ctx, profile_name)}.')
@ -104,13 +104,14 @@ def remove(
ctx: typer.Context,
profile_name: Annotated[
str,
typer.Argument(..., show_default=False, help='Name of the profile to remove.'),
typer.Argument(
...,
show_default=False,
help='Name of the profile to remove.',
callback=validate.profile_exists,
),
],
):
"""Remove a profile."""
if not validate.profile_exists(ctx, profile_name):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise typer.Exit(1)
ctx.obj['obsws'].remove_profile(profile_name)
console.out.print(f'Removed profile {console.highlight(ctx, profile_name)}.')

View File

@ -6,8 +6,8 @@ import typer
from rich.table import Table
from rich.text import Text
from . import console
from .alias import SubTyperAliasGroup
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)

View File

@ -5,8 +5,8 @@ from typing import Annotated, Optional
import typer
from . import console
from .alias import SubTyperAliasGroup
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)

View File

@ -2,8 +2,8 @@
import typer
from . import console
from .alias import SubTyperAliasGroup
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)

View File

@ -6,8 +6,8 @@ import typer
from rich.table import Table
from rich.text import Text
from . import console, util, validate
from .alias import SubTyperAliasGroup
from obsws_cli import console, util, validate
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@ -70,14 +70,14 @@ def list_(
def current(
ctx: typer.Context,
preview: Annotated[
bool, typer.Option(help='Get the preview scene instead of the program scene')
bool,
typer.Option(
help='Get the preview scene instead of the program scene',
callback=validate.studio_mode_enabled,
),
] = False,
):
"""Get the current program scene or preview scene."""
if preview and not validate.studio_mode_enabled(ctx):
console.err.print('Studio mode is not enabled, cannot get preview scene.')
raise typer.Exit(1)
if preview:
resp = ctx.obj['obsws'].get_current_preview_scene()
console.out.print(
@ -94,22 +94,22 @@ def current(
def switch(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., help='Name of the scene to switch to')
str,
typer.Argument(
...,
help='Name of the scene to switch to',
callback=validate.scene_in_scenes,
),
],
preview: Annotated[
bool,
typer.Option(help='Switch to the preview scene instead of the program scene'),
typer.Option(
help='Switch to the preview scene instead of the program scene',
callback=validate.studio_mode_enabled,
),
] = False,
):
"""Switch to a scene."""
if preview and not validate.studio_mode_enabled(ctx):
console.err.print('Studio mode is not enabled, cannot set the preview scene.')
raise typer.Exit(1)
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1)
if preview:
ctx.obj['obsws'].set_current_preview_scene(scene_name)
console.out.print(

View File

@ -5,8 +5,8 @@ from typing import Annotated
import typer
from rich.table import Table
from . import console, validate
from .alias import SubTyperAliasGroup
from obsws_cli import console, validate
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@ -53,16 +53,15 @@ def current(ctx: typer.Context):
def switch(
ctx: typer.Context,
scene_collection_name: Annotated[
str, typer.Argument(..., help='Name of the scene collection to switch to')
str,
typer.Argument(
...,
help='Name of the scene collection to switch to',
callback=validate.scene_collection_in_scene_collections,
),
],
):
"""Switch to a scene collection."""
if not validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] not found.'
)
raise typer.Exit(1)
current_scene_collection = (
ctx.obj['obsws'].get_scene_collection_list().current_scene_collection_name
)
@ -82,16 +81,15 @@ def switch(
def create(
ctx: typer.Context,
scene_collection_name: Annotated[
str, typer.Argument(..., help='Name of the scene collection to create')
str,
typer.Argument(
...,
help='Name of the scene collection to create',
callback=validate.scene_collection_not_in_scene_collections,
),
],
):
"""Create a new scene collection."""
if validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] already exists.'
)
raise typer.Exit(1)
ctx.obj['obsws'].create_scene_collection(scene_collection_name)
console.out.print(
f'Created scene collection {console.highlight(ctx, scene_collection_name)}.'

View File

@ -5,8 +5,8 @@ from typing import Annotated, Optional
import typer
from rich.table import Table
from . import console, util, validate
from .alias import SubTyperAliasGroup
from obsws_cli import console, util, validate
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@ -24,18 +24,15 @@ def list_(
typer.Argument(
show_default='The current scene',
help='Scene name to list items for',
callback=validate.scene_in_scenes,
),
] = None,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of scene items')] = False,
):
"""List all items in a scene."""
if not scene_name:
if scene_name is None:
scene_name = ctx.obj['obsws'].get_current_program_scene().scene_name
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
items = sorted(
(

View File

@ -6,8 +6,8 @@ from typing import Annotated
import obsws_python as obsws
import typer
from . import console
from .alias import SubTyperAliasGroup
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)

View File

@ -0,0 +1,337 @@
"""module for settings management."""
from typing import Annotated, Optional
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console, util
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Manage OBS settings."""
@app.command('show | sh')
def show(
ctx: typer.Context,
video: Annotated[
bool, typer.Option('--video', '-v', help='Show video settings.')
] = False,
record: Annotated[
bool, typer.Option('--record', '-r', help='Show recording settings.')
] = False,
profile: Annotated[
bool, typer.Option('--profile', '-p', help='Show profile settings.')
] = False,
):
"""Show current OBS settings."""
if not any([video, record, profile]):
video = True
record = True
profile = True
resp = ctx.obj['obsws'].get_video_settings()
video_table = Table(
title='Video Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
video_columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in video_columns:
video_table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
for setting in resp.attrs():
video_table.add_row(
util.snakecase_to_titlecase(setting),
str(getattr(resp, setting)),
style='' if video_table.row_count % 2 == 0 else 'dim',
)
if video:
console.out.print(video_table)
resp = ctx.obj['obsws'].get_record_directory()
record_table = Table(
title='Recording Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
record_columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in record_columns:
record_table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
record_table.add_row(
'Directory',
resp.record_directory,
style='' if record_table.row_count % 2 == 0 else 'dim',
)
if record:
console.out.print(record_table)
profile_table = Table(
title='Profile Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
profile_columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in profile_columns:
profile_table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
params = [
('Output', 'Mode', 'Output Mode'),
('SimpleOutput', 'StreamEncoder', 'Simple Streaming Encoder'),
('SimpleOutput', 'RecEncoder', 'Simple Recording Encoder'),
('SimpleOutput', 'RecFormat2', 'Simple Recording Video Format'),
('SimpleOutput', 'RecAudioEncoder', 'Simple Recording Audio Format'),
('SimpleOutput', 'RecQuality', 'Simple Recording Quality'),
('AdvOut', 'Encoder', 'Advanced Streaming Encoder'),
('AdvOut', 'RecEncoder', 'Advanced Recording Encoder'),
('AdvOut', 'RecType', 'Advanced Recording Type'),
('AdvOut', 'RecFormat2', 'Advanced Recording Video Format'),
('AdvOut', 'RecAudioEncoder', 'Advanced Recording Audio Format'),
]
for category, name, display_name in params:
resp = ctx.obj['obsws'].get_profile_parameter(
category=category,
name=name,
)
if resp.parameter_value is not None:
profile_table.add_row(
display_name,
str(resp.parameter_value),
style='' if profile_table.row_count % 2 == 0 else 'dim',
)
if profile:
console.out.print(profile_table)
@app.command('profile | pr')
def profile(
ctx: typer.Context,
category: Annotated[
str,
typer.Argument(
...,
help='Profile parameter category (e.g., SimpleOutput, AdvOut).',
),
],
name: Annotated[
str,
typer.Argument(
...,
help='Profile parameter name (e.g., StreamEncoder, RecFormat2).',
),
],
value: Annotated[
Optional[str],
typer.Argument(
...,
help='Value to set for the profile parameter. If omitted, the current value is retrieved.',
),
] = None,
):
"""Get/set OBS profile settings."""
if value is None:
resp = ctx.obj['obsws'].get_profile_parameter(
category=category,
name=name,
)
console.out.print(
f'Parameter Value for [bold]{name}[/bold]: '
f'[green]{resp.parameter_value}[/green]'
)
else:
ctx.obj['obsws'].set_profile_parameter(
category=category,
name=name,
value=value,
)
console.out.print(
f'Set Parameter [bold]{name}[/bold] to [green]{value}[/green]'
)
@app.command('stream-service | ss')
def stream_service(
ctx: typer.Context,
type_: Annotated[
Optional[str],
typer.Argument(
...,
help='Stream service type (e.g., Twitch, YouTube). If omitted, current settings are retrieved.',
),
] = None,
key: Annotated[
Optional[str],
typer.Option('--key', '-k', help='Stream key to set. Optional.'),
] = None,
server: Annotated[
Optional[str],
typer.Option('--server', '-s', help='Stream server to set. Optional.'),
] = None,
):
"""Get/set OBS stream service settings."""
if type_ is None:
resp = ctx.obj['obsws'].get_stream_service_settings()
table = Table(
title='Stream Service Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in columns:
table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
table.add_row(
'Type',
resp.stream_service_type,
style='' if table.row_count % 2 == 0 else 'dim',
)
table.add_row(
'Server',
resp.stream_service_settings.get('server', ''),
style='' if table.row_count % 2 == 0 else 'dim',
)
table.add_row(
'Key',
resp.stream_service_settings.get('key', ''),
style='' if table.row_count % 2 == 0 else 'dim',
)
console.out.print(table)
else:
current_settings = ctx.obj['obsws'].get_stream_service_settings()
if key is None:
key = current_settings.stream_service_settings.get('key', '')
if server is None:
server = current_settings.stream_service_settings.get('server', '')
ctx.obj['obsws'].set_stream_service_settings(
ss_type=type_,
ss_settings={'key': key, 'server': server},
)
console.out.print('Stream service settings updated.')
@app.command('video | vi')
def video(
ctx: typer.Context,
base_width: Annotated[
Optional[int],
typer.Option('--base-width', '-bw', help='Set base (canvas) width.'),
] = None,
base_height: Annotated[
Optional[int],
typer.Option('--base-height', '-bh', help='Set base (canvas) height.'),
] = None,
output_width: Annotated[
Optional[int],
typer.Option('--output-width', '-ow', help='Set output (scaled) width.'),
] = None,
output_height: Annotated[
Optional[int],
typer.Option('--output-height', '-oh', help='Set output (scaled) height.'),
] = None,
fps_num: Annotated[
Optional[int],
typer.Option('--fps-num', '-fn', help='Set FPS numerator.'),
] = None,
fps_den: Annotated[
Optional[int],
typer.Option('--fps-den', '-fd', help='Set FPS denominator.'),
] = None,
):
"""Get/set OBS video settings."""
if not any(
[
base_width,
base_height,
output_width,
output_height,
fps_num,
fps_den,
]
):
resp = ctx.obj['obsws'].get_video_settings()
table = Table(
title='Video Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in columns:
table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
for setting in resp.attrs():
table.add_row(
util.snakecase_to_titlecase(setting),
str(getattr(resp, setting)),
style='' if table.row_count % 2 == 0 else 'dim',
)
console.out.print(table)
else:
current_settings = ctx.obj['obsws'].get_video_settings()
if base_width is None:
base_width = current_settings.base_width
if base_height is None:
base_height = current_settings.base_height
if output_width is None:
output_width = current_settings.output_width
if output_height is None:
output_height = current_settings.output_height
if fps_num is None:
fps_num = current_settings.fps_num
if fps_den is None:
fps_den = current_settings.fps_den
ctx.obj['obsws'].set_video_settings(
base_width=base_width,
base_height=base_height,
out_width=output_width,
out_height=output_height,
numerator=fps_num,
denominator=fps_den,
)
console.out.print('Video settings updated.')

View File

@ -2,8 +2,8 @@
import typer
from . import console
from .alias import SubTyperAliasGroup
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)

View File

@ -2,8 +2,8 @@
import typer
from . import console
from .alias import SubTyperAliasGroup
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)

View File

@ -4,8 +4,8 @@ from typing import Annotated, Optional
import typer
from . import console, validate
from .alias import SubTyperAliasGroup
from obsws_cli import console, validate
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)

View File

@ -2,8 +2,8 @@
import typer
from . import console
from .alias import SubTyperAliasGroup
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)

146
obsws_cli/envconfig.py Normal file
View File

@ -0,0 +1,146 @@
"""module for settings management for obsws-cli."""
from collections import UserDict
from pathlib import Path
from typing import Any, Union
from dotenv import dotenv_values
ConfigValue = Union[str, int, bool]
class EnvConfig(UserDict):
"""A class to manage .env config for obsws-cli.
This class extends UserDict to provide a dictionary-like interface for config.
It loads config from .env files in the following priority:
1. Local .env file
2. User config file (~/.config/obsws-cli/obsws.env)
3. Default values
Note, typer handles reading from environment variables automatically. They take precedence
over values set in this class.
The config values are expected to be in uppercase and should start with 'OBSWS_CLI_'.
Example:
-------
config = EnvConfig()
host = config['OBSWS_CLI_HOST']
config['OBSWS_CLI_PORT'] = 4455
# Or with defaults
timeout = config.get('OBSWS_CLI_TIMEOUT', 30)
# Keys will be normalised to uppercase with prefix
debug = config.get('debug', False) # Equivalent to 'OBSWS_CLI_DEBUG'
"""
PREFIX = 'OBSWS_CLI_'
def __init__(self, *args, **kwargs):
"""Initialize the Config object with hierarchical loading."""
kwargs.update(
{
**dotenv_values(Path.home() / '.config' / 'obsws-cli' / 'obsws.env'),
**dotenv_values('.env'),
}
)
super().__init__(*args, **self._convert_types(kwargs))
def _convert_types(self, config_data: dict[str, Any]) -> dict[str, ConfigValue]:
"""Convert string values to appropriate types."""
converted = {}
for key, value in config_data.items():
if isinstance(value, str):
if value.lower() in ('true', 'false'):
converted[key] = value.lower() == 'true'
elif value.isdigit():
converted[key] = int(value)
else:
converted[key] = value
else:
converted[key] = value
return converted
def __getitem__(self, key: str) -> ConfigValue:
"""Get a setting value by key."""
normalised_key = self._normalise_key(key)
try:
return self.data[normalised_key]
except KeyError as e:
raise KeyError(
f"Config key '{key}' not found. Available keys: {list(self.data.keys())}"
) from e
def __setitem__(self, key: str, value: ConfigValue):
"""Set a setting value by key."""
normalised_key = self._normalise_key(key)
self.data[normalised_key] = value
def _normalise_key(self, key: str) -> str:
"""Normalise a key to uppercase with OBS_ prefix."""
key = key.upper()
if not key.startswith(self.PREFIX):
key = f'{self.PREFIX}{key}'
return key
def get(self, key: str, default=None) -> ConfigValue:
"""Get a config value with optional default.
Args:
----
key (str): The key to retrieve
default: Default value if key is not found
Returns:
-------
The config value or default
"""
normalised_key = self._normalise_key(key)
if not self.has_key(normalised_key):
return default
return self[normalised_key]
def has_key(self, key: str) -> bool:
"""Check if a config key exists.
Args:
----
key (str): The key to check
Returns:
-------
bool: True if key exists, False otherwise
"""
normalised_key = self._normalise_key(key)
return normalised_key in self.data
_envconfig = EnvConfig(
OBSWS_CLI_HOST='localhost',
OBSWS_CLI_PORT=4455,
OBSWS_CLI_PASSWORD='',
OBSWS_CLI_TIMEOUT=5,
OBSWS_CLI_DEBUG=False,
OBSWS_CLI_STYLE='disabled',
OBSWS_CLI_STYLE_NO_BORDER=False,
)
def get(key: str) -> ConfigValue:
"""Get a setting value by key from the global config instance.
Args:
----
key (str): The key of the config to retrieve.
default: Default value if key is not found.
Returns:
-------
The value of the config or default value.
"""
return _envconfig.get(key)

View File

@ -1,80 +0,0 @@
"""module for settings management for obsws-cli."""
from collections import UserDict
from pathlib import Path
from dotenv import dotenv_values
SettingsValue = str | int
class Settings(UserDict):
"""A class to manage settings for obsws-cli.
This class extends UserDict to provide a dictionary-like interface for settings.
It loads settings from environment variables and .env files.
The settings are expected to be in uppercase and should start with 'OBS_'.
Example:
-------
settings = Settings()
host = settings['OBS_HOST']
settings['OBS_PORT'] = 4455
"""
PREFIX = 'OBS_'
def __init__(self, *args, **kwargs):
"""Initialize the Settings object."""
kwargs.update(
{
**dotenv_values('.env'),
**dotenv_values(Path.home() / '.config' / 'obsws-cli' / 'obsws.env'),
}
)
super().__init__(*args, **kwargs)
def __getitem__(self, key: str) -> SettingsValue:
"""Get a setting value by key."""
key = key.upper()
if not key.startswith(Settings.PREFIX):
key = f'{Settings.PREFIX}{key}'
return self.data[key]
def __setitem__(self, key: str, value: SettingsValue):
"""Set a setting value by key."""
key = key.upper()
if not key.startswith(Settings.PREFIX):
key = f'{Settings.PREFIX}{key}'
self.data[key] = value
_settings = Settings(
OBS_HOST='localhost',
OBS_PORT=4455,
OBS_PASSWORD='',
OBS_TIMEOUT=5,
OBS_DEBUG=False,
OBS_STYLE='disabled',
OBS_STYLE_NO_BORDER=False,
)
def get(key: str) -> SettingsValue:
"""Get a setting value by key.
Args:
----
key (str): The key of the setting to retrieve.
Returns:
-------
The value of the setting.
Raises:
------
KeyError: If the key does not exist in the settings.
"""
return _settings[key]

View File

@ -20,3 +20,28 @@ def check_mark(value: bool, empty_if_false: bool = False) -> str:
if os.getenv('NO_COLOR', '') != '':
return '' if value else ''
return '' if value else ''
def timecode_to_milliseconds(timecode: str) -> int:
"""Convert a timecode string (HH:MM:SS) to total milliseconds."""
match timecode.split(':'):
case [mm, ss]:
hours = 0
minutes = int(mm)
seconds = int(ss)
case [hh, mm, ss]:
hours = int(hh)
minutes = int(mm)
seconds = int(ss)
return (hours * 3600 + minutes * 60 + seconds) * 1000
def milliseconds_to_timecode(milliseconds: int) -> str:
"""Convert total milliseconds to a timecode string (HH:MM:SS)."""
total_seconds = milliseconds // 1000
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
if hours == 0:
return f'{minutes:02}:{seconds:02}'
return f'{hours:02}:{minutes:02}:{seconds:02}'

View File

@ -1,5 +1,7 @@
"""module containing validation functions."""
from typing import Optional
import typer
from . import console
@ -8,7 +10,7 @@ from . import console
skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=False)
def input_in_inputs(ctx: typer.Context, input_name: str) -> bool:
def input_in_inputs(ctx: typer.Context, input_name: str) -> str:
"""Ensure the given input exists in the list of inputs."""
resp = ctx.obj['obsws'].get_input_list()
if not any(input.get('inputName') == input_name for input in resp.inputs):
@ -17,7 +19,7 @@ def input_in_inputs(ctx: typer.Context, input_name: str) -> bool:
return input_name
def input_not_in_inputs(ctx: typer.Context, input_name: str) -> bool:
def input_not_in_inputs(ctx: typer.Context, input_name: str) -> str:
"""Ensure an input does not already exist in the list of inputs."""
resp = ctx.obj['obsws'].get_input_list()
if any(input.get('inputName') == input_name for input in resp.inputs):
@ -26,26 +28,57 @@ def input_not_in_inputs(ctx: typer.Context, input_name: str) -> bool:
return input_name
def scene_in_scenes(ctx: typer.Context, scene_name: str) -> bool:
def scene_in_scenes(ctx: typer.Context, scene_name: Optional[str]) -> str | None:
"""Check if a scene exists in the list of scenes."""
if scene_name is None:
return
resp = ctx.obj['obsws'].get_scene_list()
return any(scene.get('sceneName') == scene_name for scene in resp.scenes)
if not any(scene.get('sceneName') == scene_name for scene in resp.scenes):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1)
return scene_name
def studio_mode_enabled(ctx: typer.Context) -> bool:
"""Check if studio mode is enabled."""
def studio_mode_enabled(ctx: typer.Context, preview: bool) -> bool:
"""Ensure studio mode is enabled if preview option is used."""
resp = ctx.obj['obsws'].get_studio_mode_enabled()
return resp.studio_mode_enabled
if preview and not resp.studio_mode_enabled:
console.err.print(
'Studio mode is disabled. This action requires it to be enabled.'
)
raise typer.Exit(1)
return preview
def scene_collection_in_scene_collections(
ctx: typer.Context, scene_collection_name: str
) -> bool:
"""Check if a scene collection exists."""
) -> str:
"""Ensure a scene collection exists in the list of scene collections."""
resp = ctx.obj['obsws'].get_scene_collection_list()
return any(
if not any(
collection == scene_collection_name for collection in resp.scene_collections
):
console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] not found.'
)
raise typer.Exit(1)
return scene_collection_name
def scene_collection_not_in_scene_collections(
ctx: typer.Context, scene_collection_name: str
) -> str:
"""Ensure a scene collection does not already exist in the list of scene collections."""
resp = ctx.obj['obsws'].get_scene_collection_list()
if any(
collection == scene_collection_name for collection in resp.scene_collections
):
console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] already exists.'
)
raise typer.Exit(1)
return scene_collection_name
def item_in_scene_item_list(
@ -56,16 +89,22 @@ def item_in_scene_item_list(
return any(item.get('sourceName') == item_name for item in resp.scene_items)
def profile_exists(ctx: typer.Context, profile_name: str) -> bool:
"""Check if a profile exists."""
def profile_exists(ctx: typer.Context, profile_name: str) -> str:
"""Ensure a profile exists."""
resp = ctx.obj['obsws'].get_profile_list()
return any(profile == profile_name for profile in resp.profiles)
if not any(profile == profile_name for profile in resp.profiles):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise typer.Exit(1)
return profile_name
def monitor_exists(ctx: typer.Context, monitor_index: int) -> bool:
"""Check if a monitor exists."""
resp = ctx.obj['obsws'].get_monitor_list()
return any(monitor['monitorIndex'] == monitor_index for monitor in resp.monitors)
def profile_not_exists(ctx: typer.Context, profile_name: str) -> str:
"""Ensure a profile does not exist."""
resp = ctx.obj['obsws'].get_profile_list()
if any(profile == profile_name for profile in resp.profiles):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] already exists.')
raise typer.Exit(1)
return profile_name
def kind_in_input_kinds(ctx: typer.Context, input_kind: str) -> str:
@ -75,3 +114,29 @@ def kind_in_input_kinds(ctx: typer.Context, input_kind: str) -> str:
console.err.print(f'Input kind [yellow]{input_kind}[/yellow] not found.')
raise typer.Exit(1)
return input_kind
def timecode_format(ctx: typer.Context, timecode: Optional[str]) -> str | None:
"""Validate that a timecode is in HH:MM:SS or MM:SS format."""
if timecode is None:
return
match timecode.split(':'):
case [mm, ss]:
if not (mm.isdigit() and ss.isdigit()):
console.err.print(
f'Timecode [yellow]{timecode}[/yellow] is not valid. Use MM:SS or HH:MM:SS format.'
)
raise typer.Exit(1)
case [hh, mm, ss]:
if not (hh.isdigit() and mm.isdigit() and ss.isdigit()):
console.err.print(
f'Timecode [yellow]{timecode}[/yellow] is not valid. Use MM:SS or HH:MM:SS format.'
)
raise typer.Exit(1)
case _:
console.err.print(
f'Timecode [yellow]{timecode}[/yellow] is not valid. Use MM:SS or HH:MM:SS format.'
)
raise typer.Exit(1)
return timecode

View File

@ -21,9 +21,9 @@ def pytest_sessionstart(session):
"""
# Initialize the OBS WebSocket client
session.obsws = obsws.ReqClient(
host=os.environ['OBS_HOST'],
port=os.environ['OBS_PORT'],
password=os.environ['OBS_PASSWORD'],
host=os.environ['OBSWS_CLI_HOST'],
port=os.environ['OBSWS_CLI_PORT'],
password=os.environ['OBSWS_CLI_PASSWORD'],
timeout=5,
)
resp = session.obsws.get_version()