Compare commits

...

13 Commits

Author SHA1 Message Date
f3c94d1dee add comments to for-else blocks 2025-06-13 00:52:27 +01:00
3c09f9fd5b upd CHANGELOG 2025-06-12 23:47:35 +01:00
51d49c9c93 add status to audio section 2025-06-12 23:47:23 +01:00
bc43c8483a add audio unit tests
add audio status command

patch bump
2025-06-12 23:47:07 +01:00
57e31a7e49 arguments appear to be required by default. 2025-06-12 23:25:07 +01:00
db6a9b5e84 add studiomode unit tests 2025-06-12 23:10:01 +01:00
03f1dac8ea rename leftmost column heading for audio list
patch bump
2025-06-12 22:17:48 +01:00
535b22bf8e add 0.9.0 to CHANGELOG 2025-06-12 22:13:58 +01:00
6e95e4d670 add scenecollection command group
minor bump
2025-06-12 22:13:46 +01:00
519db1b46e add Scene Collection to README 2025-06-12 22:04:15 +01:00
582587bed5 add ruff config
run files through formatter

add dosctrings to satisfy the linter
2025-06-12 20:34:14 +01:00
fecd13d345 patch bump 2025-06-12 18:50:43 +01:00
a1a22d0d00 scene list and audio list now print tables
patch bump
2025-06-12 18:49:54 +01:00
26 changed files with 772 additions and 219 deletions

View File

