mirror of
https://github.com/onyx-and-iris/obsws-cli.git
synced 2025-08-07 20:21:48 +00:00
Compare commits
9 Commits
1c86b1f6ef
...
c71aa82914
Author | SHA1 | Date | |
---|---|---|---|
c71aa82914 | |||
9dd5fedd92 | |||
85d2cce4c6 | |||
fdbb3ebe22 | |||
06d83ce05a | |||
df6f65eda0 | |||
0a944f1f58 | |||
057e677d90 | |||
eb686ae58e |
@ -1,4 +1,4 @@
|
|||||||
# SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online>
|
# SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online>
|
||||||
#
|
#
|
||||||
# SPDX-License-Identifier: MIT
|
# SPDX-License-Identifier: MIT
|
||||||
__version__ = "0.12.8"
|
__version__ = "0.12.10"
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
"""module containing commands for manipulating filters in scenes."""
|
"""module containing commands for manipulating filters in scenes."""
|
||||||
|
|
||||||
|
import obsws_python as obsws
|
||||||
import typer
|
import typer
|
||||||
from rich.console import Console
|
from rich.console import Console
|
||||||
from rich.table import Table
|
from rich.table import Table
|
||||||
@ -20,7 +21,14 @@ def main():
|
|||||||
@app.command('list | ls')
|
@app.command('list | ls')
|
||||||
def list(ctx: typer.Context, source_name: str):
|
def list(ctx: typer.Context, source_name: str):
|
||||||
"""List filters for a source."""
|
"""List filters for a source."""
|
||||||
|
try:
|
||||||
resp = ctx.obj.get_source_filter_list(source_name)
|
resp = ctx.obj.get_source_filter_list(source_name)
|
||||||
|
except obsws.error.OBSSDKError as e:
|
||||||
|
if e.code == 600:
|
||||||
|
err_console.print(f"No source was found by the name of '{source_name}'.")
|
||||||
|
raise typer.Exit(1)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
if not resp.filters:
|
if not resp.filters:
|
||||||
out_console.print(f'No filters found for source {source_name}')
|
out_console.print(f'No filters found for source {source_name}')
|
||||||
|
@ -18,6 +18,11 @@ def main():
|
|||||||
@app.command('start | s')
|
@app.command('start | s')
|
||||||
def start(ctx: typer.Context):
|
def start(ctx: typer.Context):
|
||||||
"""Start the replay buffer."""
|
"""Start the replay buffer."""
|
||||||
|
resp = ctx.obj.get_replay_buffer_status()
|
||||||
|
if resp.output_active:
|
||||||
|
err_console.print('Replay buffer is already active.')
|
||||||
|
raise typer.Exit(1)
|
||||||
|
|
||||||
ctx.obj.start_replay_buffer()
|
ctx.obj.start_replay_buffer()
|
||||||
out_console.print('Replay buffer started.')
|
out_console.print('Replay buffer started.')
|
||||||
|
|
||||||
@ -25,6 +30,11 @@ def start(ctx: typer.Context):
|
|||||||
@app.command('stop | st')
|
@app.command('stop | st')
|
||||||
def stop(ctx: typer.Context):
|
def stop(ctx: typer.Context):
|
||||||
"""Stop the replay buffer."""
|
"""Stop the replay buffer."""
|
||||||
|
resp = ctx.obj.get_replay_buffer_status()
|
||||||
|
if not resp.output_active:
|
||||||
|
err_console.print('Replay buffer is not active.')
|
||||||
|
raise typer.Exit(1)
|
||||||
|
|
||||||
ctx.obj.stop_replay_buffer()
|
ctx.obj.stop_replay_buffer()
|
||||||
out_console.print('Replay buffer stopped.')
|
out_console.print('Replay buffer stopped.')
|
||||||
|
|
||||||
|
@ -32,8 +32,11 @@ def stop(ctx: typer.Context):
|
|||||||
@app.command('toggle | tg')
|
@app.command('toggle | tg')
|
||||||
def toggle(ctx: typer.Context):
|
def toggle(ctx: typer.Context):
|
||||||
"""Toggle the virtual camera."""
|
"""Toggle the virtual camera."""
|
||||||
ctx.obj.toggle_virtual_cam()
|
resp = ctx.obj.toggle_virtual_cam()
|
||||||
out_console.print('Virtual camera toggled.')
|
if resp.output_active:
|
||||||
|
out_console.print('Virtual camera is enabled.')
|
||||||
|
else:
|
||||||
|
out_console.print('Virtual camera is disabled.')
|
||||||
|
|
||||||
|
|
||||||
@app.command('status | ss')
|
@app.command('status | ss')
|
||||||
|
@ -46,9 +46,9 @@ def pytest_sessionstart(session):
|
|||||||
|
|
||||||
session.obsws.set_current_scene_collection('test-collection')
|
session.obsws.set_current_scene_collection('test-collection')
|
||||||
|
|
||||||
session.obsws.create_scene('pytest')
|
session.obsws.create_scene('pytest_scene')
|
||||||
session.obsws.create_input(
|
session.obsws.create_input(
|
||||||
sceneName='pytest',
|
sceneName='pytest_scene',
|
||||||
inputName='pytest_input',
|
inputName='pytest_input',
|
||||||
inputKind='color_source_v3',
|
inputKind='color_source_v3',
|
||||||
inputSettings={
|
inputSettings={
|
||||||
@ -60,7 +60,7 @@ def pytest_sessionstart(session):
|
|||||||
sceneItemEnabled=True,
|
sceneItemEnabled=True,
|
||||||
)
|
)
|
||||||
session.obsws.create_input(
|
session.obsws.create_input(
|
||||||
sceneName='pytest',
|
sceneName='pytest_scene',
|
||||||
inputName='pytest_input_2',
|
inputName='pytest_input_2',
|
||||||
inputKind='color_source_v3',
|
inputKind='color_source_v3',
|
||||||
inputSettings={
|
inputSettings={
|
||||||
@ -71,11 +71,11 @@ def pytest_sessionstart(session):
|
|||||||
},
|
},
|
||||||
sceneItemEnabled=True,
|
sceneItemEnabled=True,
|
||||||
)
|
)
|
||||||
resp = session.obsws.get_scene_item_list('pytest')
|
resp = session.obsws.get_scene_item_list('pytest_scene')
|
||||||
for item in resp.scene_items:
|
for item in resp.scene_items:
|
||||||
if item['sourceName'] == 'pytest_input_2':
|
if item['sourceName'] == 'pytest_input_2':
|
||||||
session.obsws.set_scene_item_transform(
|
session.obsws.set_scene_item_transform(
|
||||||
'pytest',
|
'pytest_scene',
|
||||||
item['sceneItemId'],
|
item['sceneItemId'],
|
||||||
{
|
{
|
||||||
'rotation': 0,
|
'rotation': 0,
|
||||||
@ -83,13 +83,47 @@ def pytest_sessionstart(session):
|
|||||||
)
|
)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# Create a source filter for the Mic/Aux source
|
||||||
|
session.obsws.create_source_filter(
|
||||||
|
source_name='Mic/Aux',
|
||||||
|
filter_name='pytest filter',
|
||||||
|
filter_kind='compressor_filter',
|
||||||
|
filter_settings={
|
||||||
|
'threshold': -20,
|
||||||
|
'ratio': 4,
|
||||||
|
'attack_time': 10,
|
||||||
|
'release_time': 100,
|
||||||
|
'output_gain': -3.6,
|
||||||
|
'sidechain_source': None,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a source filter for the pytest scene
|
||||||
|
session.obsws.create_source_filter(
|
||||||
|
source_name='pytest_scene',
|
||||||
|
filter_name='pytest filter',
|
||||||
|
filter_kind='luma_key_filter_v2',
|
||||||
|
filter_settings={'luma_max': 0.6509},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def pytest_sessionfinish(session, exitstatus):
|
def pytest_sessionfinish(session, exitstatus):
|
||||||
"""Call after the whole test run finishes.
|
"""Call after the whole test run finishes.
|
||||||
|
|
||||||
Return the exit status to the system.
|
Return the exit status to the system.
|
||||||
"""
|
"""
|
||||||
session.obsws.remove_scene('pytest')
|
session.obsws.remove_source_filter(
|
||||||
|
source_name='Mic/Aux',
|
||||||
|
filter_name='pytest filter',
|
||||||
|
)
|
||||||
|
|
||||||
|
session.obsws.remove_source_filter(
|
||||||
|
source_name='pytest_scene',
|
||||||
|
filter_name='pytest filter',
|
||||||
|
)
|
||||||
|
|
||||||
|
session.obsws.remove_scene('pytest_scene')
|
||||||
|
|
||||||
session.obsws.set_current_scene_collection('default')
|
session.obsws.set_current_scene_collection('default')
|
||||||
|
|
||||||
resp = session.obsws.get_stream_status()
|
resp = session.obsws.get_stream_status()
|
||||||
@ -100,6 +134,10 @@ def pytest_sessionfinish(session, exitstatus):
|
|||||||
if resp.output_active:
|
if resp.output_active:
|
||||||
session.obsws.stop_record()
|
session.obsws.stop_record()
|
||||||
|
|
||||||
|
resp = session.obsws.get_replay_buffer_status()
|
||||||
|
if resp.output_active:
|
||||||
|
session.obsws.stop_replay_buffer()
|
||||||
|
|
||||||
# Close the OBS WebSocket client connection
|
# Close the OBS WebSocket client connection
|
||||||
session.obsws.disconnect()
|
session.obsws.disconnect()
|
||||||
|
|
||||||
|
30
tests/test_filter.py
Normal file
30
tests/test_filter.py
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
"""Unit tests for the filter command in the OBS WebSocket CLI."""
|
||||||
|
|
||||||
|
from typer.testing import CliRunner
|
||||||
|
|
||||||
|
from obsws_cli.app import app
|
||||||
|
|
||||||
|
runner = CliRunner(mix_stderr=False)
|
||||||
|
|
||||||
|
|
||||||
|
def test_filter_list():
|
||||||
|
"""Test the filter list command on an audio source."""
|
||||||
|
result = runner.invoke(app, ['filter', 'list', 'Mic/Aux'])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert 'Filters for Source: Mic/Aux' in result.stdout
|
||||||
|
assert 'pytest filter' in result.stdout
|
||||||
|
|
||||||
|
|
||||||
|
def test_filter_list_scene():
|
||||||
|
"""Test the filter list command on a scene."""
|
||||||
|
result = runner.invoke(app, ['filter', 'list', 'pytest_scene'])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert 'Filters for Source: pytest_scene' in result.stdout
|
||||||
|
assert 'pytest filter' in result.stdout
|
||||||
|
|
||||||
|
|
||||||
|
def test_filter_list_invalid_source():
|
||||||
|
"""Test the filter list command with an invalid source."""
|
||||||
|
result = runner.invoke(app, ['filter', 'list', 'invalid_source'])
|
||||||
|
assert result.exit_code != 0
|
||||||
|
assert "No source was found by the name of 'invalid_source'" in result.stderr
|
@ -4,7 +4,7 @@ from typer.testing import CliRunner
|
|||||||
|
|
||||||
from obsws_cli.app import app
|
from obsws_cli.app import app
|
||||||
|
|
||||||
runner = CliRunner()
|
runner = CliRunner(mix_stderr=False)
|
||||||
|
|
||||||
|
|
||||||
def test_group_list():
|
def test_group_list():
|
||||||
|
14
tests/test_hotkey.py
Normal file
14
tests/test_hotkey.py
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
"""Unit tests for the hotkey command in the OBS WebSocket CLI."""
|
||||||
|
|
||||||
|
from typer.testing import CliRunner
|
||||||
|
|
||||||
|
from obsws_cli.app import app
|
||||||
|
|
||||||
|
runner = CliRunner(mix_stderr=False)
|
||||||
|
|
||||||
|
|
||||||
|
def test_hotkey_list():
|
||||||
|
"""Test the hotkey list command."""
|
||||||
|
result = runner.invoke(app, ['hotkey', 'list'])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert 'Hotkeys' in result.stdout
|
47
tests/test_input.py
Normal file
47
tests/test_input.py
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
"""Unit tests for the input command in the OBS WebSocket CLI."""
|
||||||
|
|
||||||
|
from typer.testing import CliRunner
|
||||||
|
|
||||||
|
from obsws_cli.app import app
|
||||||
|
|
||||||
|
runner = CliRunner(mix_stderr=False)
|
||||||
|
|
||||||
|
|
||||||
|
def test_input_list():
|
||||||
|
"""Test the input list command."""
|
||||||
|
result = runner.invoke(app, ['input', 'list'])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert 'Desktop Audio' in result.stdout
|
||||||
|
assert 'Mic/Aux' in result.stdout
|
||||||
|
assert all(
|
||||||
|
item in result.stdout
|
||||||
|
for item in ('Colour Source', 'Colour Source 2', 'Colour Source 3')
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_input_list_filter_input():
|
||||||
|
"""Test the input list command with input filter."""
|
||||||
|
result = runner.invoke(app, ['input', 'list', '--input'])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert 'Desktop Audio' not in result.stdout
|
||||||
|
assert 'Mic/Aux' in result.stdout
|
||||||
|
|
||||||
|
|
||||||
|
def test_input_list_filter_output():
|
||||||
|
"""Test the input list command with output filter."""
|
||||||
|
result = runner.invoke(app, ['input', 'list', '--output'])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert 'Desktop Audio' in result.stdout
|
||||||
|
assert 'Mic/Aux' not in result.stdout
|
||||||
|
|
||||||
|
|
||||||
|
def test_input_list_filter_colour():
|
||||||
|
"""Test the input list command with colour filter."""
|
||||||
|
result = runner.invoke(app, ['input', 'list', '--colour'])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert all(
|
||||||
|
item in result.stdout
|
||||||
|
for item in ('Colour Source', 'Colour Source 2', 'Colour Source 3')
|
||||||
|
)
|
||||||
|
assert 'Desktop Audio' not in result.stdout
|
||||||
|
assert 'Mic/Aux' not in result.stdout
|
@ -6,7 +6,7 @@ from typer.testing import CliRunner
|
|||||||
|
|
||||||
from obsws_cli.app import app
|
from obsws_cli.app import app
|
||||||
|
|
||||||
runner = CliRunner()
|
runner = CliRunner(mix_stderr=False)
|
||||||
|
|
||||||
|
|
||||||
def test_record_start_status_stop():
|
def test_record_start_status_stop():
|
||||||
|
52
tests/test_replaybuffer.py
Normal file
52
tests/test_replaybuffer.py
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
"""Unit tests for the replaybuffer command in the OBS WebSocket CLI."""
|
||||||
|
|
||||||
|
from typer.testing import CliRunner
|
||||||
|
|
||||||
|
from obsws_cli.app import app
|
||||||
|
|
||||||
|
runner = CliRunner(mix_stderr=False)
|
||||||
|
|
||||||
|
|
||||||
|
def test_replaybuffer_start():
|
||||||
|
"""Test the replay buffer start command."""
|
||||||
|
resp = runner.invoke(app, ['replaybuffer', 'status'])
|
||||||
|
assert resp.exit_code == 0
|
||||||
|
active = 'Replay buffer is active.' in resp.stdout
|
||||||
|
|
||||||
|
resp = runner.invoke(app, ['replaybuffer', 'start'])
|
||||||
|
if active:
|
||||||
|
assert resp.exit_code != 0
|
||||||
|
assert 'Replay buffer is already active.' in resp.stderr
|
||||||
|
else:
|
||||||
|
assert resp.exit_code == 0
|
||||||
|
assert 'Replay buffer started.' in resp.stdout
|
||||||
|
|
||||||
|
|
||||||
|
def test_replaybuffer_stop():
|
||||||
|
"""Test the replay buffer stop command."""
|
||||||
|
resp = runner.invoke(app, ['replaybuffer', 'status'])
|
||||||
|
assert resp.exit_code == 0
|
||||||
|
active = 'Replay buffer is active.' in resp.stdout
|
||||||
|
|
||||||
|
resp = runner.invoke(app, ['replaybuffer', 'stop'])
|
||||||
|
if not active:
|
||||||
|
assert resp.exit_code != 0
|
||||||
|
assert 'Replay buffer is not active.' in resp.stderr
|
||||||
|
else:
|
||||||
|
assert resp.exit_code == 0
|
||||||
|
assert 'Replay buffer stopped.' in resp.stdout
|
||||||
|
|
||||||
|
|
||||||
|
def test_replaybuffer_toggle():
|
||||||
|
"""Test the replay buffer toggle command."""
|
||||||
|
resp = runner.invoke(app, ['replaybuffer', 'status'])
|
||||||
|
assert resp.exit_code == 0
|
||||||
|
active = 'Replay buffer is active.' in resp.stdout
|
||||||
|
|
||||||
|
resp = runner.invoke(app, ['replaybuffer', 'toggle'])
|
||||||
|
if active:
|
||||||
|
assert resp.exit_code == 0
|
||||||
|
assert 'Replay buffer is not active.' in resp.stdout
|
||||||
|
else:
|
||||||
|
assert resp.exit_code == 0
|
||||||
|
assert 'Replay buffer is active.' in resp.stdout
|
@ -4,22 +4,22 @@ from typer.testing import CliRunner
|
|||||||
|
|
||||||
from obsws_cli.app import app
|
from obsws_cli.app import app
|
||||||
|
|
||||||
runner = CliRunner()
|
runner = CliRunner(mix_stderr=False)
|
||||||
|
|
||||||
|
|
||||||
def test_scene_list():
|
def test_scene_list():
|
||||||
"""Test the scene list command."""
|
"""Test the scene list command."""
|
||||||
result = runner.invoke(app, ['scene', 'list'])
|
result = runner.invoke(app, ['scene', 'list'])
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert 'pytest' in result.stdout
|
assert 'pytest_scene' in result.stdout
|
||||||
|
|
||||||
|
|
||||||
def test_scene_current():
|
def test_scene_current():
|
||||||
"""Test the scene current command."""
|
"""Test the scene current command."""
|
||||||
runner.invoke(app, ['scene', 'switch', 'pytest'])
|
runner.invoke(app, ['scene', 'switch', 'pytest_scene'])
|
||||||
result = runner.invoke(app, ['scene', 'current'])
|
result = runner.invoke(app, ['scene', 'current'])
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert 'pytest' in result.stdout
|
assert 'pytest_scene' in result.stdout
|
||||||
|
|
||||||
|
|
||||||
def test_scene_switch():
|
def test_scene_switch():
|
||||||
@ -29,10 +29,10 @@ def test_scene_switch():
|
|||||||
enabled = 'Studio mode is enabled.' in result.stdout
|
enabled = 'Studio mode is enabled.' in result.stdout
|
||||||
|
|
||||||
if enabled:
|
if enabled:
|
||||||
result = runner.invoke(app, ['scene', 'switch', 'pytest', '--preview'])
|
result = runner.invoke(app, ['scene', 'switch', 'pytest_scene', '--preview'])
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert 'Switched to preview scene: pytest' in result.stdout
|
assert 'Switched to preview scene: pytest_scene' in result.stdout
|
||||||
else:
|
else:
|
||||||
result = runner.invoke(app, ['scene', 'switch', 'pytest'])
|
result = runner.invoke(app, ['scene', 'switch', 'pytest_scene'])
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert 'Switched to program scene: pytest' in result.stdout
|
assert 'Switched to program scene: pytest_scene' in result.stdout
|
||||||
|
@ -4,12 +4,12 @@ from typer.testing import CliRunner
|
|||||||
|
|
||||||
from obsws_cli.app import app
|
from obsws_cli.app import app
|
||||||
|
|
||||||
runner = CliRunner()
|
runner = CliRunner(mix_stderr=False)
|
||||||
|
|
||||||
|
|
||||||
def test_sceneitem_list():
|
def test_sceneitem_list():
|
||||||
"""Test the sceneitem list command."""
|
"""Test the sceneitem list command."""
|
||||||
result = runner.invoke(app, ['sceneitem', 'list', 'pytest'])
|
result = runner.invoke(app, ['sceneitem', 'list', 'pytest_scene'])
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert 'pytest_input' in result.stdout
|
assert 'pytest_input' in result.stdout
|
||||||
assert 'pytest_input_2' in result.stdout
|
assert 'pytest_input_2' in result.stdout
|
||||||
@ -23,11 +23,12 @@ def test_sceneitem_transform():
|
|||||||
'sceneitem',
|
'sceneitem',
|
||||||
'transform',
|
'transform',
|
||||||
'--rotation=60',
|
'--rotation=60',
|
||||||
'pytest',
|
'pytest_scene',
|
||||||
'pytest_input_2',
|
'pytest_input_2',
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert (
|
assert (
|
||||||
"Item 'pytest_input_2' in scene 'pytest' has been transformed" in result.stdout
|
"Item 'pytest_input_2' in scene 'pytest_scene' has been transformed"
|
||||||
|
in result.stdout
|
||||||
)
|
)
|
||||||
|
@ -6,7 +6,7 @@ from typer.testing import CliRunner
|
|||||||
|
|
||||||
from obsws_cli.app import app
|
from obsws_cli.app import app
|
||||||
|
|
||||||
runner = CliRunner()
|
runner = CliRunner(mix_stderr=False)
|
||||||
|
|
||||||
|
|
||||||
def test_stream_start():
|
def test_stream_start():
|
||||||
|
@ -4,7 +4,7 @@ from typer.testing import CliRunner
|
|||||||
|
|
||||||
from obsws_cli.app import app
|
from obsws_cli.app import app
|
||||||
|
|
||||||
runner = CliRunner()
|
runner = CliRunner(mix_stderr=False)
|
||||||
|
|
||||||
|
|
||||||
def test_studio_enable():
|
def test_studio_enable():
|
||||||
|
@ -4,7 +4,7 @@ from typer.testing import CliRunner
|
|||||||
|
|
||||||
from obsws_cli.app import app
|
from obsws_cli.app import app
|
||||||
|
|
||||||
runner = CliRunner()
|
runner = CliRunner(mix_stderr=False)
|
||||||
|
|
||||||
|
|
||||||
def test_version():
|
def test_version():
|
Loading…
x
Reference in New Issue
Block a user