mirror of
https://github.com/onyx-and-iris/slobs-cli.git
synced 2025-06-27 15:20:24 +01:00
upd tests to check exit_code and err output instead of exceptions
This commit is contained in:
parent
d33c209d7c
commit
00273ff461
@ -1,5 +1,4 @@
|
|||||||
import anyio
|
import anyio
|
||||||
import asyncclick as click
|
|
||||||
import pytest
|
import pytest
|
||||||
from asyncclick.testing import CliRunner
|
from asyncclick.testing import CliRunner
|
||||||
|
|
||||||
@ -13,19 +12,14 @@ async def test_record_start():
|
|||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
active = "Recording is currently active." in result.output
|
active = "Recording is currently active." in result.output
|
||||||
|
|
||||||
if not active:
|
|
||||||
result = await runner.invoke(cli, ["record", "start"])
|
result = await runner.invoke(cli, ["record", "start"])
|
||||||
|
if not active:
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert "Recording started" in result.output
|
assert "Recording started" in result.output
|
||||||
await anyio.sleep(1) # Allow some time for the recording to start
|
await anyio.sleep(0.2) # Allow some time for the recording to start
|
||||||
else:
|
else:
|
||||||
with pytest.raises(ExceptionGroup) as exc_info:
|
assert result.exit_code != 0
|
||||||
result = await runner.invoke(
|
assert "Recording is already active." in result.output
|
||||||
cli, ["record", "start"], catch_exceptions=False
|
|
||||||
)
|
|
||||||
assert exc_info.group_contains(
|
|
||||||
click.Abort, match="Recording is already active."
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
@ -35,16 +29,11 @@ async def test_record_stop():
|
|||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
active = "Recording is currently active." in result.output
|
active = "Recording is currently active." in result.output
|
||||||
|
|
||||||
if active:
|
|
||||||
result = await runner.invoke(cli, ["record", "stop"])
|
result = await runner.invoke(cli, ["record", "stop"])
|
||||||
|
if active:
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert "Recording stopped" in result.output
|
assert "Recording stopped" in result.output
|
||||||
await anyio.sleep(1) # Allow some time for the recording to stop
|
await anyio.sleep(0.2) # Allow some time for the recording to stop
|
||||||
else:
|
else:
|
||||||
with pytest.raises(ExceptionGroup) as exc_info:
|
assert result.exit_code != 0
|
||||||
result = await runner.invoke(
|
assert "Recording is already inactive." in result.output
|
||||||
cli, ["record", "stop"], catch_exceptions=False
|
|
||||||
)
|
|
||||||
assert exc_info.group_contains(
|
|
||||||
click.Abort, match="Recording is already inactive."
|
|
||||||
)
|
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
import anyio
|
import anyio
|
||||||
import asyncclick as click
|
|
||||||
import pytest
|
import pytest
|
||||||
from asyncclick.testing import CliRunner
|
from asyncclick.testing import CliRunner
|
||||||
|
|
||||||
@ -13,19 +12,14 @@ async def test_replaybuffer_start():
|
|||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
active = "Replay buffer is currently active." in result.output
|
active = "Replay buffer is currently active." in result.output
|
||||||
|
|
||||||
if not active:
|
|
||||||
result = await runner.invoke(cli, ["replaybuffer", "start"])
|
result = await runner.invoke(cli, ["replaybuffer", "start"])
|
||||||
|
if not active:
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert "Replay buffer started" in result.output
|
assert "Replay buffer started" in result.output
|
||||||
await anyio.sleep(1)
|
await anyio.sleep(0.2) # Allow some time for the replay buffer to start
|
||||||
else:
|
else:
|
||||||
with pytest.raises(ExceptionGroup) as exc_info:
|
assert result.exit_code != 0
|
||||||
result = await runner.invoke(
|
assert "Replay buffer is already active." in result.output
|
||||||
cli, ["replaybuffer", "start"], catch_exceptions=False
|
|
||||||
)
|
|
||||||
assert exc_info.group_contains(
|
|
||||||
click.Abort, match="Replay buffer is already active."
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
@ -35,16 +29,11 @@ async def test_replaybuffer_stop():
|
|||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
active = "Replay buffer is currently active." in result.output
|
active = "Replay buffer is currently active." in result.output
|
||||||
|
|
||||||
if active:
|
|
||||||
result = await runner.invoke(cli, ["replaybuffer", "stop"])
|
result = await runner.invoke(cli, ["replaybuffer", "stop"])
|
||||||
|
if active:
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert "Replay buffer stopped" in result.output
|
assert "Replay buffer stopped" in result.output
|
||||||
await anyio.sleep(1)
|
await anyio.sleep(0.2) # Allow some time for the replay buffer to stop
|
||||||
else:
|
else:
|
||||||
with pytest.raises(ExceptionGroup) as exc_info:
|
assert result.exit_code != 0
|
||||||
result = await runner.invoke(
|
assert "Replay buffer is already inactive." in result.output
|
||||||
cli, ["replaybuffer", "stop"], catch_exceptions=False
|
|
||||||
)
|
|
||||||
assert exc_info.group_contains(
|
|
||||||
click.Abort, match="Replay buffer is already inactive."
|
|
||||||
)
|
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
import anyio
|
import anyio
|
||||||
import asyncclick as click
|
|
||||||
import pytest
|
import pytest
|
||||||
from asyncclick.testing import CliRunner
|
from asyncclick.testing import CliRunner
|
||||||
|
|
||||||
@ -13,17 +12,14 @@ async def test_stream_start():
|
|||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
active = "Stream is currently active." in result.output
|
active = "Stream is currently active." in result.output
|
||||||
|
|
||||||
if not active:
|
|
||||||
result = await runner.invoke(cli, ["stream", "start"])
|
result = await runner.invoke(cli, ["stream", "start"])
|
||||||
|
if not active:
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert "Stream started" in result.output
|
assert "Stream started" in result.output
|
||||||
await anyio.sleep(1) # Allow some time for the stream to start
|
await anyio.sleep(0.2) # Allow some time for the stream to start
|
||||||
else:
|
else:
|
||||||
with pytest.raises(ExceptionGroup) as exc_info:
|
assert result.exit_code != 0
|
||||||
result = await runner.invoke(
|
assert "Stream is already active." in result.output
|
||||||
cli, ["stream", "start"], catch_exceptions=False
|
|
||||||
)
|
|
||||||
assert exc_info.group_contains(click.Abort, match="Stream is already active.")
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
@ -33,14 +29,11 @@ async def test_stream_stop():
|
|||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
active = "Stream is currently active." in result.output
|
active = "Stream is currently active." in result.output
|
||||||
|
|
||||||
if active:
|
|
||||||
result = await runner.invoke(cli, ["stream", "stop"])
|
result = await runner.invoke(cli, ["stream", "stop"])
|
||||||
|
if active:
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert "Stream stopped" in result.output
|
assert "Stream stopped" in result.output
|
||||||
await anyio.sleep(1) # Allow some time for the stream to stop
|
await anyio.sleep(0.2) # Allow some time for the stream to stop
|
||||||
else:
|
else:
|
||||||
with pytest.raises(ExceptionGroup) as exc_info:
|
assert result.exit_code != 0
|
||||||
result = await runner.invoke(
|
assert "Stream is already inactive." in result.output
|
||||||
cli, ["stream", "stop"], catch_exceptions=False
|
|
||||||
)
|
|
||||||
assert exc_info.group_contains(click.Abort, match="Stream is already inactive.")
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user