30 Commits

Author SHA1 Message Date
2535fe85c5 minor bump 2026-01-09 13:47:18 +00:00
7d4485ec05 add Settings to README 2026-01-09 13:45:32 +00:00
2c2501e017 implement settings command group 2026-01-09 13:45:25 +00:00
356684e5d4 rename Settings to Config 2026-01-09 09:39:19 +00:00
f7e51f8488 add 0.22.0 to CHANGELOG 2026-01-09 09:31:28 +00:00
8da29ce90e upd typer dep version
obsws-cli minor version bump
2026-01-09 09:21:43 +00:00
72c6bcee49 upd README with new input subcommands 2026-01-09 09:21:06 +00:00
dceafba065 extend input command group 2026-01-09 09:20:45 +00:00
7a73ec35f6 remove lazyimports env 2025-09-29 20:51:17 +01:00
48e0f6cecd bump typer dependency.
release 0.17.0 fixes slow rich imports, see https://github.com/fastapi/typer/releases/tag/0.17.0

This is related to issue #2.

minor version bump
2025-09-29 04:21:26 +01:00
52e13922dc upd test delays to 500ms 2025-07-30 08:42:11 +01:00
f335d8ffd2 move the version flag 2025-07-29 08:48:30 +01:00
286cda8066 raise typer.Exit() on empty list queries 2025-07-29 08:17:52 +01:00
e851219ced tests should now pass from fresh install 2025-07-29 08:03:24 +01:00
f852a733c3 upd publish action 2025-07-14 03:27:52 +01:00
44dadcee23 upd publish action 2025-07-14 03:25:52 +01:00
ed4531c305 revert publish action 2025-07-14 03:23:25 +01:00
ec42a4cdd9 patch bump 2025-07-14 03:21:29 +01:00
6123c92d00 upd publish action 2025-07-14 03:21:06 +01:00
1ceb95ab16 fix environment name 2025-07-14 03:12:35 +01:00
f06e2d3982 upd publish action 2025-07-14 03:10:04 +01:00
39dff3cc28 patch bump 2025-07-14 03:02:53 +01:00
967c4ab699 upd publish action 2025-07-14 02:58:25 +01:00
dc128720c7 hatch fmt 2025-07-14 02:48:21 +01:00
2e3f4267cd add workflows 2025-07-14 02:45:13 +01:00
000431ab82 add 0.20.0 to CHANGELOG 2025-07-14 02:32:59 +01:00
ec3e31cc4f add Text section to README 2025-07-14 02:32:21 +01:00
cda0bbedb9 minor bump 2025-07-14 02:32:09 +01:00
d0c96b853d add text unit tests 2025-07-14 02:31:47 +01:00
040a41daa7 add text command group 2025-07-14 02:31:35 +01:00
32 changed files with 1162 additions and 140 deletions

39
.github/workflows/publish.yml vendored Normal file
View File

@@ -0,0 +1,39 @@
name: Publish to PyPI
on:
release:
types: [published]
push:
tags:
- 'v*.*.*'
jobs:
deploy:
runs-on: ubuntu-latest
environment: pypi
permissions:
# This permission is needed for private repositories.
contents: read
# IMPORTANT: this permission is mandatory for trusted publishing
id-token: write
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install hatch
- name: Build package
run: hatch build
- name: Publish on PyPI
uses: pypa/gh-action-pypi-publish@release/v1

19
.github/workflows/ruff.yml vendored Normal file
View File

@@ -0,0 +1,19 @@
name: Ruff
on:
push:
branches: [main]
pull_request:
branches: [main]
workflow_dispatch:
jobs:
ruff:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: astral-sh/ruff-action@v3
with:
args: 'format --check --diff'

View File

