74 Commits

Author SHA1 Message Date
35bd2522cf patch bump 2026-02-07 05:21:28 +00:00
bec10d5d05 add Shell Completion to README 2026-02-07 05:21:05 +00:00
9693d4f913 we lose aliases in the help output but this does fix shell completion, which is more important.
fixes #3
2026-02-07 05:20:37 +00:00
34067ca61d patch bump 2026-01-25 16:20:03 +00:00
3ca138ef6d add 0.24.6 to CHANGELOG 2026-01-25 16:19:37 +00:00
1b8bc72097 update Environment Variable section in README 2026-01-25 16:19:28 +00:00
3a8d4ef0f0 rename config module to envconfig
envconfig:
- add method for normalising the keys
- add has_key method
- update env var prefix to OBSWS_CLI_

update tests to reflect changes
2026-01-25 16:19:12 +00:00
1fc0bef237 dynamically load commands from obsws_cli.commands
no changes to files other than imports

patch bump
2026-01-24 22:29:33 +00:00
8bec6908e5 move studio mode enabled validation into callback
patch bump
2026-01-24 02:34:45 +00:00
2c03b28fc6 fix type annotations 2026-01-12 21:13:16 +00:00
f1e29e0d4f fix settings position 2026-01-10 14:21:30 +00:00
c7b60ecaf9 patch bump 2026-01-10 14:18:24 +00:00
a05fce26f2 add media and settings aliases on the root typer 2026-01-10 14:18:04 +00:00
5355d29a31 keep it consistent 2026-01-10 14:04:27 +00:00
add9743b00 patch bump 2026-01-10 14:00:20 +00:00
8aa1fb2c09 add validate.timecode_format
add None checks for callbacks with optional values
2026-01-10 13:59:55 +00:00
5c7fc24839 patch bump 2026-01-09 23:24:54 +00:00
e4ab4ae630 remove unused monitor_exists() function 2026-01-09 23:22:14 +00:00
9cdbc657fa profile_exists validation log now callbacks 2026-01-09 23:19:49 +00:00
f74ec9cd93 scene_collection validation logic now in callbacks 2026-01-09 23:14:34 +00:00
329aec084c scene_in_scenes validation now a callback 2026-01-09 23:07:06 +00:00
3eaa3992a0 bump version in CHANGELOG 2026-01-09 19:53:18 +00:00
7c86aa8a8b minor version bump 2026-01-09 19:51:31 +00:00
09ca892fcb add Media to README 2026-01-09 19:51:11 +00:00
81fcb4e504 implement media command group 2026-01-09 19:51:03 +00:00
3f3b331363 bump version in CHANGELOG 2026-01-09 13:48:52 +00:00
2535fe85c5 minor bump 2026-01-09 13:47:18 +00:00
7d4485ec05 add Settings to README 2026-01-09 13:45:32 +00:00
2c2501e017 implement settings command group 2026-01-09 13:45:25 +00:00
356684e5d4 rename Settings to Config 2026-01-09 09:39:19 +00:00
f7e51f8488 add 0.22.0 to CHANGELOG 2026-01-09 09:31:28 +00:00
8da29ce90e upd typer dep version
obsws-cli minor version bump
2026-01-09 09:21:43 +00:00
72c6bcee49 upd README with new input subcommands 2026-01-09 09:21:06 +00:00
dceafba065 extend input command group 2026-01-09 09:20:45 +00:00
7a73ec35f6 remove lazyimports env 2025-09-29 20:51:17 +01:00
48e0f6cecd bump typer dependency.
release 0.17.0 fixes slow rich imports, see https://github.com/fastapi/typer/releases/tag/0.17.0

This is related to issue #2.

minor version bump
2025-09-29 04:21:26 +01:00
52e13922dc upd test delays to 500ms 2025-07-30 08:42:11 +01:00
f335d8ffd2 move the version flag 2025-07-29 08:48:30 +01:00
286cda8066 raise typer.Exit() on empty list queries 2025-07-29 08:17:52 +01:00
e851219ced tests should now pass from fresh install 2025-07-29 08:03:24 +01:00
f852a733c3 upd publish action 2025-07-14 03:27:52 +01:00
44dadcee23 upd publish action 2025-07-14 03:25:52 +01:00
ed4531c305 revert publish action 2025-07-14 03:23:25 +01:00
ec42a4cdd9 patch bump 2025-07-14 03:21:29 +01:00
6123c92d00 upd publish action 2025-07-14 03:21:06 +01:00
1ceb95ab16 fix environment name 2025-07-14 03:12:35 +01:00
f06e2d3982 upd publish action 2025-07-14 03:10:04 +01:00
39dff3cc28 patch bump 2025-07-14 03:02:53 +01:00
967c4ab699 upd publish action 2025-07-14 02:58:25 +01:00
dc128720c7 hatch fmt 2025-07-14 02:48:21 +01:00
2e3f4267cd add workflows 2025-07-14 02:45:13 +01:00
000431ab82 add 0.20.0 to CHANGELOG 2025-07-14 02:32:59 +01:00
ec3e31cc4f add Text section to README 2025-07-14 02:32:21 +01:00
cda0bbedb9 minor bump 2025-07-14 02:32:09 +01:00
d0c96b853d add text unit tests 2025-07-14 02:31:47 +01:00
040a41daa7 add text command group 2025-07-14 02:31:35 +01:00
0c72a10fb7 bump obsws-python version 2025-07-01 09:30:04 +01:00
f882302d16 fixes missing argument 2025-07-01 09:29:56 +01:00
98e0d98cc7 typo 2025-06-27 13:45:24 +01:00
c6b22c7cf2 use console.highlight() 2025-06-27 13:29:39 +01:00
c3e55200db move style section
add link to style section in ToC.

add imgs.
2025-06-27 13:14:54 +01:00
4d37714aaf patch bump 2025-06-27 12:57:49 +01:00
157e1a167c fixes bug when setting --style=disabled (we were stil getting coloured check/cross marks) 2025-06-27 12:57:34 +01:00
d628c5d3a4 rename heading variables 2025-06-27 12:53:10 +01:00
4bf8edb692 add 0.19.0 to CHANGELOG 2025-06-23 09:11:26 +01:00
d68326f37a add record split/chapter to README 2025-06-23 09:11:15 +01:00
a001455dad add record split/chapter commands 2025-06-23 09:10:53 +01:00
4632260961 add --style validation
add Disabled class to style registry

patch bump
2025-06-22 12:35:21 +01:00
55a7da67db reword 2025-06-22 10:19:46 +01:00
7bec573ef9 by setting values in the default style to 'none' we avoid the rich markup errors in console.highlight
add comment to util.check_mark and test only NO_COLOR

patch bump
2025-06-22 10:14:46 +01:00
55e60ff977 in case NO_COLOR is set manually
patch bump
2025-06-22 02:49:32 +01:00
922efddf7a check if we're in colourless mode before passing back highlighted text.
pass context to check_mark so we can do the same there.

Fixes  rich.errors.MarkupError
2025-06-22 01:57:58 +01:00
4a0147aa8a import as version 2025-06-22 00:38:19 +01:00
cec76df1d1 add 0.18.0 to CHANGELOG 2025-06-21 23:47:19 +01:00
49 changed files with 2090 additions and 643 deletions

39
.github/workflows/publish.yml vendored Normal file
View File

@@ -0,0 +1,39 @@
name: Publish to PyPI
on:
release:
types: [published]
push:
tags:
- 'v*.*.*'
jobs:
deploy:
runs-on: ubuntu-latest
environment: pypi
permissions:
# This permission is needed for private repositories.
contents: read
# IMPORTANT: this permission is mandatory for trusted publishing
id-token: write
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install hatch
- name: Build package
run: hatch build
- name: Publish on PyPI
uses: pypa/gh-action-pypi-publish@release/v1

19
.github/workflows/ruff.yml vendored Normal file
View File

@@ -0,0 +1,19 @@
name: Ruff
on:
push:
branches: [main]
pull_request:
branches: [main]
workflow_dispatch:
jobs:
ruff:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: astral-sh/ruff-action@v3
with:
args: 'format --check --diff'

View File