@ -5,7 +5,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
# [0.8.3] - 2025-06-12 # [0.9.0] - 2025-06-12
### Added
- scenecollection command group, see [Scene Collection](https://github.com/onyx-and-iris/slobs-cli/tree/main?tab=readme-ov-file#scene-collection)
- add audio status commmand.
# [0.8.4] - 2025-06-12
### Added ### Added
@ -16,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- scene list now shows which scene is the current active scene. - scene list now shows which scene is the current active scene.
- audio list now shows mute states. - audio list now shows mute states.
- scene list and audio list commands now print as tables
- --id option added to scene and audio commands to show the source ID in the output - --id option added to scene and audio commands to show the source ID in the output
- by default this is no longer displayed - by default this is no longer displayed

View File

@ -189,6 +189,12 @@ slobs-cli audio unmute "Mic/Aux"
slobs-cli audio toggle "Mic/Aux" slobs-cli audio toggle "Mic/Aux"
``` ```
- status: Get the mute status of an audio source by name.
```console
slobs-cli audio status "Mic/Aux"
```
#### Replay Buffer #### Replay Buffer
- start: Start the replay buffer. - start: Start the replay buffer.
@ -247,6 +253,46 @@ slobs-cli studiomode status
slobs-cli studiomode force-transition slobs-cli studiomode force-transition
``` ```
#### Scene Collection
- list: List all scene collections.
- flags:
*optional*
- --id: Include scenecollection IDs in the output.
```console
slobs-cli scenecollection list
```
- create: Create a new scene collection.
- args: <scenecollection_name>
```console
slobs-cli scenecollection create "NewCollection"
```
- delete: Delete a scene collection by name.
- args: <scenecollection_name>
```console
slobs-cli scenecollection delete "ExistingCollection"
```
- load: Load a scene collection by name.
- args: <scenecollection_name>
```console
slobs-cli scenecollection load "ExistingCollection"
```
- rename: Rename a scene collection.
- args: <scenecollection_name> <new_name>
```console
slobs-cli scenecollection rename "ExistingCollection" "NewName"
```
## Special Thanks ## Special Thanks
- [Julian-0](https://github.com/Julian-O) For writing the [PySLOBS wrapper](https://github.com/Julian-O/PySLOBS) on which this CLI depends. - [Julian-0](https://github.com/Julian-O) For writing the [PySLOBS wrapper](https://github.com/Julian-O/PySLOBS) on which this CLI depends.

40
pdm.lock generated
View File

@ -5,7 +5,7 @@
groups = ["default", "dev"] groups = ["default", "dev"]
strategy = ["inherit_metadata"] strategy = ["inherit_metadata"]
lock_version = "4.5.0" lock_version = "4.5.0"
content_hash = "sha256:87714b892affeadd7dba57d9430f0af3dc46f50cc9d095942367e4fca103f61e" content_hash = "sha256:c1f6a22d9f4fca9c52692b2931ca64dada84a1e99c9013dad4be26bdd786cc6e"
[[metadata.targets]] [[metadata.targets]]
requires_python = ">=3.11" requires_python = ">=3.11"
@ -238,6 +238,33 @@ files = [
{file = "pytest_randomly-3.16.0.tar.gz", hash = "sha256:11bf4d23a26484de7860d82f726c0629837cf4064b79157bd18ec9d41d7feb26"}, {file = "pytest_randomly-3.16.0.tar.gz", hash = "sha256:11bf4d23a26484de7860d82f726c0629837cf4064b79157bd18ec9d41d7feb26"},
] ]
[[package]]
name = "ruff"
version = "0.11.13"
requires_python = ">=3.7"
summary = "An extremely fast Python linter and code formatter, written in Rust."
groups = ["dev"]
files = [
{file = "ruff-0.11.13-py3-none-linux_armv6l.whl", hash = "sha256:4bdfbf1240533f40042ec00c9e09a3aade6f8c10b6414cf11b519488d2635d46"},
{file = "ruff-0.11.13-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:aef9c9ed1b5ca28bb15c7eac83b8670cf3b20b478195bd49c8d756ba0a36cf48"},
{file = "ruff-0.11.13-py3-none-macosx_11_0_arm64.whl", hash = "sha256:53b15a9dfdce029c842e9a5aebc3855e9ab7771395979ff85b7c1dedb53ddc2b"},
{file = "ruff-0.11.13-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ab153241400789138d13f362c43f7edecc0edfffce2afa6a68434000ecd8f69a"},
{file = "ruff-0.11.13-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:6c51f93029d54a910d3d24f7dd0bb909e31b6cd989a5e4ac513f4eb41629f0dc"},
{file = "ruff-0.11.13-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1808b3ed53e1a777c2ef733aca9051dc9bf7c99b26ece15cb59a0320fbdbd629"},
{file = "ruff-0.11.13-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:d28ce58b5ecf0f43c1b71edffabe6ed7f245d5336b17805803312ec9bc665933"},
{file = "ruff-0.11.13-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:55e4bc3a77842da33c16d55b32c6cac1ec5fb0fbec9c8c513bdce76c4f922165"},
{file = "ruff-0.11.13-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:633bf2c6f35678c56ec73189ba6fa19ff1c5e4807a78bf60ef487b9dd272cc71"},
{file = "ruff-0.11.13-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4ffbc82d70424b275b089166310448051afdc6e914fdab90e08df66c43bb5ca9"},
{file = "ruff-0.11.13-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:4a9ddd3ec62a9a89578c85842b836e4ac832d4a2e0bfaad3b02243f930ceafcc"},
{file = "ruff-0.11.13-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:d237a496e0778d719efb05058c64d28b757c77824e04ffe8796c7436e26712b7"},
{file = "ruff-0.11.13-py3-none-musllinux_1_2_i686.whl", hash = "sha256:26816a218ca6ef02142343fd24c70f7cd8c5aa6c203bca284407adf675984432"},
{file = "ruff-0.11.13-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:51c3f95abd9331dc5b87c47ac7f376db5616041173826dfd556cfe3d4977f492"},
{file = "ruff-0.11.13-py3-none-win32.whl", hash = "sha256:96c27935418e4e8e77a26bb05962817f28b8ef3843a6c6cc49d8783b5507f250"},
{file = "ruff-0.11.13-py3-none-win_amd64.whl", hash = "sha256:29c3189895a8a6a657b7af4e97d330c8a3afd2c9c8f46c81e2fc5a31866517e3"},
{file = "ruff-0.11.13-py3-none-win_arm64.whl", hash = "sha256:b4385285e9179d608ff1d2fb9922062663c658605819a6876d8beef0c30b7f3b"},
{file = "ruff-0.11.13.tar.gz", hash = "sha256:26fa247dc68d1d4e72c179e08889a25ac0c7ba4d78aecfc835d49cbfd60bf514"},
]
[[package]] [[package]]
name = "sniffio" name = "sniffio"
version = "1.3.1" version = "1.3.1"
@ -249,6 +276,17 @@ files = [
{file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"},
] ]
[[package]]
name = "terminaltables3"
version = "4.0.0"
requires_python = ">=3.8"
summary = "Generate simple tables in terminals from a nested list of strings. Fork of terminaltables."
groups = ["default"]
files = [
{file = "terminaltables3-4.0.0-py3-none-any.whl", hash = "sha256:93b4c722f35400a7869cd630e2bbab616b129d1c47c628765c7f47baab2ca270"},
{file = "terminaltables3-4.0.0.tar.gz", hash = "sha256:4e3eefe209aa89005a0a34d1525739424569729ee29b5e64a8dd51c5ebdab77f"},
]
[[package]] [[package]]
name = "tox" name = "tox"
version = "4.26.0" version = "4.26.0"

View File

@ -2,7 +2,7 @@
name = "slobs-cli" name = "slobs-cli"
description = "A command line application for Streamlabs Desktop" description = "A command line application for Streamlabs Desktop"
authors = [{ name = "onyx-and-iris", email = "code@onyxandiris.online" }] authors = [{ name = "onyx-and-iris", email = "code@onyxandiris.online" }]
dependencies = ["pyslobs>=2.0.5", "asyncclick>=8.1.8"] dependencies = ["pyslobs>=2.0.5", "asyncclick>=8.1.8", "terminaltables3>=4.0.0"]
requires-python = ">=3.11" requires-python = ">=3.11"
readme = "README.md" readme = "README.md"
license = { text = "MIT" } license = { text = "MIT" }
@ -24,18 +24,22 @@ source = "file"
path = "src/slobs_cli/__about__.py" path = "src/slobs_cli/__about__.py"
[tool.pdm.scripts] [tool.pdm.scripts]
_.env_file = ".env"
cli.cmd = "slobs-cli {args}" cli.cmd = "slobs-cli {args}"
cli.env_file = ".env"
pre_test.cmd = "python tests/setup.py" pre_test.cmd = "python tests/setup.py"
test.cmd = "pytest {args}" test.cmd = "pytest {args}"
test.env_file = ".env"
post_test.cmd = "python tests/teardown.py" post_test.cmd = "python tests/teardown.py"
fmt.cmd = "ruff format {args}"
lint.cmd = "ruff check {args}"
[dependency-groups] [dependency-groups]
dev = [ dev = [
"tox-pdm>=0.7.2", "tox-pdm>=0.7.2",
"pytest>=8.4.0", "pytest>=8.4.0",
"virtualenv-pyenv>=0.5.0",
"pytest-randomly>=3.16.0", "pytest-randomly>=3.16.0",
"virtualenv-pyenv>=0.5.0",
"ruff>=0.11.13",
] ]

79
ruff.toml Normal file
View File

@ -0,0 +1,79 @@
# Exclude a variety of commonly ignored directories.
exclude = [
".bzr",
".direnv",
".eggs",
".git",
".git-rewrite",
".hatch",
".hg",
".ipynb_checkpoints",
".mypy_cache",
".nox",
".pants.d",
".pyenv",
".pytest_cache",
".pytype",
".ruff_cache",
".svn",
".tox",
".venv",
".vscode",
"__pypackages__",
"_build",
"buck-out",
"build",
"dist",
"node_modules",
"site-packages",
"venv",
]
# Same as Black.
line-length = 88
indent-width = 4
# Assume Python 3.11
target-version = "py311"
[lint]
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or
# McCabe complexity (`C901`) by default.
# Enable pydocstyle (`D`) codes by default.
select = ["E4", "E7", "E9", "F", "D"]
ignore = ["D203", "D213"]
# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
unfixable = []
# Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[format]
# Unlike Black, use single quotes for strings.
quote-style = "single"
# Like Black, indent with spaces, rather than tabs.
indent-style = "space"
# Like Black, respect magic trailing commas.
skip-magic-trailing-comma = false
# Like Black, automatically detect the appropriate line ending.
line-ending = "auto"
# Enable auto-formatting of code examples in docstrings. Markdown,
# reStructuredText code/literal blocks and doctests are all supported.
#
# This is currently disabled by default, but it is planned for this
# to be opt-out in the future.
docstring-code-format = false
# Set the line length limit used when formatting code snippets in
# docstrings.
#
# This only has an effect when the `docstring-code-format` setting is
# enabled.
docstring-code-line-length = "dynamic"

View File

@ -1 +1,3 @@
__version__ = "0.8.3" """module for package metadata."""
__version__ = '0.9.2'

View File

@ -1,9 +1,21 @@
"""Package slobs_cli provides a command-line interface for interacting with SLOBS (Streamlabs OBS)."""
from .audio import audio from .audio import audio
from .cli import cli from .cli import cli
from .record import record from .record import record
from .replaybuffer import replaybuffer from .replaybuffer import replaybuffer
from .scene import scene from .scene import scene
from .scenecollection import scenecollection
from .stream import stream from .stream import stream
from .studiomode import studiomode from .studiomode import studiomode
__all__ = ["cli", "scene", "stream", "record", "audio", "replaybuffer", "studiomode"] __all__ = [
'cli',
'scene',
'stream',
'record',
'audio',
'replaybuffer',
'studiomode',
'scenecollection',
]

View File

@ -1,6 +1,9 @@
"""module for managing audio sources in Slobs CLI."""
import asyncclick as click import asyncclick as click
from anyio import create_task_group from anyio import create_task_group
from pyslobs import AudioService from pyslobs import AudioService
from terminaltables3 import AsciiTable
from .cli import cli from .cli import cli
from .errors import SlobsCliError from .errors import SlobsCliError
@ -8,33 +11,47 @@ from .errors import SlobsCliError
@cli.group() @cli.group()
def audio(): def audio():
"""Audio management commands.""" """Manage audio sources in Slobs CLI."""
@audio.command() @audio.command()
@click.option("--id", is_flag=True, help="Include audio source IDs in the output.") @click.option('--id', is_flag=True, help='Include audio source IDs in the output.')
@click.pass_context @click.pass_context
async def list(ctx: click.Context, id: bool = False): async def list(ctx: click.Context, id: bool = False):
"""List all audio sources.""" """List all audio sources."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
as_ = AudioService(conn) as_ = AudioService(conn)
async def _run(): async def _run():
sources = await as_.get_sources() sources = await as_.get_sources()
if not sources: if not sources:
click.echo("No audio sources found.") click.echo('No audio sources found.')
conn.close() conn.close()
return return
click.echo("Available audio sources:") table_data = [
['Audio Device Name', 'ID', 'Muted']
if id
else ['Audio Device Name', 'Muted']
]
for source in sources: for source in sources:
model = await source.get_model() model = await source.get_model()
click.echo(
f"- {click.style(model.name, fg='blue')} " to_append = [click.style(model.name, fg='blue')]
f"{f'ID: {model.source_id}, ' if id else ''}" if id:
f"Muted: {click.style('', fg='green') if model.muted else click.style('', fg='red')}" to_append.append(model.source_id)
) to_append.append('' if model.muted else '')
table_data.append(to_append)
table = AsciiTable(table_data)
table.justify_columns = {
0: 'left',
1: 'left' if id else 'center',
2: 'center' if id else None,
}
click.echo(table.table)
conn.close() conn.close()
async with create_task_group() as tg: async with create_task_group() as tg:
@ -43,12 +60,11 @@ async def list(ctx: click.Context, id: bool = False):
@audio.command() @audio.command()
@click.argument("source_name") @click.argument('source_name')
@click.pass_context @click.pass_context
async def mute(ctx: click.Context, source_name: str): async def mute(ctx: click.Context, source_name: str):
"""Mute an audio source by name.""" """Mute an audio source by name."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
as_ = AudioService(conn) as_ = AudioService(conn)
async def _run(): async def _run():
@ -59,10 +75,10 @@ async def mute(ctx: click.Context, source_name: str):
break break
else: # If no source by the given name was found else: # If no source by the given name was found
conn.close() conn.close()
raise SlobsCliError(f"Source '{source_name}' not found.") raise SlobsCliError(f'Audio source "{source_name}" not found.')
await source.set_muted(True) await source.set_muted(True)
click.echo(f"Muted audio source: {source_name}") click.echo(f'{source_name} muted successfully.')
conn.close() conn.close()
try: try:
@ -75,12 +91,11 @@ async def mute(ctx: click.Context, source_name: str):
@audio.command() @audio.command()
@click.argument("source_name") @click.argument('source_name')
@click.pass_context @click.pass_context
async def unmute(ctx: click.Context, source_name: str): async def unmute(ctx: click.Context, source_name: str):
"""Unmute an audio source by name.""" """Unmute an audio source by name."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
as_ = AudioService(conn) as_ = AudioService(conn)
async def _run(): async def _run():
@ -91,10 +106,10 @@ async def unmute(ctx: click.Context, source_name: str):
break break
else: # If no source by the given name was found else: # If no source by the given name was found
conn.close() conn.close()
raise SlobsCliError(f"Source '{source_name}' not found.") raise SlobsCliError(f'Audio source "{source_name}" not found.')
await source.set_muted(False) await source.set_muted(False)
click.echo(f"Unmuted audio source: {source_name}") click.echo(f'{source_name} unmuted successfully.')
conn.close() conn.close()
try: try:
@ -107,12 +122,11 @@ async def unmute(ctx: click.Context, source_name: str):
@audio.command() @audio.command()
@click.argument("source_name") @click.argument('source_name')
@click.pass_context @click.pass_context
async def toggle(ctx: click.Context, source_name: str): async def toggle(ctx: click.Context, source_name: str):
"""Toggle mute state of an audio source by name.""" """Toggle mute state of an audio source by name."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
as_ = AudioService(conn) as_ = AudioService(conn)
async def _run(): async def _run():
@ -122,15 +136,46 @@ async def toggle(ctx: click.Context, source_name: str):
if model.name.lower() == source_name.lower(): if model.name.lower() == source_name.lower():
if model.muted: if model.muted:
await source.set_muted(False) await source.set_muted(False)
click.echo(f"Unmuted audio source: {source_name}") click.echo(f'{source_name} unmuted successfully.')
else: else:
await source.set_muted(True) await source.set_muted(True)
click.echo(f"Muted audio source: {source_name}") click.echo(f'{source_name} muted successfully.')
conn.close() conn.close()
break break
else: # If no source by the given name was found else: # If no source by the given name was found
conn.close() conn.close()
raise SlobsCliError(f"Source '{source_name}' not found.") raise SlobsCliError(f'Audio source "{source_name}" not found.')
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@audio.command()
@click.argument('source_name')
@click.pass_context
async def status(ctx: click.Context, source_name: str):
"""Get the mute status of an audio source by name."""
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
sources = await as_.get_sources()
for source in sources:
model = await source.get_model()
if model.name.lower() == source_name.lower():
click.echo(
f'"{source_name}" is {"muted" if model.muted else "unmuted"}.'
)
conn.close()
return
else:
conn.close()
raise SlobsCliError(f'Audio source "{source_name}" not found.')
try: try:
async with create_task_group() as tg: async with create_task_group() as tg:

View File

@ -1,3 +1,5 @@
"""module defining the entry point for the Streamlabs Desktop CLI application."""
import anyio import anyio
import asyncclick as click import asyncclick as click
from pyslobs import ConnectionConfig, SlobsConnection from pyslobs import ConnectionConfig, SlobsConnection
@ -7,33 +9,33 @@ from .__about__ import __version__ as version
@click.group() @click.group()
@click.option( @click.option(
"-d", '-d',
"--domain", '--domain',
default="127.0.0.1", default='127.0.0.1',
envvar='SLOBS_DOMAIN',
show_default=True, show_default=True,
show_envvar=True, show_envvar=True,
help="The domain of the SLOBS server.", help='The domain of the SLOBS server.',
envvar="SLOBS_DOMAIN",
) )
@click.option( @click.option(
"-p", '-p',
"--port", '--port',
default=59650, default=59650,
envvar='SLOBS_PORT',
show_default=True, show_default=True,
show_envvar=True, show_envvar=True,
help="The port of the SLOBS server.", help='The port of the SLOBS server.',
envvar="SLOBS_PORT",
) )
@click.option( @click.option(
"-t", '-t',
"--token", '--token',
help="The token for the SLOBS server.", envvar='SLOBS_TOKEN',
envvar="SLOBS_TOKEN",
show_envvar=True, show_envvar=True,
required=True, required=True,
help='The token for the SLOBS server.',
) )
@click.version_option( @click.version_option(
version, "-v", "--version", message="%(prog)s version: %(version)s" version, '-v', '--version', message='%(prog)s version: %(version)s'
) )
@click.pass_context @click.pass_context
async def cli(ctx: click.Context, domain: str, port: int, token: str): async def cli(ctx: click.Context, domain: str, port: int, token: str):
@ -44,7 +46,7 @@ async def cli(ctx: click.Context, domain: str, port: int, token: str):
port=port, port=port,
token=token, token=token,
) )
ctx.obj["connection"] = SlobsConnection(config) ctx.obj['connection'] = SlobsConnection(config)
def run(): def run():