@@ -5,6 +5,19 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
# [0.22.0] - 2026-01-09
### Added
- new subcommands added to input, see [Input](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#input)
# [0.20.0] - 2025-07-14
### Added
- text command group, see [Text](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#text)
# [0.19.0] - 2025-06-23 # [0.19.0] - 2025-06-23
### Added ### Added

117
README.md
View File

@@ -298,6 +298,20 @@ obsws-cli group status START "test_group"
#### Input #### Input
- create: Create a new input.
- args: <input_name> <input_kind>
```console
obsws-cli input create 'stream mix' 'wasapi_input_capture'
```
- remove: Remove an input.
- args: <input_name>
```console
obsws-cli input remove 'stream mix'
```
- list: List all inputs. - list: List all inputs.
- flags: - flags:
@@ -315,6 +329,12 @@ obsws-cli input list
obsws-cli input list --input --colour obsws-cli input list --input --colour
``` ```
- list-kinds: List all input kinds.
```console
obsws-cli input list-kinds
```
- mute: Mute an input. - mute: Mute an input.
- args: <input_name> - args: <input_name>
@@ -335,6 +355,48 @@ obsws-cli input unmute "Mic/Aux"
obsws-cli input toggle "Mic/Aux" obsws-cli input toggle "Mic/Aux"
``` ```
- volume: Set the volume of an input.
- args: <input_name> <volume>
```console
obsws-cli input volume -- 'Desktop Audio' -38.9
```
- show: Show information for an input in the current scene.
- args: <input_name>
- flags:
*optional*
- --verbose: List all available input devices.
```console
obsws-cli input show 'Mic/Aux' --verbose
```
- update: Name of the input to update.
- args: <input_name> <device_name>
```console
obsws-cli input update 'Mic/Aux' 'Voicemeeter Out B1 (VB-Audio Voicemeeter VAIO)'
```
#### Text
- current: Get the current text for a text input.
- args: <input_name>
```console
obsws-cli text current "My Text Input"
```
- update: Update the text of a text input.
- args: <input_name> <new_text>
```console
obsws-cli text update "My Text Input" "hi OBS!"
```
#### Record #### Record
- start: Start recording. - start: Start recording.
@@ -660,6 +722,61 @@ obsws-cli projector open --monitor-index=1 "test_group"
obsws-cli screenshot save --width=2560 --height=1440 "Scene" "C:\Users\me\Videos\screenshot.png" obsws-cli screenshot save --width=2560 --height=1440 "Scene" "C:\Users\me\Videos\screenshot.png"
``` ```
#### Settings
- show: Show current OBS settings.
- flags:
*optional*
- --video: Show video settings.
- --record: Show recording settings.
- --profile: Show profile settings.
```console
obsws-cli settings show --video --record
```
- profile: Get/set OBS profile settings.
- args: <category> <name> <value>
```console
obsws-cli settings profile SimpleOutput VBitrate
obsws-cli settings profile SimpleOutput VBitrate 6000
```
- stream-service: Get/set OBS stream service settings.
- flags:
- --key: Stream key.
- --server: Stream server URL.
*optional*
- args: <type>
```console
obsws-cli settings stream-service
obsws-cli settings stream-service --key='live_xyzxyzxyzxyz' rtmp_common
```
- video: Get/set OBS video settings.
- flags:
*optional*
- --base-width: Base (canvas) width.
- --base-height: Base (canvas) height.
- --output-width: Output (scaled) width.
- --output-height: Output (scaled) height.
- --fps-num: Frames per second numerator.
- --fps-den: Frames per second denominator.
```console
obsws-cli settings video
obsws-cli settings video --base-width=1920 --base-height=1080
```
## License ## License
`obsws-cli` is distributed under the terms of the [MIT](https://spdx.org/licenses/MIT.html) license. `obsws-cli` is distributed under the terms of the [MIT](https://spdx.org/licenses/MIT.html) license.

View File

@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online> # SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online>
# #
# SPDX-License-Identifier: MIT # SPDX-License-Identifier: MIT
__version__ = "0.19.1" __version__ = '0.23.0'

View File

@@ -4,4 +4,4 @@
from .app import app from .app import app
__all__ = ["app"] __all__ = ['app']

View File

@@ -44,6 +44,8 @@ class RootTyperAliasGroup(typer.core.TyperGroup):
cmd_name = 'stream' cmd_name = 'stream'
case 'sm': case 'sm':
cmd_name = 'studiomode' cmd_name = 'studiomode'
case 't':
cmd_name = 'text'
case 'vc': case 'vc':
cmd_name = 'virtualcam' cmd_name = 'virtualcam'
return super().get_command(ctx, cmd_name) return super().get_command(ctx, cmd_name)

View File

@@ -9,7 +9,7 @@ import typer
from obsws_cli.__about__ import __version__ as version from obsws_cli.__about__ import __version__ as version
from . import console, settings, styles from . import config, console, styles
from .alias import RootTyperAliasGroup from .alias import RootTyperAliasGroup
app = typer.Typer(cls=RootTyperAliasGroup) app = typer.Typer(cls=RootTyperAliasGroup)
@@ -28,7 +28,9 @@ for sub_typer in (
'screenshot', 'screenshot',
'stream', 'stream',
'studiomode', 'studiomode',
'text',
'virtualcam', 'virtualcam',
'settings',
): ):
module = importlib.import_module(f'.{sub_typer}', package=__package__) module = importlib.import_module(f'.{sub_typer}', package=__package__)
app.add_typer(module.app, name=sub_typer) app.add_typer(module.app, name=sub_typer)
@@ -71,7 +73,7 @@ def main(
help='WebSocket host', help='WebSocket host',
show_default='localhost', show_default='localhost',
), ),
] = settings.get('host'), ] = config.get('host'),
port: Annotated[ port: Annotated[
int, int,
typer.Option( typer.Option(
@@ -81,7 +83,7 @@ def main(
help='WebSocket port', help='WebSocket port',
show_default=4455, show_default=4455,
), ),
] = settings.get('port'), ] = config.get('port'),
password: Annotated[ password: Annotated[
str, str,
typer.Option( typer.Option(
@@ -91,7 +93,7 @@ def main(
help='WebSocket password', help='WebSocket password',
show_default=False, show_default=False,
), ),
] = settings.get('password'), ] = config.get('password'),
timeout: Annotated[ timeout: Annotated[
int, int,
typer.Option( typer.Option(
@@ -101,7 +103,28 @@ def main(
help='WebSocket timeout', help='WebSocket timeout',
show_default=5, show_default=5,
), ),
] = settings.get('timeout'), ] = config.get('timeout'),
style: Annotated[
str,
typer.Option(
'--style',
'-s',
envvar='OBS_STYLE',
help='Set the style for the CLI output',
show_default='disabled',
callback=validate_style,
),
] = config.get('style'),
no_border: Annotated[
bool,
typer.Option(
'--no-border',
'-b',
envvar='OBS_STYLE_NO_BORDER',
help='Disable table border styling in the CLI output',
show_default=False,
),
] = config.get('style_no_border'),
version: Annotated[ version: Annotated[
bool, bool,
typer.Option( typer.Option(
@@ -113,27 +136,6 @@ def main(
callback=version_callback, callback=version_callback,
), ),
] = False, ] = False,
style: Annotated[
str,
typer.Option(
'--style',
'-s',
envvar='OBS_STYLE',
help='Set the style for the CLI output',
show_default='disabled',
callback=validate_style,
),
] = settings.get('style'),
no_border: Annotated[
bool,
typer.Option(
'--no-border',
'-b',
envvar='OBS_STYLE_NO_BORDER',
help='Disable table border styling in the CLI output',
show_default=False,
),
] = settings.get('style_no_border'),
debug: Annotated[ debug: Annotated[
bool, bool,
typer.Option( typer.Option(
@@ -146,11 +148,13 @@ def main(
callback=setup_logging, callback=setup_logging,
hidden=True, hidden=True,
), ),
] = settings.get('debug'), ] = config.get('debug'),
): ):
"""obsws_cli is a command line interface for the OBS WebSocket API.""" """obsws_cli is a command line interface for the OBS WebSocket API."""
ctx.ensure_object(dict) ctx.ensure_object(dict)
ctx.obj['obsws'] = ctx.with_resource(obsws.ReqClient(**ctx.params)) ctx.obj['obsws'] = ctx.with_resource(
obsws.ReqClient(host=host, port=port, password=password, timeout=timeout)
)
ctx.obj['style'] = styles.request_style_obj(style, no_border) ctx.obj['style'] = styles.request_style_obj(style, no_border)

80
obsws_cli/config.py Normal file
View File

@@ -0,0 +1,80 @@
"""module for settings management for obsws-cli."""
from collections import UserDict
from pathlib import Path
from dotenv import dotenv_values
ConfigValue = str | int
class Config(UserDict):
"""A class to manage config for obsws-cli.
This class extends UserDict to provide a dictionary-like interface for config.
It loads config from environment variables and .env files.
The config values are expected to be in uppercase and should start with 'OBS_'.
Example:
-------
config = Config()
host = config['OBS_HOST']
config['OBS_PORT'] = 4455
"""
PREFIX = 'OBS_'
def __init__(self, *args, **kwargs):
"""Initialize the Settings object."""
kwargs.update(
{
**dotenv_values('.env'),
**dotenv_values(Path.home() / '.config' / 'obsws-cli' / 'obsws.env'),
}
)
super().__init__(*args, **kwargs)
def __getitem__(self, key: str) -> ConfigValue:
"""Get a setting value by key."""
key = key.upper()
if not key.startswith(Config.PREFIX):
key = f'{Config.PREFIX}{key}'
return self.data[key]
def __setitem__(self, key: str, value: ConfigValue):
"""Set a setting value by key."""
key = key.upper()
if not key.startswith(Config.PREFIX):
key = f'{Config.PREFIX}{key}'
self.data[key] = value
_config = Config(
OBS_HOST='localhost',
OBS_PORT=4455,
OBS_PASSWORD='',
OBS_TIMEOUT=5,
OBS_DEBUG=False,
OBS_STYLE='disabled',
OBS_STYLE_NO_BORDER=False,
)
def get(key: str) -> ConfigValue:
"""Get a setting value by key.
Args:
----
key (str): The key of the config to retrieve.
Returns:
-------
The value of the config.
Raises:
------
KeyError: If the key does not exist in the config.
"""
return _config[key]

View File

@@ -24,6 +24,10 @@ def list_(
"""List all hotkeys.""" """List all hotkeys."""
resp = ctx.obj['obsws'].get_hotkey_list() resp = ctx.obj['obsws'].get_hotkey_list()
if not resp.hotkeys:
console.out.print('No hotkeys found.')
raise typer.Exit()
table = Table( table = Table(
title='Hotkeys', title='Hotkeys',
padding=(0, 2), padding=(0, 2),

View File

@@ -18,6 +18,69 @@ def main():
"""Control inputs in OBS.""" """Control inputs in OBS."""
@app.command('create | add')
def create(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to create.',
callback=validate.input_not_in_inputs,
),
],
input_kind: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Kind of the input to create.',
callback=validate.kind_in_input_kinds,
),
],
):
"""Create a new input."""
current_scene = (
ctx.obj['obsws'].get_current_program_scene().current_program_scene_name
)
try:
ctx.obj['obsws'].create_input(
inputName=input_name,
inputKind=input_kind,
sceneItemEnabled=True,
sceneName=current_scene,
inputSettings={},
)
except obsws.error.OBSSDKRequestError as e:
console.err.print(f'Failed to create input: [yellow]{e}[/yellow]')
raise typer.Exit(1)
console.out.print(
f'Input {console.highlight(ctx, input_name)} of kind '
f'{console.highlight(ctx, input_kind)} created.',
)
@app.command('remove | rm')
def remove(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to remove.',
callback=validate.input_in_inputs,
),
],
):
"""Remove an input."""
ctx.obj['obsws'].remove_input(name=input_name)
console.out.print(f'Input {console.highlight(ctx, input_name)} removed.')
@app.command('list | ls') @app.command('list | ls')
def list_( def list_(
ctx: typer.Context, ctx: typer.Context,
@@ -105,18 +168,47 @@ def list_(
console.out.print(table) console.out.print(table)
@app.command('list-kinds | ls-k')
def list_kinds(
ctx: typer.Context,
):
"""List all input kinds."""
resp = ctx.obj['obsws'].get_input_kind_list(False)
kinds = sorted(resp.input_kinds)
if not kinds:
console.out.print('No input kinds found.')
raise typer.Exit()
table = Table(
title='Input Kinds', padding=(0, 2), border_style=ctx.obj['style'].border
)
table.add_column(
Text('Input Kind', justify='center'),
justify='left',
style=ctx.obj['style'].column,
)
for kind in kinds:
table.add_row(util.snakecase_to_titlecase(kind))
console.out.print(table)
@app.command('mute | m') @app.command('mute | m')
def mute( def mute(
ctx: typer.Context, ctx: typer.Context,
input_name: Annotated[ input_name: Annotated[
str, typer.Argument(..., show_default=False, help='Name of the input to mute.') str,
typer.Argument(
...,
show_default=False,
help='Name of the input to mute.',
callback=validate.input_in_inputs,
),
], ],
): ):
"""Mute an input.""" """Mute an input."""
if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1)
ctx.obj['obsws'].set_input_mute( ctx.obj['obsws'].set_input_mute(
name=input_name, name=input_name,
muted=True, muted=True,
@@ -130,14 +222,15 @@ def unmute(
ctx: typer.Context, ctx: typer.Context,
input_name: Annotated[ input_name: Annotated[
str, str,
typer.Argument(..., show_default=False, help='Name of the input to unmute.'), typer.Argument(
...,
show_default=False,
help='Name of the input to unmute.',
callback=validate.input_in_inputs,
),
], ],
): ):
"""Unmute an input.""" """Unmute an input."""
if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1)
ctx.obj['obsws'].set_input_mute( ctx.obj['obsws'].set_input_mute(
name=input_name, name=input_name,
muted=False, muted=False,
@@ -151,14 +244,15 @@ def toggle(
ctx: typer.Context, ctx: typer.Context,
input_name: Annotated[ input_name: Annotated[
str, str,
typer.Argument(..., show_default=False, help='Name of the input to toggle.'), typer.Argument(
...,
show_default=False,
help='Name of the input to toggle.',
callback=validate.input_in_inputs,
),
], ],
): ):
"""Toggle an input.""" """Toggle an input."""
if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_input_mute(name=input_name) resp = ctx.obj['obsws'].get_input_mute(name=input_name)
new_state = not resp.input_muted new_state = not resp.input_muted
@@ -175,3 +269,188 @@ def toggle(
console.out.print( console.out.print(
f'Input {console.highlight(ctx, input_name)} unmuted.', f'Input {console.highlight(ctx, input_name)} unmuted.',
) )
@app.command('volume | vol')
def volume(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to set volume for.',
callback=validate.input_in_inputs,
),
],
volume: Annotated[
float,
typer.Argument(
...,
show_default=False,
help='Volume level to set (-90 to 0).',
min=-90,
max=0,
),
],
):
"""Set the volume of an input."""
ctx.obj['obsws'].set_input_volume(
name=input_name,
vol_db=volume,
)
console.out.print(
f'Input {console.highlight(ctx, input_name)} volume set to {console.highlight(ctx, volume)}.',
)
@app.command('show | s')
def show(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to show.',
callback=validate.input_in_inputs,
),
],
verbose: Annotated[
bool, typer.Option(help='List all available input devices.')
] = False,
):
"""Show information for an input in the current scene."""
input_list = ctx.obj['obsws'].get_input_list()
for input_ in input_list.inputs:
if input_['inputName'] == input_name:
input_kind = input_['inputKind']
break
for prop in ['device', 'device_id']:
try:
device_id = (
ctx.obj['obsws']
.get_input_settings(
name=input_name,
)
.input_settings.get(prop)
)
if device_id:
break
except obsws.error.OBSSDKRequestError:
continue
else:
device_id = '(N/A)'
for device in (
ctx.obj['obsws']
.get_input_properties_list_property_items(
input_name=input_name,
prop_name=prop,
)
.property_items
):
if device.get('itemValue') == device_id:
device_id = device.get('itemName')
break
table = Table(
title='Input Information', padding=(0, 2), border_style=ctx.obj['style'].border
)
columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'left', ctx.obj['style'].column),
(Text('Device', justify='center'), 'left', ctx.obj['style'].column),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
device_id,
)
console.out.print(table)
if verbose:
resp = ctx.obj['obsws'].get_input_properties_list_property_items(
input_name=input_name,
prop_name=prop,
)
table = Table(
title='Devices',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = [
(Text('Name', justify='center'), 'left', ctx.obj['style'].column),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for i, item in enumerate(resp.property_items):
table.add_row(
item.get('itemName'),
style='' if i % 2 == 0 else 'dim',
)
console.out.print(table)
@app.command('update | upd')
def update(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to update.',
callback=validate.input_in_inputs,
),
],
device_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the device to set for the input.',
),
],
):
"""Update a setting for an input."""
device_id = None
for prop in ['device', 'device_id']:
try:
for device in (
ctx.obj['obsws']
.get_input_properties_list_property_items(
input_name=input_name,
prop_name=prop,
)
.property_items
):
if device.get('itemName') == device_name:
device_id = device.get('itemValue')
break
except obsws.error.OBSSDKRequestError:
continue
if device_id:
break
if not device_id:
console.err.print(
f'Failed to find device ID for device '
f'{console.highlight(ctx, device_name)}.',
)
raise typer.Exit(1)
ctx.obj['obsws'].set_input_settings(
name=input_name, settings={prop: device_id}, overlay=True
)
console.out.print(
f'Input {console.highlight(ctx, input_name)} updated to use device '
f'{console.highlight(ctx, device_name)}.',
)

