Compare commits

...

13 Commits

Author SHA1 Message Date
f3c94d1dee add comments to for-else blocks 2025-06-13 00:52:27 +01:00
3c09f9fd5b upd CHANGELOG 2025-06-12 23:47:35 +01:00
51d49c9c93 add status to audio section 2025-06-12 23:47:23 +01:00
bc43c8483a add audio unit tests
add audio status command

patch bump
2025-06-12 23:47:07 +01:00
57e31a7e49 arguments appear to be required by default. 2025-06-12 23:25:07 +01:00
db6a9b5e84 add studiomode unit tests 2025-06-12 23:10:01 +01:00
03f1dac8ea rename leftmost column heading for audio list
patch bump
2025-06-12 22:17:48 +01:00
535b22bf8e add 0.9.0 to CHANGELOG 2025-06-12 22:13:58 +01:00
6e95e4d670 add scenecollection command group
minor bump
2025-06-12 22:13:46 +01:00
519db1b46e add Scene Collection to README 2025-06-12 22:04:15 +01:00
582587bed5 add ruff config
run files through formatter

add dosctrings to satisfy the linter
2025-06-12 20:34:14 +01:00
fecd13d345 patch bump 2025-06-12 18:50:43 +01:00
a1a22d0d00 scene list and audio list now print tables
patch bump
2025-06-12 18:49:54 +01:00
26 changed files with 772 additions and 219 deletions

View File

