From cc2eda00a59c8b3c368730c408fcf3b18f4b5708 Mon Sep 17 00:00:00 2001 From: onyx-and-iris Date: Tue, 10 Jun 2025 17:01:55 +0100 Subject: [PATCH] add unit tests for record, replaybuffer, scene, stream --- tests/conftest.py | 6 +++++ tests/teardown.py | 36 +++++++++++++++++++++++++++ tests/test_record.py | 50 ++++++++++++++++++++++++++++++++++++++ tests/test_replaybuffer.py | 50 ++++++++++++++++++++++++++++++++++++++ tests/test_scene.py | 25 +++++++++++++++++++ tests/test_stream.py | 46 +++++++++++++++++++++++++++++++++++ 6 files changed, 213 insertions(+) create mode 100644 tests/conftest.py create mode 100644 tests/teardown.py create mode 100644 tests/test_record.py create mode 100644 tests/test_replaybuffer.py create mode 100644 tests/test_scene.py create mode 100644 tests/test_stream.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..af7e479 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,6 @@ +import pytest + + +@pytest.fixture +def anyio_backend(): + return "asyncio" diff --git a/tests/teardown.py b/tests/teardown.py new file mode 100644 index 0000000..ba6f72a --- /dev/null +++ b/tests/teardown.py @@ -0,0 +1,36 @@ +import os + +import anyio +from anyio import create_task_group +from pyslobs import ConnectionConfig, SlobsConnection, StreamingService + + +async def cleanup(conn: SlobsConnection): + ss = StreamingService(conn) + current_state = await ss.get_model() + if current_state.streaming_status != "offline": + await ss.toggle_streaming() + if current_state.replay_buffer_status != "offline": + await ss.stop_replay_buffer() + if current_state.recording_status != "offline": + await ss.toggle_recording() + + conn.close() + + +async def main(): + conn = SlobsConnection( + ConnectionConfig( + domain=os.environ["SLOBS_DOMAIN"], + port=59650, + token=os.environ["SLOBS_TOKEN"], + ) + ) + + async with create_task_group() as tg: + tg.start_soon(conn.background_processing) + tg.start_soon(cleanup, conn) + + +if __name__ == "__main__": + anyio.run(main) diff --git a/tests/test_record.py b/tests/test_record.py new file mode 100644 index 0000000..80c5de9 --- /dev/null +++ b/tests/test_record.py @@ -0,0 +1,50 @@ +import anyio +import asyncclick as click +import pytest +from asyncclick.testing import CliRunner + +from slobs_cli import cli + + +@pytest.mark.anyio +async def test_record_start(): + runner = CliRunner() + result = await runner.invoke(cli, ["record", "status"]) + assert result.exit_code == 0 + active = "Recording is currently active." in result.output + + if not active: + result = await runner.invoke(cli, ["record", "start"]) + assert result.exit_code == 0 + assert "Recording started" in result.output + await anyio.sleep(1) # Allow some time for the recording to start + else: + with pytest.raises(ExceptionGroup) as exc_info: + result = await runner.invoke( + cli, ["record", "start"], catch_exceptions=False + ) + assert exc_info.group_contains( + click.Abort, match="Recording is already active." + ) + + +@pytest.mark.anyio +async def test_record_stop(): + runner = CliRunner() + result = await runner.invoke(cli, ["record", "status"]) + assert result.exit_code == 0 + active = "Recording is currently active." in result.output + + if active: + result = await runner.invoke(cli, ["record", "stop"]) + assert result.exit_code == 0 + assert "Recording stopped" in result.output + await anyio.sleep(1) # Allow some time for the recording to stop + else: + with pytest.raises(ExceptionGroup) as exc_info: + result = await runner.invoke( + cli, ["record", "stop"], catch_exceptions=False + ) + assert exc_info.group_contains( + click.Abort, match="Recording is already inactive." + ) diff --git a/tests/test_replaybuffer.py b/tests/test_replaybuffer.py new file mode 100644 index 0000000..3123e78 --- /dev/null +++ b/tests/test_replaybuffer.py @@ -0,0 +1,50 @@ +import anyio +import asyncclick as click +import pytest +from asyncclick.testing import CliRunner + +from slobs_cli import cli + + +@pytest.mark.anyio +async def test_replaybuffer_start(): + runner = CliRunner() + result = await runner.invoke(cli, ["replaybuffer", "status"]) + assert result.exit_code == 0 + active = "Replay buffer is currently active." in result.output + + if not active: + result = await runner.invoke(cli, ["replaybuffer", "start"]) + assert result.exit_code == 0 + assert "Replay buffer started" in result.output + await anyio.sleep(1) + else: + with pytest.raises(ExceptionGroup) as exc_info: + result = await runner.invoke( + cli, ["replaybuffer", "start"], catch_exceptions=False + ) + assert exc_info.group_contains( + click.Abort, match="Replay buffer is already active." + ) + + +@pytest.mark.anyio +async def test_replaybuffer_stop(): + runner = CliRunner() + result = await runner.invoke(cli, ["replaybuffer", "status"]) + assert result.exit_code == 0 + active = "Replay buffer is currently active." in result.output + + if active: + result = await runner.invoke(cli, ["replaybuffer", "stop"]) + assert result.exit_code == 0 + assert "Replay buffer stopped" in result.output + await anyio.sleep(1) + else: + with pytest.raises(ExceptionGroup) as exc_info: + result = await runner.invoke( + cli, ["replaybuffer", "stop"], catch_exceptions=False + ) + assert exc_info.group_contains( + click.Abort, match="Replay buffer is already inactive." + ) diff --git a/tests/test_scene.py b/tests/test_scene.py new file mode 100644 index 0000000..950335d --- /dev/null +++ b/tests/test_scene.py @@ -0,0 +1,25 @@ +import pytest +from asyncclick.testing import CliRunner + +from slobs_cli import cli + + +@pytest.mark.anyio +async def test_scene_list(): + runner = CliRunner() + result = await runner.invoke(cli, ["scene", "list"]) + assert result.exit_code == 0 + assert "slobs-test-scene-1" in result.output + assert "slobs-test-scene-2" in result.output + assert "slobs-test-scene-3" in result.output + + +@pytest.mark.anyio +async def test_scene_current(): + runner = CliRunner() + result = await runner.invoke(cli, ["scene", "switch", "slobs-test-scene-2"]) + assert result.exit_code == 0 + + result = await runner.invoke(cli, ["scene", "current"]) + assert result.exit_code == 0 + assert "Current active scene: slobs-test-scene-2" in result.output diff --git a/tests/test_stream.py b/tests/test_stream.py new file mode 100644 index 0000000..9093f2e --- /dev/null +++ b/tests/test_stream.py @@ -0,0 +1,46 @@ +import anyio +import asyncclick as click +import pytest +from asyncclick.testing import CliRunner + +from slobs_cli import cli + + +@pytest.mark.anyio +async def test_stream_start(): + runner = CliRunner() + result = await runner.invoke(cli, ["stream", "status"]) + assert result.exit_code == 0 + active = "Stream is currently active." in result.output + + if not active: + result = await runner.invoke(cli, ["stream", "start"]) + assert result.exit_code == 0 + assert "Stream started" in result.output + await anyio.sleep(1) # Allow some time for the stream to start + else: + with pytest.raises(ExceptionGroup) as exc_info: + result = await runner.invoke( + cli, ["stream", "start"], catch_exceptions=False + ) + assert exc_info.group_contains(click.Abort, match="Stream is already active.") + + +@pytest.mark.anyio +async def test_stream_stop(): + runner = CliRunner() + result = await runner.invoke(cli, ["stream", "status"]) + assert result.exit_code == 0 + active = "Stream is currently active." in result.output + + if active: + result = await runner.invoke(cli, ["stream", "stop"]) + assert result.exit_code == 0 + assert "Stream stopped" in result.output + await anyio.sleep(1) # Allow some time for the stream to stop + else: + with pytest.raises(ExceptionGroup) as exc_info: + result = await runner.invoke( + cli, ["stream", "stop"], catch_exceptions=False + ) + assert exc_info.group_contains(click.Abort, match="Stream is already inactive.")