@@ -5,6 +5,43 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
# [0.24.6] - 2026-01-26
### Changed
- environment variables should now be prefixed with 'OBSWS_CLI_', see [Environment Variables](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#environment-variables)
# [0.24.0] - 2026-01-09
### Added
- new subcommands added to input, see [Input](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#input)
- settings command group, see [Settings](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#settings)
- media command group, see [Media](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#media)
# [0.20.0] - 2025-07-14
### Added
- text command group, see [Text](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#text)
# [0.19.0] - 2025-06-23
### Added
- record split and record chapter commands, see [Record](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#record)
- As of OBS 30.2.0, the only file format supporting *record chapter* is Hybrid MP4.
# [0.18.0] - 2025-06-21
### Added
- Various colouring styles, see [Style](https://github.com/onyx-and-iris/obsws-cli/tree/main?tab=readme-ov-file#style)
- colouring is applied to list tables as well as highlighted information in stdout/stderr output.
- table border styling may be optionally disabled with the --no-border flag.
# [0.17.3] - 2025-06-20 # [0.17.3] - 2025-06-20
### Added ### Added

231
README.md
View File

@@ -14,7 +14,9 @@ For an outline of past/future changes refer to: [CHANGELOG](CHANGELOG.md)
- [Installation](#installation) - [Installation](#installation)
- [Configuration](#configuration) - [Configuration](#configuration)
- [Style](#style)
- [Commands](#root-typer) - [Commands](#root-typer)
- [Shell Completion](#shell-completion)
- [License](#license) - [License](#license)
## Requirements ## Requirements
@@ -61,13 +63,44 @@ Store and load environment variables from:
- `user home directory / .config / obsws-cli / obsws.env` - `user home directory / .config / obsws-cli / obsws.env`
```env ```env
OBS_HOST=localhost OBSWS_CLI_HOST=localhost
OBS_PORT=4455 OBSWS_CLI_PORT=4455
OBS_PASSWORD=<websocket password> OBSWS_CLI_PASSWORD=<websocket password>
``` ```
Flags can be used to override environment variables. Flags can be used to override environment variables.
## Style
Styling is opt-in, by default you will get a colourless output:
![colourless](./img/colourless.png)
You may enable styling with the --style/-s flag:
```console
obsws-cli --style="cyan" sceneitem list
```
Available styles: _red, magenta, purple, blue, cyan, green, yellow, orange, white, grey, navy, black_
![coloured](./img/coloured-border.png)
Optionally you may disable border colouring with the --no-border flag:
![coloured-no-border](./img/coloured-no-border.png)
```console
obsws-cli --style="cyan" --no-border sceneitem list
```
Or with environment variables:
```env
OBSWS_CLI_STYLE=cyan
OBSWS_CLI_STYLE_NO_BORDER=true
```
## Root Typer ## Root Typer
- obs-version: Get the OBS Client and WebSocket versions. - obs-version: Get the OBS Client and WebSocket versions.
@@ -266,6 +299,20 @@ obsws-cli group status START "test_group"
#### Input #### Input
- create: Create a new input.
- args: <input_name> <input_kind>
```console
obsws-cli input create 'stream mix' 'wasapi_input_capture'
```
- remove: Remove an input.
- args: <input_name>
```console
obsws-cli input remove 'stream mix'
```
- list: List all inputs. - list: List all inputs.
- flags: - flags:
@@ -283,6 +330,12 @@ obsws-cli input list
obsws-cli input list --input --colour obsws-cli input list --input --colour
``` ```
- list-kinds: List all input kinds.
```console
obsws-cli input list-kinds
```
- mute: Mute an input. - mute: Mute an input.
- args: <input_name> - args: <input_name>
@@ -303,6 +356,48 @@ obsws-cli input unmute "Mic/Aux"
obsws-cli input toggle "Mic/Aux" obsws-cli input toggle "Mic/Aux"
``` ```
- volume: Set the volume of an input.
- args: <input_name> <volume>
```console
obsws-cli input volume -- 'Desktop Audio' -38.9
```
- show: Show information for an input in the current scene.
- args: <input_name>
- flags:
*optional*
- --verbose: List all available input devices.
```console
obsws-cli input show 'Mic/Aux' --verbose
```
- update: Name of the input to update.
- args: <input_name> <device_name>
```console
obsws-cli input update 'Mic/Aux' 'Voicemeeter Out B1 (VB-Audio Voicemeeter VAIO)'
```
#### Text
- current: Get the current text for a text input.
- args: <input_name>
```console
obsws-cli text current "My Text Input"
```
- update: Update the text of a text input.
- args: <input_name> <new_text>
```console
obsws-cli text update "My Text Input" "hi OBS!"
```
#### Record #### Record
- start: Start recording. - start: Start recording.
@@ -354,6 +449,21 @@ obsws-cli record directory "/home/me/obs-vids/"
obsws-cli record directory "C:/Users/me/Videos" obsws-cli record directory "C:/Users/me/Videos"
``` ```
- split: Split the current recording.
```console
obsws-cli record split
```
- chapter: Create a chapter in the current recording.
*optional*
- args: <chapter_name>
```console
obsws-cli record chapter "Chapter Name"
```
#### Stream #### Stream
- start: Start streaming. - start: Start streaming.
@@ -613,34 +723,105 @@ obsws-cli projector open --monitor-index=1 "test_group"
obsws-cli screenshot save --width=2560 --height=1440 "Scene" "C:\Users\me\Videos\screenshot.png" obsws-cli screenshot save --width=2560 --height=1440 "Scene" "C:\Users\me\Videos\screenshot.png"
``` ```
## Style #### Settings
By default styling is disabled but you may enable it with: - show: Show current OBS settings.
- flags:
- --style/-s: Style used in output. *optional*
- OBS_STYLE - --video: Show video settings.
- --no-border/-b: Disable table border styling in output. - --record: Show recording settings.
- OBS_STYLE_NO_BORDER - --profile: Show profile settings.
Available styles:
- red
- magenta
- purple
- blue
- cyan
- green
- yellow
- orange
- white
- grey
- navy
- black
```console ```console
obsws-cli --style=cyan --no-border scene list obsws-cli settings show --video --record
``` ```
- profile: Get/set OBS profile settings.
- args: <category> <name> <value>
```console
obsws-cli settings profile SimpleOutput VBitrate
obsws-cli settings profile SimpleOutput VBitrate 6000
```
- stream-service: Get/set OBS stream service settings.
- flags:
- --key: Stream key.
- --server: Stream server URL.
*optional*
- args: <type>
```console
obsws-cli settings stream-service
obsws-cli settings stream-service --key='live_xyzxyzxyzxyz' rtmp_common
```
- video: Get/set OBS video settings.
- flags:
*optional*
- --base-width: Base (canvas) width.
- --base-height: Base (canvas) height.
- --output-width: Output (scaled) width.
- --output-height: Output (scaled) height.
- --fps-num: Frames per second numerator.
- --fps-den: Frames per second denominator.
```console
obsws-cli settings video
obsws-cli settings video --base-width=1920 --base-height=1080
```
#### Media
- cursor: Get/set the cursor position of a media input.
- args: InputName
*optional*
- TimeString
```console
obsws-cli media cursor "Media"
obsws-cli media cursor "Media" "00:08:30"
```
- play: Plays a media input.
```console
obsws-cli media play "Media"
```
- pause: Pauses a media input.
```console
obsws-cli media pause "Media"
```
- stop: Stops a media input.
```console
obsws-cli media stop "Media"
```
- restart: Restarts a media input.
```console
obsws-cli media restart "Media"
```
## Shell Completion
```console
obsws-cli --install-completion
```
## License ## License
`obsws-cli` is distributed under the terms of the [MIT](https://spdx.org/licenses/MIT.html) license. `obsws-cli` is distributed under the terms of the [MIT](https://spdx.org/licenses/MIT.html) license.

BIN
img/coloured-border.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.2 KiB

BIN
img/coloured-no-border.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.2 KiB

BIN
img/colourless.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.4 KiB

View File

@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online> # SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online>
# #
# SPDX-License-Identifier: MIT # SPDX-License-Identifier: MIT
__version__ = "0.17.6" __version__ = '0.24.7'

View File

@@ -4,4 +4,4 @@
from .app import app from .app import app
__all__ = ["app"] __all__ = ['app']

View File

@@ -1,7 +1,5 @@
"""module defining a custom group class for handling command name aliases.""" """module defining a custom group class for handling command name aliases."""
import re
import typer import typer
@@ -24,6 +22,8 @@ class RootTyperAliasGroup(typer.core.TyperGroup):
cmd_name = 'hotkey' cmd_name = 'hotkey'
case 'i': case 'i':
cmd_name = 'input' cmd_name = 'input'
case 'm':
cmd_name = 'media'
case 'prf': case 'prf':
cmd_name = 'profile' cmd_name = 'profile'
case 'prj': case 'prj':
@@ -40,32 +40,14 @@ class RootTyperAliasGroup(typer.core.TyperGroup):
cmd_name = 'sceneitem' cmd_name = 'sceneitem'
case 'ss': case 'ss':
cmd_name = 'screenshot' cmd_name = 'screenshot'
case 'set':
cmd_name = 'settings'
case 'st': case 'st':
cmd_name = 'stream' cmd_name = 'stream'
case 'sm': case 'sm':
cmd_name = 'studiomode' cmd_name = 'studiomode'
case 't':
cmd_name = 'text'
case 'vc': case 'vc':
cmd_name = 'virtualcam' cmd_name = 'virtualcam'
return super().get_command(ctx, cmd_name) return super().get_command(ctx, cmd_name)
class SubTyperAliasGroup(typer.core.TyperGroup):
"""A custom group class to handle command name aliases for sub typers."""
_CMD_SPLIT_P = re.compile(r' ?[,|] ?')
def __init__(self, *args, **kwargs):
"""Initialize the AliasGroup."""
super().__init__(*args, **kwargs)
self.no_args_is_help = True
def get_command(self, ctx, cmd_name):
"""Get a command by name."""
cmd_name = self._group_cmd_name(cmd_name)
return super().get_command(ctx, cmd_name)
def _group_cmd_name(self, default_name):
for cmd in self.commands.values():
if cmd.name and default_name in self._CMD_SPLIT_P.split(cmd.name):
return cmd.name
return default_name

View File

@@ -2,42 +2,29 @@
import importlib import importlib
import logging import logging
import pkgutil
from typing import Annotated from typing import Annotated
import obsws_python as obsws import obsws_python as obsws
import typer import typer
from obsws_cli.__about__ import __version__ as obsws_cli_version from obsws_cli.__about__ import __version__ as version
from . import console, settings, styles from . import commands, console, envconfig, styles
from .alias import RootTyperAliasGroup from .alias import RootTyperAliasGroup
app = typer.Typer(cls=RootTyperAliasGroup) app = typer.Typer(cls=RootTyperAliasGroup)
for sub_typer in ( for importer, modname, ispkg in pkgutil.iter_modules(
'filter', commands.__path__, commands.__name__ + '.'
'group',
'hotkey',
'input',
'profile',
'projector',
'record',
'replaybuffer',
'scene',
'scenecollection',
'sceneitem',
'screenshot',
'stream',
'studiomode',
'virtualcam',
): ):
module = importlib.import_module(f'.{sub_typer}', package=__package__) subtyper = importlib.import_module(modname)
app.add_typer(module.app, name=sub_typer) app.add_typer(subtyper.app, name=modname.split('.')[-1])
def version_callback(value: bool): def version_callback(value: bool):
"""Show the version of the CLI.""" """Show the version of the CLI."""
if value: if value:
console.out.print(f'obsws-cli version: {obsws_cli_version}') console.out.print(f'obsws-cli version: {version}')
raise typer.Exit() raise typer.Exit()
@@ -50,6 +37,15 @@ def setup_logging(debug: bool):
) )
def validate_style(value: str):
"""Validate and return the style."""
if value not in styles.registry:
raise typer.BadParameter(
f'Invalid style: {value}. Available styles: {", ".join(styles.registry.keys())}'
)
return value
@app.callback() @app.callback()
def main( def main(
ctx: typer.Context, ctx: typer.Context,
@@ -58,41 +54,62 @@ def main(
typer.Option( typer.Option(
'--host', '--host',
'-H', '-H',
envvar='OBS_HOST', envvar='OBSWS_CLI_HOST',
help='WebSocket host', help='WebSocket host',
show_default='localhost', show_default='localhost',
), ),
] = settings.get('host'), ] = envconfig.get('host'),
port: Annotated[ port: Annotated[
int, int,
typer.Option( typer.Option(
'--port', '--port',
'-P', '-P',
envvar='OBS_PORT', envvar='OBSWS_CLI_PORT',
help='WebSocket port', help='WebSocket port',
show_default=4455, show_default=4455,
), ),
] = settings.get('port'), ] = envconfig.get('port'),
password: Annotated[ password: Annotated[
str, str,
typer.Option( typer.Option(
'--password', '--password',
'-p', '-p',
envvar='OBS_PASSWORD', envvar='OBSWS_CLI_PASSWORD',
help='WebSocket password', help='WebSocket password',
show_default=False, show_default=False,
), ),
] = settings.get('password'), ] = envconfig.get('password'),
timeout: Annotated[ timeout: Annotated[
int, int,
typer.Option( typer.Option(
'--timeout', '--timeout',
'-T', '-T',
envvar='OBS_TIMEOUT', envvar='OBSWS_CLI_TIMEOUT',
help='WebSocket timeout', help='WebSocket timeout',
show_default=5, show_default=5,
), ),
] = settings.get('timeout'), ] = envconfig.get('timeout'),
style: Annotated[
str,
typer.Option(
'--style',
'-s',
envvar='OBSWS_CLI_STYLE',
help='Set the style for the CLI output',
show_default='disabled',
callback=validate_style,
),
] = envconfig.get('style'),
no_border: Annotated[
bool,
typer.Option(
'--no-border',
'-b',
envvar='OBSWS_CLI_STYLE_NO_BORDER',
help='Disable table border styling in the CLI output',
show_default=False,
),
] = envconfig.get('style_no_border'),
version: Annotated[ version: Annotated[
bool, bool,
typer.Option( typer.Option(
@@ -104,43 +121,25 @@ def main(
callback=version_callback, callback=version_callback,
), ),
] = False, ] = False,
style: Annotated[
str,
typer.Option(
'--style',
'-s',
envvar='OBS_STYLE',
help='Set the style for the CLI output',
show_default='disabled',
),
] = settings.get('style'),
no_border: Annotated[
bool,
typer.Option(
'--no-border',
'-b',
envvar='OBS_STYLE_NO_BORDER',
help='Disable table border styling in the CLI output',
show_default=False,
),
] = settings.get('style_no_border'),
debug: Annotated[ debug: Annotated[
bool, bool,
typer.Option( typer.Option(
'--debug', '--debug',
'-d', '-d',
envvar='OBS_DEBUG', envvar='OBSWS_CLI_DEBUG',
is_eager=True, is_eager=True,
help='Enable debug logging', help='Enable debug logging',
show_default=False, show_default=False,
callback=setup_logging, callback=setup_logging,
hidden=True, hidden=True,
), ),
] = settings.get('debug'), ] = envconfig.get('debug'),
): ):
"""obsws_cli is a command line interface for the OBS WebSocket API.""" """obsws_cli is a command line interface for the OBS WebSocket API."""
ctx.ensure_object(dict) ctx.ensure_object(dict)
ctx.obj['obsws'] = ctx.with_resource(obsws.ReqClient(**ctx.params)) ctx.obj['obsws'] = ctx.with_resource(
obsws.ReqClient(host=host, port=port, password=password, timeout=timeout)
)
ctx.obj['style'] = styles.request_style_obj(style, no_border) ctx.obj['style'] = styles.request_style_obj(style, no_border)

View File

@@ -7,10 +7,9 @@ import typer
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
from . import console, util from obsws_cli import console, util
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup) app = typer.Typer()
@app.callback() @app.callback()
@@ -18,7 +17,8 @@ def main():
"""Control filters in OBS scenes.""" """Control filters in OBS scenes."""
@app.command('list | ls') @app.command('list')
@app.command('ls', hidden=True)
def list_( def list_(
ctx: typer.Context, ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[
@@ -62,8 +62,8 @@ def list_(
(Text('Enabled', justify='center'), 'center', None), (Text('Enabled', justify='center'), 'center', None),
(Text('Settings', justify='center'), 'center', ctx.obj['style'].column), (Text('Settings', justify='center'), 'center', ctx.obj['style'].column),
] ]
for name, justify, style in columns: for heading, justify, style in columns:
table.add_column(name, justify=justify, style=style) table.add_column(heading, justify=justify, style=style)
for filter in resp.filters: for filter in resp.filters:
resp = ctx.obj['obsws'].get_source_filter_default_settings(filter['filterKind']) resp = ctx.obj['obsws'].get_source_filter_default_settings(filter['filterKind'])
@@ -90,7 +90,8 @@ def _get_filter_enabled(ctx: typer.Context, source_name: str, filter_name: str):
return resp.filter_enabled return resp.filter_enabled
@app.command('enable | on') @app.command('enable')
@app.command('on', hidden=True)
def enable( def enable(
ctx: typer.Context, ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[
@@ -119,7 +120,8 @@ def enable(
) )
@app.command('disable | off') @app.command('disable')
@app.command('off', hidden=True)
def disable( def disable(
ctx: typer.Context, ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[
@@ -148,7 +150,8 @@ def disable(
) )
@app.command('toggle | tg') @app.command('toggle')
@app.command('tg', hidden=True)
def toggle( def toggle(
ctx: typer.Context, ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[
@@ -181,7 +184,8 @@ def toggle(
) )
@app.command('status | ss') @app.command('status')
@app.command('ss', hidden=True)
def status( def status(
ctx: typer.Context, ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[

View File

@@ -6,11 +6,10 @@ import typer
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
from . import console, util, validate from obsws_cli import console, util, validate
from .alias import SubTyperAliasGroup from obsws_cli.protocols import DataclassProtocol
from .protocols import DataclassProtocol
app = typer.Typer(cls=SubTyperAliasGroup) app = typer.Typer()
@app.callback() @app.callback()
@@ -18,7 +17,8 @@ def main():
"""Control groups in OBS scenes.""" """Control groups in OBS scenes."""
@app.command('list | ls') @app.command('list')
@app.command('ls', hidden=True)
def list_( def list_(
ctx: typer.Context, ctx: typer.Context,
scene_name: Annotated[ scene_name: Annotated[
@@ -26,17 +26,14 @@ def list_(
typer.Argument( typer.Argument(
show_default='The current scene', show_default='The current scene',
help='Scene name to list groups for', help='Scene name to list groups for',
callback=validate.scene_in_scenes,
), ),
] = None, ] = None,
): ):
"""List groups in a scene.""" """List groups in a scene."""
if not scene_name: if scene_name is None:
scene_name = ctx.obj['obsws'].get_current_program_scene().scene_name scene_name = ctx.obj['obsws'].get_current_program_scene().scene_name
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
groups = [ groups = [
(item.get('sceneItemId'), item.get('sourceName'), item.get('sceneItemEnabled')) (item.get('sceneItemId'), item.get('sourceName'), item.get('sceneItemEnabled'))
@@ -61,8 +58,8 @@ def list_(
(Text('Group Name', justify='center'), 'left', ctx.obj['style'].column), (Text('Group Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Enabled', justify='center'), 'center', None), (Text('Enabled', justify='center'), 'center', None),
] ]
for column, justify, style in columns: for heading, justify, style in columns:
table.add_column(column, justify=justify, style=style) table.add_column(heading, justify=justify, style=style)
for item_id, group_name, is_enabled in groups: for item_id, group_name, is_enabled in groups:
table.add_row( table.add_row(
@@ -87,22 +84,24 @@ def _get_group(group_name: str, resp: DataclassProtocol) -> dict | None:
return group return group
@app.command('show | sh') @app.command('show')
@app.command('sh', hidden=True)
def show( def show(
ctx: typer.Context, ctx: typer.Context,
scene_name: Annotated[ scene_name: Annotated[
str, str,
typer.Argument(..., show_default=False, help='Scene name the group is in'), typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
], ],
group_name: Annotated[ group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to show') str, typer.Argument(..., show_default=False, help='Group name to show')
], ],
): ):
"""Show a group in a scene.""" """Show a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
console.err.print( console.err.print(
@@ -119,21 +118,24 @@ def show(
console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.') console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
@app.command('hide | h') @app.command('hide')
@app.command('h', hidden=True)
def hide( def hide(
ctx: typer.Context, ctx: typer.Context,
scene_name: Annotated[ scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the group is in') str,
typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
], ],
group_name: Annotated[ group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to hide') str, typer.Argument(..., show_default=False, help='Group name to hide')
], ],
): ):
"""Hide a group in a scene.""" """Hide a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
console.err.print( console.err.print(
@@ -150,21 +152,24 @@ def hide(
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.') console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')
@app.command('toggle | tg') @app.command('toggle')
@app.command('tg', hidden=True)
def toggle( def toggle(
ctx: typer.Context, ctx: typer.Context,
scene_name: Annotated[ scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the group is in') str,
typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
], ],
group_name: Annotated[ group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to toggle') str, typer.Argument(..., show_default=False, help='Group name to toggle')
], ],
): ):
"""Toggle a group in a scene.""" """Toggle a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
console.err.print( console.err.print(
@@ -185,21 +190,24 @@ def toggle(
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.') console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')
@app.command('status | ss') @app.command('status')
@app.command('ss', hidden=True)
def status( def status(
ctx: typer.Context, ctx: typer.Context,
scene_name: Annotated[ scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the group is in') str,
typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
], ],
group_name: Annotated[ group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to check status') str, typer.Argument(..., show_default=False, help='Group name to check status')
], ],
): ):
"""Get the status of a group in a scene.""" """Get the status of a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
console.err.print( console.err.print(

View File

@@ -6,10 +6,9 @@ import typer
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
from . import console from obsws_cli import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup) app = typer.Typer()
@app.callback() @app.callback()
@@ -17,13 +16,18 @@ def main():
"""Control hotkeys in OBS.""" """Control hotkeys in OBS."""
@app.command('list | ls') @app.command('list')
@app.command('ls', hidden=True)
def list_( def list_(
ctx: typer.Context, ctx: typer.Context,
): ):
"""List all hotkeys.""" """List all hotkeys."""
resp = ctx.obj['obsws'].get_hotkey_list() resp = ctx.obj['obsws'].get_hotkey_list()
if not resp.hotkeys:
console.out.print('No hotkeys found.')
raise typer.Exit()
table = Table( table = Table(
title='Hotkeys', title='Hotkeys',
padding=(0, 2), padding=(0, 2),
@@ -41,7 +45,8 @@ def list_(
console.out.print(table) console.out.print(table)
@app.command('trigger | tr') @app.command('trigger')
@app.command('tr', hidden=True)
def trigger( def trigger(
ctx: typer.Context, ctx: typer.Context,
hotkey: Annotated[ hotkey: Annotated[
@@ -52,7 +57,8 @@ def trigger(
ctx.obj['obsws'].trigger_hotkey_by_name(hotkey) ctx.obj['obsws'].trigger_hotkey_by_name(hotkey)
@app.command('trigger-sequence | trs') @app.command('trigger-sequence')
@app.command('trs', hidden=True)
def trigger_sequence( def trigger_sequence(
ctx: typer.Context, ctx: typer.Context,
key_id: Annotated[ key_id: Annotated[

465
obsws_cli/commands/input.py Normal file
View File

@@ -0,0 +1,465 @@
"""module containing commands for manipulating inputs."""
from typing import Annotated
import obsws_python as obsws
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console, util, validate
app = typer.Typer()
@app.callback()
def main():
"""Control inputs in OBS."""
@app.command('create')
@app.command('cr', hidden=True)
def create(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to create.',
callback=validate.input_not_in_inputs,
),
],
input_kind: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Kind of the input to create.',
callback=validate.kind_in_input_kinds,
),
],
):
"""Create a new input."""
current_scene = (
ctx.obj['obsws'].get_current_program_scene().current_program_scene_name
)
try:
ctx.obj['obsws'].create_input(
inputName=input_name,
inputKind=input_kind,
sceneItemEnabled=True,
sceneName=current_scene,
inputSettings={},
)
except obsws.error.OBSSDKRequestError as e:
console.err.print(f'Failed to create input: [yellow]{e}[/yellow]')
raise typer.Exit(1)
console.out.print(
f'Input {console.highlight(ctx, input_name)} of kind '
f'{console.highlight(ctx, input_kind)} created.',
)
@app.command('remove')
@app.command('rm', hidden=True)
def remove(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to remove.',
callback=validate.input_in_inputs,
),
],
):
"""Remove an input."""
ctx.obj['obsws'].remove_input(name=input_name)
console.out.print(f'Input {console.highlight(ctx, input_name)} removed.')
@app.command('list')
@app.command('ls', hidden=True)
def list_(
ctx: typer.Context,
input: Annotated[bool, typer.Option(help='Filter by input type.')] = False,
output: Annotated[bool, typer.Option(help='Filter by output type.')] = False,
colour: Annotated[bool, typer.Option(help='Filter by colour source type.')] = False,
ffmpeg: Annotated[bool, typer.Option(help='Filter by ffmpeg source type.')] = False,
vlc: Annotated[bool, typer.Option(help='Filter by VLC source type.')] = False,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of inputs.')] = False,
):
"""List all inputs."""
resp = ctx.obj['obsws'].get_input_list()
kinds = []
if input:
kinds.append('input')
if output:
kinds.append('output')
if colour:
kinds.append('color')
if ffmpeg:
kinds.append('ffmpeg')
if vlc:
kinds.append('vlc')
if not any([input, output, colour, ffmpeg, vlc]):
kinds = ctx.obj['obsws'].get_input_kind_list(False).input_kinds
inputs = sorted(
(
(input_.get('inputName'), input_.get('inputKind'), input_.get('inputUuid'))
for input_ in filter(
lambda input_: any(kind in input_.get('inputKind') for kind in kinds),
resp.inputs,
)
),
key=lambda x: x[0], # Sort by input name
)
if not inputs:
console.out.print('No inputs found.')
raise typer.Exit()
table = Table(title='Inputs', padding=(0, 2), border_style=ctx.obj['style'].border)
if uuid:
columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'center', ctx.obj['style'].column),
(Text('Muted', justify='center'), 'center', None),
(Text('UUID', justify='center'), 'left', ctx.obj['style'].column),
]
else:
columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'center', ctx.obj['style'].column),
(Text('Muted', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for input_name, input_kind, input_uuid in inputs:
input_mark = ''
try:
input_muted = ctx.obj['obsws'].get_input_mute(name=input_name).input_muted
input_mark = util.check_mark(input_muted)
except obsws.error.OBSSDKRequestError as e:
if e.code == 604: # Input does not support audio
input_mark = 'N/A'
else:
raise
if uuid:
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
input_mark,
input_uuid,
)
else:
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
input_mark,
)
console.out.print(table)
@app.command('list-kinds')
@app.command('ls-k', hidden=True)
def list_kinds(
ctx: typer.Context,
):
"""List all input kinds."""
resp = ctx.obj['obsws'].get_input_kind_list(False)
kinds = sorted(resp.input_kinds)
if not kinds:
console.out.print('No input kinds found.')
raise typer.Exit()
table = Table(
title='Input Kinds', padding=(0, 2), border_style=ctx.obj['style'].border
)
table.add_column(
Text('Input Kind', justify='center'),
justify='left',
style=ctx.obj['style'].column,
)
for kind in kinds:
table.add_row(util.snakecase_to_titlecase(kind))
console.out.print(table)
@app.command('mute')
@app.command('m', hidden=True)
def mute(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to mute.',
callback=validate.input_in_inputs,
),
],
):
"""Mute an input."""
ctx.obj['obsws'].set_input_mute(
name=input_name,
muted=True,
)
console.out.print(f'Input {console.highlight(ctx, input_name)} muted.')
@app.command('unmute')
@app.command('um', hidden=True)
def unmute(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to unmute.',
callback=validate.input_in_inputs,
),
],
):
"""Unmute an input."""
ctx.obj['obsws'].set_input_mute(
name=input_name,
muted=False,
)
console.out.print(f'Input {console.highlight(ctx, input_name)} unmuted.')
@app.command('toggle')
@app.command('tg', hidden=True)
def toggle(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to toggle.',
callback=validate.input_in_inputs,
),
],
):
"""Toggle an input."""
resp = ctx.obj['obsws'].get_input_mute(name=input_name)
new_state = not resp.input_muted
ctx.obj['obsws'].set_input_mute(
name=input_name,
muted=new_state,
)
if new_state:
console.out.print(
f'Input {console.highlight(ctx, input_name)} muted.',
)
else:
console.out.print(
f'Input {console.highlight(ctx, input_name)} unmuted.',
)
@app.command('volume')
@app.command('vol', hidden=True)
def volume(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to set volume for.',
callback=validate.input_in_inputs,
),
],
volume: Annotated[
float,
typer.Argument(
...,
show_default=False,
help='Volume level to set (-90 to 0).',
min=-90,
max=0,
),
],
):
"""Set the volume of an input."""
ctx.obj['obsws'].set_input_volume(
name=input_name,
vol_db=volume,
)
console.out.print(
f'Input {console.highlight(ctx, input_name)} volume set to {console.highlight(ctx, volume)}.',
)
@app.command('show')
@app.command('s', hidden=True)
def show(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to show.',
callback=validate.input_in_inputs,
),
],
verbose: Annotated[
bool, typer.Option(help='List all available input devices.')
] = False,
):
"""Show information for an input in the current scene."""
input_list = ctx.obj['obsws'].get_input_list()
for input_ in input_list.inputs:
if input_['inputName'] == input_name:
input_kind = input_['inputKind']
break
for prop in ['device', 'device_id']:
try:
device_id = (
ctx.obj['obsws']
.get_input_settings(
name=input_name,
)
.input_settings.get(prop)
)
if device_id:
break
except obsws.error.OBSSDKRequestError:
continue
else:
device_id = '(N/A)'
for device in (
ctx.obj['obsws']
.get_input_properties_list_property_items(
input_name=input_name,
prop_name=prop,
)
.property_items
):
if device.get('itemValue') == device_id:
device_id = device.get('itemName')
break
table = Table(
title='Input Information', padding=(0, 2), border_style=ctx.obj['style'].border
)
columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'left', ctx.obj['style'].column),
(Text('Device', justify='center'), 'left', ctx.obj['style'].column),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
device_id,
)
console.out.print(table)
if verbose:
resp = ctx.obj['obsws'].get_input_properties_list_property_items(
input_name=input_name,
prop_name=prop,
)
table = Table(
title='Devices',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = [
(Text('Name', justify='center'), 'left', ctx.obj['style'].column),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for i, item in enumerate(resp.property_items):
table.add_row(
item.get('itemName'),
style='' if i % 2 == 0 else 'dim',
)
console.out.print(table)
@app.command('update')
@app.command('upd', hidden=True)
def update(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to update.',
callback=validate.input_in_inputs,
),
],
device_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the device to set for the input.',
),
],
):
"""Update a setting for an input."""
device_id = None
for prop in ['device', 'device_id']:
try:
for device in (
ctx.obj['obsws']
.get_input_properties_list_property_items(
input_name=input_name,
prop_name=prop,
)
.property_items
):
if device.get('itemName') == device_name:
device_id = device.get('itemValue')
break
except obsws.error.OBSSDKRequestError:
continue
if device_id:
break
if not device_id:
console.err.print(
f'Failed to find device ID for device '
f'{console.highlight(ctx, device_name)}.',
)
raise typer.Exit(1)
ctx.obj['obsws'].set_input_settings(
name=input_name, settings={prop: device_id}, overlay=True
)
console.out.print(
f'Input {console.highlight(ctx, input_name)} updated to use device '
f'{console.highlight(ctx, device_name)}.',
)

105
obsws_cli/commands/media.py Normal file
View File

@@ -0,0 +1,105 @@
"""module containing commands for media inputs."""
from typing import Annotated, Optional
import typer
from obsws_cli import console, util, validate
app = typer.Typer()
@app.callback()
def main():
"""Commands for media inputs."""
@app.command('cursor')
@app.command('cur', hidden=True)
def cursor(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
timecode: Annotated[
Optional[str],
typer.Argument(
...,
help='The timecode to set the cursor to (format: HH:MM:SS).',
callback=validate.timecode_format,
),
] = None,
):
"""Get/set the cursor position of a media input."""
if timecode is None:
resp = ctx.obj['obsws'].get_media_input_status(input_name)
console.out.print(
f'Cursor for {console.highlight(ctx, input_name)} is at {util.milliseconds_to_timecode(resp.media_cursor)}.'
)
return
cursor_position = util.timecode_to_milliseconds(timecode)
ctx.obj['obsws'].set_media_input_cursor(input_name, cursor_position)
console.out.print(
f'Cursor for {console.highlight(ctx, input_name)} set to {timecode}.'
)
@app.command('play')
@app.command('p', hidden=True)
def play(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Get/set the playing status of a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_PLAY'
)
console.out.print(f'Playing media input {console.highlight(ctx, input_name)}.')
@app.command('pause')
@app.command('pa', hidden=True)
def pause(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Pause a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_PAUSE'
)
console.out.print(f'Paused media input {console.highlight(ctx, input_name)}.')
@app.command('stop')
@app.command('s', hidden=True)
def stop(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Stop a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_STOP'
)
console.out.print(f'Stopped media input {console.highlight(ctx, input_name)}.')
@app.command('restart')
@app.command('r', hidden=True)
def restart(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Restart a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_RESTART'
)
console.out.print(f'Restarted media input {console.highlight(ctx, input_name)}.')

View File

@@ -6,10 +6,9 @@ import typer
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
from . import console, util, validate from obsws_cli import console, util, validate
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup) app = typer.Typer()
@app.callback() @app.callback()
@@ -17,11 +16,16 @@ def main():
"""Control profiles in OBS.""" """Control profiles in OBS."""
@app.command('list | ls') @app.command('list')
@app.command('ls', hidden=True)
def list_(ctx: typer.Context): def list_(ctx: typer.Context):
"""List profiles.""" """List profiles."""
resp = ctx.obj['obsws'].get_profile_list() resp = ctx.obj['obsws'].get_profile_list()
if not resp.profiles:
console.out.print('No profiles found.')
raise typer.Exit()
table = Table( table = Table(
title='Profiles', padding=(0, 2), border_style=ctx.obj['style'].border title='Profiles', padding=(0, 2), border_style=ctx.obj['style'].border
) )
@@ -29,19 +33,22 @@ def list_(ctx: typer.Context):
(Text('Profile Name', justify='center'), 'left', ctx.obj['style'].column), (Text('Profile Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Current', justify='center'), 'center', None), (Text('Current', justify='center'), 'center', None),
] ]
for column, justify, style in columns: for heading, justify, style in columns:
table.add_column(column, justify=justify, style=style) table.add_column(heading, justify=justify, style=style)
for profile in resp.profiles: for profile in resp.profiles:
table.add_row( table.add_row(
profile, profile,
util.check_mark(profile == resp.current_profile_name, empty_if_false=True), util.check_mark(
ctx, profile == resp.current_profile_name, empty_if_false=True
),
) )
console.out.print(table) console.out.print(table)
@app.command('current | get') @app.command('current')
@app.command('get', hidden=True)
def current(ctx: typer.Context): def current(ctx: typer.Context):
"""Get the current profile.""" """Get the current profile."""
resp = ctx.obj['obsws'].get_profile_list() resp = ctx.obj['obsws'].get_profile_list()
@@ -50,21 +57,21 @@ def current(ctx: typer.Context):
) )
@app.command('switch | set') @app.command('switch')
@app.command('set', hidden=True)
def switch( def switch(
ctx: typer.Context, ctx: typer.Context,
profile_name: Annotated[ profile_name: Annotated[
str, str,
typer.Argument( typer.Argument(
..., show_default=False, help='Name of the profile to switch to' ...,
show_default=False,
help='Name of the profile to switch to',
callback=validate.profile_exists,
), ),
], ],
): ):
"""Switch to a profile.""" """Switch to a profile."""
if not validate.profile_exists(ctx, profile_name):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_profile_list() resp = ctx.obj['obsws'].get_profile_list()
if resp.current_profile_name == profile_name: if resp.current_profile_name == profile_name:
console.err.print( console.err.print(
@@ -76,35 +83,39 @@ def switch(
console.out.print(f'Switched to profile {console.highlight(ctx, profile_name)}.') console.out.print(f'Switched to profile {console.highlight(ctx, profile_name)}.')
@app.command('create | new') @app.command('create')
@app.command('new', hidden=True)
def create( def create(
ctx: typer.Context, ctx: typer.Context,
profile_name: Annotated[ profile_name: Annotated[
str, str,
typer.Argument(..., show_default=False, help='Name of the profile to create.'), typer.Argument(
...,
show_default=False,
help='Name of the profile to create.',
callback=validate.profile_not_exists,
),
], ],
): ):
"""Create a new profile.""" """Create a new profile."""
if validate.profile_exists(ctx, profile_name):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] already exists.')
raise typer.Exit(1)
ctx.obj['obsws'].create_profile(profile_name) ctx.obj['obsws'].create_profile(profile_name)
console.out.print(f'Created profile {console.highlight(ctx, profile_name)}.') console.out.print(f'Created profile {console.highlight(ctx, profile_name)}.')
@app.command('remove | rm') @app.command('remove')
@app.command('rm', hidden=True)
def remove( def remove(
ctx: typer.Context, ctx: typer.Context,
profile_name: Annotated[ profile_name: Annotated[
str, str,
typer.Argument(..., show_default=False, help='Name of the profile to remove.'), typer.Argument(
...,
show_default=False,
help='Name of the profile to remove.',
callback=validate.profile_exists,
),
], ],
): ):
"""Remove a profile.""" """Remove a profile."""
if not validate.profile_exists(ctx, profile_name):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise typer.Exit(1)
ctx.obj['obsws'].remove_profile(profile_name) ctx.obj['obsws'].remove_profile(profile_name)
console.out.print(f'Removed profile {console.highlight(ctx, profile_name)}.') console.out.print(f'Removed profile {console.highlight(ctx, profile_name)}.')

View File

@@ -6,10 +6,9 @@ import typer
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
from . import console from obsws_cli import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup) app = typer.Typer()
@app.callback() @app.callback()
@@ -17,20 +16,20 @@ def main():
"""Control projectors in OBS.""" """Control projectors in OBS."""
@app.command('list-monitors | ls-m') @app.command('list-monitors')
@app.command('ls-m', hidden=True)
def list_monitors(ctx: typer.Context): def list_monitors(ctx: typer.Context):
"""List available monitors.""" """List available monitors."""
resp = ctx.obj['obsws'].get_monitor_list() resp = ctx.obj['obsws'].get_monitor_list()
if not resp.monitors:
console.out.print('No monitors found.')
return
monitors = sorted( monitors = sorted(
((m['monitorIndex'], m['monitorName']) for m in resp.monitors), ((m['monitorIndex'], m['monitorName']) for m in resp.monitors),
key=lambda m: m[0], key=lambda m: m[0],
) )
if not monitors:
console.out.print('No monitors found.')
raise typer.Exit()
table = Table( table = Table(
title='Available Monitors', title='Available Monitors',
padding=(0, 2), padding=(0, 2),
@@ -49,7 +48,8 @@ def list_monitors(ctx: typer.Context):
console.out.print(table) console.out.print(table)
@app.command('open | o') @app.command('open')
@app.command('o', hidden=True)
def open( def open(
ctx: typer.Context, ctx: typer.Context,
monitor_index: Annotated[ monitor_index: Annotated[

View File

@@ -5,10 +5,9 @@ from typing import Annotated, Optional
import typer import typer
from . import console from obsws_cli import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup) app = typer.Typer()
@app.callback() @app.callback()
@@ -22,7 +21,8 @@ def _get_recording_status(ctx: typer.Context) -> tuple:
return resp.output_active, resp.output_paused return resp.output_active, resp.output_paused
@app.command('start | s') @app.command('start')
@app.command('s', hidden=True)
def start(ctx: typer.Context): def start(ctx: typer.Context):
"""Start recording.""" """Start recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
@@ -38,7 +38,8 @@ def start(ctx: typer.Context):
console.out.print('Recording started successfully.') console.out.print('Recording started successfully.')
@app.command('stop | st') @app.command('stop')
@app.command('st', hidden=True)
def stop(ctx: typer.Context): def stop(ctx: typer.Context):
"""Stop recording.""" """Stop recording."""
active, _ = _get_recording_status(ctx) active, _ = _get_recording_status(ctx)
@@ -52,7 +53,8 @@ def stop(ctx: typer.Context):
) )
@app.command('toggle | tg') @app.command('toggle')
@app.command('tg', hidden=True)
def toggle(ctx: typer.Context): def toggle(ctx: typer.Context):
"""Toggle recording.""" """Toggle recording."""
resp = ctx.obj['obsws'].toggle_record() resp = ctx.obj['obsws'].toggle_record()
@@ -62,7 +64,8 @@ def toggle(ctx: typer.Context):
console.out.print('Recording stopped successfully.') console.out.print('Recording stopped successfully.')
@app.command('status | ss') @app.command('status')
@app.command('ss', hidden=True)
def status(ctx: typer.Context): def status(ctx: typer.Context):
"""Get recording status.""" """Get recording status."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
@@ -75,7 +78,8 @@ def status(ctx: typer.Context):
console.out.print('Recording is not in progress.') console.out.print('Recording is not in progress.')
@app.command('resume | r') @app.command('resume')
@app.command('r', hidden=True)
def resume(ctx: typer.Context): def resume(ctx: typer.Context):
"""Resume recording.""" """Resume recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
@@ -90,7 +94,8 @@ def resume(ctx: typer.Context):
console.out.print('Recording resumed successfully.') console.out.print('Recording resumed successfully.')
@app.command('pause | p') @app.command('pause')
@app.command('p', hidden=True)
def pause(ctx: typer.Context): def pause(ctx: typer.Context):
"""Pause recording.""" """Pause recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
@@ -105,7 +110,8 @@ def pause(ctx: typer.Context):
console.out.print('Recording paused successfully.') console.out.print('Recording paused successfully.')
@app.command('directory | d') @app.command('directory')
@app.command('d', hidden=True)
def directory( def directory(
ctx: typer.Context, ctx: typer.Context,
record_directory: Annotated[ record_directory: Annotated[
@@ -130,3 +136,45 @@ def directory(
console.out.print( console.out.print(
f'Recording directory: {console.highlight(ctx, resp.record_directory)}' f'Recording directory: {console.highlight(ctx, resp.record_directory)}'
) )
@app.command('split')
@app.command('sp', hidden=True)
def split(ctx: typer.Context):
"""Split the current recording."""
active, paused = _get_recording_status(ctx)
if not active:
console.err.print('Recording is not in progress, cannot split.')
raise typer.Exit(1)
if paused:
console.err.print('Recording is paused, cannot split.')
raise typer.Exit(1)
ctx.obj['obsws'].split_record_file()
console.out.print('Recording split successfully.')
@app.command('chapter')
@app.command('ch', hidden=True)
def chapter(
ctx: typer.Context,
chapter_name: Annotated[
Optional[str],
typer.Argument(
help='Name of the chapter to create.',
),
] = None,
):
"""Create a chapter in the current recording."""
active, paused = _get_recording_status(ctx)
if not active:
console.err.print('Recording is not in progress, cannot create chapter.')
raise typer.Exit(1)
if paused:
console.err.print('Recording is paused, cannot create chapter.')
raise typer.Exit(1)
ctx.obj['obsws'].create_record_chapter(chapter_name)
console.out.print(
f'Chapter {console.highlight(ctx, chapter_name or "unnamed")} created successfully.'
)

View File

@@ -2,10 +2,9 @@
import typer import typer
from . import console from obsws_cli import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup) app = typer.Typer()
@app.callback() @app.callback()
@@ -13,7 +12,8 @@ def main():
"""Control profiles in OBS.""" """Control profiles in OBS."""
@app.command('start | s') @app.command('start')
@app.command('s', hidden=True)
def start(ctx: typer.Context): def start(ctx: typer.Context):
"""Start the replay buffer.""" """Start the replay buffer."""
resp = ctx.obj['obsws'].get_replay_buffer_status() resp = ctx.obj['obsws'].get_replay_buffer_status()
@@ -25,7 +25,8 @@ def start(ctx: typer.Context):
console.out.print('Replay buffer started.') console.out.print('Replay buffer started.')
@app.command('stop | st') @app.command('stop')
@app.command('st', hidden=True)
def stop(ctx: typer.Context): def stop(ctx: typer.Context):
"""Stop the replay buffer.""" """Stop the replay buffer."""
resp = ctx.obj['obsws'].get_replay_buffer_status() resp = ctx.obj['obsws'].get_replay_buffer_status()
@@ -37,7 +38,8 @@ def stop(ctx: typer.Context):
console.out.print('Replay buffer stopped.') console.out.print('Replay buffer stopped.')
@app.command('toggle | tg') @app.command('toggle')
@app.command('tg', hidden=True)
def toggle(ctx: typer.Context): def toggle(ctx: typer.Context):
"""Toggle the replay buffer.""" """Toggle the replay buffer."""
resp = ctx.obj['obsws'].toggle_replay_buffer() resp = ctx.obj['obsws'].toggle_replay_buffer()
@@ -47,7 +49,8 @@ def toggle(ctx: typer.Context):
console.out.print('Replay buffer is not active.') console.out.print('Replay buffer is not active.')
@app.command('status | ss') @app.command('status')
@app.command('ss', hidden=True)
def status(ctx: typer.Context): def status(ctx: typer.Context):
"""Get the status of the replay buffer.""" """Get the status of the replay buffer."""
resp = ctx.obj['obsws'].get_replay_buffer_status() resp = ctx.obj['obsws'].get_replay_buffer_status()
@@ -57,7 +60,8 @@ def status(ctx: typer.Context):
console.out.print('Replay buffer is not active.') console.out.print('Replay buffer is not active.')
@app.command('save | sv') @app.command('save')
@app.command('sv', hidden=True)
def save(ctx: typer.Context): def save(ctx: typer.Context):
"""Save the replay buffer.""" """Save the replay buffer."""
ctx.obj['obsws'].save_replay_buffer() ctx.obj['obsws'].save_replay_buffer()

View File

@@ -6,10 +6,9 @@ import typer
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
from . import console, util, validate from obsws_cli import console, util, validate
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup) app = typer.Typer()
@app.callback() @app.callback()
@@ -17,7 +16,8 @@ def main():
"""Control OBS scenes.""" """Control OBS scenes."""
@app.command('list | ls') @app.command('list')
@app.command('ls', hidden=True)
def list_( def list_(
ctx: typer.Context, ctx: typer.Context,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of scenes')] = False, uuid: Annotated[bool, typer.Option(help='Show UUIDs of scenes')] = False,
@@ -29,6 +29,10 @@ def list_(
for scene in reversed(resp.scenes) for scene in reversed(resp.scenes)
) )
if not scenes:
console.out.print('No scenes found.')
raise typer.Exit()
active_scene = ctx.obj['obsws'].get_current_program_scene().scene_name active_scene = ctx.obj['obsws'].get_current_program_scene().scene_name
table = Table(title='Scenes', padding=(0, 2), border_style=ctx.obj['style'].border) table = Table(title='Scenes', padding=(0, 2), border_style=ctx.obj['style'].border)
@@ -43,8 +47,8 @@ def list_(
(Text('Scene Name', justify='center'), 'left', ctx.obj['style'].column), (Text('Scene Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Active', justify='center'), 'center', None), (Text('Active', justify='center'), 'center', None),
] ]
for column, justify, style in columns: for heading, justify, style in columns:
table.add_column(column, justify=justify, style=style) table.add_column(heading, justify=justify, style=style)
for scene_name, scene_uuid in scenes: for scene_name, scene_uuid in scenes:
if uuid: if uuid:
@@ -62,18 +66,19 @@ def list_(
console.out.print(table) console.out.print(table)
@app.command('current | get') @app.command('current')
@app.command('get', hidden=True)
def current( def current(
ctx: typer.Context, ctx: typer.Context,
preview: Annotated[ preview: Annotated[
bool, typer.Option(help='Get the preview scene instead of the program scene') bool,
typer.Option(
help='Get the preview scene instead of the program scene',
callback=validate.studio_mode_enabled,
),
] = False, ] = False,
): ):
"""Get the current program scene or preview scene.""" """Get the current program scene or preview scene."""
if preview and not validate.studio_mode_enabled(ctx):
console.err.print('Studio mode is not enabled, cannot get preview scene.')
raise typer.Exit(1)
if preview: if preview:
resp = ctx.obj['obsws'].get_current_preview_scene() resp = ctx.obj['obsws'].get_current_preview_scene()
console.out.print( console.out.print(
@@ -86,29 +91,34 @@ def current(
) )
@app.command('switch | set') @app.command('switch')
@app.command('set', hidden=True)
def switch( def switch(
ctx: typer.Context, ctx: typer.Context,
scene_name: Annotated[ scene_name: Annotated[
str, typer.Argument(..., help='Name of the scene to switch to') str,
typer.Argument(
...,
help='Name of the scene to switch to',
callback=validate.scene_in_scenes,
),
], ],
preview: Annotated[ preview: Annotated[
bool, bool,
typer.Option(help='Switch to the preview scene instead of the program scene'), typer.Option(
help='Switch to the preview scene instead of the program scene',
callback=validate.studio_mode_enabled,
),
] = False, ] = False,
): ):
"""Switch to a scene.""" """Switch to a scene."""
if preview and not validate.studio_mode_enabled(ctx):
console.err.print('Studio mode is not enabled, cannot set the preview scene.')
raise typer.Exit(1)
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1)
if preview: if preview:
ctx.obj['obsws'].set_current_preview_scene(scene_name) ctx.obj['obsws'].set_current_preview_scene(scene_name)
console.out.print(f'Switched to preview scene: [green]{scene_name}[/green]') console.out.print(
f'Switched to preview scene: {console.highlight(ctx, scene_name)}'
)
else: else:
ctx.obj['obsws'].set_current_program_scene(scene_name) ctx.obj['obsws'].set_current_program_scene(scene_name)
console.out.print(f'Switched to program scene: [green]{scene_name}[/green]') console.out.print(
f'Switched to program scene: {console.highlight(ctx, scene_name)}'
)

View File

@@ -5,10 +5,9 @@ from typing import Annotated
import typer import typer
from rich.table import Table from rich.table import Table
from . import console, validate from obsws_cli import console, validate
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup) app = typer.Typer()
@app.callback() @app.callback()
@@ -16,11 +15,16 @@ def main():
"""Control scene collections in OBS.""" """Control scene collections in OBS."""
@app.command('list | ls') @app.command('list')
@app.command('ls', hidden=True)
def list_(ctx: typer.Context): def list_(ctx: typer.Context):
"""List all scene collections.""" """List all scene collections."""
resp = ctx.obj['obsws'].get_scene_collection_list() resp = ctx.obj['obsws'].get_scene_collection_list()
if not resp.scene_collections:
console.out.print('No scene collections found.')
raise typer.Exit()
table = Table( table = Table(
title='Scene Collections', title='Scene Collections',
padding=(0, 2), padding=(0, 2),
@@ -36,7 +40,8 @@ def list_(ctx: typer.Context):
console.out.print(table) console.out.print(table)
@app.command('current | get') @app.command('current')
@app.command('get', hidden=True)
def current(ctx: typer.Context): def current(ctx: typer.Context):
"""Get the current scene collection.""" """Get the current scene collection."""
resp = ctx.obj['obsws'].get_scene_collection_list() resp = ctx.obj['obsws'].get_scene_collection_list()
@@ -45,20 +50,20 @@ def current(ctx: typer.Context):
) )
@app.command('switch | set') @app.command('switch')
@app.command('set', hidden=True)
def switch( def switch(
ctx: typer.Context, ctx: typer.Context,
scene_collection_name: Annotated[ scene_collection_name: Annotated[
str, typer.Argument(..., help='Name of the scene collection to switch to') str,
typer.Argument(
...,
help='Name of the scene collection to switch to',
callback=validate.scene_collection_in_scene_collections,
),
], ],
): ):
"""Switch to a scene collection.""" """Switch to a scene collection."""
if not validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] not found.'
)
raise typer.Exit(1)
current_scene_collection = ( current_scene_collection = (
ctx.obj['obsws'].get_scene_collection_list().current_scene_collection_name ctx.obj['obsws'].get_scene_collection_list().current_scene_collection_name
) )
@@ -74,20 +79,20 @@ def switch(
) )
@app.command('create | new') @app.command('create')
@app.command('new', hidden=True)
def create( def create(
ctx: typer.Context, ctx: typer.Context,
scene_collection_name: Annotated[ scene_collection_name: Annotated[
str, typer.Argument(..., help='Name of the scene collection to create') str,
typer.Argument(
...,
help='Name of the scene collection to create',
callback=validate.scene_collection_not_in_scene_collections,
),
], ],
): ):
"""Create a new scene collection.""" """Create a new scene collection."""
if validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] already exists.'
)
raise typer.Exit(1)
ctx.obj['obsws'].create_scene_collection(scene_collection_name) ctx.obj['obsws'].create_scene_collection(scene_collection_name)
console.out.print( console.out.print(
f'Created scene collection {console.highlight(ctx, scene_collection_name)}.' f'Created scene collection {console.highlight(ctx, scene_collection_name)}.'

View File

@@ -5,10 +5,9 @@ from typing import Annotated, Optional
import typer import typer
from rich.table import Table from rich.table import Table
from . import console, util, validate from obsws_cli import console, util, validate
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup) app = typer.Typer()
@app.callback() @app.callback()
@@ -16,7 +15,8 @@ def main():
"""Control items in OBS scenes.""" """Control items in OBS scenes."""
@app.command('list | ls') @app.command('list')
@app.command('ls', hidden=True)
def list_( def list_(
ctx: typer.Context, ctx: typer.Context,
scene_name: Annotated[ scene_name: Annotated[
@@ -24,18 +24,15 @@ def list_(
typer.Argument( typer.Argument(
show_default='The current scene', show_default='The current scene',
help='Scene name to list items for', help='Scene name to list items for',
callback=validate.scene_in_scenes,
), ),
] = None, ] = None,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of scene items')] = False, uuid: Annotated[bool, typer.Option(help='Show UUIDs of scene items')] = False,
): ):
"""List all items in a scene.""" """List all items in a scene."""
if not scene_name: if scene_name is None:
scene_name = ctx.obj['obsws'].get_current_program_scene().scene_name scene_name = ctx.obj['obsws'].get_current_program_scene().scene_name
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
items = sorted( items = sorted(
( (
@@ -78,8 +75,8 @@ def list_(
('Enabled', 'center', None), ('Enabled', 'center', None),
] ]
# Add columns to the table # Add columns to the table
for column, justify, style in columns: for heading, justify, style in columns:
table.add_column(column, justify=justify, style=style) table.add_column(heading, justify=justify, style=style)
for item_id, item_name, is_group, is_enabled, source_uuid in items: for item_id, item_name, is_group, is_enabled, source_uuid in items:
if is_group: if is_group:
@@ -189,7 +186,8 @@ def _get_scene_name_and_item_id(
return scene_name, scene_item_id return scene_name, scene_item_id
@app.command('show | sh') @app.command('show')
@app.command('sh', hidden=True)
def show( def show(
ctx: typer.Context, ctx: typer.Context,
scene_name: Annotated[ scene_name: Annotated[
@@ -231,7 +229,8 @@ def show(
) )
@app.command('hide | h') @app.command('hide')
@app.command('h', hidden=True)
def hide( def hide(
ctx: typer.Context, ctx: typer.Context,
scene_name: Annotated[ scene_name: Annotated[
@@ -272,7 +271,8 @@ def hide(
) )
@app.command('toggle | tg') @app.command('toggle')
@app.command('tg', hidden=True)
def toggle( def toggle(
ctx: typer.Context, ctx: typer.Context,
scene_name: Annotated[ scene_name: Annotated[
@@ -333,7 +333,8 @@ def toggle(
) )
@app.command('visible | v') @app.command('visible')
@app.command('v', hidden=True)
def visible( def visible(
ctx: typer.Context, ctx: typer.Context,
scene_name: Annotated[ scene_name: Annotated[
@@ -377,7 +378,8 @@ def visible(
) )
@app.command('transform | t') @app.command('transform')
@app.command('t', hidden=True)
def transform( def transform(
ctx: typer.Context, ctx: typer.Context,
scene_name: Annotated[ scene_name: Annotated[

View File

@@ -6,10 +6,9 @@ from typing import Annotated
import obsws_python as obsws import obsws_python as obsws
import typer import typer
from . import console from obsws_cli import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup) app = typer.Typer()
@app.callback() @app.callback()
@@ -17,7 +16,8 @@ def main():
"""Take screenshots using OBS.""" """Take screenshots using OBS."""
@app.command('save | sv') @app.command('save')
@app.command('s', hidden=True)
def save( def save(
ctx: typer.Context, ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[

View File

@@ -0,0 +1,340 @@
"""module for settings management."""
from typing import Annotated, Optional
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console, util
app = typer.Typer()
@app.callback()
def main():
"""Manage OBS settings."""
@app.command('show')
@app.command('sh', hidden=True)
def show(
ctx: typer.Context,
video: Annotated[
bool, typer.Option('--video', '-v', help='Show video settings.')
] = False,
record: Annotated[
bool, typer.Option('--record', '-r', help='Show recording settings.')
] = False,
profile: Annotated[
bool, typer.Option('--profile', '-p', help='Show profile settings.')
] = False,
):
"""Show current OBS settings."""
if not any([video, record, profile]):
video = True
record = True
profile = True
resp = ctx.obj['obsws'].get_video_settings()
video_table = Table(
title='Video Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
video_columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in video_columns:
video_table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
for setting in resp.attrs():
video_table.add_row(
util.snakecase_to_titlecase(setting),
str(getattr(resp, setting)),
style='' if video_table.row_count % 2 == 0 else 'dim',
)
if video:
console.out.print(video_table)
resp = ctx.obj['obsws'].get_record_directory()
record_table = Table(
title='Recording Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
record_columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in record_columns:
record_table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
record_table.add_row(
'Directory',
resp.record_directory,
style='' if record_table.row_count % 2 == 0 else 'dim',
)
if record:
console.out.print(record_table)
profile_table = Table(
title='Profile Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
profile_columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in profile_columns:
profile_table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
params = [
('Output', 'Mode', 'Output Mode'),
('SimpleOutput', 'StreamEncoder', 'Simple Streaming Encoder'),
('SimpleOutput', 'RecEncoder', 'Simple Recording Encoder'),
('SimpleOutput', 'RecFormat2', 'Simple Recording Video Format'),
('SimpleOutput', 'RecAudioEncoder', 'Simple Recording Audio Format'),
('SimpleOutput', 'RecQuality', 'Simple Recording Quality'),
('AdvOut', 'Encoder', 'Advanced Streaming Encoder'),
('AdvOut', 'RecEncoder', 'Advanced Recording Encoder'),
('AdvOut', 'RecType', 'Advanced Recording Type'),
('AdvOut', 'RecFormat2', 'Advanced Recording Video Format'),
('AdvOut', 'RecAudioEncoder', 'Advanced Recording Audio Format'),
]
for category, name, display_name in params:
resp = ctx.obj['obsws'].get_profile_parameter(
category=category,
name=name,
)
if resp.parameter_value is not None:
profile_table.add_row(
display_name,
str(resp.parameter_value),
style='' if profile_table.row_count % 2 == 0 else 'dim',
)
if profile:
console.out.print(profile_table)
@app.command('profile')
@app.command('pr', hidden=True)
def profile(
ctx: typer.Context,
category: Annotated[
str,
typer.Argument(
...,
help='Profile parameter category (e.g., SimpleOutput, AdvOut).',
),
],
name: Annotated[
str,
typer.Argument(
...,
help='Profile parameter name (e.g., StreamEncoder, RecFormat2).',
),
],
value: Annotated[
Optional[str],
typer.Argument(
...,
help='Value to set for the profile parameter. If omitted, the current value is retrieved.',
),
] = None,
):
"""Get/set OBS profile settings."""
if value is None:
resp = ctx.obj['obsws'].get_profile_parameter(
category=category,
name=name,
)
console.out.print(
f'Parameter Value for [bold]{name}[/bold]: '
f'[green]{resp.parameter_value}[/green]'
)
else:
ctx.obj['obsws'].set_profile_parameter(
category=category,
name=name,
value=value,
)
console.out.print(
f'Set Parameter [bold]{name}[/bold] to [green]{value}[/green]'
)
@app.command('stream-service')
@app.command('ss', hidden=True)
def stream_service(
ctx: typer.Context,
type_: Annotated[
Optional[str],
typer.Argument(
...,
help='Stream service type (e.g., Twitch, YouTube). If omitted, current settings are retrieved.',
),
] = None,
key: Annotated[
Optional[str],
typer.Option('--key', '-k', help='Stream key to set. Optional.'),
] = None,
server: Annotated[
Optional[str],
typer.Option('--server', '-s', help='Stream server to set. Optional.'),
] = None,
):
"""Get/set OBS stream service settings."""
if type_ is None:
resp = ctx.obj['obsws'].get_stream_service_settings()
table = Table(
title='Stream Service Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in columns:
table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
table.add_row(
'Type',
resp.stream_service_type,
style='' if table.row_count % 2 == 0 else 'dim',
)
table.add_row(
'Server',
resp.stream_service_settings.get('server', ''),
style='' if table.row_count % 2 == 0 else 'dim',
)
table.add_row(
'Key',
resp.stream_service_settings.get('key', ''),
style='' if table.row_count % 2 == 0 else 'dim',
)
console.out.print(table)
else:
current_settings = ctx.obj['obsws'].get_stream_service_settings()
if key is None:
key = current_settings.stream_service_settings.get('key', '')
if server is None:
server = current_settings.stream_service_settings.get('server', '')
ctx.obj['obsws'].set_stream_service_settings(
ss_type=type_,
ss_settings={'key': key, 'server': server},
)
console.out.print('Stream service settings updated.')
@app.command('video')
@app.command('vi', hidden=True)
def video(
ctx: typer.Context,
base_width: Annotated[
Optional[int],
typer.Option('--base-width', '-bw', help='Set base (canvas) width.'),
] = None,
base_height: Annotated[
Optional[int],
typer.Option('--base-height', '-bh', help='Set base (canvas) height.'),
] = None,
output_width: Annotated[
Optional[int],
typer.Option('--output-width', '-ow', help='Set output (scaled) width.'),
] = None,
output_height: Annotated[
Optional[int],
typer.Option('--output-height', '-oh', help='Set output (scaled) height.'),
] = None,
fps_num: Annotated[
Optional[int],
typer.Option('--fps-num', '-fn', help='Set FPS numerator.'),
] = None,
fps_den: Annotated[
Optional[int],
typer.Option('--fps-den', '-fd', help='Set FPS denominator.'),
] = None,
):
"""Get/set OBS video settings."""
if not any(
[
base_width,
base_height,
output_width,
output_height,
fps_num,
fps_den,
]
):
resp = ctx.obj['obsws'].get_video_settings()
table = Table(
title='Video Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in columns:
table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
for setting in resp.attrs():
table.add_row(
util.snakecase_to_titlecase(setting),
str(getattr(resp, setting)),
style='' if table.row_count % 2 == 0 else 'dim',
)
console.out.print(table)
else:
current_settings = ctx.obj['obsws'].get_video_settings()
if base_width is None:
base_width = current_settings.base_width
if base_height is None:
base_height = current_settings.base_height
if output_width is None:
output_width = current_settings.output_width
if output_height is None:
output_height = current_settings.output_height
if fps_num is None:
fps_num = current_settings.fps_num
if fps_den is None:
fps_den = current_settings.fps_den
ctx.obj['obsws'].set_video_settings(
base_width=base_width,
base_height=base_height,
out_width=output_width,
out_height=output_height,
numerator=fps_num,
denominator=fps_den,
)
console.out.print('Video settings updated.')

View File

@@ -2,10 +2,9 @@
import typer import typer
from . import console from obsws_cli import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup) app = typer.Typer()
@app.callback() @app.callback()
@@ -19,7 +18,8 @@ def _get_streaming_status(ctx: typer.Context) -> tuple:
return resp.output_active, resp.output_duration return resp.output_active, resp.output_duration
@app.command('start | s') @app.command('start')
@app.command('s', hidden=True)
def start(ctx: typer.Context): def start(ctx: typer.Context):
"""Start streaming.""" """Start streaming."""
active, _ = _get_streaming_status(ctx) active, _ = _get_streaming_status(ctx)
@@ -31,7 +31,8 @@ def start(ctx: typer.Context):
console.out.print('Streaming started successfully.') console.out.print('Streaming started successfully.')
@app.command('stop | st') @app.command('stop')
@app.command('st', hidden=True)
def stop(ctx: typer.Context): def stop(ctx: typer.Context):
"""Stop streaming.""" """Stop streaming."""
active, _ = _get_streaming_status(ctx) active, _ = _get_streaming_status(ctx)
@@ -43,7 +44,8 @@ def stop(ctx: typer.Context):
console.out.print('Streaming stopped successfully.') console.out.print('Streaming stopped successfully.')
@app.command('toggle | tg') @app.command('toggle')
@app.command('tg', hidden=True)
def toggle(ctx: typer.Context): def toggle(ctx: typer.Context):
"""Toggle streaming.""" """Toggle streaming."""
resp = ctx.obj['obsws'].toggle_stream() resp = ctx.obj['obsws'].toggle_stream()
@@ -53,7 +55,8 @@ def toggle(ctx: typer.Context):
console.out.print('Streaming stopped successfully.') console.out.print('Streaming stopped successfully.')
@app.command('status | ss') @app.command('status')
@app.command('ss', hidden=True)
def status(ctx: typer.Context): def status(ctx: typer.Context):
"""Get streaming status.""" """Get streaming status."""
active, duration = _get_streaming_status(ctx) active, duration = _get_streaming_status(ctx)

View File

@@ -2,10 +2,9 @@
import typer import typer
from . import console from obsws_cli import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup) app = typer.Typer()
@app.callback() @app.callback()
@@ -13,21 +12,24 @@ def main():
"""Control studio mode in OBS.""" """Control studio mode in OBS."""
@app.command('enable | on') @app.command('enable')
@app.command('on', hidden=True)
def enable(ctx: typer.Context): def enable(ctx: typer.Context):
"""Enable studio mode.""" """Enable studio mode."""
ctx.obj['obsws'].set_studio_mode_enabled(True) ctx.obj['obsws'].set_studio_mode_enabled(True)
console.out.print('Studio mode has been enabled.') console.out.print('Studio mode has been enabled.')
@app.command('disable | off') @app.command('disable')
@app.command('off', hidden=True)
def disable(ctx: typer.Context): def disable(ctx: typer.Context):
"""Disable studio mode.""" """Disable studio mode."""
ctx.obj['obsws'].set_studio_mode_enabled(False) ctx.obj['obsws'].set_studio_mode_enabled(False)
console.out.print('Studio mode has been disabled.') console.out.print('Studio mode has been disabled.')
@app.command('toggle | tg') @app.command('toggle')
@app.command('tg', hidden=True)
def toggle(ctx: typer.Context): def toggle(ctx: typer.Context):
"""Toggle studio mode.""" """Toggle studio mode."""
resp = ctx.obj['obsws'].get_studio_mode_enabled() resp = ctx.obj['obsws'].get_studio_mode_enabled()
@@ -39,7 +41,8 @@ def toggle(ctx: typer.Context):
console.out.print('Studio mode is now enabled.') console.out.print('Studio mode is now enabled.')
@app.command('status | ss') @app.command('status')
@app.command('ss', hidden=True)
def status(ctx: typer.Context): def status(ctx: typer.Context):
"""Get the status of studio mode.""" """Get the status of studio mode."""
resp = ctx.obj['obsws'].get_studio_mode_enabled() resp = ctx.obj['obsws'].get_studio_mode_enabled()

View File

@@ -0,0 +1,79 @@
"""module containing commands for manipulating text inputs."""
from typing import Annotated, Optional
import typer
from obsws_cli import console, validate
app = typer.Typer()
@app.callback()
def main():
"""Control text inputs in OBS."""
@app.command('current')
@app.command('get', hidden=True)
def current(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
help='Name of the text input to get.', callback=validate.input_in_inputs
),
],
):
"""Get the current text for a text input."""
resp = ctx.obj['obsws'].get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
console.err.print(
f'Input [yellow]{input_name}[/yellow] is not a text input.',
)
raise typer.Exit(1)
current_text = resp.input_settings.get('text', '')
if not current_text:
current_text = '(empty)'
console.out.print(
f'Current text for input {console.highlight(ctx, input_name)}: {current_text}',
)
@app.command('update')
@app.command('set', hidden=True)
def update(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
help='Name of the text input to update.', callback=validate.input_in_inputs
),
],
new_text: Annotated[
Optional[str],
typer.Argument(
help='The new text to set for the input.',
),
] = None,
):
"""Update the text of a text input."""
resp = ctx.obj['obsws'].get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
console.err.print(
f'Input [yellow]{input_name}[/yellow] is not a text input.',
)
raise typer.Exit(1)
ctx.obj['obsws'].set_input_settings(
name=input_name,
settings={'text': new_text},
overlay=True,
)
if not new_text:
new_text = '(empty)'
console.out.print(
f'Text for input {console.highlight(ctx, input_name)} updated to: {new_text}',
)

View File

@@ -2,10 +2,9 @@
import typer import typer
from . import console from obsws_cli import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup) app = typer.Typer()
@app.callback() @app.callback()
@@ -13,21 +12,24 @@ def main():
"""Control virtual camera in OBS.""" """Control virtual camera in OBS."""
@app.command('start | s') @app.command('start')
@app.command('s', hidden=True)
def start(ctx: typer.Context): def start(ctx: typer.Context):
"""Start the virtual camera.""" """Start the virtual camera."""
ctx.obj['obsws'].start_virtual_cam() ctx.obj['obsws'].start_virtual_cam()
console.out.print('Virtual camera started.') console.out.print('Virtual camera started.')
@app.command('stop | p') @app.command('stop')
@app.command('p', hidden=True)
def stop(ctx: typer.Context): def stop(ctx: typer.Context):
"""Stop the virtual camera.""" """Stop the virtual camera."""
ctx.obj['obsws'].stop_virtual_cam() ctx.obj['obsws'].stop_virtual_cam()
console.out.print('Virtual camera stopped.') console.out.print('Virtual camera stopped.')
@app.command('toggle | tg') @app.command('toggle')
@app.command('tg', hidden=True)
def toggle(ctx: typer.Context): def toggle(ctx: typer.Context):
"""Toggle the virtual camera.""" """Toggle the virtual camera."""
resp = ctx.obj['obsws'].toggle_virtual_cam() resp = ctx.obj['obsws'].toggle_virtual_cam()
@@ -37,7 +39,8 @@ def toggle(ctx: typer.Context):
console.out.print('Virtual camera is disabled.') console.out.print('Virtual camera is disabled.')
@app.command('status | ss') @app.command('status')
@app.command('ss', hidden=True)
def status(ctx: typer.Context): def status(ctx: typer.Context):
"""Get the status of the virtual camera.""" """Get the status of the virtual camera."""
resp = ctx.obj['obsws'].get_virtual_cam_status() resp = ctx.obj['obsws'].get_virtual_cam_status()

146
obsws_cli/envconfig.py Normal file
View File

@@ -0,0 +1,146 @@
"""module for settings management for obsws-cli."""
from collections import UserDict
from pathlib import Path
from typing import Any, Union
from dotenv import dotenv_values
ConfigValue = Union[str, int, bool]
class EnvConfig(UserDict):
"""A class to manage .env config for obsws-cli.
This class extends UserDict to provide a dictionary-like interface for config.
It loads config from .env files in the following priority:
1. Local .env file
2. User config file (~/.config/obsws-cli/obsws.env)
3. Default values
Note, typer handles reading from environment variables automatically. They take precedence
over values set in this class.
The config values are expected to be in uppercase and should start with 'OBSWS_CLI_'.
Example:
-------
config = EnvConfig()
host = config['OBSWS_CLI_HOST']
config['OBSWS_CLI_PORT'] = 4455
# Or with defaults
timeout = config.get('OBSWS_CLI_TIMEOUT', 30)
# Keys will be normalised to uppercase with prefix
debug = config.get('debug', False) # Equivalent to 'OBSWS_CLI_DEBUG'
"""
PREFIX = 'OBSWS_CLI_'
def __init__(self, *args, **kwargs):
"""Initialize the Config object with hierarchical loading."""
kwargs.update(
{
**dotenv_values(Path.home() / '.config' / 'obsws-cli' / 'obsws.env'),
**dotenv_values('.env'),
}
)
super().__init__(*args, **self._convert_types(kwargs))
def _convert_types(self, config_data: dict[str, Any]) -> dict[str, ConfigValue]:
"""Convert string values to appropriate types."""
converted = {}
for key, value in config_data.items():
if isinstance(value, str):
if value.lower() in ('true', 'false'):
converted[key] = value.lower() == 'true'
elif value.isdigit():
converted[key] = int(value)
else:
converted[key] = value
else:
converted[key] = value
return converted
def __getitem__(self, key: str) -> ConfigValue:
"""Get a setting value by key."""
normalised_key = self._normalise_key(key)
try:
return self.data[normalised_key]
except KeyError as e:
raise KeyError(
f"Config key '{key}' not found. Available keys: {list(self.data.keys())}"
) from e
def __setitem__(self, key: str, value: ConfigValue):
"""Set a setting value by key."""
normalised_key = self._normalise_key(key)
self.data[normalised_key] = value
def _normalise_key(self, key: str) -> str:
"""Normalise a key to uppercase with OBS_ prefix."""
key = key.upper()
if not key.startswith(self.PREFIX):
key = f'{self.PREFIX}{key}'
return key
def get(self, key: str, default=None) -> ConfigValue:
"""Get a config value with optional default.
Args:
----
key (str): The key to retrieve
default: Default value if key is not found
Returns:
-------
The config value or default
"""
normalised_key = self._normalise_key(key)
if not self.has_key(normalised_key):
return default
return self[normalised_key]
def has_key(self, key: str) -> bool:
"""Check if a config key exists.
Args:
----
key (str): The key to check
Returns:
-------
bool: True if key exists, False otherwise
"""
normalised_key = self._normalise_key(key)
return normalised_key in self.data
_envconfig = EnvConfig(
OBSWS_CLI_HOST='localhost',
OBSWS_CLI_PORT=4455,
OBSWS_CLI_PASSWORD='',
OBSWS_CLI_TIMEOUT=5,
OBSWS_CLI_DEBUG=False,
OBSWS_CLI_STYLE='disabled',
OBSWS_CLI_STYLE_NO_BORDER=False,
)
def get(key: str) -> ConfigValue:
"""Get a setting value by key from the global config instance.
Args:
----
key (str): The key of the config to retrieve.
default: Default value if key is not found.
Returns:
-------
The value of the config or default value.
"""
return _envconfig.get(key)

View File

@@ -1,177 +0,0 @@
"""module containing commands for manipulating inputs."""
from typing import Annotated
import obsws_python as obsws
import typer
from rich.table import Table
from rich.text import Text
from . import console, util, validate
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control inputs in OBS."""
@app.command('list | ls')
def list_(
ctx: typer.Context,
input: Annotated[bool, typer.Option(help='Filter by input type.')] = False,
output: Annotated[bool, typer.Option(help='Filter by output type.')] = False,
colour: Annotated[bool, typer.Option(help='Filter by colour source type.')] = False,
ffmpeg: Annotated[bool, typer.Option(help='Filter by ffmpeg source type.')] = False,
vlc: Annotated[bool, typer.Option(help='Filter by VLC source type.')] = False,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of inputs.')] = False,
):
"""List all inputs."""
resp = ctx.obj['obsws'].get_input_list()
kinds = []
if input:
kinds.append('input')
if output:
kinds.append('output')
if colour:
kinds.append('color')
if ffmpeg:
kinds.append('ffmpeg')
if vlc:
kinds.append('vlc')
if not any([input, output, colour, ffmpeg, vlc]):
kinds = ctx.obj['obsws'].get_input_kind_list(False).input_kinds
inputs = sorted(
(
(input_.get('inputName'), input_.get('inputKind'), input_.get('inputUuid'))
for input_ in filter(
lambda input_: any(kind in input_.get('inputKind') for kind in kinds),
resp.inputs,
)
),
key=lambda x: x[0], # Sort by input name
)
if not inputs:
console.out.print('No inputs found.')
raise typer.Exit()
table = Table(title='Inputs', padding=(0, 2), border_style=ctx.obj['style'].border)
if uuid:
columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'center', ctx.obj['style'].column),
(Text('Muted', justify='center'), 'center', None),
(Text('UUID', justify='center'), 'left', ctx.obj['style'].column),
]
else:
columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'center', ctx.obj['style'].column),
(Text('Muted', justify='center'), 'center', None),
]
for column, justify, style in columns:
table.add_column(column, justify=justify, style=style)
for input_name, input_kind, input_uuid in inputs:
input_mark = ''
try:
input_muted = ctx.obj['obsws'].get_input_mute(name=input_name).input_muted
input_mark = util.check_mark(input_muted)
except obsws.error.OBSSDKRequestError as e:
if e.code == 604: # Input does not support audio
input_mark = 'N/A'
else:
raise
if uuid:
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
input_mark,
input_uuid,
)
else:
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
input_mark,
)
console.out.print(table)
@app.command('mute | m')
def mute(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., show_default=False, help='Name of the input to mute.')
],
):
"""Mute an input."""
if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1)
ctx.obj['obsws'].set_input_mute(
name=input_name,
muted=True,
)
console.out.print(f'Input {console.highlight(ctx, input_name)} muted.')
@app.command('unmute | um')
def unmute(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(..., show_default=False, help='Name of the input to unmute.'),
],
):
"""Unmute an input."""
if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1)
ctx.obj['obsws'].set_input_mute(
name=input_name,
muted=False,
)
console.out.print(f'Input {console.highlight(ctx, input_name)} unmuted.')
@app.command('toggle | tg')
def toggle(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(..., show_default=False, help='Name of the input to toggle.'),
],
):
"""Toggle an input."""
if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_input_mute(name=input_name)
new_state = not resp.input_muted
ctx.obj['obsws'].set_input_mute(
name=input_name,
muted=new_state,
)
if new_state:
console.out.print(
f'Input {console.highlight(ctx, input_name)} muted.',
)
else:
console.out.print(
f'Input {console.highlight(ctx, input_name)} unmuted.',
)

View File

@@ -1,76 +0,0 @@
"""module for settings management for obsws-cli."""
from collections import UserDict
from pathlib import Path
from dotenv import dotenv_values
SettingsValue = str | int
class Settings(UserDict):
"""A class to manage settings for obsws-cli.
This class extends UserDict to provide a dictionary-like interface for settings.
It loads settings from environment variables and .env files.
The settings are expected to be in uppercase and should start with 'OBS_'.
Example:
settings = Settings()
host = settings['OBS_HOST']
settings['OBS_PORT'] = 4455
"""
PREFIX = 'OBS_'
def __init__(self, *args, **kwargs):
"""Initialize the Settings object."""
kwargs.update(
{
**dotenv_values('.env'),
**dotenv_values(Path.home() / '.config' / 'obsws-cli' / 'obsws.env'),
}
)
super().__init__(*args, **kwargs)
def __getitem__(self, key: str) -> SettingsValue:
"""Get a setting value by key."""
key = key.upper()
if not key.startswith(Settings.PREFIX):
key = f'{Settings.PREFIX}{key}'
return self.data[key]
def __setitem__(self, key: str, value: SettingsValue):
"""Set a setting value by key."""
key = key.upper()
if not key.startswith(Settings.PREFIX):
key = f'{Settings.PREFIX}{key}'
self.data[key] = value
_settings = Settings(
OBS_HOST='localhost',
OBS_PORT=4455,
OBS_PASSWORD='',
OBS_TIMEOUT=5,
OBS_DEBUG=False,
OBS_STYLE='disabled',
OBS_STYLE_NO_BORDER=False,
)
def get(key: str) -> SettingsValue:
"""Get a setting value by key.
Args:
key (str): The key of the setting to retrieve.
Returns:
The value of the setting.
Raises:
KeyError: If the key does not exist in the settings.
"""
return _settings[key]

View File

@@ -3,15 +3,15 @@
import os import os
from dataclasses import dataclass from dataclasses import dataclass
_registry = {} registry = {}
def register_style(cls): def register_style(cls):
"""Register a style class.""" """Register a style class."""
key = cls.__name__.lower() key = cls.__name__.lower()
if key in _registry: if key in registry:
raise ValueError(f'Style {key} is already registered.') raise ValueError(f'Style {key} is already registered.')
_registry[key] = cls registry[key] = cls
return cls return cls
@@ -19,11 +19,10 @@ def register_style(cls):
class Style: class Style:
"""Base class for styles.""" """Base class for styles."""
name: str = 'no_colour' name: str
description: str = 'Style disabled' border: str
border: str | None = None column: str
column: str | None = None highlight: str
highlight: str | None = None
no_border: bool = False no_border: bool = False
def __post_init__(self): def __post_init__(self):
@@ -32,9 +31,16 @@ class Style:
if self.no_border: if self.no_border:
self.border = None self.border = None
def __str__(self):
"""Return a string representation of the style.""" @register_style
return f'{self.name} - {self.description}' @dataclass
class Disabled(Style):
"""Disabled style."""
name: str = 'disabled'
border: str = 'none'
column: str = 'none'
highlight: str = 'none'
@register_style @register_style
@@ -43,10 +49,9 @@ class Red(Style):
"""Red style.""" """Red style."""
name: str = 'red' name: str = 'red'
description: str = 'Red text color'
border: str = 'red3' border: str = 'red3'
highlight: str = 'red1'
column: str = 'red1' column: str = 'red1'
highlight: str = 'red1'
@register_style @register_style
@@ -55,10 +60,9 @@ class Magenta(Style):
"""Magenta style.""" """Magenta style."""
name: str = 'magenta' name: str = 'magenta'
description: str = 'Magenta text color'
border: str = 'magenta3' border: str = 'magenta3'
highlight: str = 'orchid1'
column: str = 'orchid1' column: str = 'orchid1'
highlight: str = 'orchid1'
@register_style @register_style
@@ -67,10 +71,9 @@ class Purple(Style):
"""Purple style.""" """Purple style."""
name: str = 'purple' name: str = 'purple'
description: str = 'Purple text color'
border: str = 'medium_purple4' border: str = 'medium_purple4'
highlight: str = 'medium_purple'
column: str = 'medium_purple' column: str = 'medium_purple'
highlight: str = 'medium_purple'
@register_style @register_style
@@ -79,10 +82,9 @@ class Blue(Style):
"""Blue style.""" """Blue style."""
name: str = 'blue' name: str = 'blue'
description: str = 'Blue text color'
border: str = 'cornflower_blue' border: str = 'cornflower_blue'
highlight: str = 'sky_blue2'
column: str = 'sky_blue2' column: str = 'sky_blue2'
highlight: str = 'sky_blue2'
@register_style @register_style
@@ -91,10 +93,9 @@ class Cyan(Style):
"""Cyan style.""" """Cyan style."""
name: str = 'cyan' name: str = 'cyan'
description: str = 'Cyan text color'
border: str = 'dark_cyan' border: str = 'dark_cyan'
highlight: str = 'cyan'
column: str = 'cyan' column: str = 'cyan'
highlight: str = 'cyan'
@register_style @register_style
@@ -103,10 +104,9 @@ class Green(Style):
"""Green style.""" """Green style."""
name: str = 'green' name: str = 'green'
description: str = 'Green text color'
border: str = 'green4' border: str = 'green4'
highlight: str = 'spring_green3'
column: str = 'spring_green3' column: str = 'spring_green3'
highlight: str = 'spring_green3'
@register_style @register_style
@@ -115,10 +115,9 @@ class Yellow(Style):
"""Yellow style.""" """Yellow style."""
name: str = 'yellow' name: str = 'yellow'
description: str = 'Yellow text color'
border: str = 'yellow3' border: str = 'yellow3'
highlight: str = 'wheat1'
column: str = 'wheat1' column: str = 'wheat1'
highlight: str = 'wheat1'
@register_style @register_style
@@ -127,10 +126,9 @@ class Orange(Style):
"""Orange style.""" """Orange style."""
name: str = 'orange' name: str = 'orange'
description: str = 'Orange text color'
border: str = 'dark_orange' border: str = 'dark_orange'
highlight: str = 'orange1'
column: str = 'orange1' column: str = 'orange1'
highlight: str = 'orange1'
@register_style @register_style
@@ -139,10 +137,9 @@ class White(Style):
"""White style.""" """White style."""
name: str = 'white' name: str = 'white'
description: str = 'White text color'
border: str = 'grey82' border: str = 'grey82'
highlight: str = 'grey100'
column: str = 'grey100' column: str = 'grey100'
highlight: str = 'grey100'
@register_style @register_style
@@ -151,10 +148,9 @@ class Grey(Style):
"""Grey style.""" """Grey style."""
name: str = 'grey' name: str = 'grey'
description: str = 'Grey text color'
border: str = 'grey50' border: str = 'grey50'
highlight: str = 'grey70'
column: str = 'grey70' column: str = 'grey70'
highlight: str = 'grey70'
@register_style @register_style
@@ -163,10 +159,9 @@ class Navy(Style):
"""Navy Blue style.""" """Navy Blue style."""
name: str = 'navyblue' name: str = 'navyblue'
description: str = 'Navy Blue text color'
border: str = 'deep_sky_blue4' border: str = 'deep_sky_blue4'
highlight: str = 'light_sky_blue3'
column: str = 'light_sky_blue3' column: str = 'light_sky_blue3'
highlight: str = 'light_sky_blue3'
@register_style @register_style
@@ -175,17 +170,14 @@ class Black(Style):
"""Black style.""" """Black style."""
name: str = 'black' name: str = 'black'
description: str = 'Black text color'
border: str = 'grey19' border: str = 'grey19'
column: str = 'grey11' column: str = 'grey11'
highlight: str = 'grey11'
def request_style_obj(style_name: str, no_border: bool) -> Style: def request_style_obj(style_name: str, no_border: bool) -> Style:
"""Entry point for style objects. Returns a Style object based on the style name.""" """Entry point for style objects. Returns a Style object based on the style name."""
style_name = str(style_name).lower() # coerce the type to string and lowercase it if style_name == 'disabled':
os.environ['NO_COLOR'] = '1'
if style_name not in _registry: return registry[style_name.lower()](no_border=no_border)
os.environ['NO_COLOR'] = '1' # Disable colour output
return Style()
return _registry[style_name](no_border=no_border)

View File

@@ -13,6 +13,35 @@ def check_mark(value: bool, empty_if_false: bool = False) -> str:
if empty_if_false and not value: if empty_if_false and not value:
return '' return ''
# rich gracefully handles the absence of colour throughout the rest of the application,
# but here we must handle it manually.
# If NO_COLOR is set, we return plain text symbols.
# Otherwise, we return coloured symbols.
if os.getenv('NO_COLOR', '') != '': if os.getenv('NO_COLOR', '') != '':
return '' if value else '' return '' if value else ''
return '' if value else '' return '' if value else ''
def timecode_to_milliseconds(timecode: str) -> int:
"""Convert a timecode string (HH:MM:SS) to total milliseconds."""
match timecode.split(':'):
case [mm, ss]:
hours = 0
minutes = int(mm)
seconds = int(ss)
case [hh, mm, ss]:
hours = int(hh)
minutes = int(mm)
seconds = int(ss)
return (hours * 3600 + minutes * 60 + seconds) * 1000
def milliseconds_to_timecode(milliseconds: int) -> str:
"""Convert total milliseconds to a timecode string (HH:MM:SS)."""
total_seconds = milliseconds // 1000
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
if hours == 0:
return f'{minutes:02}:{seconds:02}'
return f'{hours:02}:{minutes:02}:{seconds:02}'

View File

@@ -1,37 +1,84 @@
"""module containing validation functions.""" """module containing validation functions."""
from typing import Optional
import typer import typer
from . import console
# type alias for an option that is skipped when the command is run # type alias for an option that is skipped when the command is run
skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=False) skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=False)
def input_in_inputs(ctx: typer.Context, input_name: str) -> bool: def input_in_inputs(ctx: typer.Context, input_name: str) -> str:
"""Check if an input is in the input list.""" """Ensure the given input exists in the list of inputs."""
inputs = ctx.obj['obsws'].get_input_list().inputs resp = ctx.obj['obsws'].get_input_list()
return any(input_.get('inputName') == input_name for input_ in inputs) if not any(input.get('inputName') == input_name for input in resp.inputs):
console.err.print(f'Input [yellow]{input_name}[/yellow] does not exist.')
raise typer.Exit(1)
return input_name
def scene_in_scenes(ctx: typer.Context, scene_name: str) -> bool: def input_not_in_inputs(ctx: typer.Context, input_name: str) -> str:
"""Ensure an input does not already exist in the list of inputs."""
resp = ctx.obj['obsws'].get_input_list()
if any(input.get('inputName') == input_name for input in resp.inputs):
console.err.print(f'Input [yellow]{input_name}[/yellow] already exists.')
raise typer.Exit(1)
return input_name
def scene_in_scenes(ctx: typer.Context, scene_name: Optional[str]) -> str | None:
"""Check if a scene exists in the list of scenes.""" """Check if a scene exists in the list of scenes."""
if scene_name is None:
return
resp = ctx.obj['obsws'].get_scene_list() resp = ctx.obj['obsws'].get_scene_list()
return any(scene.get('sceneName') == scene_name for scene in resp.scenes) if not any(scene.get('sceneName') == scene_name for scene in resp.scenes):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1)
return scene_name
def studio_mode_enabled(ctx: typer.Context) -> bool: def studio_mode_enabled(ctx: typer.Context, preview: bool) -> bool:
"""Check if studio mode is enabled.""" """Ensure studio mode is enabled if preview option is used."""
resp = ctx.obj['obsws'].get_studio_mode_enabled() resp = ctx.obj['obsws'].get_studio_mode_enabled()
return resp.studio_mode_enabled if preview and not resp.studio_mode_enabled:
console.err.print(
'Studio mode is disabled. This action requires it to be enabled.'
)
raise typer.Exit(1)
return preview
def scene_collection_in_scene_collections( def scene_collection_in_scene_collections(
ctx: typer.Context, scene_collection_name: str ctx: typer.Context, scene_collection_name: str
) -> bool: ) -> str:
"""Check if a scene collection exists.""" """Ensure a scene collection exists in the list of scene collections."""
resp = ctx.obj['obsws'].get_scene_collection_list() resp = ctx.obj['obsws'].get_scene_collection_list()
return any( if not any(
collection == scene_collection_name for collection in resp.scene_collections collection == scene_collection_name for collection in resp.scene_collections
) ):
console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] not found.'
)
raise typer.Exit(1)
return scene_collection_name
def scene_collection_not_in_scene_collections(
ctx: typer.Context, scene_collection_name: str
) -> str:
"""Ensure a scene collection does not already exist in the list of scene collections."""
resp = ctx.obj['obsws'].get_scene_collection_list()
if any(
collection == scene_collection_name for collection in resp.scene_collections
):
console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] already exists.'
)
raise typer.Exit(1)
return scene_collection_name
def item_in_scene_item_list( def item_in_scene_item_list(
@@ -42,13 +89,54 @@ def item_in_scene_item_list(
return any(item.get('sourceName') == item_name for item in resp.scene_items) return any(item.get('sourceName') == item_name for item in resp.scene_items)
def profile_exists(ctx: typer.Context, profile_name: str) -> bool: def profile_exists(ctx: typer.Context, profile_name: str) -> str:
"""Check if a profile exists.""" """Ensure a profile exists."""
resp = ctx.obj['obsws'].get_profile_list() resp = ctx.obj['obsws'].get_profile_list()
return any(profile == profile_name for profile in resp.profiles) if not any(profile == profile_name for profile in resp.profiles):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise typer.Exit(1)
return profile_name
def monitor_exists(ctx: typer.Context, monitor_index: int) -> bool: def profile_not_exists(ctx: typer.Context, profile_name: str) -> str:
"""Check if a monitor exists.""" """Ensure a profile does not exist."""
resp = ctx.obj['obsws'].get_monitor_list() resp = ctx.obj['obsws'].get_profile_list()
return any(monitor['monitorIndex'] == monitor_index for monitor in resp.monitors) if any(profile == profile_name for profile in resp.profiles):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] already exists.')
raise typer.Exit(1)
return profile_name
def kind_in_input_kinds(ctx: typer.Context, input_kind: str) -> str:
"""Check if an input kind is valid."""
resp = ctx.obj['obsws'].get_input_kind_list(False)
if not any(kind == input_kind for kind in resp.input_kinds):
console.err.print(f'Input kind [yellow]{input_kind}[/yellow] not found.')
raise typer.Exit(1)
return input_kind
def timecode_format(ctx: typer.Context, timecode: Optional[str]) -> str | None:
"""Validate that a timecode is in HH:MM:SS or MM:SS format."""
if timecode is None:
return
match timecode.split(':'):
case [mm, ss]:
if not (mm.isdigit() and ss.isdigit()):
console.err.print(
f'Timecode [yellow]{timecode}[/yellow] is not valid. Use MM:SS or HH:MM:SS format.'
)
raise typer.Exit(1)
case [hh, mm, ss]:
if not (hh.isdigit() and mm.isdigit() and ss.isdigit()):
console.err.print(
f'Timecode [yellow]{timecode}[/yellow] is not valid. Use MM:SS or HH:MM:SS format.'
)
raise typer.Exit(1)
case _:
console.err.print(
f'Timecode [yellow]{timecode}[/yellow] is not valid. Use MM:SS or HH:MM:SS format.'
)
raise typer.Exit(1)
return timecode

View File

@@ -21,7 +21,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy", "Programming Language :: Python :: Implementation :: PyPy",
] ]
dependencies = ["typer>=0.16.0", "obsws-python>=1.7.2", "python-dotenv>=1.1.0"] dependencies = ["typer>=0.21.1", "obsws-python>=1.8.0", "python-dotenv>=1.1.0"]
[project.urls] [project.urls]
@@ -42,9 +42,6 @@ dependencies = ["click-man>=0.5.1"]
cli = "obsws-cli {args:}" cli = "obsws-cli {args:}"
man = "python man/generate.py --output=./man" man = "python man/generate.py --output=./man"
[tool.hatch.envs.lazyimports.scripts]
cli = "obsws-cli {args:}"
[tool.hatch.envs.hatch-test] [tool.hatch.envs.hatch-test]
randomize = true randomize = true

View File

@@ -1,6 +1,7 @@
"""pytest configuration file.""" """pytest configuration file."""
import os import os
import time
import obsws_python as obsws import obsws_python as obsws
from dotenv import find_dotenv, load_dotenv from dotenv import find_dotenv, load_dotenv
@@ -20,9 +21,9 @@ def pytest_sessionstart(session):
""" """
# Initialize the OBS WebSocket client # Initialize the OBS WebSocket client
session.obsws = obsws.ReqClient( session.obsws = obsws.ReqClient(
host=os.environ['OBS_HOST'], host=os.environ['OBSWS_CLI_HOST'],
port=os.environ['OBS_PORT'], port=os.environ['OBSWS_CLI_PORT'],
password=os.environ['OBS_PASSWORD'], password=os.environ['OBSWS_CLI_PASSWORD'],
timeout=5, timeout=5,
) )
resp = session.obsws.get_version() resp = session.obsws.get_version()
@@ -44,9 +45,54 @@ def pytest_sessionstart(session):
}, },
) )
session.obsws.set_current_scene_collection('test-collection') session.obsws.create_profile('pytest_profile')
time.sleep(0.1) # Wait for the profile to be created
session.obsws.set_profile_parameter(
'SimpleOutput',
'RecRB',
'true',
)
# hack to ensure the replay buffer is enabled
session.obsws.set_current_profile('Untitled')
session.obsws.set_current_profile('pytest_profile')
session.obsws.create_scene('pytest_scene') session.obsws.create_scene('pytest_scene')
# Ensure Desktop Audio is created.
desktop_audio_kinds = {
'windows': 'wasapi_output_capture',
'linux': 'pulse_output_capture',
'darwin': 'coreaudio_output_capture',
}
platform = os.environ.get('OBS_TESTS_PLATFORM', os.uname().sysname.lower())
try:
session.obsws.create_input(
sceneName='pytest_scene',
inputName='Desktop Audio',
inputKind=desktop_audio_kinds[platform],
inputSettings={'device_id': 'default'},
sceneItemEnabled=True,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 601:
"""input already exists, continue."""
# Ensure Mic/Aux is created.
mic_kinds = {
'windows': 'wasapi_input_capture',
'linux': 'pulse_input_capture',
'darwin': 'coreaudio_input_capture',
}
try:
session.obsws.create_input(
sceneName='pytest_scene',
inputName='Mic/Aux',
inputKind=mic_kinds[platform],
inputSettings={'device_id': 'default'},
sceneItemEnabled=True,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 601:
"""input already exists, continue."""
session.obsws.create_input( session.obsws.create_input(
sceneName='pytest_scene', sceneName='pytest_scene',
inputName='pytest_input', inputName='pytest_input',
@@ -71,6 +117,13 @@ def pytest_sessionstart(session):
}, },
sceneItemEnabled=True, sceneItemEnabled=True,
) )
session.obsws.create_input(
sceneName='pytest_scene',
inputName='pytest_text_input',
inputKind='text_gdiplus_v3',
inputSettings={'text': 'Hello, OBS!'},
sceneItemEnabled=True,
)
resp = session.obsws.get_scene_item_list('pytest_scene') resp = session.obsws.get_scene_item_list('pytest_scene')
for item in resp.scene_items: for item in resp.scene_items:
if item['sourceName'] == 'pytest_input_2': if item['sourceName'] == 'pytest_input_2':
@@ -124,7 +177,7 @@ def pytest_sessionfinish(session, exitstatus):
session.obsws.remove_scene('pytest_scene') session.obsws.remove_scene('pytest_scene')
session.obsws.set_current_scene_collection('default') session.obsws.set_current_scene_collection('Untitled')
resp = session.obsws.get_stream_status() resp = session.obsws.get_stream_status()
if resp.output_active: if resp.output_active:
@@ -142,6 +195,8 @@ def pytest_sessionfinish(session, exitstatus):
if resp.studio_mode_enabled: if resp.studio_mode_enabled:
session.obsws.set_studio_mode_enabled(False) session.obsws.set_studio_mode_enabled(False)
session.obsws.remove_profile('pytest_profile')
# Close the OBS WebSocket client connection # Close the OBS WebSocket client connection
session.obsws.disconnect() session.obsws.disconnect()

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_filter_list(): def test_filter_list():

View File

@@ -1,10 +1,18 @@
"""Unit tests for the group command in the OBS WebSocket CLI.""" """Unit tests for the group command in the OBS WebSocket CLI."""
import os
import pytest
from typer.testing import CliRunner from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
if os.environ.get('OBS_TESTS_SKIP_GROUP_TESTS'):
pytest.skip(
'Skipping group tests as per environment variable', allow_module_level=True
)
def test_group_list(): def test_group_list():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_hotkey_list(): def test_hotkey_list():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_input_list(): def test_input_list():
@@ -13,10 +13,7 @@ def test_input_list():
assert result.exit_code == 0 assert result.exit_code == 0
assert 'Desktop Audio' in result.stdout assert 'Desktop Audio' in result.stdout
assert 'Mic/Aux' in result.stdout assert 'Mic/Aux' in result.stdout
assert all( assert all(item in result.stdout for item in ('pytest_input', 'pytest_input_2'))
item in result.stdout
for item in ('Colour Source', 'Colour Source 2', 'Colour Source 3')
)
def test_input_list_filter_input(): def test_input_list_filter_input():
@@ -39,9 +36,6 @@ def test_input_list_filter_colour():
"""Test the input list command with colour filter.""" """Test the input list command with colour filter."""
result = runner.invoke(app, ['input', 'list', '--colour']) result = runner.invoke(app, ['input', 'list', '--colour'])
assert result.exit_code == 0 assert result.exit_code == 0
assert all( assert all(item in result.stdout for item in ('pytest_input', 'pytest_input_2'))
item in result.stdout
for item in ('Colour Source', 'Colour Source 2', 'Colour Source 3')
)
assert 'Desktop Audio' not in result.stdout assert 'Desktop Audio' not in result.stdout
assert 'Mic/Aux' not in result.stdout assert 'Mic/Aux' not in result.stdout

View File

@@ -6,7 +6,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_record_start(): def test_record_start():
@@ -49,7 +49,9 @@ def test_record_toggle():
result = runner.invoke(app, ['record', 'toggle']) result = runner.invoke(app, ['record', 'toggle'])
assert result.exit_code == 0 assert result.exit_code == 0
time.sleep(0.5) # Wait for the recording to toggle time.sleep(0.5) # Wait for the recording to toggle
if active: if active:
assert 'Recording stopped successfully.' in result.stdout assert 'Recording stopped successfully.' in result.stdout
else: else:

View File

@@ -1,10 +1,20 @@
"""Unit tests for the replaybuffer command in the OBS WebSocket CLI.""" """Unit tests for the replaybuffer command in the OBS WebSocket CLI."""
import os
import time
import pytest
from typer.testing import CliRunner from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
if os.environ.get('OBS_TESTS_SKIP_REPLAYBUFFER_TESTS'):
pytest.skip(
'Skipping replaybuffer tests as per environment variable',
allow_module_level=True,
)
def test_replaybuffer_start(): def test_replaybuffer_start():
@@ -14,6 +24,9 @@ def test_replaybuffer_start():
active = 'Replay buffer is active.' in resp.stdout active = 'Replay buffer is active.' in resp.stdout
resp = runner.invoke(app, ['replaybuffer', 'start']) resp = runner.invoke(app, ['replaybuffer', 'start'])
time.sleep(0.5) # Wait for the replay buffer to start
if active: if active:
assert resp.exit_code != 0 assert resp.exit_code != 0
assert 'Replay buffer is already active.' in resp.stderr assert 'Replay buffer is already active.' in resp.stderr
@@ -29,6 +42,9 @@ def test_replaybuffer_stop():
active = 'Replay buffer is active.' in resp.stdout active = 'Replay buffer is active.' in resp.stdout
resp = runner.invoke(app, ['replaybuffer', 'stop']) resp = runner.invoke(app, ['replaybuffer', 'stop'])
time.sleep(0.5) # Wait for the replay buffer to stop
if not active: if not active:
assert resp.exit_code != 0 assert resp.exit_code != 0
assert 'Replay buffer is not active.' in resp.stderr assert 'Replay buffer is not active.' in resp.stderr
@@ -44,9 +60,11 @@ def test_replaybuffer_toggle():
active = 'Replay buffer is active.' in resp.stdout active = 'Replay buffer is active.' in resp.stdout
resp = runner.invoke(app, ['replaybuffer', 'toggle']) resp = runner.invoke(app, ['replaybuffer', 'toggle'])
assert resp.exit_code == 0
time.sleep(0.5) # Wait for the replay buffer to toggle
if active: if active:
assert resp.exit_code == 0
assert 'Replay buffer is not active.' in resp.stdout assert 'Replay buffer is not active.' in resp.stdout
else: else:
assert resp.exit_code == 0
assert 'Replay buffer is active.' in resp.stdout assert 'Replay buffer is active.' in resp.stdout

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_scene_list(): def test_scene_list():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_sceneitem_list(): def test_sceneitem_list():

View File

@@ -6,7 +6,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_stream_start(): def test_stream_start():
@@ -23,7 +23,7 @@ def test_stream_start():
else: else:
assert result.exit_code == 0 assert result.exit_code == 0
assert 'Streaming started successfully.' in result.stdout assert 'Streaming started successfully.' in result.stdout
time.sleep(1) # Wait for the streaming to start time.sleep(0.5) # Wait for the streaming to start
def test_stream_stop(): def test_stream_stop():
@@ -37,7 +37,7 @@ def test_stream_stop():
if active: if active:
assert result.exit_code == 0 assert result.exit_code == 0
assert 'Streaming stopped successfully.' in result.stdout assert 'Streaming stopped successfully.' in result.stdout
time.sleep(1) # Wait for the streaming to stop time.sleep(0.5) # Wait for the streaming to stop
else: else:
assert result.exit_code != 0 assert result.exit_code != 0
assert 'Streaming is not in progress, cannot stop.' in result.stderr assert 'Streaming is not in progress, cannot stop.' in result.stderr
@@ -52,7 +52,7 @@ def test_stream_toggle():
result = runner.invoke(app, ['stream', 'toggle']) result = runner.invoke(app, ['stream', 'toggle'])
assert result.exit_code == 0 assert result.exit_code == 0
time.sleep(1) # Wait for the stream to toggle time.sleep(0.5) # Wait for the stream to toggle
if active: if active:
assert 'Streaming stopped successfully.' in result.stdout assert 'Streaming stopped successfully.' in result.stdout

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_studio_enable(): def test_studio_enable():

18
tests/test_text.py Normal file
View File

@@ -0,0 +1,18 @@
"""Unit tests for the text command in the OBS WebSocket CLI."""
from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
def test_text_update():
"""Test the text update command."""
result = runner.invoke(app, ['text', 'current', 'pytest_text_input'])
assert result.exit_code == 0
assert 'Current text for input pytest_text_input: Hello, OBS!' in result.stdout
result = runner.invoke(app, ['text', 'update', 'pytest_text_input', 'New Text'])
assert result.exit_code == 0
assert 'Text for input pytest_text_input updated to: New Text' in result.stdout

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_version(): def test_version():