View File

@ -1,3 +1,5 @@
"""module for custom exceptions in Slobs CLI."""
import asyncclick as click import asyncclick as click
@ -5,9 +7,10 @@ class SlobsCliError(click.ClickException):
"""Base class for all Slobs CLI errors.""" """Base class for all Slobs CLI errors."""
def __init__(self, message: str): def __init__(self, message: str):
"""Initialize the SlobsCliError with a message."""
super().__init__(message) super().__init__(message)
self.exit_code = 1 self.exit_code = 1
def show(self): def show(self):
"""Display the error message in red.""" """Display the error message in red."""
click.secho(f"Error: {self.message}", fg="red", err=True) click.secho(f'Error: {self.message}', fg='red', err=True)

View File

@ -1,3 +1,5 @@
"""module for managing recording commands in Slobs CLI."""
import asyncclick as click import asyncclick as click
from anyio import create_task_group from anyio import create_task_group
from pyslobs import StreamingService from pyslobs import StreamingService
@ -8,27 +10,26 @@ from .errors import SlobsCliError
@cli.group() @cli.group()
def record(): def record():
"""Recording management commands.""" """Manage recording in Slobs CLI."""
@record.command() @record.command()
@click.pass_context @click.pass_context
async def start(ctx: click.Context): async def start(ctx: click.Context):
"""Start recording.""" """Start recording."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ss = StreamingService(conn) ss = StreamingService(conn)
async def _run(): async def _run():
model = await ss.get_model() model = await ss.get_model()
active = model.recording_status != "offline" active = model.recording_status != 'offline'
if active: if active:
conn.close() conn.close()
raise SlobsCliError("Recording is already active.") raise SlobsCliError('Recording is already active.')
await ss.toggle_recording() await ss.toggle_recording()
click.echo("Recording started.") click.echo('Recording started.')
conn.close() conn.close()
@ -45,20 +46,19 @@ async def start(ctx: click.Context):
@click.pass_context @click.pass_context
async def stop(ctx: click.Context): async def stop(ctx: click.Context):
"""Stop recording.""" """Stop recording."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ss = StreamingService(conn) ss = StreamingService(conn)
async def _run(): async def _run():
model = await ss.get_model() model = await ss.get_model()
active = model.recording_status != "offline" active = model.recording_status != 'offline'
if not active: if not active:
conn.close() conn.close()
raise SlobsCliError("Recording is already inactive.") raise SlobsCliError('Recording is already inactive.')
await ss.toggle_recording() await ss.toggle_recording()
click.echo("Recording stopped.") click.echo('Recording stopped.')
conn.close() conn.close()
@ -75,18 +75,17 @@ async def stop(ctx: click.Context):
@click.pass_context @click.pass_context
async def status(ctx: click.Context): async def status(ctx: click.Context):
"""Get recording status.""" """Get recording status."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ss = StreamingService(conn) ss = StreamingService(conn)
async def _run(): async def _run():
model = await ss.get_model() model = await ss.get_model()
active = model.recording_status != "offline" active = model.recording_status != 'offline'
if active: if active:
click.echo("Recording is currently active.") click.echo('Recording is currently active.')
else: else:
click.echo("Recording is currently inactive.") click.echo('Recording is currently inactive.')
conn.close() conn.close()
@ -99,20 +98,18 @@ async def status(ctx: click.Context):
@click.pass_context @click.pass_context
async def toggle(ctx: click.Context): async def toggle(ctx: click.Context):
"""Toggle recording status.""" """Toggle recording status."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ss = StreamingService(conn) ss = StreamingService(conn)
async def _run(): async def _run():
model = await ss.get_model() model = await ss.get_model()
active = model.recording_status != "offline" active = model.recording_status != 'offline'
await ss.toggle_recording()
if active: if active:
await ss.toggle_recording() click.echo('Recording stopped.')
click.echo("Recording stopped.")
else: else:
await ss.toggle_recording() click.echo('Recording started.')
click.echo("Recording started.")
conn.close() conn.close()

