diff --git a/pdm.lock b/pdm.lock index 0b869ab..56215d0 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:e24474fa487e6f512b9c1f5a97aaabf58ed6164fb084f12e954ec59f1c4f8545" +content_hash = "sha256:c1f6a22d9f4fca9c52692b2931ca64dada84a1e99c9013dad4be26bdd786cc6e" [[metadata.targets]] requires_python = ">=3.11" @@ -238,6 +238,33 @@ files = [ {file = "pytest_randomly-3.16.0.tar.gz", hash = "sha256:11bf4d23a26484de7860d82f726c0629837cf4064b79157bd18ec9d41d7feb26"}, ] +[[package]] +name = "ruff" +version = "0.11.13" +requires_python = ">=3.7" +summary = "An extremely fast Python linter and code formatter, written in Rust." +groups = ["dev"] +files = [ + {file = "ruff-0.11.13-py3-none-linux_armv6l.whl", hash = "sha256:4bdfbf1240533f40042ec00c9e09a3aade6f8c10b6414cf11b519488d2635d46"}, + {file = "ruff-0.11.13-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:aef9c9ed1b5ca28bb15c7eac83b8670cf3b20b478195bd49c8d756ba0a36cf48"}, + {file = "ruff-0.11.13-py3-none-macosx_11_0_arm64.whl", hash = "sha256:53b15a9dfdce029c842e9a5aebc3855e9ab7771395979ff85b7c1dedb53ddc2b"}, + {file = "ruff-0.11.13-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ab153241400789138d13f362c43f7edecc0edfffce2afa6a68434000ecd8f69a"}, + {file = "ruff-0.11.13-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:6c51f93029d54a910d3d24f7dd0bb909e31b6cd989a5e4ac513f4eb41629f0dc"}, + {file = "ruff-0.11.13-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1808b3ed53e1a777c2ef733aca9051dc9bf7c99b26ece15cb59a0320fbdbd629"}, + {file = "ruff-0.11.13-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:d28ce58b5ecf0f43c1b71edffabe6ed7f245d5336b17805803312ec9bc665933"}, + {file = "ruff-0.11.13-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:55e4bc3a77842da33c16d55b32c6cac1ec5fb0fbec9c8c513bdce76c4f922165"}, + {file = "ruff-0.11.13-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:633bf2c6f35678c56ec73189ba6fa19ff1c5e4807a78bf60ef487b9dd272cc71"}, + {file = "ruff-0.11.13-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4ffbc82d70424b275b089166310448051afdc6e914fdab90e08df66c43bb5ca9"}, + {file = "ruff-0.11.13-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:4a9ddd3ec62a9a89578c85842b836e4ac832d4a2e0bfaad3b02243f930ceafcc"}, + {file = "ruff-0.11.13-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:d237a496e0778d719efb05058c64d28b757c77824e04ffe8796c7436e26712b7"}, + {file = "ruff-0.11.13-py3-none-musllinux_1_2_i686.whl", hash = "sha256:26816a218ca6ef02142343fd24c70f7cd8c5aa6c203bca284407adf675984432"}, + {file = "ruff-0.11.13-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:51c3f95abd9331dc5b87c47ac7f376db5616041173826dfd556cfe3d4977f492"}, + {file = "ruff-0.11.13-py3-none-win32.whl", hash = "sha256:96c27935418e4e8e77a26bb05962817f28b8ef3843a6c6cc49d8783b5507f250"}, + {file = "ruff-0.11.13-py3-none-win_amd64.whl", hash = "sha256:29c3189895a8a6a657b7af4e97d330c8a3afd2c9c8f46c81e2fc5a31866517e3"}, + {file = "ruff-0.11.13-py3-none-win_arm64.whl", hash = "sha256:b4385285e9179d608ff1d2fb9922062663c658605819a6876d8beef0c30b7f3b"}, + {file = "ruff-0.11.13.tar.gz", hash = "sha256:26fa247dc68d1d4e72c179e08889a25ac0c7ba4d78aecfc835d49cbfd60bf514"}, +] + [[package]] name = "sniffio" version = "1.3.1" diff --git a/pyproject.toml b/pyproject.toml index 9152650..780f3d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,18 +24,22 @@ source = "file" path = "src/slobs_cli/__about__.py" [tool.pdm.scripts] +_.env_file = ".env" + cli.cmd = "slobs-cli {args}" -cli.env_file = ".env" pre_test.cmd = "python tests/setup.py" test.cmd = "pytest {args}" -test.env_file = ".env" post_test.cmd = "python tests/teardown.py" +fmt.cmd = "ruff format {args}" +lint.cmd = "ruff check {args}" + [dependency-groups] dev = [ "tox-pdm>=0.7.2", "pytest>=8.4.0", "pytest-randomly>=3.16.0", "virtualenv-pyenv>=0.5.0", + "ruff>=0.11.13", ] diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 0000000..eeb513e --- /dev/null +++ b/ruff.toml @@ -0,0 +1,79 @@ +# Exclude a variety of commonly ignored directories. +exclude = [ + ".bzr", + ".direnv", + ".eggs", + ".git", + ".git-rewrite", + ".hatch", + ".hg", + ".ipynb_checkpoints", + ".mypy_cache", + ".nox", + ".pants.d", + ".pyenv", + ".pytest_cache", + ".pytype", + ".ruff_cache", + ".svn", + ".tox", + ".venv", + ".vscode", + "__pypackages__", + "_build", + "buck-out", + "build", + "dist", + "node_modules", + "site-packages", + "venv", +] + +# Same as Black. +line-length = 88 +indent-width = 4 + +# Assume Python 3.11 +target-version = "py311" + +[lint] +# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. +# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or +# McCabe complexity (`C901`) by default. +# Enable pydocstyle (`D`) codes by default. +select = ["E4", "E7", "E9", "F", "D"] +ignore = ["D203", "D213"] + +# Allow fix for all enabled rules (when `--fix`) is provided. +fixable = ["ALL"] +unfixable = [] + +# Allow unused variables when underscore-prefixed. +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" + +[format] +# Unlike Black, use single quotes for strings. +quote-style = "single" + +# Like Black, indent with spaces, rather than tabs. +indent-style = "space" + +# Like Black, respect magic trailing commas. +skip-magic-trailing-comma = false + +# Like Black, automatically detect the appropriate line ending. +line-ending = "auto" + +# Enable auto-formatting of code examples in docstrings. Markdown, +# reStructuredText code/literal blocks and doctests are all supported. +# +# This is currently disabled by default, but it is planned for this +# to be opt-out in the future. +docstring-code-format = false + +# Set the line length limit used when formatting code snippets in +# docstrings. +# +# This only has an effect when the `docstring-code-format` setting is +# enabled. +docstring-code-line-length = "dynamic" diff --git a/src/slobs_cli/__about__.py b/src/slobs_cli/__about__.py index fa3ddd8..c61f9dc 100644 --- a/src/slobs_cli/__about__.py +++ b/src/slobs_cli/__about__.py @@ -1 +1,3 @@ -__version__ = "0.8.4" +"""module for package metadata.""" + +__version__ = '0.8.4' diff --git a/src/slobs_cli/__init__.py b/src/slobs_cli/__init__.py index b7e1834..197efb6 100644 --- a/src/slobs_cli/__init__.py +++ b/src/slobs_cli/__init__.py @@ -1,3 +1,5 @@ +"""Package slobs_cli provides a command-line interface for interacting with SLOBS (Streamlabs OBS).""" + from .audio import audio from .cli import cli from .record import record @@ -6,4 +8,4 @@ from .scene import scene from .stream import stream from .studiomode import studiomode -__all__ = ["cli", "scene", "stream", "record", "audio", "replaybuffer", "studiomode"] +__all__ = ['cli', 'scene', 'stream', 'record', 'audio', 'replaybuffer', 'studiomode'] diff --git a/src/slobs_cli/audio.py b/src/slobs_cli/audio.py index 6f48488..755391a 100644 --- a/src/slobs_cli/audio.py +++ b/src/slobs_cli/audio.py @@ -1,3 +1,5 @@ +"""module for managing audio sources in Slobs CLI.""" + import asyncclick as click from anyio import create_task_group from pyslobs import AudioService @@ -9,41 +11,40 @@ from .errors import SlobsCliError @cli.group() def audio(): - """Audio management commands.""" + """Manage audio sources in Slobs CLI.""" @audio.command() -@click.option("--id", is_flag=True, help="Include audio source IDs in the output.") +@click.option('--id', is_flag=True, help='Include audio source IDs in the output.') @click.pass_context async def list(ctx: click.Context, id: bool = False): """List all audio sources.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] as_ = AudioService(conn) async def _run(): sources = await as_.get_sources() if not sources: - click.echo("No audio sources found.") + click.echo('No audio sources found.') conn.close() return - table_data = [["Audio Name", "ID", "Muted"] if id else ["Name", "Muted"]] + table_data = [['Audio Name', 'ID', 'Muted'] if id else ['Name', 'Muted']] for source in sources: model = await source.get_model() - to_append = [f"{click.style(model.name, fg='blue')}"] + to_append = [click.style(model.name, fg='blue')] if id: - to_append.append(f"{model.source_id}") - to_append.append("✅" if model.muted else "❌") + to_append.append(model.source_id) + to_append.append('✅' if model.muted else '❌') table_data.append(to_append) table = AsciiTable(table_data) table.justify_columns = { - 0: "left", - 1: "left" if id else "center", - 2: "center" if id else None, + 0: 'left', + 1: 'left' if id else 'center', + 2: 'center' if id else None, } click.echo(table.table) @@ -55,12 +56,11 @@ async def list(ctx: click.Context, id: bool = False): @audio.command() -@click.argument("source_name") +@click.argument('source_name') @click.pass_context async def mute(ctx: click.Context, source_name: str): """Mute an audio source by name.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] as_ = AudioService(conn) async def _run(): @@ -74,7 +74,7 @@ async def mute(ctx: click.Context, source_name: str): raise SlobsCliError(f"Source '{source_name}' not found.") await source.set_muted(True) - click.echo(f"Muted audio source: {source_name}") + click.echo(f'Muted audio source: {source_name}') conn.close() try: @@ -87,12 +87,11 @@ async def mute(ctx: click.Context, source_name: str): @audio.command() -@click.argument("source_name") +@click.argument('source_name') @click.pass_context async def unmute(ctx: click.Context, source_name: str): """Unmute an audio source by name.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] as_ = AudioService(conn) async def _run(): @@ -106,7 +105,7 @@ async def unmute(ctx: click.Context, source_name: str): raise SlobsCliError(f"Source '{source_name}' not found.") await source.set_muted(False) - click.echo(f"Unmuted audio source: {source_name}") + click.echo(f'Unmuted audio source: {source_name}') conn.close() try: @@ -119,12 +118,11 @@ async def unmute(ctx: click.Context, source_name: str): @audio.command() -@click.argument("source_name") +@click.argument('source_name') @click.pass_context async def toggle(ctx: click.Context, source_name: str): """Toggle mute state of an audio source by name.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] as_ = AudioService(conn) async def _run(): @@ -134,10 +132,10 @@ async def toggle(ctx: click.Context, source_name: str): if model.name.lower() == source_name.lower(): if model.muted: await source.set_muted(False) - click.echo(f"Unmuted audio source: {source_name}") + click.echo(f'Unmuted audio source: {source_name}') else: await source.set_muted(True) - click.echo(f"Muted audio source: {source_name}") + click.echo(f'Muted audio source: {source_name}') conn.close() break else: # If no source by the given name was found diff --git a/src/slobs_cli/cli.py b/src/slobs_cli/cli.py index c05a0fe..d5167a3 100644 --- a/src/slobs_cli/cli.py +++ b/src/slobs_cli/cli.py @@ -1,3 +1,5 @@ +"""module defining the entry point for the Streamlabs Desktop CLI application.""" + import anyio import asyncclick as click from pyslobs import ConnectionConfig, SlobsConnection @@ -7,33 +9,33 @@ from .__about__ import __version__ as version @click.group() @click.option( - "-d", - "--domain", - default="127.0.0.1", + '-d', + '--domain', + default='127.0.0.1', + envvar='SLOBS_DOMAIN', show_default=True, show_envvar=True, - help="The domain of the SLOBS server.", - envvar="SLOBS_DOMAIN", + help='The domain of the SLOBS server.', ) @click.option( - "-p", - "--port", + '-p', + '--port', default=59650, + envvar='SLOBS_PORT', show_default=True, show_envvar=True, - help="The port of the SLOBS server.", - envvar="SLOBS_PORT", + help='The port of the SLOBS server.', ) @click.option( - "-t", - "--token", - help="The token for the SLOBS server.", - envvar="SLOBS_TOKEN", + '-t', + '--token', + envvar='SLOBS_TOKEN', show_envvar=True, required=True, + help='The token for the SLOBS server.', ) @click.version_option( - version, "-v", "--version", message="%(prog)s version: %(version)s" + version, '-v', '--version', message='%(prog)s version: %(version)s' ) @click.pass_context async def cli(ctx: click.Context, domain: str, port: int, token: str): @@ -44,7 +46,7 @@ async def cli(ctx: click.Context, domain: str, port: int, token: str): port=port, token=token, ) - ctx.obj["connection"] = SlobsConnection(config) + ctx.obj['connection'] = SlobsConnection(config) def run(): diff --git a/src/slobs_cli/errors.py b/src/slobs_cli/errors.py index 5164f0e..42cadc7 100644 --- a/src/slobs_cli/errors.py +++ b/src/slobs_cli/errors.py @@ -1,3 +1,5 @@ +"""module for custom exceptions in Slobs CLI.""" + import asyncclick as click @@ -5,9 +7,10 @@ class SlobsCliError(click.ClickException): """Base class for all Slobs CLI errors.""" def __init__(self, message: str): + """Initialize the SlobsCliError with a message.""" super().__init__(message) self.exit_code = 1 def show(self): """Display the error message in red.""" - click.secho(f"Error: {self.message}", fg="red", err=True) + click.secho(f'Error: {self.message}', fg='red', err=True) diff --git a/src/slobs_cli/record.py b/src/slobs_cli/record.py index 226443a..8b36387 100644 --- a/src/slobs_cli/record.py +++ b/src/slobs_cli/record.py @@ -1,3 +1,5 @@ +"""module for managing recording commands in Slobs CLI.""" + import asyncclick as click from anyio import create_task_group from pyslobs import StreamingService @@ -8,27 +10,26 @@ from .errors import SlobsCliError @cli.group() def record(): - """Recording management commands.""" + """Manage recording in Slobs CLI.""" @record.command() @click.pass_context async def start(ctx: click.Context): """Start recording.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ss = StreamingService(conn) async def _run(): model = await ss.get_model() - active = model.recording_status != "offline" + active = model.recording_status != 'offline' if active: conn.close() - raise SlobsCliError("Recording is already active.") + raise SlobsCliError('Recording is already active.') await ss.toggle_recording() - click.echo("Recording started.") + click.echo('Recording started.') conn.close() @@ -45,20 +46,19 @@ async def start(ctx: click.Context): @click.pass_context async def stop(ctx: click.Context): """Stop recording.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ss = StreamingService(conn) async def _run(): model = await ss.get_model() - active = model.recording_status != "offline" + active = model.recording_status != 'offline' if not active: conn.close() - raise SlobsCliError("Recording is already inactive.") + raise SlobsCliError('Recording is already inactive.') await ss.toggle_recording() - click.echo("Recording stopped.") + click.echo('Recording stopped.') conn.close() @@ -75,18 +75,17 @@ async def stop(ctx: click.Context): @click.pass_context async def status(ctx: click.Context): """Get recording status.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ss = StreamingService(conn) async def _run(): model = await ss.get_model() - active = model.recording_status != "offline" + active = model.recording_status != 'offline' if active: - click.echo("Recording is currently active.") + click.echo('Recording is currently active.') else: - click.echo("Recording is currently inactive.") + click.echo('Recording is currently inactive.') conn.close() @@ -99,20 +98,19 @@ async def status(ctx: click.Context): @click.pass_context async def toggle(ctx: click.Context): """Toggle recording status.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ss = StreamingService(conn) async def _run(): model = await ss.get_model() - active = model.recording_status != "offline" + active = model.recording_status != 'offline' if active: await ss.toggle_recording() - click.echo("Recording stopped.") + click.echo('Recording stopped.') else: await ss.toggle_recording() - click.echo("Recording started.") + click.echo('Recording started.') conn.close() diff --git a/src/slobs_cli/replaybuffer.py b/src/slobs_cli/replaybuffer.py index d274487..bad52d4 100644 --- a/src/slobs_cli/replaybuffer.py +++ b/src/slobs_cli/replaybuffer.py @@ -1,3 +1,5 @@ +"""module for managing the replay buffer in Slobs CLI.""" + import asyncclick as click from anyio import create_task_group from pyslobs import StreamingService @@ -8,27 +10,26 @@ from .errors import SlobsCliError @cli.group() def replaybuffer(): - """Replay buffer management commands.""" + """Manage the replay buffer in Slobs CLI.""" @replaybuffer.command() @click.pass_context async def start(ctx: click.Context): """Start the replay buffer.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ss = StreamingService(conn) async def _run(): model = await ss.get_model() - active = model.replay_buffer_status != "offline" + active = model.replay_buffer_status != 'offline' if active: conn.close() - raise SlobsCliError("Replay buffer is already active.") + raise SlobsCliError('Replay buffer is already active.') await ss.start_replay_buffer() - click.echo("Replay buffer started.") + click.echo('Replay buffer started.') conn.close() try: @@ -44,20 +45,19 @@ async def start(ctx: click.Context): @click.pass_context async def stop(ctx: click.Context): """Stop the replay buffer.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ss = StreamingService(conn) async def _run(): model = await ss.get_model() - active = model.replay_buffer_status != "offline" + active = model.replay_buffer_status != 'offline' if not active: conn.close() - raise SlobsCliError("Replay buffer is already inactive.") + raise SlobsCliError('Replay buffer is already inactive.') await ss.stop_replay_buffer() - click.echo("Replay buffer stopped.") + click.echo('Replay buffer stopped.') conn.close() try: @@ -73,17 +73,16 @@ async def stop(ctx: click.Context): @click.pass_context async def status(ctx: click.Context): """Get the current status of the replay buffer.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ss = StreamingService(conn) async def _run(): model = await ss.get_model() - active = model.replay_buffer_status != "offline" + active = model.replay_buffer_status != 'offline' if active: - click.echo("Replay buffer is currently active.") + click.echo('Replay buffer is currently active.') else: - click.echo("Replay buffer is currently inactive.") + click.echo('Replay buffer is currently inactive.') conn.close() async with create_task_group() as tg: @@ -95,13 +94,12 @@ async def status(ctx: click.Context): @click.pass_context async def save(ctx: click.Context): """Save the current replay buffer.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ss = StreamingService(conn) async def _run(): await ss.save_replay() - click.echo("Replay buffer saved.") + click.echo('Replay buffer saved.') conn.close() async with create_task_group() as tg: diff --git a/src/slobs_cli/scene.py b/src/slobs_cli/scene.py index e859f1f..31cb10c 100644 --- a/src/slobs_cli/scene.py +++ b/src/slobs_cli/scene.py @@ -1,3 +1,5 @@ +"""module for managing scenes in Slobs CLI.""" + import asyncclick as click from anyio import create_task_group from pyslobs import ScenesService, TransitionsService @@ -9,46 +11,45 @@ from .errors import SlobsCliError @cli.group() def scene(): - """Scene management commands.""" + """Manage scenes in Slobs CLI.""" @scene.command() -@click.option("--id", is_flag=True, help="Include scene IDs in the output.") +@click.option('--id', is_flag=True, help='Include scene IDs in the output.') @click.pass_context async def list(ctx: click.Context, id: bool = False): """List all available scenes.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ss = ScenesService(conn) async def _run(): scenes = await ss.get_scenes() if not scenes: - click.echo("No scenes found.") + click.echo('No scenes found.') conn.close() return active_scene = await ss.active_scene() table_data = [ - ["Scene Name", "ID", "Active"] if id else ["Scene Name", "Active"] + ['Scene Name', 'ID', 'Active'] if id else ['Scene Name', 'Active'] ] for scene in scenes: if scene.id == active_scene.id: - to_append = [f"{click.style(scene.name, fg='green')}"] + to_append = [click.style(scene.name, fg='green')] else: - to_append = [f"{click.style(scene.name, fg='blue')}"] + to_append = [click.style(scene.name, fg='blue')] if id: - to_append.append(f"{scene.id}") - to_append.append("✅" if scene.id == active_scene.id else "") + to_append.append(scene.id) + to_append.append('✅' if scene.id == active_scene.id else '') table_data.append(to_append) table = AsciiTable(table_data) table.justify_columns = { - 0: "left", - 1: "left" if id else "center", - 2: "center" if id else None, + 0: 'left', + 1: 'left' if id else 'center', + 2: 'center' if id else None, } click.echo(table.table) @@ -60,19 +61,18 @@ async def list(ctx: click.Context, id: bool = False): @scene.command() -@click.option("--id", is_flag=True, help="Include scene IDs in the output.") +@click.option('--id', is_flag=True, help='Include scene IDs in the output.') @click.pass_context async def current(ctx: click.Context, id: bool = False): """Show the currently active scene.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ss = ScenesService(conn) async def _run(): active_scene = await ss.active_scene() click.echo( - f"Current active scene: {click.style(active_scene.name, fg='green')} " - f"{f'(ID: {active_scene.id})' if id else ''}" + f'Current active scene: {click.style(active_scene.name, fg="green")} ' + f'{f"(ID: {active_scene.id})" if id else ""}' ) conn.close() @@ -82,20 +82,19 @@ async def current(ctx: click.Context, id: bool = False): @scene.command() -@click.option("--id", is_flag=True, help="Include scene IDs in the output.") -@click.argument("scene_name", type=str) +@click.option('--id', is_flag=True, help='Include scene IDs in the output.') +@click.argument('scene_name', type=str) @click.option( - "--preview", + '--preview', is_flag=True, - help="Switch the preview scene only.", + help='Switch the preview scene only.', ) @click.pass_context async def switch( ctx: click.Context, scene_name: str, preview: bool = False, id: bool = False ): """Switch to a scene by its name.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ss = ScenesService(conn) ts = TransitionsService(conn) @@ -109,29 +108,29 @@ async def switch( await ss.make_scene_active(scene.id) if preview: click.echo( - f"Switched to preview scene: {click.style(scene.name, fg='blue')} " - f"{f'(ID: {scene.id}).' if id else ''}" + f'Switched to preview scene: {click.style(scene.name, fg="blue")} ' + f'{f"(ID: {scene.id})." if id else ""}' ) else: click.echo( - f"Switched to scene: {click.style(scene.name, fg='blue')} " - f"{f'(ID: {scene.id}).' if id else ''}" + f'Switched to scene: {click.style(scene.name, fg="blue")} ' + f'{f"(ID: {scene.id})." if id else ""}' ) await ts.execute_studio_mode_transition() click.echo( - "Executed studio mode transition to make the scene active." + 'Executed studio mode transition to make the scene active.' ) else: if preview: conn.close() raise SlobsCliError( - "Cannot switch the preview scene in non-studio mode." + 'Cannot switch the preview scene in non-studio mode.' ) await ss.make_scene_active(scene.id) click.echo( - f"Switched to scene: {click.style(scene.name, fg='blue')} " - f"{f'(ID: {scene.id}).' if id else ''}" + f'Switched to scene: {click.style(scene.name, fg="blue")} ' + f'{f"(ID: {scene.id})." if id else ""}' ) conn.close() diff --git a/src/slobs_cli/stream.py b/src/slobs_cli/stream.py index 11752f9..e6ba7c9 100644 --- a/src/slobs_cli/stream.py +++ b/src/slobs_cli/stream.py @@ -1,3 +1,5 @@ +"""module for managing the replay buffer in Slobs CLI.""" + import asyncclick as click from anyio import create_task_group from pyslobs import StreamingService @@ -8,27 +10,26 @@ from .errors import SlobsCliError @cli.group() def stream(): - """Stream management commands.""" + """Manage streaming in Slobs CLI.""" @stream.command() @click.pass_context async def start(ctx: click.Context): """Start the stream.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ss = StreamingService(conn) async def _run(): model = await ss.get_model() - active = model.streaming_status != "offline" + active = model.streaming_status != 'offline' if active: conn.close() - raise SlobsCliError("Stream is already active.") + raise SlobsCliError('Stream is already active.') await ss.toggle_streaming() - click.echo("Stream started.") + click.echo('Stream started.') conn.close() try: @@ -44,20 +45,19 @@ async def start(ctx: click.Context): @click.pass_context async def stop(ctx: click.Context): """Stop the stream.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ss = StreamingService(conn) async def _run(): model = await ss.get_model() - active = model.streaming_status != "offline" + active = model.streaming_status != 'offline' if not active: conn.close() - raise SlobsCliError("Stream is already inactive.") + raise SlobsCliError('Stream is already inactive.') await ss.toggle_streaming() - click.echo("Stream stopped.") + click.echo('Stream stopped.') conn.close() try: @@ -73,18 +73,17 @@ async def stop(ctx: click.Context): @click.pass_context async def status(ctx: click.Context): """Get the current stream status.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ss = StreamingService(conn) async def _run(): model = await ss.get_model() - active = model.streaming_status != "offline" + active = model.streaming_status != 'offline' if active: - click.echo("Stream is currently active.") + click.echo('Stream is currently active.') else: - click.echo("Stream is currently inactive.") + click.echo('Stream is currently inactive.') conn.close() async with create_task_group() as tg: @@ -96,19 +95,18 @@ async def status(ctx: click.Context): @click.pass_context async def toggle(ctx: click.Context): """Toggle the stream status.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ss = StreamingService(conn) async def _run(): model = await ss.get_model() - active = model.streaming_status != "offline" + active = model.streaming_status != 'offline' await ss.toggle_streaming() if active: - click.echo("Stream stopped.") + click.echo('Stream stopped.') else: - click.echo("Stream started.") + click.echo('Stream started.') conn.close() diff --git a/src/slobs_cli/studiomode.py b/src/slobs_cli/studiomode.py index 1151dc7..a6cd0c1 100644 --- a/src/slobs_cli/studiomode.py +++ b/src/slobs_cli/studiomode.py @@ -1,3 +1,5 @@ +"""module for managing studio mode in Slobs CLI.""" + import asyncclick as click from anyio import create_task_group from pyslobs import TransitionsService @@ -8,25 +10,24 @@ from .errors import SlobsCliError @cli.group() def studiomode(): - """Studio mode management commands.""" + """Manage studio mode in Slobs CLI.""" @studiomode.command() @click.pass_context async def enable(ctx: click.Context): """Enable studio mode.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ts = TransitionsService(conn) async def _run(): model = await ts.get_model() if model.studio_mode: conn.close() - raise SlobsCliError("Studio mode is already enabled.") + raise SlobsCliError('Studio mode is already enabled.') await ts.enable_studio_mode() - click.echo("Studio mode enabled successfully.") + click.echo('Studio mode enabled successfully.') conn.close() try: @@ -42,18 +43,17 @@ async def enable(ctx: click.Context): @click.pass_context async def disable(ctx: click.Context): """Disable studio mode.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ts = TransitionsService(conn) async def _run(): model = await ts.get_model() if not model.studio_mode: conn.close() - raise SlobsCliError("Studio mode is already disabled.") + raise SlobsCliError('Studio mode is already disabled.') await ts.disable_studio_mode() - click.echo("Studio mode disabled successfully.") + click.echo('Studio mode disabled successfully.') conn.close() try: @@ -69,16 +69,15 @@ async def disable(ctx: click.Context): @click.pass_context async def status(ctx: click.Context): """Check the status of studio mode.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ts = TransitionsService(conn) async def _run(): model = await ts.get_model() if model.studio_mode: - click.echo("Studio mode is currently enabled.") + click.echo('Studio mode is currently enabled.') else: - click.echo("Studio mode is currently disabled.") + click.echo('Studio mode is currently disabled.') conn.close() async with create_task_group() as tg: @@ -90,18 +89,17 @@ async def status(ctx: click.Context): @click.pass_context async def toggle(ctx: click.Context): """Toggle studio mode.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ts = TransitionsService(conn) async def _run(): model = await ts.get_model() if model.studio_mode: await ts.disable_studio_mode() - click.echo("Studio mode disabled successfully.") + click.echo('Studio mode disabled successfully.') else: await ts.enable_studio_mode() - click.echo("Studio mode enabled successfully.") + click.echo('Studio mode enabled successfully.') conn.close() async with create_task_group() as tg: @@ -113,18 +111,17 @@ async def toggle(ctx: click.Context): @click.pass_context async def force_transition(ctx: click.Context): """Force a transition in studio mode.""" - - conn = ctx.obj["connection"] + conn = ctx.obj['connection'] ts = TransitionsService(conn) async def _run(): model = await ts.get_model() if not model.studio_mode: conn.close() - raise SlobsCliError("Studio mode is not enabled.") + raise SlobsCliError('Studio mode is not enabled.') await ts.execute_studio_mode_transition() - click.echo("Forced studio mode transition.") + click.echo('Forced studio mode transition.') conn.close() try: diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..b65fba0 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""Test suite for the slobs_cli package.""" diff --git a/tests/conftest.py b/tests/conftest.py index af7e479..a4e50a8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,9 @@ +"""pytest configuration for async tests using anyio.""" + import pytest @pytest.fixture def anyio_backend(): - return "asyncio" + """Return the backend to use for async tests.""" + return 'asyncio' diff --git a/tests/setup.py b/tests/setup.py index 8a54bd8..3a886d8 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -1,3 +1,10 @@ +"""Create test scenes in Streamlabs. + +Usage: + Run this script as a standalone program to setup the test environment. + Requires 'SLOBS_DOMAIN' and 'SLOBS_TOKEN' environment variables to be set. +""" + import os import anyio @@ -6,20 +13,22 @@ from pyslobs import ConnectionConfig, ScenesService, SlobsConnection async def setup(conn: SlobsConnection): + """Set up test scenes in Streamlabs OBS.""" ss = ScenesService(conn) - await ss.create_scene("slobs-test-scene-1") - await ss.create_scene("slobs-test-scene-2") - await ss.create_scene("slobs-test-scene-3") + await ss.create_scene('slobs-test-scene-1') + await ss.create_scene('slobs-test-scene-2') + await ss.create_scene('slobs-test-scene-3') conn.close() async def main(): + """Establish connection and set up scenes.""" conn = SlobsConnection( ConnectionConfig( - domain=os.environ["SLOBS_DOMAIN"], + domain=os.environ['SLOBS_DOMAIN'], port=59650, - token=os.environ["SLOBS_TOKEN"], + token=os.environ['SLOBS_TOKEN'], ) ) @@ -28,5 +37,5 @@ async def main(): tg.start_soon(setup, conn) -if __name__ == "__main__": +if __name__ == '__main__': anyio.run(main) diff --git a/tests/teardown.py b/tests/teardown.py index edfd32a..f50d1fc 100644 --- a/tests/teardown.py +++ b/tests/teardown.py @@ -1,3 +1,10 @@ +"""Remove test scenes in Streamlabs, disable streaming, recording, and replay buffer. + +Usage: + Run this script as a standalone program to tear down the test environment. + Requires 'SLOBS_DOMAIN' and 'SLOBS_TOKEN' environment variables to be set. +""" + import os import anyio @@ -6,30 +13,32 @@ from pyslobs import ConnectionConfig, ScenesService, SlobsConnection, StreamingS async def cleanup(conn: SlobsConnection): + """Clean up test scenes and ensure streaming, recording, and replay buffer are stopped.""" ss = ScenesService(conn) scenes = await ss.get_scenes() for scene in scenes: - if scene.name.startswith("slobs-test-scene-"): + if scene.name.startswith('slobs-test-scene-'): await ss.remove_scene(scene.id) ss = StreamingService(conn) model = await ss.get_model() - if model.streaming_status != "offline": + if model.streaming_status != 'offline': await ss.toggle_streaming() - if model.replay_buffer_status != "offline": + if model.replay_buffer_status != 'offline': await ss.stop_replay_buffer() - if model.recording_status != "offline": + if model.recording_status != 'offline': await ss.toggle_recording() conn.close() async def main(): + """Establish connection and clean up test scenes.""" conn = SlobsConnection( ConnectionConfig( - domain=os.environ["SLOBS_DOMAIN"], + domain=os.environ['SLOBS_DOMAIN'], port=59650, - token=os.environ["SLOBS_TOKEN"], + token=os.environ['SLOBS_TOKEN'], ) ) @@ -38,5 +47,5 @@ async def main(): tg.start_soon(cleanup, conn) -if __name__ == "__main__": +if __name__ == '__main__': anyio.run(main) diff --git a/tests/test_record.py b/tests/test_record.py index cc16d24..2b15c3b 100644 --- a/tests/test_record.py +++ b/tests/test_record.py @@ -1,3 +1,5 @@ +"""Test cases for the recording commands of the slobs_cli CLI application.""" + import anyio import pytest from asyncclick.testing import CliRunner @@ -7,33 +9,35 @@ from slobs_cli import cli @pytest.mark.anyio async def test_record_start(): + """Test the start recording command.""" runner = CliRunner() - result = await runner.invoke(cli, ["record", "status"]) + result = await runner.invoke(cli, ['record', 'status']) assert result.exit_code == 0 - active = "Recording is currently active." in result.output + active = 'Recording is currently active.' in result.output - result = await runner.invoke(cli, ["record", "start"]) + result = await runner.invoke(cli, ['record', 'start']) if not active: assert result.exit_code == 0 - assert "Recording started" in result.output + assert 'Recording started' in result.output await anyio.sleep(0.2) # Allow some time for the recording to start else: assert result.exit_code != 0 - assert "Recording is already active." in result.output + assert 'Recording is already active.' in result.output @pytest.mark.anyio async def test_record_stop(): + """Test the stop recording command.""" runner = CliRunner() - result = await runner.invoke(cli, ["record", "status"]) + result = await runner.invoke(cli, ['record', 'status']) assert result.exit_code == 0 - active = "Recording is currently active." in result.output + active = 'Recording is currently active.' in result.output - result = await runner.invoke(cli, ["record", "stop"]) + result = await runner.invoke(cli, ['record', 'stop']) if active: assert result.exit_code == 0 - assert "Recording stopped" in result.output + assert 'Recording stopped' in result.output await anyio.sleep(0.2) # Allow some time for the recording to stop else: assert result.exit_code != 0 - assert "Recording is already inactive." in result.output + assert 'Recording is already inactive.' in result.output diff --git a/tests/test_replaybuffer.py b/tests/test_replaybuffer.py index 3fe589c..d08e11f 100644 --- a/tests/test_replaybuffer.py +++ b/tests/test_replaybuffer.py @@ -1,3 +1,5 @@ +"""Test cases for the replay buffer commands in slobs_cli.""" + import anyio import pytest from asyncclick.testing import CliRunner @@ -7,33 +9,35 @@ from slobs_cli import cli @pytest.mark.anyio async def test_replaybuffer_start(): + """Test the start replay buffer command.""" runner = CliRunner() - result = await runner.invoke(cli, ["replaybuffer", "status"]) + result = await runner.invoke(cli, ['replaybuffer', 'status']) assert result.exit_code == 0 - active = "Replay buffer is currently active." in result.output + active = 'Replay buffer is currently active.' in result.output - result = await runner.invoke(cli, ["replaybuffer", "start"]) + result = await runner.invoke(cli, ['replaybuffer', 'start']) if not active: assert result.exit_code == 0 - assert "Replay buffer started" in result.output + assert 'Replay buffer started' in result.output await anyio.sleep(0.2) # Allow some time for the replay buffer to start else: assert result.exit_code != 0 - assert "Replay buffer is already active." in result.output + assert 'Replay buffer is already active.' in result.output @pytest.mark.anyio async def test_replaybuffer_stop(): + """Test the stop replay buffer command.""" runner = CliRunner() - result = await runner.invoke(cli, ["replaybuffer", "status"]) + result = await runner.invoke(cli, ['replaybuffer', 'status']) assert result.exit_code == 0 - active = "Replay buffer is currently active." in result.output + active = 'Replay buffer is currently active.' in result.output - result = await runner.invoke(cli, ["replaybuffer", "stop"]) + result = await runner.invoke(cli, ['replaybuffer', 'stop']) if active: assert result.exit_code == 0 - assert "Replay buffer stopped" in result.output + assert 'Replay buffer stopped' in result.output await anyio.sleep(0.2) # Allow some time for the replay buffer to stop else: assert result.exit_code != 0 - assert "Replay buffer is already inactive." in result.output + assert 'Replay buffer is already inactive.' in result.output diff --git a/tests/test_scene.py b/tests/test_scene.py index 950335d..61d4ad4 100644 --- a/tests/test_scene.py +++ b/tests/test_scene.py @@ -1,3 +1,5 @@ +"""Test cases for scene commands in slobs_cli.""" + import pytest from asyncclick.testing import CliRunner @@ -6,20 +8,22 @@ from slobs_cli import cli @pytest.mark.anyio async def test_scene_list(): + """Test the list scenes command.""" runner = CliRunner() - result = await runner.invoke(cli, ["scene", "list"]) + result = await runner.invoke(cli, ['scene', 'list']) assert result.exit_code == 0 - assert "slobs-test-scene-1" in result.output - assert "slobs-test-scene-2" in result.output - assert "slobs-test-scene-3" in result.output + assert 'slobs-test-scene-1' in result.output + assert 'slobs-test-scene-2' in result.output + assert 'slobs-test-scene-3' in result.output @pytest.mark.anyio async def test_scene_current(): + """Test the current scene command.""" runner = CliRunner() - result = await runner.invoke(cli, ["scene", "switch", "slobs-test-scene-2"]) + result = await runner.invoke(cli, ['scene', 'switch', 'slobs-test-scene-2']) assert result.exit_code == 0 - result = await runner.invoke(cli, ["scene", "current"]) + result = await runner.invoke(cli, ['scene', 'current']) assert result.exit_code == 0 - assert "Current active scene: slobs-test-scene-2" in result.output + assert 'Current active scene: slobs-test-scene-2' in result.output diff --git a/tests/test_stream.py b/tests/test_stream.py index 1513daa..d51d53b 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -1,3 +1,5 @@ +"""Tests for the stream commands in slobs_cli.""" + import anyio import pytest from asyncclick.testing import CliRunner @@ -7,33 +9,35 @@ from slobs_cli import cli @pytest.mark.anyio async def test_stream_start(): + """Test the start stream command.""" runner = CliRunner() - result = await runner.invoke(cli, ["stream", "status"]) + result = await runner.invoke(cli, ['stream', 'status']) assert result.exit_code == 0 - active = "Stream is currently active." in result.output + active = 'Stream is currently active.' in result.output - result = await runner.invoke(cli, ["stream", "start"]) + result = await runner.invoke(cli, ['stream', 'start']) if not active: assert result.exit_code == 0 - assert "Stream started" in result.output + assert 'Stream started' in result.output await anyio.sleep(0.2) # Allow some time for the stream to start else: assert result.exit_code != 0 - assert "Stream is already active." in result.output + assert 'Stream is already active.' in result.output @pytest.mark.anyio async def test_stream_stop(): + """Test the stop stream command.""" runner = CliRunner() - result = await runner.invoke(cli, ["stream", "status"]) + result = await runner.invoke(cli, ['stream', 'status']) assert result.exit_code == 0 - active = "Stream is currently active." in result.output + active = 'Stream is currently active.' in result.output - result = await runner.invoke(cli, ["stream", "stop"]) + result = await runner.invoke(cli, ['stream', 'stop']) if active: assert result.exit_code == 0 - assert "Stream stopped" in result.output + assert 'Stream stopped' in result.output await anyio.sleep(0.2) # Allow some time for the stream to stop else: assert result.exit_code != 0 - assert "Stream is already inactive." in result.output + assert 'Stream is already inactive.' in result.output