add unit tests for record, replaybuffer, scene, stream

This commit is contained in:
onyx-and-iris 2025-06-10 17:01:55 +01:00
parent c8eb27d188
commit cc2eda00a5
6 changed files with 213 additions and 0 deletions

6
tests/conftest.py Normal file
View File

@ -0,0 +1,6 @@
import pytest
@pytest.fixture
def anyio_backend():
return "asyncio"

36
tests/teardown.py Normal file
View File

@ -0,0 +1,36 @@
import os
import anyio
from anyio import create_task_group
from pyslobs import ConnectionConfig, SlobsConnection, StreamingService
async def cleanup(conn: SlobsConnection):
ss = StreamingService(conn)
current_state = await ss.get_model()
if current_state.streaming_status != "offline":
await ss.toggle_streaming()
if current_state.replay_buffer_status != "offline":
await ss.stop_replay_buffer()
if current_state.recording_status != "offline":
await ss.toggle_recording()
conn.close()
async def main():
conn = SlobsConnection(
ConnectionConfig(
domain=os.environ["SLOBS_DOMAIN"],
port=59650,
token=os.environ["SLOBS_TOKEN"],
)
)
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(cleanup, conn)
if __name__ == "__main__":
anyio.run(main)

50
tests/test_record.py Normal file
View File

@ -0,0 +1,50 @@
import anyio
import asyncclick as click
import pytest
from asyncclick.testing import CliRunner
from slobs_cli import cli
@pytest.mark.anyio
async def test_record_start():
runner = CliRunner()
result = await runner.invoke(cli, ["record", "status"])
assert result.exit_code == 0
active = "Recording is currently active." in result.output
if not active:
result = await runner.invoke(cli, ["record", "start"])
assert result.exit_code == 0
assert "Recording started" in result.output
await anyio.sleep(1) # Allow some time for the recording to start
else:
with pytest.raises(ExceptionGroup) as exc_info:
result = await runner.invoke(
cli, ["record", "start"], catch_exceptions=False
)
assert exc_info.group_contains(
click.Abort, match="Recording is already active."
)
@pytest.mark.anyio
async def test_record_stop():
runner = CliRunner()
result = await runner.invoke(cli, ["record", "status"])
assert result.exit_code == 0
active = "Recording is currently active." in result.output
if active:
result = await runner.invoke(cli, ["record", "stop"])
assert result.exit_code == 0
assert "Recording stopped" in result.output
await anyio.sleep(1) # Allow some time for the recording to stop
else:
with pytest.raises(ExceptionGroup) as exc_info:
result = await runner.invoke(
cli, ["record", "stop"], catch_exceptions=False
)
assert exc_info.group_contains(
click.Abort, match="Recording is already inactive."
)

View File

@ -0,0 +1,50 @@
import anyio
import asyncclick as click
import pytest
from asyncclick.testing import CliRunner
from slobs_cli import cli
@pytest.mark.anyio
async def test_replaybuffer_start():
runner = CliRunner()
result = await runner.invoke(cli, ["replaybuffer", "status"])
assert result.exit_code == 0
active = "Replay buffer is currently active." in result.output
if not active:
result = await runner.invoke(cli, ["replaybuffer", "start"])
assert result.exit_code == 0
assert "Replay buffer started" in result.output
await anyio.sleep(1)
else:
with pytest.raises(ExceptionGroup) as exc_info:
result = await runner.invoke(
cli, ["replaybuffer", "start"], catch_exceptions=False
)
assert exc_info.group_contains(
click.Abort, match="Replay buffer is already active."
)
@pytest.mark.anyio
async def test_replaybuffer_stop():
runner = CliRunner()
result = await runner.invoke(cli, ["replaybuffer", "status"])
assert result.exit_code == 0
active = "Replay buffer is currently active." in result.output
if active:
result = await runner.invoke(cli, ["replaybuffer", "stop"])
assert result.exit_code == 0
assert "Replay buffer stopped" in result.output
await anyio.sleep(1)
else:
with pytest.raises(ExceptionGroup) as exc_info:
result = await runner.invoke(
cli, ["replaybuffer", "stop"], catch_exceptions=False
)
assert exc_info.group_contains(
click.Abort, match="Replay buffer is already inactive."
)

25
tests/test_scene.py Normal file
View File

@ -0,0 +1,25 @@
import pytest
from asyncclick.testing import CliRunner
from slobs_cli import cli
@pytest.mark.anyio
async def test_scene_list():
runner = CliRunner()
result = await runner.invoke(cli, ["scene", "list"])
assert result.exit_code == 0
assert "slobs-test-scene-1" in result.output
assert "slobs-test-scene-2" in result.output
assert "slobs-test-scene-3" in result.output
@pytest.mark.anyio
async def test_scene_current():
runner = CliRunner()
result = await runner.invoke(cli, ["scene", "switch", "slobs-test-scene-2"])
assert result.exit_code == 0
result = await runner.invoke(cli, ["scene", "current"])
assert result.exit_code == 0
assert "Current active scene: slobs-test-scene-2" in result.output

46
tests/test_stream.py Normal file
View File

@ -0,0 +1,46 @@
import anyio
import asyncclick as click
import pytest
from asyncclick.testing import CliRunner
from slobs_cli import cli
@pytest.mark.anyio
async def test_stream_start():
runner = CliRunner()
result = await runner.invoke(cli, ["stream", "status"])
assert result.exit_code == 0
active = "Stream is currently active." in result.output
if not active:
result = await runner.invoke(cli, ["stream", "start"])
assert result.exit_code == 0
assert "Stream started" in result.output
await anyio.sleep(1) # Allow some time for the stream to start
else:
with pytest.raises(ExceptionGroup) as exc_info:
result = await runner.invoke(
cli, ["stream", "start"], catch_exceptions=False
)
assert exc_info.group_contains(click.Abort, match="Stream is already active.")
@pytest.mark.anyio
async def test_stream_stop():
runner = CliRunner()
result = await runner.invoke(cli, ["stream", "status"])
assert result.exit_code == 0
active = "Stream is currently active." in result.output
if active:
result = await runner.invoke(cli, ["stream", "stop"])
assert result.exit_code == 0
assert "Stream stopped" in result.output
await anyio.sleep(1) # Allow some time for the stream to stop
else:
with pytest.raises(ExceptionGroup) as exc_info:
result = await runner.invoke(
cli, ["stream", "stop"], catch_exceptions=False
)
assert exc_info.group_contains(click.Abort, match="Stream is already inactive.")