View File

@ -1,3 +1,5 @@
"""module for managing the replay buffer in Slobs CLI."""
import asyncclick as click import asyncclick as click
from anyio import create_task_group from anyio import create_task_group
from pyslobs import StreamingService from pyslobs import StreamingService
@ -8,27 +10,26 @@ from .errors import SlobsCliError
@cli.group() @cli.group()
def replaybuffer(): def replaybuffer():
"""Replay buffer management commands.""" """Manage the replay buffer in Slobs CLI."""
@replaybuffer.command() @replaybuffer.command()
@click.pass_context @click.pass_context
async def start(ctx: click.Context): async def start(ctx: click.Context):
"""Start the replay buffer.""" """Start the replay buffer."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ss = StreamingService(conn) ss = StreamingService(conn)
async def _run(): async def _run():
model = await ss.get_model() model = await ss.get_model()
active = model.replay_buffer_status != "offline" active = model.replay_buffer_status != 'offline'
if active: if active:
conn.close() conn.close()
raise SlobsCliError("Replay buffer is already active.") raise SlobsCliError('Replay buffer is already active.')
await ss.start_replay_buffer() await ss.start_replay_buffer()
click.echo("Replay buffer started.") click.echo('Replay buffer started.')
conn.close() conn.close()
try: try:
@ -44,20 +45,19 @@ async def start(ctx: click.Context):
@click.pass_context @click.pass_context
async def stop(ctx: click.Context): async def stop(ctx: click.Context):
"""Stop the replay buffer.""" """Stop the replay buffer."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ss = StreamingService(conn) ss = StreamingService(conn)
async def _run(): async def _run():
model = await ss.get_model() model = await ss.get_model()
active = model.replay_buffer_status != "offline" active = model.replay_buffer_status != 'offline'
if not active: if not active:
conn.close() conn.close()
raise SlobsCliError("Replay buffer is already inactive.") raise SlobsCliError('Replay buffer is already inactive.')
await ss.stop_replay_buffer() await ss.stop_replay_buffer()
click.echo("Replay buffer stopped.") click.echo('Replay buffer stopped.')
conn.close() conn.close()
try: try:
@ -73,17 +73,16 @@ async def stop(ctx: click.Context):
@click.pass_context @click.pass_context
async def status(ctx: click.Context): async def status(ctx: click.Context):
"""Get the current status of the replay buffer.""" """Get the current status of the replay buffer."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ss = StreamingService(conn) ss = StreamingService(conn)
async def _run(): async def _run():
model = await ss.get_model() model = await ss.get_model()
active = model.replay_buffer_status != "offline" active = model.replay_buffer_status != 'offline'
if active: if active:
click.echo("Replay buffer is currently active.") click.echo('Replay buffer is currently active.')
else: else:
click.echo("Replay buffer is currently inactive.") click.echo('Replay buffer is currently inactive.')
conn.close() conn.close()
async with create_task_group() as tg: async with create_task_group() as tg:
@ -95,13 +94,12 @@ async def status(ctx: click.Context):
@click.pass_context @click.pass_context
async def save(ctx: click.Context): async def save(ctx: click.Context):
"""Save the current replay buffer.""" """Save the current replay buffer."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ss = StreamingService(conn) ss = StreamingService(conn)
async def _run(): async def _run():
await ss.save_replay() await ss.save_replay()
click.echo("Replay buffer saved.") click.echo('Replay buffer saved.')
conn.close() conn.close()
async with create_task_group() as tg: async with create_task_group() as tg:

View File