@ -5,7 +5,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
# [0.8.3] - 2025-06-12
# [0.9.0] - 2025-06-12
### Added
- scenecollection command group, see [Scene Collection](https://github.com/onyx-and-iris/slobs-cli/tree/main?tab=readme-ov-file#scene-collection)
- add audio status commmand.
# [0.8.4] - 2025-06-12
### Added
@ -16,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- scene list now shows which scene is the current active scene.
- audio list now shows mute states.
- scene list and audio list commands now print as tables
- --id option added to scene and audio commands to show the source ID in the output
- by default this is no longer displayed

View File

@ -189,6 +189,12 @@ slobs-cli audio unmute "Mic/Aux"
slobs-cli audio toggle "Mic/Aux"
```
- status: Get the mute status of an audio source by name.
```console
slobs-cli audio status "Mic/Aux"
```
#### Replay Buffer
- start: Start the replay buffer.
@ -247,6 +253,46 @@ slobs-cli studiomode status
slobs-cli studiomode force-transition
```
#### Scene Collection
- list: List all scene collections.
- flags:
*optional*
- --id: Include scenecollection IDs in the output.
```console
slobs-cli scenecollection list
```
- create: Create a new scene collection.
- args: <scenecollection_name>
```console
slobs-cli scenecollection create "NewCollection"
```
- delete: Delete a scene collection by name.
- args: <scenecollection_name>
```console
slobs-cli scenecollection delete "ExistingCollection"
```
- load: Load a scene collection by name.
- args: <scenecollection_name>
```console
slobs-cli scenecollection load "ExistingCollection"
```
- rename: Rename a scene collection.
- args: <scenecollection_name> <new_name>
```console
slobs-cli scenecollection rename "ExistingCollection" "NewName"
```
## Special Thanks
- [Julian-0](https://github.com/Julian-O) For writing the [PySLOBS wrapper](https://github.com/Julian-O/PySLOBS) on which this CLI depends.

40
pdm.lock generated
View File

@ -5,7 +5,7 @@
groups = ["default", "dev"]
strategy = ["inherit_metadata"]
lock_version = "4.5.0"
content_hash = "sha256:87714b892affeadd7dba57d9430f0af3dc46f50cc9d095942367e4fca103f61e"
content_hash = "sha256:c1f6a22d9f4fca9c52692b2931ca64dada84a1e99c9013dad4be26bdd786cc6e"
[[metadata.targets]]
requires_python = ">=3.11"
@ -238,6 +238,33 @@ files = [
{file = "pytest_randomly-3.16.0.tar.gz", hash = "sha256:11bf4d23a26484de7860d82f726c0629837cf4064b79157bd18ec9d41d7feb26"},
]
[[package]]
name = "ruff"
version = "0.11.13"
requires_python = ">=3.7"
summary = "An extremely fast Python linter and code formatter, written in Rust."
groups = ["dev"]
files = [
{file = "ruff-0.11.13-py3-none-linux_armv6l.whl", hash = "sha256:4bdfbf1240533f40042ec00c9e09a3aade6f8c10b6414cf11b519488d2635d46"},
{file = "ruff-0.11.13-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:aef9c9ed1b5ca28bb15c7eac83b8670cf3b20b478195bd49c8d756ba0a36cf48"},
{file = "ruff-0.11.13-py3-none-macosx_11_0_arm64.whl", hash = "sha256:53b15a9dfdce029c842e9a5aebc3855e9ab7771395979ff85b7c1dedb53ddc2b"},
{file = "ruff-0.11.13-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ab153241400789138d13f362c43f7edecc0edfffce2afa6a68434000ecd8f69a"},
{file = "ruff-0.11.13-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:6c51f93029d54a910d3d24f7dd0bb909e31b6cd989a5e4ac513f4eb41629f0dc"},
{file = "ruff-0.11.13-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1808b3ed53e1a777c2ef733aca9051dc9bf7c99b26ece15cb59a0320fbdbd629"},
{file = "ruff-0.11.13-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:d28ce58b5ecf0f43c1b71edffabe6ed7f245d5336b17805803312ec9bc665933"},
{file = "ruff-0.11.13-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:55e4bc3a77842da33c16d55b32c6cac1ec5fb0fbec9c8c513bdce76c4f922165"},
{file = "ruff-0.11.13-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:633bf2c6f35678c56ec73189ba6fa19ff1c5e4807a78bf60ef487b9dd272cc71"},
{file = "ruff-0.11.13-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4ffbc82d70424b275b089166310448051afdc6e914fdab90e08df66c43bb5ca9"},
{file = "ruff-0.11.13-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:4a9ddd3ec62a9a89578c85842b836e4ac832d4a2e0bfaad3b02243f930ceafcc"},
{file = "ruff-0.11.13-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:d237a496e0778d719efb05058c64d28b757c77824e04ffe8796c7436e26712b7"},
{file = "ruff-0.11.13-py3-none-musllinux_1_2_i686.whl", hash = "sha256:26816a218ca6ef02142343fd24c70f7cd8c5aa6c203bca284407adf675984432"},
{file = "ruff-0.11.13-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:51c3f95abd9331dc5b87c47ac7f376db5616041173826dfd556cfe3d4977f492"},
{file = "ruff-0.11.13-py3-none-win32.whl", hash = "sha256:96c27935418e4e8e77a26bb05962817f28b8ef3843a6c6cc49d8783b5507f250"},
{file = "ruff-0.11.13-py3-none-win_amd64.whl", hash = "sha256:29c3189895a8a6a657b7af4e97d330c8a3afd2c9c8f46c81e2fc5a31866517e3"},
{file = "ruff-0.11.13-py3-none-win_arm64.whl", hash = "sha256:b4385285e9179d608ff1d2fb9922062663c658605819a6876d8beef0c30b7f3b"},
{file = "ruff-0.11.13.tar.gz", hash = "sha256:26fa247dc68d1d4e72c179e08889a25ac0c7ba4d78aecfc835d49cbfd60bf514"},
]
[[package]]
name = "sniffio"
version = "1.3.1"
@ -249,6 +276,17 @@ files = [
{file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"},
]
[[package]]
name = "terminaltables3"
version = "4.0.0"
requires_python = ">=3.8"
summary = "Generate simple tables in terminals from a nested list of strings. Fork of terminaltables."
groups = ["default"]
files = [
{file = "terminaltables3-4.0.0-py3-none-any.whl", hash = "sha256:93b4c722f35400a7869cd630e2bbab616b129d1c47c628765c7f47baab2ca270"},
{file = "terminaltables3-4.0.0.tar.gz", hash = "sha256:4e3eefe209aa89005a0a34d1525739424569729ee29b5e64a8dd51c5ebdab77f"},
]
[[package]]
name = "tox"
version = "4.26.0"

View File

@ -2,7 +2,7 @@
name = "slobs-cli"
description = "A command line application for Streamlabs Desktop"
authors = [{ name = "onyx-and-iris", email = "code@onyxandiris.online" }]
dependencies = ["pyslobs>=2.0.5", "asyncclick>=8.1.8"]
dependencies = ["pyslobs>=2.0.5", "asyncclick>=8.1.8", "terminaltables3>=4.0.0"]
requires-python = ">=3.11"
readme = "README.md"
license = { text = "MIT" }
@ -24,18 +24,22 @@ source = "file"
path = "src/slobs_cli/__about__.py"
[tool.pdm.scripts]
_.env_file = ".env"
cli.cmd = "slobs-cli {args}"
cli.env_file = ".env"
pre_test.cmd = "python tests/setup.py"
test.cmd = "pytest {args}"
test.env_file = ".env"
post_test.cmd = "python tests/teardown.py"
fmt.cmd = "ruff format {args}"
lint.cmd = "ruff check {args}"
[dependency-groups]
dev = [
"tox-pdm>=0.7.2",
"pytest>=8.4.0",
"virtualenv-pyenv>=0.5.0",
"pytest-randomly>=3.16.0",
"virtualenv-pyenv>=0.5.0",
"ruff>=0.11.13",
]

79
ruff.toml Normal file
View File

@ -0,0 +1,79 @@
# Exclude a variety of commonly ignored directories.
exclude = [
".bzr",
".direnv",
".eggs",
".git",
".git-rewrite",
".hatch",
".hg",
".ipynb_checkpoints",
".mypy_cache",
".nox",
".pants.d",
".pyenv",
".pytest_cache",
".pytype",
".ruff_cache",
".svn",
".tox",
".venv",
".vscode",
"__pypackages__",
"_build",
"buck-out",
"build",
"dist",
"node_modules",
"site-packages",
"venv",
]
# Same as Black.
line-length = 88
indent-width = 4
# Assume Python 3.11
target-version = "py311"
[lint]
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or
# McCabe complexity (`C901`) by default.
# Enable pydocstyle (`D`) codes by default.
select = ["E4", "E7", "E9", "F", "D"]
ignore = ["D203", "D213"]
# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
unfixable = []
# Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[format]
# Unlike Black, use single quotes for strings.
quote-style = "single"
# Like Black, indent with spaces, rather than tabs.
indent-style = "space"
# Like Black, respect magic trailing commas.
skip-magic-trailing-comma = false
# Like Black, automatically detect the appropriate line ending.
line-ending = "auto"
# Enable auto-formatting of code examples in docstrings. Markdown,
# reStructuredText code/literal blocks and doctests are all supported.
#
# This is currently disabled by default, but it is planned for this
# to be opt-out in the future.
docstring-code-format = false
# Set the line length limit used when formatting code snippets in
# docstrings.
#
# This only has an effect when the `docstring-code-format` setting is
# enabled.
docstring-code-line-length = "dynamic"

View File

@ -1 +1,3 @@
__version__ = "0.8.3"
"""module for package metadata."""
__version__ = '0.9.2'

View File

@ -1,9 +1,21 @@
"""Package slobs_cli provides a command-line interface for interacting with SLOBS (Streamlabs OBS)."""
from .audio import audio
from .cli import cli
from .record import record
from .replaybuffer import replaybuffer
from .scene import scene
from .scenecollection import scenecollection
from .stream import stream
from .studiomode import studiomode
__all__ = ["cli", "scene", "stream", "record", "audio", "replaybuffer", "studiomode"]
__all__ = [
'cli',
'scene',
'stream',
'record',
'audio',
'replaybuffer',
'studiomode',
'scenecollection',
]

View File

@ -1,6 +1,9 @@
"""module for managing audio sources in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import AudioService
from terminaltables3 import AsciiTable
from .cli import cli
from .errors import SlobsCliError
@ -8,33 +11,47 @@ from .errors import SlobsCliError
@cli.group()
def audio():
"""Audio management commands."""
"""Manage audio sources in Slobs CLI."""
@audio.command()
@click.option("--id", is_flag=True, help="Include audio source IDs in the output.")
@click.option('--id', is_flag=True, help='Include audio source IDs in the output.')
@click.pass_context
async def list(ctx: click.Context, id: bool = False):
"""List all audio sources."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
sources = await as_.get_sources()
if not sources:
click.echo("No audio sources found.")
click.echo('No audio sources found.')
conn.close()
return
click.echo("Available audio sources:")
table_data = [
['Audio Device Name', 'ID', 'Muted']
if id
else ['Audio Device Name', 'Muted']
]
for source in sources:
model = await source.get_model()
click.echo(
f"- {click.style(model.name, fg='blue')} "
f"{f'ID: {model.source_id}, ' if id else ''}"
f"Muted: {click.style('', fg='green') if model.muted else click.style('', fg='red')}"
)
to_append = [click.style(model.name, fg='blue')]
if id:
to_append.append(model.source_id)
to_append.append('' if model.muted else '')
table_data.append(to_append)
table = AsciiTable(table_data)
table.justify_columns = {
0: 'left',
1: 'left' if id else 'center',
2: 'center' if id else None,
}
click.echo(table.table)
conn.close()
async with create_task_group() as tg:
@ -43,12 +60,11 @@ async def list(ctx: click.Context, id: bool = False):
@audio.command()
@click.argument("source_name")
@click.argument('source_name')
@click.pass_context
async def mute(ctx: click.Context, source_name: str):
"""Mute an audio source by name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
@ -59,10 +75,10 @@ async def mute(ctx: click.Context, source_name: str):
break
else: # If no source by the given name was found
conn.close()
raise SlobsCliError(f"Source '{source_name}' not found.")
raise SlobsCliError(f'Audio source "{source_name}" not found.')
await source.set_muted(True)
click.echo(f"Muted audio source: {source_name}")
click.echo(f'{source_name} muted successfully.')
conn.close()
try:
@ -75,12 +91,11 @@ async def mute(ctx: click.Context, source_name: str):
@audio.command()
@click.argument("source_name")
@click.argument('source_name')
@click.pass_context
async def unmute(ctx: click.Context, source_name: str):
"""Unmute an audio source by name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
@ -91,10 +106,10 @@ async def unmute(ctx: click.Context, source_name: str):
break
else: # If no source by the given name was found
conn.close()
raise SlobsCliError(f"Source '{source_name}' not found.")
raise SlobsCliError(f'Audio source "{source_name}" not found.')
await source.set_muted(False)
click.echo(f"Unmuted audio source: {source_name}")
click.echo(f'{source_name} unmuted successfully.')
conn.close()
try:
@ -107,12 +122,11 @@ async def unmute(ctx: click.Context, source_name: str):
@audio.command()
@click.argument("source_name")
@click.argument('source_name')
@click.pass_context
async def toggle(ctx: click.Context, source_name: str):
"""Toggle mute state of an audio source by name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
@ -122,15 +136,46 @@ async def toggle(ctx: click.Context, source_name: str):
if model.name.lower() == source_name.lower():
if model.muted:
await source.set_muted(False)
click.echo(f"Unmuted audio source: {source_name}")
click.echo(f'{source_name} unmuted successfully.')
else:
await source.set_muted(True)
click.echo(f"Muted audio source: {source_name}")
click.echo(f'{source_name} muted successfully.')
conn.close()
break
else: # If no source by the given name was found
conn.close()
raise SlobsCliError(f"Source '{source_name}' not found.")
raise SlobsCliError(f'Audio source "{source_name}" not found.')
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@audio.command()
@click.argument('source_name')
@click.pass_context
async def status(ctx: click.Context, source_name: str):
"""Get the mute status of an audio source by name."""
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
sources = await as_.get_sources()
for source in sources:
model = await source.get_model()
if model.name.lower() == source_name.lower():
click.echo(
f'"{source_name}" is {"muted" if model.muted else "unmuted"}.'
)
conn.close()
return
else:
conn.close()
raise SlobsCliError(f'Audio source "{source_name}" not found.')
try:
async with create_task_group() as tg:

View File

@ -1,3 +1,5 @@
"""module defining the entry point for the Streamlabs Desktop CLI application."""
import anyio
import asyncclick as click
from pyslobs import ConnectionConfig, SlobsConnection
@ -7,33 +9,33 @@ from .__about__ import __version__ as version
@click.group()
@click.option(
"-d",
"--domain",
default="127.0.0.1",
'-d',
'--domain',
default='127.0.0.1',
envvar='SLOBS_DOMAIN',
show_default=True,
show_envvar=True,
help="The domain of the SLOBS server.",
envvar="SLOBS_DOMAIN",
help='The domain of the SLOBS server.',
)
@click.option(
"-p",
"--port",
'-p',
'--port',
default=59650,
envvar='SLOBS_PORT',
show_default=True,
show_envvar=True,
help="The port of the SLOBS server.",
envvar="SLOBS_PORT",
help='The port of the SLOBS server.',
)
@click.option(
"-t",
"--token",
help="The token for the SLOBS server.",
envvar="SLOBS_TOKEN",
'-t',
'--token',
envvar='SLOBS_TOKEN',
show_envvar=True,
required=True,
help='The token for the SLOBS server.',
)
@click.version_option(
version, "-v", "--version", message="%(prog)s version: %(version)s"
version, '-v', '--version', message='%(prog)s version: %(version)s'
)
@click.pass_context
async def cli(ctx: click.Context, domain: str, port: int, token: str):
@ -44,7 +46,7 @@ async def cli(ctx: click.Context, domain: str, port: int, token: str):
port=port,
token=token,
)
ctx.obj["connection"] = SlobsConnection(config)
ctx.obj['connection'] = SlobsConnection(config)
def run():

View File

@ -1,3 +1,5 @@
"""module for custom exceptions in Slobs CLI."""
import asyncclick as click
@ -5,9 +7,10 @@ class SlobsCliError(click.ClickException):
"""Base class for all Slobs CLI errors."""
def __init__(self, message: str):
"""Initialize the SlobsCliError with a message."""
super().__init__(message)
self.exit_code = 1
def show(self):
"""Display the error message in red."""
click.secho(f"Error: {self.message}", fg="red", err=True)
click.secho(f'Error: {self.message}', fg='red', err=True)

View File

@ -1,3 +1,5 @@
"""module for managing recording commands in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import StreamingService
@ -8,27 +10,26 @@ from .errors import SlobsCliError
@cli.group()
def record():
"""Recording management commands."""
"""Manage recording in Slobs CLI."""
@record.command()
@click.pass_context
async def start(ctx: click.Context):
"""Start recording."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.recording_status != "offline"
active = model.recording_status != 'offline'
if active:
conn.close()
raise SlobsCliError("Recording is already active.")
raise SlobsCliError('Recording is already active.')
await ss.toggle_recording()
click.echo("Recording started.")
click.echo('Recording started.')
conn.close()
@ -45,20 +46,19 @@ async def start(ctx: click.Context):
@click.pass_context
async def stop(ctx: click.Context):
"""Stop recording."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.recording_status != "offline"
active = model.recording_status != 'offline'
if not active:
conn.close()
raise SlobsCliError("Recording is already inactive.")
raise SlobsCliError('Recording is already inactive.')
await ss.toggle_recording()
click.echo("Recording stopped.")
click.echo('Recording stopped.')
conn.close()
@ -75,18 +75,17 @@ async def stop(ctx: click.Context):
@click.pass_context
async def status(ctx: click.Context):
"""Get recording status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.recording_status != "offline"
active = model.recording_status != 'offline'
if active:
click.echo("Recording is currently active.")
click.echo('Recording is currently active.')
else:
click.echo("Recording is currently inactive.")
click.echo('Recording is currently inactive.')
conn.close()
@ -99,20 +98,18 @@ async def status(ctx: click.Context):
@click.pass_context
async def toggle(ctx: click.Context):
"""Toggle recording status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.recording_status != "offline"
active = model.recording_status != 'offline'
await ss.toggle_recording()
if active:
await ss.toggle_recording()
click.echo("Recording stopped.")
click.echo('Recording stopped.')
else:
await ss.toggle_recording()
click.echo("Recording started.")
click.echo('Recording started.')
conn.close()

View File

@ -1,3 +1,5 @@
"""module for managing the replay buffer in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import StreamingService
@ -8,27 +10,26 @@ from .errors import SlobsCliError
@cli.group()
def replaybuffer():
"""Replay buffer management commands."""
"""Manage the replay buffer in Slobs CLI."""
@replaybuffer.command()
@click.pass_context
async def start(ctx: click.Context):
"""Start the replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.replay_buffer_status != "offline"
active = model.replay_buffer_status != 'offline'
if active:
conn.close()
raise SlobsCliError("Replay buffer is already active.")
raise SlobsCliError('Replay buffer is already active.')
await ss.start_replay_buffer()
click.echo("Replay buffer started.")
click.echo('Replay buffer started.')
conn.close()
try:
@ -44,20 +45,19 @@ async def start(ctx: click.Context):
@click.pass_context
async def stop(ctx: click.Context):
"""Stop the replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.replay_buffer_status != "offline"
active = model.replay_buffer_status != 'offline'
if not active:
conn.close()
raise SlobsCliError("Replay buffer is already inactive.")
raise SlobsCliError('Replay buffer is already inactive.')
await ss.stop_replay_buffer()
click.echo("Replay buffer stopped.")
click.echo('Replay buffer stopped.')
conn.close()
try:
@ -73,17 +73,16 @@ async def stop(ctx: click.Context):
@click.pass_context
async def status(ctx: click.Context):
"""Get the current status of the replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.replay_buffer_status != "offline"
active = model.replay_buffer_status != 'offline'
if active:
click.echo("Replay buffer is currently active.")
click.echo('Replay buffer is currently active.')
else:
click.echo("Replay buffer is currently inactive.")
click.echo('Replay buffer is currently inactive.')
conn.close()
async with create_task_group() as tg:
@ -95,13 +94,12 @@ async def status(ctx: click.Context):
@click.pass_context
async def save(ctx: click.Context):
"""Save the current replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
await ss.save_replay()
click.echo("Replay buffer saved.")
click.echo('Replay buffer saved.')
conn.close()
async with create_task_group() as tg:

View File

@ -1,6 +1,9 @@
"""module for managing scenes in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import ScenesService, TransitionsService
from terminaltables3 import AsciiTable
from .cli import cli
from .errors import SlobsCliError
@ -8,39 +11,48 @@ from .errors import SlobsCliError
@cli.group()
def scene():
"""Scene management commands."""
"""Manage scenes in Slobs CLI."""
@scene.command()
@click.option("--id", is_flag=True, help="Include scene IDs in the output.")
@click.option('--id', is_flag=True, help='Include scene IDs in the output.')
@click.pass_context
async def list(ctx: click.Context, id: bool = False):
"""List all available scenes."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = ScenesService(conn)
async def _run():
scenes = await ss.get_scenes()
if not scenes:
click.echo("No scenes found.")
click.echo('No scenes found.')
conn.close()
return
active_scene = await ss.active_scene()
click.echo("Available scenes:")
table_data = [
['Scene Name', 'ID', 'Active'] if id else ['Scene Name', 'Active']
]
for scene in scenes:
if scene.id == active_scene.id:
click.echo(
f"- {click.style(scene.name, fg='green')} "
f"{f'(ID: {scene.id})' if id else ''} [Active]"
)
to_append = [click.style(scene.name, fg='green')]
else:
click.echo(
f"- {click.style(scene.name, fg='blue')} "
f"{f'(ID: {scene.id})' if id else ''}"
)
to_append = [click.style(scene.name, fg='blue')]
if id:
to_append.append(scene.id)
if scene.id == active_scene.id:
to_append.append('')
table_data.append(to_append)
table = AsciiTable(table_data)
table.justify_columns = {
0: 'left',
1: 'left' if id else 'center',
2: 'center' if id else None,
}
click.echo(table.table)
conn.close()
@ -50,19 +62,18 @@ async def list(ctx: click.Context, id: bool = False):
@scene.command()
@click.option("--id", is_flag=True, help="Include scene IDs in the output.")
@click.option('--id', is_flag=True, help='Include scene IDs in the output.')
@click.pass_context
async def current(ctx: click.Context, id: bool = False):
"""Show the currently active scene."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = ScenesService(conn)
async def _run():
active_scene = await ss.active_scene()
click.echo(
f"Current active scene: {click.style(active_scene.name, fg='green')} "
f"{f'(ID: {active_scene.id})' if id else ''}"
f'Current active scene: {click.style(active_scene.name, fg="green")} '
f'{f"(ID: {active_scene.id})" if id else ""}'
)
conn.close()
@ -72,20 +83,19 @@ async def current(ctx: click.Context, id: bool = False):
@scene.command()
@click.option("--id", is_flag=True, help="Include scene IDs in the output.")
@click.argument("scene_name", type=str)
@click.option('--id', is_flag=True, help='Include scene IDs in the output.')
@click.argument('scene_name')
@click.option(
"--preview",
'--preview',
is_flag=True,
help="Switch the preview scene only.",
help='Switch the preview scene only.',
)
@click.pass_context
async def switch(
ctx: click.Context, scene_name: str, preview: bool = False, id: bool = False
):
"""Switch to a scene by its name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = ScenesService(conn)
ts = TransitionsService(conn)
@ -99,29 +109,29 @@ async def switch(
await ss.make_scene_active(scene.id)
if preview:
click.echo(
f"Switched to preview scene: {click.style(scene.name, fg='blue')} "
f"{f'(ID: {scene.id}).' if id else ''}"
f'Switched to preview scene: {click.style(scene.name, fg="blue")} '
f'{f"(ID: {scene.id})." if id else ""}'
)
else:
click.echo(
f"Switched to scene: {click.style(scene.name, fg='blue')} "
f"{f'(ID: {scene.id}).' if id else ''}"
f'Switched to scene: {click.style(scene.name, fg="blue")} '
f'{f"(ID: {scene.id})." if id else ""}'
)
await ts.execute_studio_mode_transition()
click.echo(
"Executed studio mode transition to make the scene active."
'Executed studio mode transition to make the scene active.'
)
else:
if preview:
conn.close()
raise SlobsCliError(
"Cannot switch the preview scene in non-studio mode."
'Cannot switch the preview scene in non-studio mode.'
)
await ss.make_scene_active(scene.id)
click.echo(
f"Switched to scene: {click.style(scene.name, fg='blue')} "
f"{f'(ID: {scene.id}).' if id else ''}"
f'Switched to scene: {click.style(scene.name, fg="blue")} '
f'{f"(ID: {scene.id})." if id else ""}'
)
conn.close()

View File

@ -0,0 +1,173 @@
"""module for scene collection management in SLOBS CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import ISceneCollectionCreateOptions, SceneCollectionsService
from terminaltables3 import AsciiTable
from .cli import cli
from .errors import SlobsCliError
@cli.group()
def scenecollection():
"""Manage scene collections in Slobs CLI."""
@scenecollection.command()
@click.option('--id', is_flag=True, help='Include scene collection IDs in the output.')
@click.pass_context
async def list(ctx: click.Context, id: bool):
"""List all scene collections."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
if not collections:
click.echo('No scene collections found.')
conn.close()
return
active_collection = await scs.active_collection()
table_data = [
['Scene Collection Name', 'ID', 'Active']
if id
else ['Scene Collection Name', 'Active']
]
for collection in collections:
if collection.id == active_collection.id:
to_append = [click.style(collection.name, fg='green')]
else:
to_append = [click.style(collection.name, fg='blue')]
if id:
to_append.append(collection.id)
if collection.id == active_collection.id:
to_append.append('')
table_data.append(to_append)
table = AsciiTable(table_data)
table.justify_columns = {
0: 'left',
1: 'left' if id else 'center',
2: 'center' if id else None,
}
click.echo(table.table)
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
@scenecollection.command()
@click.argument('scenecollection_name')
@click.pass_context
async def load(ctx: click.Context, scenecollection_name: str):
"""Load a scene collection by name."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
for collection in collections:
if collection.name == scenecollection_name:
break
else: # If no collection by the given name was found
conn.close()
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
await scs.load(collection.id)
click.echo(f'Scene collection "{scenecollection_name}" loaded successfully.')
conn.close()
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@scenecollection.command()
@click.argument('scenecollection_name')
@click.pass_context
async def create(ctx: click.Context, scenecollection_name: str):
"""Create a new scene collection."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
await scs.create(ISceneCollectionCreateOptions(scenecollection_name))
click.echo(f'Scene collection "{scenecollection_name}" created successfully.')
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
@scenecollection.command()
@click.argument('scenecollection_name')
@click.pass_context
async def delete(ctx: click.Context, scenecollection_name: str):
"""Delete a scene collection by name."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
for collection in collections:
if collection.name == scenecollection_name:
break
else: # If no collection by the given name was found
conn.close()
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
await scs.delete(collection.id)
click.echo(f'Scene collection "{scenecollection_name}" deleted successfully.')
conn.close()
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@scenecollection.command()
@click.argument('scenecollection_name')
@click.argument('new_name')
@click.pass_context
async def rename(ctx: click.Context, scenecollection_name: str, new_name: str):
"""Rename a scene collection."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
for collection in collections:
if collection.name == scenecollection_name:
break
else: # If no collection by the given name was found
conn.close()
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
await scs.rename(new_name, collection.id)
click.echo(
f'Scene collection "{scenecollection_name}" renamed to "{new_name}".'
)
conn.close()
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e

View File

@ -1,3 +1,5 @@
"""module for managing the replay buffer in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import StreamingService
@ -8,27 +10,26 @@ from .errors import SlobsCliError
@cli.group()
def stream():
"""Stream management commands."""
"""Manage streaming in Slobs CLI."""
@stream.command()
@click.pass_context
async def start(ctx: click.Context):
"""Start the stream."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.streaming_status != "offline"
active = model.streaming_status != 'offline'
if active:
conn.close()
raise SlobsCliError("Stream is already active.")
raise SlobsCliError('Stream is already active.')
await ss.toggle_streaming()
click.echo("Stream started.")
click.echo('Stream started.')
conn.close()
try:
@ -44,20 +45,19 @@ async def start(ctx: click.Context):
@click.pass_context
async def stop(ctx: click.Context):
"""Stop the stream."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.streaming_status != "offline"
active = model.streaming_status != 'offline'
if not active:
conn.close()
raise SlobsCliError("Stream is already inactive.")
raise SlobsCliError('Stream is already inactive.')
await ss.toggle_streaming()
click.echo("Stream stopped.")
click.echo('Stream stopped.')
conn.close()
try:
@ -73,18 +73,17 @@ async def stop(ctx: click.Context):
@click.pass_context
async def status(ctx: click.Context):
"""Get the current stream status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.streaming_status != "offline"
active = model.streaming_status != 'offline'
if active:
click.echo("Stream is currently active.")
click.echo('Stream is currently active.')
else:
click.echo("Stream is currently inactive.")
click.echo('Stream is currently inactive.')
conn.close()
async with create_task_group() as tg:
@ -96,19 +95,18 @@ async def status(ctx: click.Context):
@click.pass_context
async def toggle(ctx: click.Context):
"""Toggle the stream status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.streaming_status != "offline"
active = model.streaming_status != 'offline'
await ss.toggle_streaming()
if active:
click.echo("Stream stopped.")
click.echo('Stream stopped.')
else:
click.echo("Stream started.")
click.echo('Stream started.')
conn.close()

View File

@ -1,3 +1,5 @@
"""module for managing studio mode in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import TransitionsService
@ -8,25 +10,24 @@ from .errors import SlobsCliError
@cli.group()
def studiomode():
"""Studio mode management commands."""
"""Manage studio mode in Slobs CLI."""
@studiomode.command()
@click.pass_context
async def enable(ctx: click.Context):
"""Enable studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
model = await ts.get_model()
if model.studio_mode:
conn.close()
raise SlobsCliError("Studio mode is already enabled.")
raise SlobsCliError('Studio mode is already enabled.')
await ts.enable_studio_mode()
click.echo("Studio mode enabled successfully.")
click.echo('Studio mode enabled successfully.')
conn.close()
try:
@ -42,18 +43,17 @@ async def enable(ctx: click.Context):
@click.pass_context
async def disable(ctx: click.Context):
"""Disable studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
model = await ts.get_model()
if not model.studio_mode:
conn.close()
raise SlobsCliError("Studio mode is already disabled.")
raise SlobsCliError('Studio mode is already disabled.')
await ts.disable_studio_mode()
click.echo("Studio mode disabled successfully.")
click.echo('Studio mode disabled successfully.')
conn.close()
try:
@ -69,16 +69,15 @@ async def disable(ctx: click.Context):
@click.pass_context
async def status(ctx: click.Context):
"""Check the status of studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
model = await ts.get_model()
if model.studio_mode:
click.echo("Studio mode is currently enabled.")
click.echo('Studio mode is currently enabled.')
else:
click.echo("Studio mode is currently disabled.")
click.echo('Studio mode is currently disabled.')
conn.close()
async with create_task_group() as tg:
@ -90,18 +89,17 @@ async def status(ctx: click.Context):
@click.pass_context
async def toggle(ctx: click.Context):
"""Toggle studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
model = await ts.get_model()
if model.studio_mode:
await ts.disable_studio_mode()
click.echo("Studio mode disabled successfully.")
click.echo('Studio mode disabled successfully.')
else:
await ts.enable_studio_mode()
click.echo("Studio mode enabled successfully.")
click.echo('Studio mode enabled successfully.')
conn.close()
async with create_task_group() as tg:
@ -113,18 +111,17 @@ async def toggle(ctx: click.Context):
@click.pass_context
async def force_transition(ctx: click.Context):
"""Force a transition in studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
model = await ts.get_model()
if not model.studio_mode:
conn.close()
raise SlobsCliError("Studio mode is not enabled.")
raise SlobsCliError('Studio mode is not enabled.')
await ts.execute_studio_mode_transition()
click.echo("Forced studio mode transition.")
click.echo('Forced studio mode transition.')
conn.close()
try:

View File

@ -0,0 +1 @@
"""Test suite for the slobs_cli package."""

View File

@ -1,6 +1,9 @@
"""pytest configuration for async tests using anyio."""
import pytest
@pytest.fixture
def anyio_backend():
return "asyncio"
"""Return the backend to use for async tests."""
return 'asyncio'

View File

@ -1,3 +1,10 @@
"""Create test scenes in Streamlabs.
Usage:
Run this script as a standalone program to setup the test environment.
Requires 'SLOBS_DOMAIN' and 'SLOBS_TOKEN' environment variables to be set.
"""
import os
import anyio
@ -6,20 +13,22 @@ from pyslobs import ConnectionConfig, ScenesService, SlobsConnection
async def setup(conn: SlobsConnection):
"""Set up test scenes in Streamlabs OBS."""
ss = ScenesService(conn)
await ss.create_scene("slobs-test-scene-1")
await ss.create_scene("slobs-test-scene-2")
await ss.create_scene("slobs-test-scene-3")
await ss.create_scene('slobs-test-scene-1')
await ss.create_scene('slobs-test-scene-2')
await ss.create_scene('slobs-test-scene-3')
conn.close()
async def main():
"""Establish connection and set up scenes."""
conn = SlobsConnection(
ConnectionConfig(
domain=os.environ["SLOBS_DOMAIN"],
domain=os.environ['SLOBS_DOMAIN'],
port=59650,
token=os.environ["SLOBS_TOKEN"],
token=os.environ['SLOBS_TOKEN'],
)
)
@ -28,5 +37,5 @@ async def main():
tg.start_soon(setup, conn)
if __name__ == "__main__":
if __name__ == '__main__':
anyio.run(main)

View File

@ -1,3 +1,10 @@
"""Remove test scenes in Streamlabs, disable streaming, recording, and replay buffer.
Usage:
Run this script as a standalone program to tear down the test environment.
Requires 'SLOBS_DOMAIN' and 'SLOBS_TOKEN' environment variables to be set.
"""
import os
import anyio
@ -6,30 +13,32 @@ from pyslobs import ConnectionConfig, ScenesService, SlobsConnection, StreamingS
async def cleanup(conn: SlobsConnection):
"""Clean up test scenes and ensure streaming, recording, and replay buffer are stopped."""
ss = ScenesService(conn)
scenes = await ss.get_scenes()
for scene in scenes:
if scene.name.startswith("slobs-test-scene-"):
if scene.name.startswith('slobs-test-scene-'):
await ss.remove_scene(scene.id)
ss = StreamingService(conn)
model = await ss.get_model()
if model.streaming_status != "offline":
if model.streaming_status != 'offline':
await ss.toggle_streaming()
if model.replay_buffer_status != "offline":
if model.replay_buffer_status != 'offline':
await ss.stop_replay_buffer()
if model.recording_status != "offline":
if model.recording_status != 'offline':
await ss.toggle_recording()
conn.close()
async def main():
"""Establish connection and clean up test scenes."""
conn = SlobsConnection(
ConnectionConfig(
domain=os.environ["SLOBS_DOMAIN"],
domain=os.environ['SLOBS_DOMAIN'],
port=59650,
token=os.environ["SLOBS_TOKEN"],
token=os.environ['SLOBS_TOKEN'],
)
)
@ -38,5 +47,5 @@ async def main():
tg.start_soon(cleanup, conn)
if __name__ == "__main__":
if __name__ == '__main__':
anyio.run(main)

43
tests/test_audio.py Normal file
View File

@ -0,0 +1,43 @@
"""Test cases for audio commands in slobs_cli."""
import pytest
from asyncclick.testing import CliRunner
from slobs_cli import cli
@pytest.mark.anyio
async def test_audio_list():
"""Test the list audio sources command."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'list'])
assert result.exit_code == 0
assert 'Desktop Audio' in result.output
assert 'Mic/Aux' in result.output
@pytest.mark.anyio
async def test_audio_mute():
"""Test the mute audio source command."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'mute', 'Mic/Aux'])
assert result.exit_code == 0
assert 'Mic/Aux muted successfully' in result.output
@pytest.mark.anyio
async def test_audio_unmute():
"""Test the unmute audio source command."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'unmute', 'Mic/Aux'])
assert result.exit_code == 0
assert 'Mic/Aux unmuted successfully' in result.output
@pytest.mark.anyio
async def test_audio_invalid_source():
"""Test handling of invalid audio source."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'mute', 'InvalidSource'])
assert result.exit_code != 0
assert 'Audio source "InvalidSource" not found' in result.output

View File

@ -1,3 +1,5 @@
"""Test cases for the recording commands of the slobs_cli CLI application."""
import anyio
import pytest
from asyncclick.testing import CliRunner
@ -7,33 +9,35 @@ from slobs_cli import cli
@pytest.mark.anyio
async def test_record_start():
"""Test the start recording command."""
runner = CliRunner()
result = await runner.invoke(cli, ["record", "status"])
result = await runner.invoke(cli, ['record', 'status'])
assert result.exit_code == 0
active = "Recording is currently active." in result.output
active = 'Recording is currently active.' in result.output
result = await runner.invoke(cli, ["record", "start"])
result = await runner.invoke(cli, ['record', 'start'])
if not active:
assert result.exit_code == 0
assert "Recording started" in result.output
assert 'Recording started' in result.output
await anyio.sleep(0.2) # Allow some time for the recording to start
else:
assert result.exit_code != 0
assert "Recording is already active." in result.output
assert 'Recording is already active.' in result.output
@pytest.mark.anyio
async def test_record_stop():
"""Test the stop recording command."""
runner = CliRunner()
result = await runner.invoke(cli, ["record", "status"])
result = await runner.invoke(cli, ['record', 'status'])
assert result.exit_code == 0
active = "Recording is currently active." in result.output
active = 'Recording is currently active.' in result.output
result = await runner.invoke(cli, ["record", "stop"])
result = await runner.invoke(cli, ['record', 'stop'])
if active:
assert result.exit_code == 0
assert "Recording stopped" in result.output
assert 'Recording stopped' in result.output
await anyio.sleep(0.2) # Allow some time for the recording to stop
else:
assert result.exit_code != 0
assert "Recording is already inactive." in result.output
assert 'Recording is already inactive.' in result.output

View File

@ -1,3 +1,5 @@
"""Test cases for the replay buffer commands in slobs_cli."""
import anyio
import pytest
from asyncclick.testing import CliRunner
@ -7,33 +9,35 @@ from slobs_cli import cli
@pytest.mark.anyio
async def test_replaybuffer_start():
"""Test the start replay buffer command."""
runner = CliRunner()
result = await runner.invoke(cli, ["replaybuffer", "status"])
result = await runner.invoke(cli, ['replaybuffer', 'status'])
assert result.exit_code == 0
active = "Replay buffer is currently active." in result.output
active = 'Replay buffer is currently active.' in result.output
result = await runner.invoke(cli, ["replaybuffer", "start"])
result = await runner.invoke(cli, ['replaybuffer', 'start'])
if not active:
assert result.exit_code == 0
assert "Replay buffer started" in result.output
assert 'Replay buffer started' in result.output
await anyio.sleep(0.2) # Allow some time for the replay buffer to start
else:
assert result.exit_code != 0
assert "Replay buffer is already active." in result.output
assert 'Replay buffer is already active.' in result.output
@pytest.mark.anyio
async def test_replaybuffer_stop():
"""Test the stop replay buffer command."""
runner = CliRunner()
result = await runner.invoke(cli, ["replaybuffer", "status"])
result = await runner.invoke(cli, ['replaybuffer', 'status'])
assert result.exit_code == 0
active = "Replay buffer is currently active." in result.output
active = 'Replay buffer is currently active.' in result.output
result = await runner.invoke(cli, ["replaybuffer", "stop"])
result = await runner.invoke(cli, ['replaybuffer', 'stop'])
if active:
assert result.exit_code == 0
assert "Replay buffer stopped" in result.output
assert 'Replay buffer stopped' in result.output
await anyio.sleep(0.2) # Allow some time for the replay buffer to stop
else:
assert result.exit_code != 0
assert "Replay buffer is already inactive." in result.output
assert 'Replay buffer is already inactive.' in result.output

View File

@ -1,3 +1,5 @@
"""Test cases for scene commands in slobs_cli."""
import pytest
from asyncclick.testing import CliRunner
@ -6,20 +8,22 @@ from slobs_cli import cli
@pytest.mark.anyio
async def test_scene_list():
"""Test the list scenes command."""
runner = CliRunner()
result = await runner.invoke(cli, ["scene", "list"])
result = await runner.invoke(cli, ['scene', 'list'])
assert result.exit_code == 0
assert "slobs-test-scene-1" in result.output
assert "slobs-test-scene-2" in result.output
assert "slobs-test-scene-3" in result.output
assert 'slobs-test-scene-1' in result.output
assert 'slobs-test-scene-2' in result.output
assert 'slobs-test-scene-3' in result.output
@pytest.mark.anyio
async def test_scene_current():
"""Test the current scene command."""
runner = CliRunner()
result = await runner.invoke(cli, ["scene", "switch", "slobs-test-scene-2"])
result = await runner.invoke(cli, ['scene', 'switch', 'slobs-test-scene-2'])
assert result.exit_code == 0
result = await runner.invoke(cli, ["scene", "current"])
result = await runner.invoke(cli, ['scene', 'current'])
assert result.exit_code == 0
assert "Current active scene: slobs-test-scene-2" in result.output
assert 'Current active scene: slobs-test-scene-2' in result.output

View File

@ -1,3 +1,5 @@
"""Tests for the stream commands in slobs_cli."""
import anyio
import pytest
from asyncclick.testing import CliRunner
@ -7,33 +9,35 @@ from slobs_cli import cli
@pytest.mark.anyio
async def test_stream_start():
"""Test the start stream command."""
runner = CliRunner()
result = await runner.invoke(cli, ["stream", "status"])
result = await runner.invoke(cli, ['stream', 'status'])
assert result.exit_code == 0
active = "Stream is currently active." in result.output
active = 'Stream is currently active.' in result.output
result = await runner.invoke(cli, ["stream", "start"])
result = await runner.invoke(cli, ['stream', 'start'])
if not active:
assert result.exit_code == 0
assert "Stream started" in result.output
assert 'Stream started' in result.output
await anyio.sleep(0.2) # Allow some time for the stream to start
else:
assert result.exit_code != 0
assert "Stream is already active." in result.output
assert 'Stream is already active.' in result.output
@pytest.mark.anyio
async def test_stream_stop():
"""Test the stop stream command."""
runner = CliRunner()
result = await runner.invoke(cli, ["stream", "status"])
result = await runner.invoke(cli, ['stream', 'status'])
assert result.exit_code == 0
active = "Stream is currently active." in result.output
active = 'Stream is currently active.' in result.output
result = await runner.invoke(cli, ["stream", "stop"])
result = await runner.invoke(cli, ['stream', 'stop'])
if active:
assert result.exit_code == 0
assert "Stream stopped" in result.output
assert 'Stream stopped' in result.output
await anyio.sleep(0.2) # Allow some time for the stream to stop
else:
assert result.exit_code != 0
assert "Stream is already inactive." in result.output
assert 'Stream is already inactive.' in result.output

60
tests/test_studiomode.py Normal file
View File

@ -0,0 +1,60 @@
"""Test cases for the studio mode commands of the slobs_cli CLI application."""
import pytest
from asyncclick.testing import CliRunner
from slobs_cli import cli
@pytest.mark.anyio
async def test_studiomode_enable():
"""Test the enable studio mode command."""
runner = CliRunner()
result = await runner.invoke(cli, ['studiomode', 'status'])
assert result.exit_code == 0
active = 'Studio mode is currently enabled.' in result.output
result = await runner.invoke(cli, ['studiomode', 'enable'])
if active:
assert result.exit_code != 0
assert 'Studio mode is already enabled.' in result.output
else:
assert result.exit_code == 0
assert 'Studio mode enabled successfully.' in result.output
@pytest.mark.anyio
async def test_studiomode_disable():
"""Test the disable studio mode command."""
runner = CliRunner()
result = await runner.invoke(cli, ['studiomode', 'status'])
assert result.exit_code == 0
active = 'Studio mode is currently enabled.' in result.output
result = await runner.invoke(cli, ['studiomode', 'disable'])
if not active:
assert result.exit_code != 0
assert 'Studio mode is already disabled.' in result.output
else:
assert result.exit_code == 0
assert 'Studio mode disabled successfully.' in result.output
@pytest.mark.anyio
async def test_studiomode_toggle():
"""Test the toggle studio mode command."""
runner = CliRunner()
result = await runner.invoke(cli, ['studiomode', 'status'])
assert result.exit_code == 0
active = 'Studio mode is currently enabled.' in result.output
result = await runner.invoke(cli, ['studiomode', 'toggle'])
if active:
assert result.exit_code == 0
assert 'Studio mode disabled successfully.' in result.output
else:
assert result.exit_code == 0
assert 'Studio mode enabled successfully.' in result.output