View File

@@ -22,6 +22,10 @@ def list_(ctx: typer.Context):
"""List profiles.""" """List profiles."""
resp = ctx.obj['obsws'].get_profile_list() resp = ctx.obj['obsws'].get_profile_list()
if not resp.profiles:
console.out.print('No profiles found.')
raise typer.Exit()
table = Table( table = Table(
title='Profiles', padding=(0, 2), border_style=ctx.obj['style'].border title='Profiles', padding=(0, 2), border_style=ctx.obj['style'].border
) )

View File

@@ -21,16 +21,15 @@ def main():
def list_monitors(ctx: typer.Context): def list_monitors(ctx: typer.Context):
"""List available monitors.""" """List available monitors."""
resp = ctx.obj['obsws'].get_monitor_list() resp = ctx.obj['obsws'].get_monitor_list()
if not resp.monitors:
console.out.print('No monitors found.')
return
monitors = sorted( monitors = sorted(
((m['monitorIndex'], m['monitorName']) for m in resp.monitors), ((m['monitorIndex'], m['monitorName']) for m in resp.monitors),
key=lambda m: m[0], key=lambda m: m[0],
) )
if not monitors:
console.out.print('No monitors found.')
raise typer.Exit()
table = Table( table = Table(
title='Available Monitors', title='Available Monitors',
padding=(0, 2), padding=(0, 2),

View File

@@ -29,6 +29,10 @@ def list_(
for scene in reversed(resp.scenes) for scene in reversed(resp.scenes)
) )
if not scenes:
console.out.print('No scenes found.')
raise typer.Exit()
active_scene = ctx.obj['obsws'].get_current_program_scene().scene_name active_scene = ctx.obj['obsws'].get_current_program_scene().scene_name
table = Table(title='Scenes', padding=(0, 2), border_style=ctx.obj['style'].border) table = Table(title='Scenes', padding=(0, 2), border_style=ctx.obj['style'].border)

View File

@@ -21,6 +21,10 @@ def list_(ctx: typer.Context):
"""List all scene collections.""" """List all scene collections."""
resp = ctx.obj['obsws'].get_scene_collection_list() resp = ctx.obj['obsws'].get_scene_collection_list()
if not resp.scene_collections:
console.out.print('No scene collections found.')
raise typer.Exit()
table = Table( table = Table(
title='Scene Collections', title='Scene Collections',
padding=(0, 2), padding=(0, 2),

View File

@@ -1,76 +1,337 @@
"""module for settings management for obsws-cli.""" """module for settings management."""
from collections import UserDict from typing import Annotated, Optional
from pathlib import Path
from dotenv import dotenv_values import typer
from rich.table import Table
from rich.text import Text
SettingsValue = str | int from . import console, util
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
class Settings(UserDict): @app.callback()
"""A class to manage settings for obsws-cli. def main():
"""Manage OBS settings."""
This class extends UserDict to provide a dictionary-like interface for settings.
It loads settings from environment variables and .env files.
The settings are expected to be in uppercase and should start with 'OBS_'.
Example: @app.command('show | sh')
settings = Settings() def show(
host = settings['OBS_HOST'] ctx: typer.Context,
settings['OBS_PORT'] = 4455 video: Annotated[
bool, typer.Option('--video', '-v', help='Show video settings.')
] = False,
record: Annotated[
bool, typer.Option('--record', '-r', help='Show recording settings.')
] = False,
profile: Annotated[
bool, typer.Option('--profile', '-p', help='Show profile settings.')
] = False,
):
"""Show current OBS settings."""
if not any([video, record, profile]):
video = True
record = True
profile = True
""" resp = ctx.obj['obsws'].get_video_settings()
video_table = Table(
PREFIX = 'OBS_' title='Video Settings',
padding=(0, 2),
def __init__(self, *args, **kwargs): border_style=ctx.obj['style'].border,
"""Initialize the Settings object.""" )
kwargs.update( video_columns = (
{ ('Setting', 'left', ctx.obj['style'].column),
**dotenv_values('.env'), ('Value', 'left', ctx.obj['style'].column),
**dotenv_values(Path.home() / '.config' / 'obsws-cli' / 'obsws.env'),
}
) )
super().__init__(*args, **kwargs)
def __getitem__(self, key: str) -> SettingsValue: for header_text, justify, style in video_columns:
"""Get a setting value by key.""" video_table.add_column(
key = key.upper() Text(header_text, justify='center'),
if not key.startswith(Settings.PREFIX): justify=justify,
key = f'{Settings.PREFIX}{key}' style=style,
return self.data[key] )
def __setitem__(self, key: str, value: SettingsValue): for setting in resp.attrs():
"""Set a setting value by key.""" video_table.add_row(
key = key.upper() util.snakecase_to_titlecase(setting),
if not key.startswith(Settings.PREFIX): str(getattr(resp, setting)),
key = f'{Settings.PREFIX}{key}' style='' if video_table.row_count % 2 == 0 else 'dim',
self.data[key] = value )
if video:
console.out.print(video_table)
resp = ctx.obj['obsws'].get_record_directory()
record_table = Table(
title='Recording Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
record_columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in record_columns:
record_table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
record_table.add_row(
'Directory',
resp.record_directory,
style='' if record_table.row_count % 2 == 0 else 'dim',
)
if record:
console.out.print(record_table)
profile_table = Table(
title='Profile Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
profile_columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in profile_columns:
profile_table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
params = [
('Output', 'Mode', 'Output Mode'),
('SimpleOutput', 'StreamEncoder', 'Simple Streaming Encoder'),
('SimpleOutput', 'RecEncoder', 'Simple Recording Encoder'),
('SimpleOutput', 'RecFormat2', 'Simple Recording Video Format'),
('SimpleOutput', 'RecAudioEncoder', 'Simple Recording Audio Format'),
('SimpleOutput', 'RecQuality', 'Simple Recording Quality'),
('AdvOut', 'Encoder', 'Advanced Streaming Encoder'),
('AdvOut', 'RecEncoder', 'Advanced Recording Encoder'),
('AdvOut', 'RecType', 'Advanced Recording Type'),
('AdvOut', 'RecFormat2', 'Advanced Recording Video Format'),
('AdvOut', 'RecAudioEncoder', 'Advanced Recording Audio Format'),
]
for category, name, display_name in params:
resp = ctx.obj['obsws'].get_profile_parameter(
category=category,
name=name,
)
if resp.parameter_value is not None:
profile_table.add_row(
display_name,
str(resp.parameter_value),
style='' if profile_table.row_count % 2 == 0 else 'dim',
)
if profile:
console.out.print(profile_table)
_settings = Settings( @app.command('profile | pr')
OBS_HOST='localhost', def profile(
OBS_PORT=4455, ctx: typer.Context,
OBS_PASSWORD='', category: Annotated[
OBS_TIMEOUT=5, str,
OBS_DEBUG=False, typer.Argument(
OBS_STYLE='disabled', ...,
OBS_STYLE_NO_BORDER=False, help='Profile parameter category (e.g., SimpleOutput, AdvOut).',
),
],
name: Annotated[
str,
typer.Argument(
...,
help='Profile parameter name (e.g., StreamEncoder, RecFormat2).',
),
],
value: Annotated[
Optional[str],
typer.Argument(
...,
help='Value to set for the profile parameter. If omitted, the current value is retrieved.',
),
] = None,
):
"""Get/set OBS profile settings."""
if value is None:
resp = ctx.obj['obsws'].get_profile_parameter(
category=category,
name=name,
)
console.out.print(
f'Parameter Value for [bold]{name}[/bold]: '
f'[green]{resp.parameter_value}[/green]'
)
else:
ctx.obj['obsws'].set_profile_parameter(
category=category,
name=name,
value=value,
)
console.out.print(
f'Set Parameter [bold]{name}[/bold] to [green]{value}[/green]'
) )
def get(key: str) -> SettingsValue: @app.command('stream-service | ss')
"""Get a setting value by key. def stream_service(
ctx: typer.Context,
type_: Annotated[
Optional[str],
typer.Argument(
...,
help='Stream service type (e.g., Twitch, YouTube). If omitted, current settings are retrieved.',
),
] = None,
key: Annotated[
Optional[str],
typer.Option('--key', '-k', help='Stream key to set. Optional.'),
] = None,
server: Annotated[
Optional[str],
typer.Option('--server', '-s', help='Stream server to set. Optional.'),
] = None,
):
"""Get/set OBS stream service settings."""
if type_ is None:
resp = ctx.obj['obsws'].get_stream_service_settings()
table = Table(
title='Stream Service Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in columns:
table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
table.add_row(
'Type',
resp.stream_service_type,
style='' if table.row_count % 2 == 0 else 'dim',
)
table.add_row(
'Server',
resp.stream_service_settings.get('server', ''),
style='' if table.row_count % 2 == 0 else 'dim',
)
table.add_row(
'Key',
resp.stream_service_settings.get('key', ''),
style='' if table.row_count % 2 == 0 else 'dim',
)
console.out.print(table)
else:
current_settings = ctx.obj['obsws'].get_stream_service_settings()
if key is None:
key = current_settings.stream_service_settings.get('key', '')
if server is None:
server = current_settings.stream_service_settings.get('server', '')
Args: ctx.obj['obsws'].set_stream_service_settings(
key (str): The key of the setting to retrieve. ss_type=type_,
ss_settings={'key': key, 'server': server},
)
console.out.print('Stream service settings updated.')
Returns:
The value of the setting.
Raises: @app.command('video | vi')
KeyError: If the key does not exist in the settings. def video(
ctx: typer.Context,
base_width: Annotated[
Optional[int],
typer.Option('--base-width', '-bw', help='Set base (canvas) width.'),
] = None,
base_height: Annotated[
Optional[int],
typer.Option('--base-height', '-bh', help='Set base (canvas) height.'),
] = None,
output_width: Annotated[
Optional[int],
typer.Option('--output-width', '-ow', help='Set output (scaled) width.'),
] = None,
output_height: Annotated[
Optional[int],
typer.Option('--output-height', '-oh', help='Set output (scaled) height.'),
] = None,
fps_num: Annotated[
Optional[int],
typer.Option('--fps-num', '-fn', help='Set FPS numerator.'),
] = None,
fps_den: Annotated[
Optional[int],
typer.Option('--fps-den', '-fd', help='Set FPS denominator.'),
] = None,
):
"""Get/set OBS video settings."""
if not any(
[
base_width,
base_height,
output_width,
output_height,
fps_num,
fps_den,
]
):
resp = ctx.obj['obsws'].get_video_settings()
table = Table(
title='Video Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in columns:
table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
for setting in resp.attrs():
table.add_row(
util.snakecase_to_titlecase(setting),
str(getattr(resp, setting)),
style='' if table.row_count % 2 == 0 else 'dim',
)
console.out.print(table)
else:
current_settings = ctx.obj['obsws'].get_video_settings()
if base_width is None:
base_width = current_settings.base_width
if base_height is None:
base_height = current_settings.base_height
if output_width is None:
output_width = current_settings.output_width
if output_height is None:
output_height = current_settings.output_height
if fps_num is None:
fps_num = current_settings.fps_num
if fps_den is None:
fps_den = current_settings.fps_den
""" ctx.obj['obsws'].set_video_settings(
return _settings[key] base_width=base_width,
base_height=base_height,
out_width=output_width,
out_height=output_height,
numerator=fps_num,
denominator=fps_den,
)
console.out.print('Video settings updated.')

78
obsws_cli/text.py Normal file
View File

@@ -0,0 +1,78 @@
"""module containing commands for manipulating text inputs."""
from typing import Annotated, Optional
import typer
from . import console, validate
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control text inputs in OBS."""
@app.command('current | get')
def current(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
help='Name of the text input to get.', callback=validate.input_in_inputs
),
],
):
"""Get the current text for a text input."""
resp = ctx.obj['obsws'].get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
console.err.print(
f'Input [yellow]{input_name}[/yellow] is not a text input.',
)
raise typer.Exit(1)
current_text = resp.input_settings.get('text', '')
if not current_text:
current_text = '(empty)'
console.out.print(
f'Current text for input {console.highlight(ctx, input_name)}: {current_text}',
)
@app.command('update | set')
def update(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
help='Name of the text input to update.', callback=validate.input_in_inputs
),
],
new_text: Annotated[
Optional[str],
typer.Argument(
help='The new text to set for the input.',
),
] = None,
):
"""Update the text of a text input."""
resp = ctx.obj['obsws'].get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
console.err.print(
f'Input [yellow]{input_name}[/yellow] is not a text input.',
)
raise typer.Exit(1)
ctx.obj['obsws'].set_input_settings(
name=input_name,
settings={'text': new_text},
overlay=True,
)
if not new_text:
new_text = '(empty)'
console.out.print(
f'Text for input {console.highlight(ctx, input_name)} updated to: {new_text}',
)

View File

@@ -2,14 +2,28 @@
import typer import typer
from . import console
# type alias for an option that is skipped when the command is run # type alias for an option that is skipped when the command is run
skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=False) skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=False)
def input_in_inputs(ctx: typer.Context, input_name: str) -> bool: def input_in_inputs(ctx: typer.Context, input_name: str) -> bool:
"""Check if an input is in the input list.""" """Ensure the given input exists in the list of inputs."""
inputs = ctx.obj['obsws'].get_input_list().inputs resp = ctx.obj['obsws'].get_input_list()
return any(input_.get('inputName') == input_name for input_ in inputs) if not any(input.get('inputName') == input_name for input in resp.inputs):
console.err.print(f'Input [yellow]{input_name}[/yellow] does not exist.')
raise typer.Exit(1)
return input_name
def input_not_in_inputs(ctx: typer.Context, input_name: str) -> bool:
"""Ensure an input does not already exist in the list of inputs."""
resp = ctx.obj['obsws'].get_input_list()
if any(input.get('inputName') == input_name for input in resp.inputs):
console.err.print(f'Input [yellow]{input_name}[/yellow] already exists.')
raise typer.Exit(1)
return input_name
def scene_in_scenes(ctx: typer.Context, scene_name: str) -> bool: def scene_in_scenes(ctx: typer.Context, scene_name: str) -> bool:
@@ -52,3 +66,12 @@ def monitor_exists(ctx: typer.Context, monitor_index: int) -> bool:
"""Check if a monitor exists.""" """Check if a monitor exists."""
resp = ctx.obj['obsws'].get_monitor_list() resp = ctx.obj['obsws'].get_monitor_list()
return any(monitor['monitorIndex'] == monitor_index for monitor in resp.monitors) return any(monitor['monitorIndex'] == monitor_index for monitor in resp.monitors)
def kind_in_input_kinds(ctx: typer.Context, input_kind: str) -> str:
"""Check if an input kind is valid."""
resp = ctx.obj['obsws'].get_input_kind_list(False)
if not any(kind == input_kind for kind in resp.input_kinds):
console.err.print(f'Input kind [yellow]{input_kind}[/yellow] not found.')
raise typer.Exit(1)
return input_kind

View File

@@ -21,7 +21,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy", "Programming Language :: Python :: Implementation :: PyPy",
] ]
dependencies = ["typer>=0.16.0", "obsws-python>=1.8.0", "python-dotenv>=1.1.0"] dependencies = ["typer>=0.21.1", "obsws-python>=1.8.0", "python-dotenv>=1.1.0"]
[project.urls] [project.urls]
@@ -42,9 +42,6 @@ dependencies = ["click-man>=0.5.1"]
cli = "obsws-cli {args:}" cli = "obsws-cli {args:}"
man = "python man/generate.py --output=./man" man = "python man/generate.py --output=./man"
[tool.hatch.envs.lazyimports.scripts]
cli = "obsws-cli {args:}"
[tool.hatch.envs.hatch-test] [tool.hatch.envs.hatch-test]
randomize = true randomize = true

View File

@@ -1,6 +1,7 @@
"""pytest configuration file.""" """pytest configuration file."""
import os import os
import time
import obsws_python as obsws import obsws_python as obsws
from dotenv import find_dotenv, load_dotenv from dotenv import find_dotenv, load_dotenv
@@ -44,9 +45,54 @@ def pytest_sessionstart(session):
}, },
) )
session.obsws.set_current_scene_collection('test-collection') session.obsws.create_profile('pytest_profile')
time.sleep(0.1) # Wait for the profile to be created
session.obsws.set_profile_parameter(
'SimpleOutput',
'RecRB',
'true',
)
# hack to ensure the replay buffer is enabled
session.obsws.set_current_profile('Untitled')
session.obsws.set_current_profile('pytest_profile')
session.obsws.create_scene('pytest_scene') session.obsws.create_scene('pytest_scene')
# Ensure Desktop Audio is created.
desktop_audio_kinds = {
'windows': 'wasapi_output_capture',
'linux': 'pulse_output_capture',
'darwin': 'coreaudio_output_capture',
}
platform = os.environ.get('OBS_TESTS_PLATFORM', os.uname().sysname.lower())
try:
session.obsws.create_input(
sceneName='pytest_scene',
inputName='Desktop Audio',
inputKind=desktop_audio_kinds[platform],
inputSettings={'device_id': 'default'},
sceneItemEnabled=True,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 601:
"""input already exists, continue."""
# Ensure Mic/Aux is created.
mic_kinds = {
'windows': 'wasapi_input_capture',
'linux': 'pulse_input_capture',
'darwin': 'coreaudio_input_capture',
}
try:
session.obsws.create_input(
sceneName='pytest_scene',
inputName='Mic/Aux',
inputKind=mic_kinds[platform],
inputSettings={'device_id': 'default'},
sceneItemEnabled=True,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 601:
"""input already exists, continue."""
session.obsws.create_input( session.obsws.create_input(
sceneName='pytest_scene', sceneName='pytest_scene',
inputName='pytest_input', inputName='pytest_input',
@@ -71,6 +117,13 @@ def pytest_sessionstart(session):
}, },
sceneItemEnabled=True, sceneItemEnabled=True,
) )
session.obsws.create_input(
sceneName='pytest_scene',
inputName='pytest_text_input',
inputKind='text_gdiplus_v3',
inputSettings={'text': 'Hello, OBS!'},
sceneItemEnabled=True,
)
resp = session.obsws.get_scene_item_list('pytest_scene') resp = session.obsws.get_scene_item_list('pytest_scene')
for item in resp.scene_items: for item in resp.scene_items:
if item['sourceName'] == 'pytest_input_2': if item['sourceName'] == 'pytest_input_2':
@@ -124,7 +177,7 @@ def pytest_sessionfinish(session, exitstatus):
session.obsws.remove_scene('pytest_scene') session.obsws.remove_scene('pytest_scene')
session.obsws.set_current_scene_collection('default') session.obsws.set_current_scene_collection('Untitled')
resp = session.obsws.get_stream_status() resp = session.obsws.get_stream_status()
if resp.output_active: if resp.output_active:
@@ -142,6 +195,8 @@ def pytest_sessionfinish(session, exitstatus):
if resp.studio_mode_enabled: if resp.studio_mode_enabled:
session.obsws.set_studio_mode_enabled(False) session.obsws.set_studio_mode_enabled(False)
session.obsws.remove_profile('pytest_profile')
# Close the OBS WebSocket client connection # Close the OBS WebSocket client connection
session.obsws.disconnect() session.obsws.disconnect()

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_filter_list(): def test_filter_list():

View File

@@ -1,10 +1,18 @@
"""Unit tests for the group command in the OBS WebSocket CLI.""" """Unit tests for the group command in the OBS WebSocket CLI."""
import os
import pytest
from typer.testing import CliRunner from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
if os.environ.get('OBS_TESTS_SKIP_GROUP_TESTS'):
pytest.skip(
'Skipping group tests as per environment variable', allow_module_level=True
)
def test_group_list(): def test_group_list():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_hotkey_list(): def test_hotkey_list():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_input_list(): def test_input_list():
@@ -13,10 +13,7 @@ def test_input_list():
assert result.exit_code == 0 assert result.exit_code == 0
assert 'Desktop Audio' in result.stdout assert 'Desktop Audio' in result.stdout
assert 'Mic/Aux' in result.stdout assert 'Mic/Aux' in result.stdout
assert all( assert all(item in result.stdout for item in ('pytest_input', 'pytest_input_2'))
item in result.stdout
for item in ('Colour Source', 'Colour Source 2', 'Colour Source 3')
)
def test_input_list_filter_input(): def test_input_list_filter_input():
@@ -39,9 +36,6 @@ def test_input_list_filter_colour():
"""Test the input list command with colour filter.""" """Test the input list command with colour filter."""
result = runner.invoke(app, ['input', 'list', '--colour']) result = runner.invoke(app, ['input', 'list', '--colour'])
assert result.exit_code == 0 assert result.exit_code == 0
assert all( assert all(item in result.stdout for item in ('pytest_input', 'pytest_input_2'))
item in result.stdout
for item in ('Colour Source', 'Colour Source 2', 'Colour Source 3')
)
assert 'Desktop Audio' not in result.stdout assert 'Desktop Audio' not in result.stdout
assert 'Mic/Aux' not in result.stdout assert 'Mic/Aux' not in result.stdout

View File

@@ -6,7 +6,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_record_start(): def test_record_start():
@@ -49,7 +49,9 @@ def test_record_toggle():
result = runner.invoke(app, ['record', 'toggle']) result = runner.invoke(app, ['record', 'toggle'])
assert result.exit_code == 0 assert result.exit_code == 0
time.sleep(0.5) # Wait for the recording to toggle time.sleep(0.5) # Wait for the recording to toggle
if active: if active:
assert 'Recording stopped successfully.' in result.stdout assert 'Recording stopped successfully.' in result.stdout
else: else:

View File

@@ -1,10 +1,20 @@
"""Unit tests for the replaybuffer command in the OBS WebSocket CLI.""" """Unit tests for the replaybuffer command in the OBS WebSocket CLI."""
import os
import time
import pytest
from typer.testing import CliRunner from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
if os.environ.get('OBS_TESTS_SKIP_REPLAYBUFFER_TESTS'):
pytest.skip(
'Skipping replaybuffer tests as per environment variable',
allow_module_level=True,
)
def test_replaybuffer_start(): def test_replaybuffer_start():
@@ -14,6 +24,9 @@ def test_replaybuffer_start():
active = 'Replay buffer is active.' in resp.stdout active = 'Replay buffer is active.' in resp.stdout
resp = runner.invoke(app, ['replaybuffer', 'start']) resp = runner.invoke(app, ['replaybuffer', 'start'])
time.sleep(0.5) # Wait for the replay buffer to start
if active: if active:
assert resp.exit_code != 0 assert resp.exit_code != 0
assert 'Replay buffer is already active.' in resp.stderr assert 'Replay buffer is already active.' in resp.stderr
@@ -29,6 +42,9 @@ def test_replaybuffer_stop():
active = 'Replay buffer is active.' in resp.stdout active = 'Replay buffer is active.' in resp.stdout
resp = runner.invoke(app, ['replaybuffer', 'stop']) resp = runner.invoke(app, ['replaybuffer', 'stop'])
time.sleep(0.5) # Wait for the replay buffer to stop
if not active: if not active:
assert resp.exit_code != 0 assert resp.exit_code != 0
assert 'Replay buffer is not active.' in resp.stderr assert 'Replay buffer is not active.' in resp.stderr
@@ -44,9 +60,11 @@ def test_replaybuffer_toggle():
active = 'Replay buffer is active.' in resp.stdout active = 'Replay buffer is active.' in resp.stdout
resp = runner.invoke(app, ['replaybuffer', 'toggle']) resp = runner.invoke(app, ['replaybuffer', 'toggle'])
if active:
assert resp.exit_code == 0 assert resp.exit_code == 0
time.sleep(0.5) # Wait for the replay buffer to toggle
if active:
assert 'Replay buffer is not active.' in resp.stdout assert 'Replay buffer is not active.' in resp.stdout
else: else:
assert resp.exit_code == 0
assert 'Replay buffer is active.' in resp.stdout assert 'Replay buffer is active.' in resp.stdout

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_scene_list(): def test_scene_list():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_sceneitem_list(): def test_sceneitem_list():

View File

@@ -6,7 +6,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_stream_start(): def test_stream_start():
@@ -23,7 +23,7 @@ def test_stream_start():
else: else:
assert result.exit_code == 0 assert result.exit_code == 0
assert 'Streaming started successfully.' in result.stdout assert 'Streaming started successfully.' in result.stdout
time.sleep(1) # Wait for the streaming to start time.sleep(0.5) # Wait for the streaming to start
def test_stream_stop(): def test_stream_stop():
@@ -37,7 +37,7 @@ def test_stream_stop():
if active: if active:
assert result.exit_code == 0 assert result.exit_code == 0
assert 'Streaming stopped successfully.' in result.stdout assert 'Streaming stopped successfully.' in result.stdout
time.sleep(1) # Wait for the streaming to stop time.sleep(0.5) # Wait for the streaming to stop
else: else:
assert result.exit_code != 0 assert result.exit_code != 0
assert 'Streaming is not in progress, cannot stop.' in result.stderr assert 'Streaming is not in progress, cannot stop.' in result.stderr
@@ -52,7 +52,7 @@ def test_stream_toggle():
result = runner.invoke(app, ['stream', 'toggle']) result = runner.invoke(app, ['stream', 'toggle'])
assert result.exit_code == 0 assert result.exit_code == 0
time.sleep(1) # Wait for the stream to toggle time.sleep(0.5) # Wait for the stream to toggle
if active: if active:
assert 'Streaming stopped successfully.' in result.stdout assert 'Streaming stopped successfully.' in result.stdout

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_studio_enable(): def test_studio_enable():

18
tests/test_text.py Normal file
View File

@@ -0,0 +1,18 @@
"""Unit tests for the text command in the OBS WebSocket CLI."""
from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
def test_text_update():
"""Test the text update command."""
result = runner.invoke(app, ['text', 'current', 'pytest_text_input'])
assert result.exit_code == 0
assert 'Current text for input pytest_text_input: Hello, OBS!' in result.stdout
result = runner.invoke(app, ['text', 'update', 'pytest_text_input', 'New Text'])
assert result.exit_code == 0
assert 'Text for input pytest_text_input updated to: New Text' in result.stdout

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner(mix_stderr=False) runner = CliRunner()
def test_version(): def test_version():