@ -1,6 +1,9 @@
"""module for managing scenes in Slobs CLI."""
import asyncclick as click import asyncclick as click
from anyio import create_task_group from anyio import create_task_group
from pyslobs import ScenesService, TransitionsService from pyslobs import ScenesService, TransitionsService
from terminaltables3 import AsciiTable
from .cli import cli from .cli import cli
from .errors import SlobsCliError from .errors import SlobsCliError
@ -8,39 +11,48 @@ from .errors import SlobsCliError
@cli.group() @cli.group()
def scene(): def scene():
"""Scene management commands.""" """Manage scenes in Slobs CLI."""
@scene.command() @scene.command()
@click.option("--id", is_flag=True, help="Include scene IDs in the output.") @click.option('--id', is_flag=True, help='Include scene IDs in the output.')
@click.pass_context @click.pass_context
async def list(ctx: click.Context, id: bool = False): async def list(ctx: click.Context, id: bool = False):
"""List all available scenes.""" """List all available scenes."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ss = ScenesService(conn) ss = ScenesService(conn)
async def _run(): async def _run():
scenes = await ss.get_scenes() scenes = await ss.get_scenes()
if not scenes: if not scenes:
click.echo("No scenes found.") click.echo('No scenes found.')
conn.close() conn.close()
return return
active_scene = await ss.active_scene() active_scene = await ss.active_scene()
click.echo("Available scenes:") table_data = [
['Scene Name', 'ID', 'Active'] if id else ['Scene Name', 'Active']
]
for scene in scenes: for scene in scenes:
if scene.id == active_scene.id: if scene.id == active_scene.id:
click.echo( to_append = [click.style(scene.name, fg='green')]
f"- {click.style(scene.name, fg='green')} "
f"{f'(ID: {scene.id})' if id else ''} [Active]"
)
else: else:
click.echo( to_append = [click.style(scene.name, fg='blue')]
f"- {click.style(scene.name, fg='blue')} " if id:
f"{f'(ID: {scene.id})' if id else ''}" to_append.append(scene.id)
) if scene.id == active_scene.id:
to_append.append('')
table_data.append(to_append)
table = AsciiTable(table_data)
table.justify_columns = {
0: 'left',
1: 'left' if id else 'center',
2: 'center' if id else None,
}
click.echo(table.table)
conn.close() conn.close()
@ -50,19 +62,18 @@ async def list(ctx: click.Context, id: bool = False):
@scene.command() @scene.command()
@click.option("--id", is_flag=True, help="Include scene IDs in the output.") @click.option('--id', is_flag=True, help='Include scene IDs in the output.')
@click.pass_context @click.pass_context
async def current(ctx: click.Context, id: bool = False): async def current(ctx: click.Context, id: bool = False):
"""Show the currently active scene.""" """Show the currently active scene."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ss = ScenesService(conn) ss = ScenesService(conn)
async def _run(): async def _run():
active_scene = await ss.active_scene() active_scene = await ss.active_scene()
click.echo( click.echo(
f"Current active scene: {click.style(active_scene.name, fg='green')} " f'Current active scene: {click.style(active_scene.name, fg="green")} '
f"{f'(ID: {active_scene.id})' if id else ''}" f'{f"(ID: {active_scene.id})" if id else ""}'
) )
conn.close() conn.close()
@ -72,20 +83,19 @@ async def current(ctx: click.Context, id: bool = False):
@scene.command() @scene.command()
@click.option("--id", is_flag=True, help="Include scene IDs in the output.") @click.option('--id', is_flag=True, help='Include scene IDs in the output.')
@click.argument("scene_name", type=str) @click.argument('scene_name')
@click.option( @click.option(
"--preview", '--preview',
is_flag=True, is_flag=True,
help="Switch the preview scene only.", help='Switch the preview scene only.',
) )
@click.pass_context @click.pass_context
async def switch( async def switch(
ctx: click.Context, scene_name: str, preview: bool = False, id: bool = False ctx: click.Context, scene_name: str, preview: bool = False, id: bool = False
): ):
"""Switch to a scene by its name.""" """Switch to a scene by its name."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ss = ScenesService(conn) ss = ScenesService(conn)
ts = TransitionsService(conn) ts = TransitionsService(conn)
@ -99,29 +109,29 @@ async def switch(
await ss.make_scene_active(scene.id) await ss.make_scene_active(scene.id)
if preview: if preview:
click.echo( click.echo(
f"Switched to preview scene: {click.style(scene.name, fg='blue')} " f'Switched to preview scene: {click.style(scene.name, fg="blue")} '
f"{f'(ID: {scene.id}).' if id else ''}" f'{f"(ID: {scene.id})." if id else ""}'
) )
else: else:
click.echo( click.echo(
f"Switched to scene: {click.style(scene.name, fg='blue')} " f'Switched to scene: {click.style(scene.name, fg="blue")} '
f"{f'(ID: {scene.id}).' if id else ''}" f'{f"(ID: {scene.id})." if id else ""}'
) )
await ts.execute_studio_mode_transition() await ts.execute_studio_mode_transition()
click.echo( click.echo(
"Executed studio mode transition to make the scene active." 'Executed studio mode transition to make the scene active.'
) )
else: else:
if preview: if preview:
conn.close() conn.close()
raise SlobsCliError( raise SlobsCliError(
"Cannot switch the preview scene in non-studio mode." 'Cannot switch the preview scene in non-studio mode.'
) )
await ss.make_scene_active(scene.id) await ss.make_scene_active(scene.id)
click.echo( click.echo(
f"Switched to scene: {click.style(scene.name, fg='blue')} " f'Switched to scene: {click.style(scene.name, fg="blue")} '
f"{f'(ID: {scene.id}).' if id else ''}" f'{f"(ID: {scene.id})." if id else ""}'
) )
conn.close() conn.close()

View File

@ -0,0 +1,173 @@
"""module for scene collection management in SLOBS CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import ISceneCollectionCreateOptions, SceneCollectionsService
from terminaltables3 import AsciiTable
from .cli import cli
from .errors import SlobsCliError
@cli.group()
def scenecollection():
"""Manage scene collections in Slobs CLI."""
@scenecollection.command()
@click.option('--id', is_flag=True, help='Include scene collection IDs in the output.')
@click.pass_context
async def list(ctx: click.Context, id: bool):
"""List all scene collections."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
if not collections:
click.echo('No scene collections found.')
conn.close()
return
active_collection = await scs.active_collection()
table_data = [
['Scene Collection Name', 'ID', 'Active']
if id
else ['Scene Collection Name', 'Active']
]
for collection in collections:
if collection.id == active_collection.id:
to_append = [click.style(collection.name, fg='green')]
else:
to_append = [click.style(collection.name, fg='blue')]
if id:
to_append.append(collection.id)
if collection.id == active_collection.id:
to_append.append('')
table_data.append(to_append)
table = AsciiTable(table_data)
table.justify_columns = {
0: 'left',
1: 'left' if id else 'center',
2: 'center' if id else None,
}
click.echo(table.table)
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
@scenecollection.command()
@click.argument('scenecollection_name')
@click.pass_context
async def load(ctx: click.Context, scenecollection_name: str):
"""Load a scene collection by name."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
for collection in collections:
if collection.name == scenecollection_name:
break
else: # If no collection by the given name was found
conn.close()
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
await scs.load(collection.id)
click.echo(f'Scene collection "{scenecollection_name}" loaded successfully.')
conn.close()
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@scenecollection.command()
@click.argument('scenecollection_name')
@click.pass_context
async def create(ctx: click.Context, scenecollection_name: str):
"""Create a new scene collection."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
await scs.create(ISceneCollectionCreateOptions(scenecollection_name))
click.echo(f'Scene collection "{scenecollection_name}" created successfully.')
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
@scenecollection.command()
@click.argument('scenecollection_name')
@click.pass_context
async def delete(ctx: click.Context, scenecollection_name: str):
"""Delete a scene collection by name."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
for collection in collections:
if collection.name == scenecollection_name:
break
else: # If no collection by the given name was found
conn.close()
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
await scs.delete(collection.id)
click.echo(f'Scene collection "{scenecollection_name}" deleted successfully.')
conn.close()
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
@scenecollection.command()
@click.argument('scenecollection_name')
@click.argument('new_name')
@click.pass_context
async def rename(ctx: click.Context, scenecollection_name: str, new_name: str):
"""Rename a scene collection."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
for collection in collections:
if collection.name == scenecollection_name:
break
else: # If no collection by the given name was found
conn.close()
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
await scs.rename(new_name, collection.id)
click.echo(
f'Scene collection "{scenecollection_name}" renamed to "{new_name}".'
)
conn.close()
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e

View File

@ -1,3 +1,5 @@
"""module for managing the replay buffer in Slobs CLI."""
import asyncclick as click import asyncclick as click
from anyio import create_task_group from anyio import create_task_group
from pyslobs import StreamingService from pyslobs import StreamingService
@ -8,27 +10,26 @@ from .errors import SlobsCliError
@cli.group() @cli.group()
def stream(): def stream():
"""Stream management commands.""" """Manage streaming in Slobs CLI."""
@stream.command() @stream.command()
@click.pass_context @click.pass_context
async def start(ctx: click.Context): async def start(ctx: click.Context):
"""Start the stream.""" """Start the stream."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ss = StreamingService(conn) ss = StreamingService(conn)
async def _run(): async def _run():
model = await ss.get_model() model = await ss.get_model()
active = model.streaming_status != "offline" active = model.streaming_status != 'offline'
if active: if active:
conn.close() conn.close()
raise SlobsCliError("Stream is already active.") raise SlobsCliError('Stream is already active.')
await ss.toggle_streaming() await ss.toggle_streaming()
click.echo("Stream started.") click.echo('Stream started.')
conn.close() conn.close()
try: try:
@ -44,20 +45,19 @@ async def start(ctx: click.Context):
@click.pass_context @click.pass_context
async def stop(ctx: click.Context): async def stop(ctx: click.Context):
"""Stop the stream.""" """Stop the stream."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ss = StreamingService(conn) ss = StreamingService(conn)
async def _run(): async def _run():
model = await ss.get_model() model = await ss.get_model()
active = model.streaming_status != "offline" active = model.streaming_status != 'offline'
if not active: if not active:
conn.close() conn.close()
raise SlobsCliError("Stream is already inactive.") raise SlobsCliError('Stream is already inactive.')
await ss.toggle_streaming() await ss.toggle_streaming()
click.echo("Stream stopped.") click.echo('Stream stopped.')
conn.close() conn.close()
try: try:
@ -73,18 +73,17 @@ async def stop(ctx: click.Context):
@click.pass_context @click.pass_context
async def status(ctx: click.Context): async def status(ctx: click.Context):
"""Get the current stream status.""" """Get the current stream status."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ss = StreamingService(conn) ss = StreamingService(conn)
async def _run(): async def _run():
model = await ss.get_model() model = await ss.get_model()
active = model.streaming_status != "offline" active = model.streaming_status != 'offline'
if active: if active:
click.echo("Stream is currently active.") click.echo('Stream is currently active.')
else: else:
click.echo("Stream is currently inactive.") click.echo('Stream is currently inactive.')
conn.close() conn.close()
async with create_task_group() as tg: async with create_task_group() as tg:
@ -96,19 +95,18 @@ async def status(ctx: click.Context):
@click.pass_context @click.pass_context
async def toggle(ctx: click.Context): async def toggle(ctx: click.Context):
"""Toggle the stream status.""" """Toggle the stream status."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ss = StreamingService(conn) ss = StreamingService(conn)
async def _run(): async def _run():
model = await ss.get_model() model = await ss.get_model()
active = model.streaming_status != "offline" active = model.streaming_status != 'offline'
await ss.toggle_streaming() await ss.toggle_streaming()
if active: if active:
click.echo("Stream stopped.") click.echo('Stream stopped.')
else: else:
click.echo("Stream started.") click.echo('Stream started.')
conn.close() conn.close()

View File

@ -1,3 +1,5 @@
"""module for managing studio mode in Slobs CLI."""
import asyncclick as click import asyncclick as click
from anyio import create_task_group from anyio import create_task_group
from pyslobs import TransitionsService from pyslobs import TransitionsService
@ -8,25 +10,24 @@ from .errors import SlobsCliError
@cli.group() @cli.group()
def studiomode(): def studiomode():
"""Studio mode management commands.""" """Manage studio mode in Slobs CLI."""
@studiomode.command() @studiomode.command()
@click.pass_context @click.pass_context
async def enable(ctx: click.Context): async def enable(ctx: click.Context):
"""Enable studio mode.""" """Enable studio mode."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ts = TransitionsService(conn) ts = TransitionsService(conn)
async def _run(): async def _run():
model = await ts.get_model() model = await ts.get_model()
if model.studio_mode: if model.studio_mode:
conn.close() conn.close()
raise SlobsCliError("Studio mode is already enabled.") raise SlobsCliError('Studio mode is already enabled.')
await ts.enable_studio_mode() await ts.enable_studio_mode()
click.echo("Studio mode enabled successfully.") click.echo('Studio mode enabled successfully.')
conn.close() conn.close()
try: try:
@ -42,18 +43,17 @@ async def enable(ctx: click.Context):
@click.pass_context @click.pass_context
async def disable(ctx: click.Context): async def disable(ctx: click.Context):
"""Disable studio mode.""" """Disable studio mode."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ts = TransitionsService(conn) ts = TransitionsService(conn)
async def _run(): async def _run():
model = await ts.get_model() model = await ts.get_model()
if not model.studio_mode: if not model.studio_mode:
conn.close() conn.close()
raise SlobsCliError("Studio mode is already disabled.") raise SlobsCliError('Studio mode is already disabled.')
await ts.disable_studio_mode() await ts.disable_studio_mode()
click.echo("Studio mode disabled successfully.") click.echo('Studio mode disabled successfully.')
conn.close() conn.close()
try: try:
@ -69,16 +69,15 @@ async def disable(ctx: click.Context):
@click.pass_context @click.pass_context
async def status(ctx: click.Context): async def status(ctx: click.Context):
"""Check the status of studio mode.""" """Check the status of studio mode."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ts = TransitionsService(conn) ts = TransitionsService(conn)
async def _run(): async def _run():
model = await ts.get_model() model = await ts.get_model()
if model.studio_mode: if model.studio_mode:
click.echo("Studio mode is currently enabled.") click.echo('Studio mode is currently enabled.')
else: else:
click.echo("Studio mode is currently disabled.") click.echo('Studio mode is currently disabled.')
conn.close() conn.close()
async with create_task_group() as tg: async with create_task_group() as tg:
@ -90,18 +89,17 @@ async def status(ctx: click.Context):
@click.pass_context @click.pass_context
async def toggle(ctx: click.Context): async def toggle(ctx: click.Context):
"""Toggle studio mode.""" """Toggle studio mode."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ts = TransitionsService(conn) ts = TransitionsService(conn)
async def _run(): async def _run():
model = await ts.get_model() model = await ts.get_model()
if model.studio_mode: if model.studio_mode:
await ts.disable_studio_mode() await ts.disable_studio_mode()
click.echo("Studio mode disabled successfully.") click.echo('Studio mode disabled successfully.')
else: else:
await ts.enable_studio_mode() await ts.enable_studio_mode()
click.echo("Studio mode enabled successfully.") click.echo('Studio mode enabled successfully.')
conn.close() conn.close()
async with create_task_group() as tg: async with create_task_group() as tg:
@ -113,18 +111,17 @@ async def toggle(ctx: click.Context):
@click.pass_context @click.pass_context
async def force_transition(ctx: click.Context): async def force_transition(ctx: click.Context):
"""Force a transition in studio mode.""" """Force a transition in studio mode."""
conn = ctx.obj['connection']
conn = ctx.obj["connection"]
ts = TransitionsService(conn) ts = TransitionsService(conn)
async def _run(): async def _run():
model = await ts.get_model() model = await ts.get_model()
if not model.studio_mode: if not model.studio_mode:
conn.close() conn.close()
raise SlobsCliError("Studio mode is not enabled.") raise SlobsCliError('Studio mode is not enabled.')
await ts.execute_studio_mode_transition() await ts.execute_studio_mode_transition()
click.echo("Forced studio mode transition.") click.echo('Forced studio mode transition.')
conn.close() conn.close()
try: try:

View File

@ -0,0 +1 @@
"""Test suite for the slobs_cli package."""

View File

@ -1,6 +1,9 @@
"""pytest configuration for async tests using anyio."""
import pytest import pytest
@pytest.fixture @pytest.fixture
def anyio_backend(): def anyio_backend():
return "asyncio" """Return the backend to use for async tests."""
return 'asyncio'

View File

@ -1,3 +1,10 @@
"""Create test scenes in Streamlabs.
Usage:
Run this script as a standalone program to setup the test environment.
Requires 'SLOBS_DOMAIN' and 'SLOBS_TOKEN' environment variables to be set.
"""
import os import os
import anyio import anyio
@ -6,20 +13,22 @@ from pyslobs import ConnectionConfig, ScenesService, SlobsConnection
async def setup(conn: SlobsConnection): async def setup(conn: SlobsConnection):
"""Set up test scenes in Streamlabs OBS."""
ss = ScenesService(conn) ss = ScenesService(conn)
await ss.create_scene("slobs-test-scene-1") await ss.create_scene('slobs-test-scene-1')
await ss.create_scene("slobs-test-scene-2") await ss.create_scene('slobs-test-scene-2')
await ss.create_scene("slobs-test-scene-3") await ss.create_scene('slobs-test-scene-3')
conn.close() conn.close()
async def main(): async def main():
"""Establish connection and set up scenes."""
conn = SlobsConnection( conn = SlobsConnection(
ConnectionConfig( ConnectionConfig(
domain=os.environ["SLOBS_DOMAIN"], domain=os.environ['SLOBS_DOMAIN'],
port=59650, port=59650,
token=os.environ["SLOBS_TOKEN"], token=os.environ['SLOBS_TOKEN'],
) )
) )
@ -28,5 +37,5 @@ async def main():
tg.start_soon(setup, conn) tg.start_soon(setup, conn)
if __name__ == "__main__": if __name__ == '__main__':
anyio.run(main) anyio.run(main)

View File

@ -1,3 +1,10 @@
"""Remove test scenes in Streamlabs, disable streaming, recording, and replay buffer.
Usage:
Run this script as a standalone program to tear down the test environment.
Requires 'SLOBS_DOMAIN' and 'SLOBS_TOKEN' environment variables to be set.
"""
import os import os
import anyio import anyio
@ -6,30 +13,32 @@ from pyslobs import ConnectionConfig, ScenesService, SlobsConnection, StreamingS
async def cleanup(conn: SlobsConnection): async def cleanup(conn: SlobsConnection):
"""Clean up test scenes and ensure streaming, recording, and replay buffer are stopped."""
ss = ScenesService(conn) ss = ScenesService(conn)
scenes = await ss.get_scenes() scenes = await ss.get_scenes()
for scene in scenes: for scene in scenes:
if scene.name.startswith("slobs-test-scene-"): if scene.name.startswith('slobs-test-scene-'):
await ss.remove_scene(scene.id) await ss.remove_scene(scene.id)
ss = StreamingService(conn) ss = StreamingService(conn)
model = await ss.get_model() model = await ss.get_model()
if model.streaming_status != "offline": if model.streaming_status != 'offline':
await ss.toggle_streaming() await ss.toggle_streaming()
if model.replay_buffer_status != "offline": if model.replay_buffer_status != 'offline':
await ss.stop_replay_buffer() await ss.stop_replay_buffer()
if model.recording_status != "offline": if model.recording_status != 'offline':
await ss.toggle_recording() await ss.toggle_recording()
conn.close() conn.close()
async def main(): async def main():
"""Establish connection and clean up test scenes."""
conn = SlobsConnection( conn = SlobsConnection(
ConnectionConfig( ConnectionConfig(
domain=os.environ["SLOBS_DOMAIN"], domain=os.environ['SLOBS_DOMAIN'],
port=59650, port=59650,
token=os.environ["SLOBS_TOKEN"], token=os.environ['SLOBS_TOKEN'],
) )
) )
@ -38,5 +47,5 @@ async def main():
tg.start_soon(cleanup, conn) tg.start_soon(cleanup, conn)
if __name__ == "__main__": if __name__ == '__main__':
anyio.run(main) anyio.run(main)

43
tests/test_audio.py Normal file
View File

@ -0,0 +1,43 @@
"""Test cases for audio commands in slobs_cli."""
import pytest
from asyncclick.testing import CliRunner
from slobs_cli import cli
@pytest.mark.anyio
async def test_audio_list():
"""Test the list audio sources command."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'list'])
assert result.exit_code == 0
assert 'Desktop Audio' in result.output
assert 'Mic/Aux' in result.output
@pytest.mark.anyio
async def test_audio_mute():
"""Test the mute audio source command."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'mute', 'Mic/Aux'])
assert result.exit_code == 0
assert 'Mic/Aux muted successfully' in result.output
@pytest.mark.anyio
async def test_audio_unmute():
"""Test the unmute audio source command."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'unmute', 'Mic/Aux'])
assert result.exit_code == 0
assert 'Mic/Aux unmuted successfully' in result.output
@pytest.mark.anyio
async def test_audio_invalid_source():
"""Test handling of invalid audio source."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'mute', 'InvalidSource'])
assert result.exit_code != 0
assert 'Audio source "InvalidSource" not found' in result.output

View File

@ -1,3 +1,5 @@
"""Test cases for the recording commands of the slobs_cli CLI application."""
import anyio import anyio
import pytest import pytest
from asyncclick.testing import CliRunner from asyncclick.testing import CliRunner
@ -7,33 +9,35 @@ from slobs_cli import cli
@pytest.mark.anyio @pytest.mark.anyio
async def test_record_start(): async def test_record_start():
"""Test the start recording command."""
runner = CliRunner() runner = CliRunner()
result = await runner.invoke(cli, ["record", "status"]) result = await runner.invoke(cli, ['record', 'status'])
assert result.exit_code == 0 assert result.exit_code == 0
active = "Recording is currently active." in result.output active = 'Recording is currently active.' in result.output
result = await runner.invoke(cli, ["record", "start"]) result = await runner.invoke(cli, ['record', 'start'])
if not active: if not active:
assert result.exit_code == 0 assert result.exit_code == 0
assert "Recording started" in result.output assert 'Recording started' in result.output
await anyio.sleep(0.2) # Allow some time for the recording to start await anyio.sleep(0.2) # Allow some time for the recording to start
else: else:
assert result.exit_code != 0 assert result.exit_code != 0
assert "Recording is already active." in result.output assert 'Recording is already active.' in result.output
@pytest.mark.anyio @pytest.mark.anyio
async def test_record_stop(): async def test_record_stop():
"""Test the stop recording command."""
runner = CliRunner() runner = CliRunner()
result = await runner.invoke(cli, ["record", "status"]) result = await runner.invoke(cli, ['record', 'status'])
assert result.exit_code == 0 assert result.exit_code == 0
active = "Recording is currently active." in result.output active = 'Recording is currently active.' in result.output
result = await runner.invoke(cli, ["record", "stop"]) result = await runner.invoke(cli, ['record', 'stop'])
if active: if active:
assert result.exit_code == 0 assert result.exit_code == 0
assert "Recording stopped" in result.output assert 'Recording stopped' in result.output
await anyio.sleep(0.2) # Allow some time for the recording to stop await anyio.sleep(0.2) # Allow some time for the recording to stop
else: else:
assert result.exit_code != 0 assert result.exit_code != 0
assert "Recording is already inactive." in result.output assert 'Recording is already inactive.' in result.output

View File

@ -1,3 +1,5 @@
"""Test cases for the replay buffer commands in slobs_cli."""
import anyio import anyio
import pytest import pytest
from asyncclick.testing import CliRunner from asyncclick.testing import CliRunner
@ -7,33 +9,35 @@ from slobs_cli import cli
@pytest.mark.anyio @pytest.mark.anyio
async def test_replaybuffer_start(): async def test_replaybuffer_start():
"""Test the start replay buffer command."""
runner = CliRunner() runner = CliRunner()
result = await runner.invoke(cli, ["replaybuffer", "status"]) result = await runner.invoke(cli, ['replaybuffer', 'status'])
assert result.exit_code == 0 assert result.exit_code == 0
active = "Replay buffer is currently active." in result.output active = 'Replay buffer is currently active.' in result.output
result = await runner.invoke(cli, ["replaybuffer", "start"]) result = await runner.invoke(cli, ['replaybuffer', 'start'])
if not active: if not active:
assert result.exit_code == 0 assert result.exit_code == 0
assert "Replay buffer started" in result.output assert 'Replay buffer started' in result.output
await anyio.sleep(0.2) # Allow some time for the replay buffer to start await anyio.sleep(0.2) # Allow some time for the replay buffer to start
else: else:
assert result.exit_code != 0 assert result.exit_code != 0
assert "Replay buffer is already active." in result.output assert 'Replay buffer is already active.' in result.output
@pytest.mark.anyio @pytest.mark.anyio
async def test_replaybuffer_stop(): async def test_replaybuffer_stop():
"""Test the stop replay buffer command."""
runner = CliRunner() runner = CliRunner()
result = await runner.invoke(cli, ["replaybuffer", "status"]) result = await runner.invoke(cli, ['replaybuffer', 'status'])
assert result.exit_code == 0 assert result.exit_code == 0
active = "Replay buffer is currently active." in result.output active = 'Replay buffer is currently active.' in result.output
result = await runner.invoke(cli, ["replaybuffer", "stop"]) result = await runner.invoke(cli, ['replaybuffer', 'stop'])
if active: if active:
assert result.exit_code == 0 assert result.exit_code == 0
assert "Replay buffer stopped" in result.output assert 'Replay buffer stopped' in result.output
await anyio.sleep(0.2) # Allow some time for the replay buffer to stop await anyio.sleep(0.2) # Allow some time for the replay buffer to stop
else: else:
assert result.exit_code != 0 assert result.exit_code != 0
assert "Replay buffer is already inactive." in result.output assert 'Replay buffer is already inactive.' in result.output

View File

@ -1,3 +1,5 @@
"""Test cases for scene commands in slobs_cli."""
import pytest import pytest
from asyncclick.testing import CliRunner from asyncclick.testing import CliRunner
@ -6,20 +8,22 @@ from slobs_cli import cli
@pytest.mark.anyio @pytest.mark.anyio
async def test_scene_list(): async def test_scene_list():
"""Test the list scenes command."""
runner = CliRunner() runner = CliRunner()
result = await runner.invoke(cli, ["scene", "list"]) result = await runner.invoke(cli, ['scene', 'list'])
assert result.exit_code == 0 assert result.exit_code == 0
assert "slobs-test-scene-1" in result.output assert 'slobs-test-scene-1' in result.output
assert "slobs-test-scene-2" in result.output assert 'slobs-test-scene-2' in result.output
assert "slobs-test-scene-3" in result.output assert 'slobs-test-scene-3' in result.output
@pytest.mark.anyio @pytest.mark.anyio
async def test_scene_current(): async def test_scene_current():
"""Test the current scene command."""
runner = CliRunner() runner = CliRunner()
result = await runner.invoke(cli, ["scene", "switch", "slobs-test-scene-2"]) result = await runner.invoke(cli, ['scene', 'switch', 'slobs-test-scene-2'])
assert result.exit_code == 0 assert result.exit_code == 0
result = await runner.invoke(cli, ["scene", "current"]) result = await runner.invoke(cli, ['scene', 'current'])
assert result.exit_code == 0 assert result.exit_code == 0
assert "Current active scene: slobs-test-scene-2" in result.output assert 'Current active scene: slobs-test-scene-2' in result.output

View File

@ -1,3 +1,5 @@
"""Tests for the stream commands in slobs_cli."""
import anyio import anyio
import pytest import pytest
from asyncclick.testing import CliRunner from asyncclick.testing import CliRunner
@ -7,33 +9,35 @@ from slobs_cli import cli
@pytest.mark.anyio @pytest.mark.anyio
async def test_stream_start(): async def test_stream_start():
"""Test the start stream command."""
runner = CliRunner() runner = CliRunner()
result = await runner.invoke(cli, ["stream", "status"]) result = await runner.invoke(cli, ['stream', 'status'])
assert result.exit_code == 0 assert result.exit_code == 0
active = "Stream is currently active." in result.output active = 'Stream is currently active.' in result.output
result = await runner.invoke(cli, ["stream", "start"]) result = await runner.invoke(cli, ['stream', 'start'])
if not active: if not active:
assert result.exit_code == 0 assert result.exit_code == 0
assert "Stream started" in result.output assert 'Stream started' in result.output
await anyio.sleep(0.2) # Allow some time for the stream to start await anyio.sleep(0.2) # Allow some time for the stream to start
else: else:
assert result.exit_code != 0 assert result.exit_code != 0
assert "Stream is already active." in result.output assert 'Stream is already active.' in result.output
@pytest.mark.anyio @pytest.mark.anyio
async def test_stream_stop(): async def test_stream_stop():
"""Test the stop stream command."""
runner = CliRunner() runner = CliRunner()
result = await runner.invoke(cli, ["stream", "status"]) result = await runner.invoke(cli, ['stream', 'status'])
assert result.exit_code == 0 assert result.exit_code == 0
active = "Stream is currently active." in result.output active = 'Stream is currently active.' in result.output
result = await runner.invoke(cli, ["stream", "stop"]) result = await runner.invoke(cli, ['stream', 'stop'])
if active: if active:
assert result.exit_code == 0 assert result.exit_code == 0
assert "Stream stopped" in result.output assert 'Stream stopped' in result.output
await anyio.sleep(0.2) # Allow some time for the stream to stop await anyio.sleep(0.2) # Allow some time for the stream to stop
else: else:
assert result.exit_code != 0 assert result.exit_code != 0
assert "Stream is already inactive." in result.output assert 'Stream is already inactive.' in result.output

60
tests/test_studiomode.py Normal file
View File

@ -0,0 +1,60 @@
"""Test cases for the studio mode commands of the slobs_cli CLI application."""
import pytest
from asyncclick.testing import CliRunner
from slobs_cli import cli
@pytest.mark.anyio
async def test_studiomode_enable():
"""Test the enable studio mode command."""
runner = CliRunner()
result = await runner.invoke(cli, ['studiomode', 'status'])
assert result.exit_code == 0
active = 'Studio mode is currently enabled.' in result.output
result = await runner.invoke(cli, ['studiomode', 'enable'])
if active:
assert result.exit_code != 0
assert 'Studio mode is already enabled.' in result.output
else:
assert result.exit_code == 0
assert 'Studio mode enabled successfully.' in result.output
@pytest.mark.anyio
async def test_studiomode_disable():
"""Test the disable studio mode command."""
runner = CliRunner()
result = await runner.invoke(cli, ['studiomode', 'status'])
assert result.exit_code == 0
active = 'Studio mode is currently enabled.' in result.output
result = await runner.invoke(cli, ['studiomode', 'disable'])
if not active:
assert result.exit_code != 0
assert 'Studio mode is already disabled.' in result.output
else:
assert result.exit_code == 0
assert 'Studio mode disabled successfully.' in result.output
@pytest.mark.anyio
async def test_studiomode_toggle():
"""Test the toggle studio mode command."""
runner = CliRunner()
result = await runner.invoke(cli, ['studiomode', 'status'])
assert result.exit_code == 0
active = 'Studio mode is currently enabled.' in result.output
result = await runner.invoke(cli, ['studiomode', 'toggle'])
if active:
assert result.exit_code == 0
assert 'Studio mode disabled successfully.' in result.output
else:
assert result.exit_code == 0
assert 'Studio mode enabled successfully.' in result.output