Compare commits

...

19 Commits

Author SHA1 Message Date
3b184c6531 upd dependencies 2025-07-24 04:21:58 +01:00
8c37ce1fc0 remove typer import 2025-07-24 04:21:48 +01:00
436e4d5345 remove alias, settings 2025-07-24 04:21:40 +01:00
2ef89be184 convert virtualcam commands 2025-07-24 04:09:49 +01:00
506aff833c convert text commands 2025-07-24 04:08:07 +01:00
eb939b735c convert studiomode commands 2025-07-24 04:05:01 +01:00
bb7a468dd5 convert stream commands 2025-07-24 03:59:37 +01:00
e77627b845 convert screenshot commands 2025-07-24 03:52:50 +01:00
93b066090b fix aliases 2025-07-24 03:49:31 +01:00
1ce832dfde convert sceneitem commands 2025-07-24 03:46:11 +01:00
e8664f0117 convert scenecollection commands 2025-07-24 02:34:15 +01:00
a3dff0f739 convert replaybuffer commands 2025-07-24 02:29:58 +01:00
6da9df5ceb convert record commands 2025-07-24 02:26:18 +01:00
75fc18273e convert projector commands 2025-07-24 02:20:17 +01:00
e658819719 convert profile commands 2025-07-24 02:14:38 +01:00
4451fbf22c conver input commands 2025-07-24 02:06:05 +01:00
132b283347 convert hotkey commands 2025-07-24 01:48:55 +01:00
ae8ff20cf4 convert group commands 2025-07-24 01:39:02 +01:00
de1c604c46 update the --help message
add descriptions for filter + scene command groups

Usage now after main CLI description
2025-07-24 01:38:42 +01:00
21 changed files with 704 additions and 785 deletions

View File

@ -1,73 +0,0 @@
"""module defining a custom group class for handling command name aliases."""
import re
import typer
class RootTyperAliasGroup(typer.core.TyperGroup):
"""A custom group class to handle command name aliases for the root typer."""
def __init__(self, *args, **kwargs):
"""Initialize the AliasGroup."""
super().__init__(*args, **kwargs)
self.no_args_is_help = True
def get_command(self, ctx, cmd_name):
"""Get a command by name."""
match cmd_name:
case 'f':
cmd_name = 'filter'
case 'g':
cmd_name = 'group'
case 'hk':
cmd_name = 'hotkey'
case 'i':
cmd_name = 'input'
case 'prf':
cmd_name = 'profile'
case 'prj':
cmd_name = 'projector'
case 'rc':
cmd_name = 'record'
case 'rb':
cmd_name = 'replaybuffer'
case 'sc':
cmd_name = 'scene'
case 'scc':
cmd_name = 'scenecollection'
case 'si':
cmd_name = 'sceneitem'
case 'ss':
cmd_name = 'screenshot'
case 'st':
cmd_name = 'stream'
case 'sm':
cmd_name = 'studiomode'
case 't':
cmd_name = 'text'
case 'vc':
cmd_name = 'virtualcam'
return super().get_command(ctx, cmd_name)
class SubTyperAliasGroup(typer.core.TyperGroup):
"""A custom group class to handle command name aliases for sub typers."""
_CMD_SPLIT_P = re.compile(r' ?[,|] ?')
def __init__(self, *args, **kwargs):
"""Initialize the AliasGroup."""
super().__init__(*args, **kwargs)
self.no_args_is_help = True
def get_command(self, ctx, cmd_name):
"""Get a command by name."""
cmd_name = self._group_cmd_name(cmd_name)
return super().get_command(ctx, cmd_name)
def _group_cmd_name(self, default_name):
for cmd in self.commands.values():
if cmd.name and default_name in self._CMD_SPLIT_P.split(cmd.name):
return cmd.name
return default_name

View File

@ -19,11 +19,26 @@ app = App(
'OBS_' 'OBS_'
), # Environment variable prefix for configuration parameters ), # Environment variable prefix for configuration parameters
version=version, version=version,
usage='[bold][yellow]Usage:[/yellow] [white]obsws-cli [OPTIONS] COMMAND [ARGS]...[/white][/bold]',
) )
app.meta.group_parameters = Group('Session Parameters', sort_key=0) app.meta.group_parameters = Group('Options', sort_key=0)
for sub_app in ( for sub_app in (
'filter', 'filter',
'group',
'hotkey',
'input',
'profile',
'projector',
'record',
'replaybuffer',
'scene', 'scene',
'scenecollection',
'sceneitem',
'screenshot',
'stream',
'studiomode',
'text',
'virtualcam',
): ):
module = importlib.import_module(f'.{sub_app}', package=__package__) module = importlib.import_module(f'.{sub_app}', package=__package__)
app.command(module.app) app.command(module.app)
@ -74,7 +89,7 @@ def launcher(
Parameter(validator=setup_logging), Parameter(validator=setup_logging),
] = False, ] = False,
): ):
"""Initialize the OBS WebSocket client and return the context.""" """Command line interface for the OBS WebSocket API."""
with obsws.ReqClient( with obsws.ReqClient(
host=obs_config.host, host=obs_config.host,
port=obs_config.port, port=obs_config.port,

View File

@ -12,7 +12,7 @@ from .context import Context
from .enum import ExitCode from .enum import ExitCode
from .error import OBSWSCLIError from .error import OBSWSCLIError
app = App(name='filter') app = App(name='filter', help='Commands for managing filters in OBS sources')
@app.command(name=['list', 'ls']) @app.command(name=['list', 'ls'])

View File

@ -2,42 +2,42 @@
from typing import Annotated, Optional from typing import Annotated, Optional
import typer from cyclopts import App, Argument, Parameter
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
from . import console, util, validate from . import console, util, validate
from .alias import SubTyperAliasGroup from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
from .protocols import DataclassProtocol from .protocols import DataclassProtocol
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='group', help='Commands for managing groups in OBS scenes')
@app.callback() @app.command(name=['list', 'ls'])
def main():
"""Control groups in OBS scenes."""
@app.command('list | ls')
def list_( def list_(
ctx: typer.Context,
scene_name: Annotated[ scene_name: Annotated[
Optional[str], Optional[str],
typer.Argument( Argument(
show_default='The current scene', hint='Scene name to list groups for',
help='Scene name to list groups for',
), ),
] = None, ] = None,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""List groups in a scene.""" """List groups in a scene."""
if not scene_name: if not scene_name:
scene_name = ctx.obj['obsws'].get_current_program_scene().scene_name scene_name = ctx.client.get_current_program_scene().scene_name
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f"Scene '{scene_name}' not found.") raise OBSWSCLIError(
raise typer.Exit(1) f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.client.get_scene_item_list(scene_name)
groups = [ groups = [
(item.get('sceneItemId'), item.get('sourceName'), item.get('sceneItemEnabled')) (item.get('sceneItemId'), item.get('sourceName'), item.get('sceneItemEnabled'))
for item in resp.scene_items for item in resp.scene_items
@ -45,20 +45,20 @@ def list_(
] ]
if not groups: if not groups:
console.out.print( raise OBSWSCLIError(
f'No groups found in scene {console.highlight(ctx, scene_name)}.' f'No groups found in scene {console.highlight(ctx, scene_name)}.',
code=ExitCode.SUCCESS,
) )
raise typer.Exit()
table = Table( table = Table(
title=f'Groups in Scene: {scene_name}', title=f'Groups in Scene: {scene_name}',
padding=(0, 2), padding=(0, 2),
border_style=ctx.obj['style'].border, border_style=ctx.style.border,
) )
columns = [ columns = [
(Text('ID', justify='center'), 'center', ctx.obj['style'].column), (Text('ID', justify='center'), 'center', ctx.style.column),
(Text('Group Name', justify='center'), 'left', ctx.obj['style'].column), (Text('Group Name', justify='center'), 'left', ctx.style.column),
(Text('Enabled', justify='center'), 'center', None), (Text('Enabled', justify='center'), 'center', None),
] ]
for heading, justify, style in columns: for heading, justify, style in columns:
@ -87,30 +87,32 @@ def _get_group(group_name: str, resp: DataclassProtocol) -> dict | None:
return group return group
@app.command('show | sh') @app.command(name=['show', 'sh'])
def show( def show(
ctx: typer.Context,
scene_name: Annotated[ scene_name: Annotated[
str, str,
typer.Argument(..., show_default=False, help='Scene name the group is in'), Argument(hint='Scene name the group is in'),
],
group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to show')
], ],
group_name: Annotated[str, Argument(hint='Group name to show')],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Show a group in a scene.""" """Show a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f"Scene '{scene_name}' not found.") raise OBSWSCLIError(
raise typer.Exit(1) f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
) )
raise typer.Exit(1)
ctx.obj['obsws'].set_scene_item_enabled( resp = ctx.client.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
raise OBSWSCLIError(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].',
code=ExitCode.ERROR,
)
ctx.client.set_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(group.get('sceneItemId')), item_id=int(group.get('sceneItemId')),
enabled=True, enabled=True,
@ -119,29 +121,29 @@ def show(
console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.') console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
@app.command('hide | h') @app.command(name=['hide', 'h'])
def hide( def hide(
ctx: typer.Context, scene_name: Annotated[str, Argument(hint='Scene name the group is in')],
scene_name: Annotated[ group_name: Annotated[str, Argument(hint='Group name to hide')],
str, typer.Argument(..., show_default=False, help='Scene name the group is in') /,
], *,
group_name: Annotated[ ctx: Annotated[Context, Parameter(parse=False)],
str, typer.Argument(..., show_default=False, help='Group name to hide')
],
): ):
"""Hide a group in a scene.""" """Hide a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.') raise OBSWSCLIError(
raise typer.Exit(1) f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
) )
raise typer.Exit(1)
ctx.obj['obsws'].set_scene_item_enabled( resp = ctx.client.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
raise OBSWSCLIError(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].',
code=ExitCode.ERROR,
)
ctx.client.set_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(group.get('sceneItemId')), item_id=int(group.get('sceneItemId')),
enabled=False, enabled=False,
@ -150,30 +152,30 @@ def hide(
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.') console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')
@app.command('toggle | tg') @app.command(name=['toggle', 'tg'])
def toggle( def toggle(
ctx: typer.Context, scene_name: Annotated[str, Argument(hint='Scene name the group is in')],
scene_name: Annotated[ group_name: Annotated[str, Argument(hint='Group name to toggle')],
str, typer.Argument(..., show_default=False, help='Scene name the group is in') /,
], *,
group_name: Annotated[ ctx: Annotated[Context, Parameter(parse=False)],
str, typer.Argument(..., show_default=False, help='Group name to toggle')
],
): ):
"""Toggle a group in a scene.""" """Toggle a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.') raise OBSWSCLIError(
raise typer.Exit(1) f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) )
if (group := _get_group(group_name, resp)) is None:
console.err.print( resp = ctx.client.get_scene_item_list(scene_name)
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].' if (group := _get_group(group_name, resp)) is None:
raise OBSWSCLIError(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].',
code=ExitCode.ERROR,
) )
raise typer.Exit(1)
new_state = not group.get('sceneItemEnabled') new_state = not group.get('sceneItemEnabled')
ctx.obj['obsws'].set_scene_item_enabled( ctx.client.set_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(group.get('sceneItemId')), item_id=int(group.get('sceneItemId')),
enabled=new_state, enabled=new_state,
@ -185,29 +187,29 @@ def toggle(
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.') console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')
@app.command('status | ss') @app.command(name=['status', 'ss'])
def status( def status(
ctx: typer.Context, scene_name: Annotated[str, Argument(hint='Scene name the group is in')],
scene_name: Annotated[ group_name: Annotated[str, Argument(hint='Group name to check status')],
str, typer.Argument(..., show_default=False, help='Scene name the group is in') /,
], *,
group_name: Annotated[ ctx: Annotated[Context, Parameter(parse=False)],
str, typer.Argument(..., show_default=False, help='Group name to check status')
],
): ):
"""Get the status of a group in a scene.""" """Get the status of a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.') raise OBSWSCLIError(
raise typer.Exit(1) f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
) )
raise typer.Exit(1)
enabled = ctx.obj['obsws'].get_scene_item_enabled( resp = ctx.client.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
raise OBSWSCLIError(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].',
code=ExitCode.ERROR,
)
enabled = ctx.client.get_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(group.get('sceneItemId')), item_id=int(group.get('sceneItemId')),
) )

View File

@ -2,37 +2,33 @@
from typing import Annotated from typing import Annotated
import typer from cyclopts import App, Argument, Parameter
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
from . import console from . import console
from .alias import SubTyperAliasGroup from .context import Context
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='hotkey', help='Commands for managing hotkeys in OBS')
@app.callback() @app.command(name=['list', 'ls'])
def main():
"""Control hotkeys in OBS."""
@app.command('list | ls')
def list_( def list_(
ctx: typer.Context, *,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""List all hotkeys.""" """List all hotkeys."""
resp = ctx.obj['obsws'].get_hotkey_list() resp = ctx.client.get_hotkey_list()
table = Table( table = Table(
title='Hotkeys', title='Hotkeys',
padding=(0, 2), padding=(0, 2),
border_style=ctx.obj['style'].border, border_style=ctx.style.border,
) )
table.add_column( table.add_column(
Text('Hotkey Name', justify='center'), Text('Hotkey Name', justify='center'),
justify='left', justify='left',
style=ctx.obj['style'].column, style=ctx.style.column,
) )
for i, hotkey in enumerate(resp.hotkeys): for i, hotkey in enumerate(resp.hotkeys):
@ -41,40 +37,40 @@ def list_(
console.out.print(table) console.out.print(table)
@app.command('trigger | tr') @app.command(name=['trigger', 'tr'])
def trigger( def trigger(
ctx: typer.Context, hotkey: Annotated[str, Argument(hint='The hotkey to trigger')],
hotkey: Annotated[ /,
str, typer.Argument(..., show_default=False, help='The hotkey to trigger') *,
], ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Trigger a hotkey by name.""" """Trigger a hotkey by name."""
ctx.obj['obsws'].trigger_hotkey_by_name(hotkey) ctx.client.trigger_hotkey_by_name(hotkey)
@app.command('trigger-sequence | trs') @app.command(name=['trigger-sequence', 'trs'])
def trigger_sequence( def trigger_sequence(
ctx: typer.Context,
key_id: Annotated[ key_id: Annotated[
str, str,
typer.Argument( Argument(
..., hint='The OBS key ID to trigger, see https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#hotkey for more info',
show_default=False,
help='The OBS key ID to trigger, see https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#hotkey for more info',
), ),
], ],
/,
shift: Annotated[ shift: Annotated[
bool, typer.Option(..., help='Press shift when triggering the hotkey') bool, Parameter(help='Press shift when triggering the hotkey')
] = False, ] = False,
ctrl: Annotated[ ctrl: Annotated[
bool, typer.Option(..., help='Press control when triggering the hotkey') bool, Parameter(help='Press control when triggering the hotkey')
] = False, ] = False,
alt: Annotated[ alt: Annotated[
bool, typer.Option(..., help='Press alt when triggering the hotkey') bool, Parameter(help='Press alt when triggering the hotkey')
] = False, ] = False,
cmd: Annotated[ cmd: Annotated[
bool, typer.Option(..., help='Press cmd when triggering the hotkey') bool, Parameter(help='Press cmd when triggering the hotkey')
] = False, ] = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Trigger a hotkey by sequence.""" """Trigger a hotkey by sequence."""
ctx.obj['obsws'].trigger_hotkey_by_key_sequence(key_id, shift, ctrl, alt, cmd) ctx.client.trigger_hotkey_by_key_sequence(key_id, shift, ctrl, alt, cmd)

View File

@ -3,33 +3,31 @@
from typing import Annotated from typing import Annotated
import obsws_python as obsws import obsws_python as obsws
import typer from cyclopts import App, Argument, Parameter
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
from . import console, util, validate from . import console, util, validate
from .alias import SubTyperAliasGroup from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='input', help='Commands for managing inputs in OBS')
@app.callback() @app.command(name=['list', 'ls'])
def main():
"""Control inputs in OBS."""
@app.command('list | ls')
def list_( def list_(
ctx: typer.Context, input: Annotated[bool, Parameter(help='Filter by input type.')] = False,
input: Annotated[bool, typer.Option(help='Filter by input type.')] = False, output: Annotated[bool, Parameter(help='Filter by output type.')] = False,
output: Annotated[bool, typer.Option(help='Filter by output type.')] = False, colour: Annotated[bool, Parameter(help='Filter by colour source type.')] = False,
colour: Annotated[bool, typer.Option(help='Filter by colour source type.')] = False, ffmpeg: Annotated[bool, Parameter(help='Filter by ffmpeg source type.')] = False,
ffmpeg: Annotated[bool, typer.Option(help='Filter by ffmpeg source type.')] = False, vlc: Annotated[bool, Parameter(help='Filter by VLC source type.')] = False,
vlc: Annotated[bool, typer.Option(help='Filter by VLC source type.')] = False, uuid: Annotated[bool, Parameter(help='Show UUIDs of inputs.')] = False,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of inputs.')] = False, *,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""List all inputs.""" """List all inputs."""
resp = ctx.obj['obsws'].get_input_list() resp = ctx.client.get_input_list()
kinds = [] kinds = []
if input: if input:
@ -43,7 +41,7 @@ def list_(
if vlc: if vlc:
kinds.append('vlc') kinds.append('vlc')
if not any([input, output, colour, ffmpeg, vlc]): if not any([input, output, colour, ffmpeg, vlc]):
kinds = ctx.obj['obsws'].get_input_kind_list(False).input_kinds kinds = ctx.client.get_input_kind_list(False).input_kinds
inputs = sorted( inputs = sorted(
( (
@ -57,21 +55,20 @@ def list_(
) )
if not inputs: if not inputs:
console.out.print('No inputs found.') raise OBSWSCLIError('No inputs found.', code=ExitCode.SUCCESS)
raise typer.Exit()
table = Table(title='Inputs', padding=(0, 2), border_style=ctx.obj['style'].border) table = Table(title='Inputs', padding=(0, 2), border_style=ctx.style.border)
if uuid: if uuid:
columns = [ columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column), (Text('Input Name', justify='center'), 'left', ctx.style.column),
(Text('Kind', justify='center'), 'center', ctx.obj['style'].column), (Text('Kind', justify='center'), 'center', ctx.style.column),
(Text('Muted', justify='center'), 'center', None), (Text('Muted', justify='center'), 'center', None),
(Text('UUID', justify='center'), 'left', ctx.obj['style'].column), (Text('UUID', justify='center'), 'left', ctx.style.column),
] ]
else: else:
columns = [ columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column), (Text('Input Name', justify='center'), 'left', ctx.style.column),
(Text('Kind', justify='center'), 'center', ctx.obj['style'].column), (Text('Kind', justify='center'), 'center', ctx.style.column),
(Text('Muted', justify='center'), 'center', None), (Text('Muted', justify='center'), 'center', None),
] ]
for heading, justify, style in columns: for heading, justify, style in columns:
@ -80,7 +77,7 @@ def list_(
for input_name, input_kind, input_uuid in inputs: for input_name, input_kind, input_uuid in inputs:
input_mark = '' input_mark = ''
try: try:
input_muted = ctx.obj['obsws'].get_input_mute(name=input_name).input_muted input_muted = ctx.client.get_input_mute(name=input_name).input_muted
input_mark = util.check_mark(input_muted) input_mark = util.check_mark(input_muted)
except obsws.error.OBSSDKRequestError as e: except obsws.error.OBSSDKRequestError as e:
if e.code == 604: # Input does not support audio if e.code == 604: # Input does not support audio
@ -105,19 +102,21 @@ def list_(
console.out.print(table) console.out.print(table)
@app.command('mute | m') @app.command(name=['mute', 'm'])
def mute( def mute(
ctx: typer.Context, input_name: Annotated[str, Argument(hint='Name of the input to mute.')],
input_name: Annotated[ /,
str, typer.Argument(..., show_default=False, help='Name of the input to mute.') *,
], ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Mute an input.""" """Mute an input."""
if not validate.input_in_inputs(ctx, input_name): if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.') raise OBSWSCLIError(
raise typer.Exit(1) f'Input [yellow]{input_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
ctx.obj['obsws'].set_input_mute( ctx.client.set_input_mute(
name=input_name, name=input_name,
muted=True, muted=True,
) )
@ -125,20 +124,21 @@ def mute(
console.out.print(f'Input {console.highlight(ctx, input_name)} muted.') console.out.print(f'Input {console.highlight(ctx, input_name)} muted.')
@app.command('unmute | um') @app.command(name=['unmute', 'um'])
def unmute( def unmute(
ctx: typer.Context, input_name: Annotated[str, Argument(hint='Name of the input to unmute.')],
input_name: Annotated[ /,
str, *,
typer.Argument(..., show_default=False, help='Name of the input to unmute.'), ctx: Annotated[Context, Parameter(parse=False)],
],
): ):
"""Unmute an input.""" """Unmute an input."""
if not validate.input_in_inputs(ctx, input_name): if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.') raise OBSWSCLIError(
raise typer.Exit(1) f'Input [yellow]{input_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
ctx.obj['obsws'].set_input_mute( ctx.client.set_input_mute(
name=input_name, name=input_name,
muted=False, muted=False,
) )
@ -146,23 +146,27 @@ def unmute(
console.out.print(f'Input {console.highlight(ctx, input_name)} unmuted.') console.out.print(f'Input {console.highlight(ctx, input_name)} unmuted.')
@app.command('toggle | tg') @app.command(name=['toggle', 'tg'])
def toggle( def toggle(
ctx: typer.Context,
input_name: Annotated[ input_name: Annotated[
str, str,
typer.Argument(..., show_default=False, help='Name of the input to toggle.'), Argument(hint='Name of the input to toggle.'),
], ],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Toggle an input.""" """Toggle an input."""
if not validate.input_in_inputs(ctx, input_name): if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.') raise OBSWSCLIError(
raise typer.Exit(1) f'Input [yellow]{input_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.obj['obsws'].get_input_mute(name=input_name) resp = ctx.client.get_input_mute(name=input_name)
new_state = not resp.input_muted new_state = not resp.input_muted
ctx.obj['obsws'].set_input_mute( ctx.client.set_input_mute(
name=input_name, name=input_name,
muted=new_state, muted=new_state,
) )

View File

@ -2,31 +2,29 @@
from typing import Annotated from typing import Annotated
import typer from cyclopts import App, Argument, Parameter
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
from . import console, util, validate from . import console, util, validate
from .alias import SubTyperAliasGroup from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='profile', help='Commands for managing profiles in OBS')
@app.callback() @app.command(name=['list', 'ls'])
def main(): def list_(
"""Control profiles in OBS.""" *,
ctx: Annotated[Context, Parameter(parse=False)],
):
@app.command('list | ls')
def list_(ctx: typer.Context):
"""List profiles.""" """List profiles."""
resp = ctx.obj['obsws'].get_profile_list() resp = ctx.client.get_profile_list()
table = Table( table = Table(title='Profiles', padding=(0, 2), border_style=ctx.style.border)
title='Profiles', padding=(0, 2), border_style=ctx.obj['style'].border
)
columns = [ columns = [
(Text('Profile Name', justify='center'), 'left', ctx.obj['style'].column), (Text('Profile Name', justify='center'), 'left', ctx.style.column),
(Text('Current', justify='center'), 'center', None), (Text('Current', justify='center'), 'center', None),
] ]
for heading, justify, style in columns: for heading, justify, style in columns:
@ -43,70 +41,85 @@ def list_(ctx: typer.Context):
console.out.print(table) console.out.print(table)
@app.command('current | get') @app.command(name=['current', 'get'])
def current(ctx: typer.Context): def current(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the current profile.""" """Get the current profile."""
resp = ctx.obj['obsws'].get_profile_list() resp = ctx.client.get_profile_list()
console.out.print( console.out.print(
f'Current profile: {console.highlight(ctx, resp.current_profile_name)}' f'Current profile: {console.highlight(ctx, resp.current_profile_name)}'
) )
@app.command('switch | set') @app.command(name=['switch', 'set'])
def switch( def switch(
ctx: typer.Context,
profile_name: Annotated[ profile_name: Annotated[
str, str,
typer.Argument( Argument(hint='Name of the profile to switch to'),
..., show_default=False, help='Name of the profile to switch to'
),
], ],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Switch to a profile.""" """Switch to a profile."""
if not validate.profile_exists(ctx, profile_name): if not validate.profile_exists(ctx, profile_name):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.') console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise typer.Exit(1) raise OBSWSCLIError(
f'Profile [yellow]{profile_name}[/yellow] not found.',
resp = ctx.obj['obsws'].get_profile_list() code=ExitCode.ERROR,
if resp.current_profile_name == profile_name:
console.err.print(
f'Profile [yellow]{profile_name}[/yellow] is already the current profile.'
) )
raise typer.Exit(1)
ctx.obj['obsws'].set_current_profile(profile_name) resp = ctx.client.get_profile_list()
if resp.current_profile_name == profile_name:
raise OBSWSCLIError(
f'Profile [yellow]{profile_name}[/yellow] is already the current profile.',
code=ExitCode.ERROR,
)
ctx.client.set_current_profile(profile_name)
console.out.print(f'Switched to profile {console.highlight(ctx, profile_name)}.') console.out.print(f'Switched to profile {console.highlight(ctx, profile_name)}.')
@app.command('create | new') @app.command(name=['create', 'new'])
def create( def create(
ctx: typer.Context,
profile_name: Annotated[ profile_name: Annotated[
str, str,
typer.Argument(..., show_default=False, help='Name of the profile to create.'), Argument(hint='Name of the profile to create.'),
], ],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Create a new profile.""" """Create a new profile."""
if validate.profile_exists(ctx, profile_name): if validate.profile_exists(ctx, profile_name):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] already exists.') raise OBSWSCLIError(
raise typer.Exit(1) f'Profile [yellow]{profile_name}[/yellow] already exists.',
code=ExitCode.ERROR,
)
ctx.obj['obsws'].create_profile(profile_name) ctx.client.create_profile(profile_name)
console.out.print(f'Created profile {console.highlight(ctx, profile_name)}.') console.out.print(f'Created profile {console.highlight(ctx, profile_name)}.')
@app.command('remove | rm') @app.command(name=['remove', 'rm'])
def remove( def remove(
ctx: typer.Context,
profile_name: Annotated[ profile_name: Annotated[
str, str,
typer.Argument(..., show_default=False, help='Name of the profile to remove.'), Argument(hint='Name of the profile to remove.'),
], ],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Remove a profile.""" """Remove a profile."""
if not validate.profile_exists(ctx, profile_name): if not validate.profile_exists(ctx, profile_name):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.') console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise typer.Exit(1) raise OBSWSCLIError(
f'Profile [yellow]{profile_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
ctx.obj['obsws'].remove_profile(profile_name) ctx.client.remove_profile(profile_name)
console.out.print(f'Removed profile {console.highlight(ctx, profile_name)}.') console.out.print(f'Removed profile {console.highlight(ctx, profile_name)}.')

View File

@ -2,25 +2,25 @@
from typing import Annotated from typing import Annotated
import typer from cyclopts import App, Argument, Parameter
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
from . import console from . import console
from .alias import SubTyperAliasGroup from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='projector', help='Commands for managing projectors in OBS')
@app.callback() @app.command(name=['list-monitors', 'ls-m'])
def main(): def list_monitors(
"""Control projectors in OBS.""" *,
ctx: Annotated[Context, Parameter(parse=False)],
):
@app.command('list-monitors | ls-m')
def list_monitors(ctx: typer.Context):
"""List available monitors.""" """List available monitors."""
resp = ctx.obj['obsws'].get_monitor_list() resp = ctx.client.get_monitor_list()
if not resp.monitors: if not resp.monitors:
console.out.print('No monitors found.') console.out.print('No monitors found.')
@ -34,13 +34,13 @@ def list_monitors(ctx: typer.Context):
table = Table( table = Table(
title='Available Monitors', title='Available Monitors',
padding=(0, 2), padding=(0, 2),
border_style=ctx.obj['style'].border, border_style=ctx.style.border,
) )
table.add_column( table.add_column(
Text('Index', justify='center'), justify='center', style=ctx.obj['style'].column Text('Index', justify='center'), justify='center', style=ctx.style.column
) )
table.add_column( table.add_column(
Text('Name', justify='center'), justify='left', style=ctx.obj['style'].column Text('Name', justify='center'), justify='left', style=ctx.style.column
) )
for index, monitor in monitors: for index, monitor in monitors:
@ -49,29 +49,30 @@ def list_monitors(ctx: typer.Context):
console.out.print(table) console.out.print(table)
@app.command('open | o') @app.command(name=['open', 'o'])
def open( def open(
ctx: typer.Context,
monitor_index: Annotated[
int,
typer.Option(help='Index of the monitor to open the projector on.'),
] = 0,
source_name: Annotated[ source_name: Annotated[
str, str,
typer.Argument( Argument(
show_default='The current scene', hint='Name of the source to project.',
help='Name of the source to project.',
), ),
] = '', ] = '',
/,
monitor_index: Annotated[
int,
Parameter(help='Index of the monitor to open the projector on.'),
] = 0,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Open a fullscreen projector for a source on a specific monitor.""" """Open a fullscreen projector for a source on a specific monitor."""
if not source_name: if not source_name:
source_name = ctx.obj['obsws'].get_current_program_scene().scene_name source_name = ctx.client.get_current_program_scene().scene_name
monitors = ctx.obj['obsws'].get_monitor_list().monitors monitors = ctx.client.get_monitor_list().monitors
for monitor in monitors: for monitor in monitors:
if monitor['monitorIndex'] == monitor_index: if monitor['monitorIndex'] == monitor_index:
ctx.obj['obsws'].open_source_projector( ctx.client.open_source_projector(
source_name=source_name, source_name=source_name,
monitor_index=monitor_index, monitor_index=monitor_index,
) )
@ -82,8 +83,8 @@ def open(
break break
else: else:
console.err.print( raise OBSWSCLIError(
f'Monitor with index [yellow]{monitor_index}[/yellow] not found. ' f'Monitor with index [yellow]{monitor_index}[/yellow] not found. '
f'Use [yellow]obsws-cli projector ls-m[/yellow] to see available monitors.' f'Use [yellow]obsws-cli projector ls-m[/yellow] to see available monitors.',
ExitCode.ERROR,
) )
raise typer.Exit(code=1)

View File

@ -3,67 +3,75 @@
from pathlib import Path from pathlib import Path
from typing import Annotated, Optional from typing import Annotated, Optional
import typer from cyclopts import App, Argument, Parameter
from . import console from . import console
from .alias import SubTyperAliasGroup from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='record', help='Commands for controlling OBS recording functionality.')
@app.callback() def _get_recording_status(ctx: Context) -> tuple:
def main():
"""Control OBS recording functionality."""
def _get_recording_status(ctx: typer.Context) -> tuple:
"""Get recording status.""" """Get recording status."""
resp = ctx.obj['obsws'].get_record_status() resp = ctx.client.get_record_status()
return resp.output_active, resp.output_paused return resp.output_active, resp.output_paused
@app.command('start | s') @app.command(name=['start', 's'])
def start(ctx: typer.Context): def start(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start recording.""" """Start recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if active: if active:
err_msg = 'Recording is already in progress, cannot start.' err_msg = 'Recording is already in progress, cannot start.'
if paused: if paused:
err_msg += ' Try resuming it.' err_msg += ' Try resuming it.'
raise OBSWSCLIError(err_msg, ExitCode.ERROR)
console.err.print(err_msg) ctx.client.start_record()
raise typer.Exit(1)
ctx.obj['obsws'].start_record()
console.out.print('Recording started successfully.') console.out.print('Recording started successfully.')
@app.command('stop | st') @app.command(name=['stop', 'st'])
def stop(ctx: typer.Context): def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop recording.""" """Stop recording."""
active, _ = _get_recording_status(ctx) active, _ = _get_recording_status(ctx)
if not active: if not active:
console.err.print('Recording is not in progress, cannot stop.') raise OBSWSCLIError(
raise typer.Exit(1) 'Recording is not in progress, cannot stop.', ExitCode.ERROR
)
resp = ctx.obj['obsws'].stop_record() resp = ctx.client.stop_record()
console.out.print( console.out.print(
f'Recording stopped successfully. Saved to: {console.highlight(ctx, resp.output_path)}' f'Recording stopped successfully. Saved to: {console.highlight(ctx, resp.output_path)}'
) )
@app.command('toggle | tg') @app.command(name=['toggle', 'tg'])
def toggle(ctx: typer.Context): def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle recording.""" """Toggle recording."""
resp = ctx.obj['obsws'].toggle_record() resp = ctx.client.toggle_record()
if resp.output_active: if resp.output_active:
console.out.print('Recording started successfully.') console.out.print('Recording started successfully.')
else: else:
console.out.print('Recording stopped successfully.') console.out.print('Recording stopped successfully.')
@app.command('status | ss') @app.command(name=['status', 'ss'])
def status(ctx: typer.Context): def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get recording status.""" """Get recording status."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if active: if active:
@ -75,98 +83,114 @@ def status(ctx: typer.Context):
console.out.print('Recording is not in progress.') console.out.print('Recording is not in progress.')
@app.command('resume | r') @app.command(name=['resume', 'r'])
def resume(ctx: typer.Context): def resume(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Resume recording.""" """Resume recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if not active: if not active:
console.err.print('Recording is not in progress, cannot resume.') raise OBSWSCLIError(
raise typer.Exit(1) 'Recording is not in progress, cannot resume.', ExitCode.ERROR
)
if not paused: if not paused:
console.err.print('Recording is in progress but not paused, cannot resume.') raise OBSWSCLIError(
raise typer.Exit(1) 'Recording is in progress but not paused, cannot resume.', ExitCode.ERROR
)
ctx.obj['obsws'].resume_record() ctx.client.resume_record()
console.out.print('Recording resumed successfully.') console.out.print('Recording resumed successfully.')
@app.command('pause | p') @app.command(name=['pause', 'p'])
def pause(ctx: typer.Context): def pause(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Pause recording.""" """Pause recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if not active: if not active:
console.err.print('Recording is not in progress, cannot pause.') raise OBSWSCLIError(
raise typer.Exit(1) 'Recording is not in progress, cannot pause.', ExitCode.ERROR
)
if paused: if paused:
console.err.print('Recording is in progress but already paused, cannot pause.') raise OBSWSCLIError(
raise typer.Exit(1) 'Recording is in progress but already paused, cannot pause.', ExitCode.ERROR
)
ctx.obj['obsws'].pause_record() ctx.client.pause_record()
console.out.print('Recording paused successfully.') console.out.print('Recording paused successfully.')
@app.command('directory | d') @app.command(name=['directory', 'd'])
def directory( def directory(
ctx: typer.Context,
record_directory: Annotated[ record_directory: Annotated[
Optional[Path], Optional[Path],
# Since the CLI and OBS may be running on different platforms, # Since the CLI and OBS may be running on different platforms,
# we won't validate the path here. # we won't validate the path here.
typer.Argument( Argument(
file_okay=False, hint='Directory to set for recording.',
dir_okay=True,
help='Directory to set for recording.',
), ),
] = None, ] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Get or set the recording directory.""" """Get or set the recording directory."""
if record_directory is not None: if record_directory is not None:
ctx.obj['obsws'].set_record_directory(str(record_directory)) ctx.client.set_record_directory(str(record_directory))
console.out.print( console.out.print(
f'Recording directory updated to: {console.highlight(ctx, record_directory)}' f'Recording directory updated to: {console.highlight(ctx, record_directory)}'
) )
else: else:
resp = ctx.obj['obsws'].get_record_directory() resp = ctx.client.get_record_directory()
console.out.print( console.out.print(
f'Recording directory: {console.highlight(ctx, resp.record_directory)}' f'Recording directory: {console.highlight(ctx, resp.record_directory)}'
) )
@app.command('split | sp') @app.command(name=['split', 'sp'])
def split(ctx: typer.Context): def split(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Split the current recording.""" """Split the current recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if not active: if not active:
console.err.print('Recording is not in progress, cannot split.') console.err.print('Recording is not in progress, cannot split.')
raise typer.Exit(1) raise OBSWSCLIError(
'Recording is not in progress, cannot split.', ExitCode.ERROR
)
if paused: if paused:
console.err.print('Recording is paused, cannot split.') raise OBSWSCLIError('Recording is paused, cannot split.', ExitCode.ERROR)
raise typer.Exit(1)
ctx.obj['obsws'].split_record_file() ctx.client.split_record_file()
console.out.print('Recording split successfully.') console.out.print('Recording split successfully.')
@app.command('chapter | ch') @app.command(name=['chapter', 'ch'])
def chapter( def chapter(
ctx: typer.Context,
chapter_name: Annotated[ chapter_name: Annotated[
Optional[str], Optional[str],
typer.Argument( Argument(
help='Name of the chapter to create.', hint='Name of the chapter to create.',
), ),
] = None, ] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Create a chapter in the current recording.""" """Create a chapter in the current recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if not active: if not active:
console.err.print('Recording is not in progress, cannot create chapter.') raise OBSWSCLIError(
raise typer.Exit(1) 'Recording is not in progress, cannot create chapter.', ExitCode.ERROR
)
if paused: if paused:
console.err.print('Recording is paused, cannot create chapter.') raise OBSWSCLIError(
raise typer.Exit(1) 'Recording is paused, cannot create chapter.', ExitCode.ERROR
)
ctx.obj['obsws'].create_record_chapter(chapter_name) ctx.client.create_record_chapter(chapter_name)
console.out.print( console.out.print(
f'Chapter {console.highlight(ctx, chapter_name or "unnamed")} created successfully.' f'Chapter {console.highlight(ctx, chapter_name or "unnamed")} created successfully.'
) )

View File

@ -1,64 +1,78 @@
"""module containing commands for manipulating the replay buffer in OBS.""" """module containing commands for manipulating the replay buffer in OBS."""
import typer from typing import Annotated
from cyclopts import App, Parameter
from . import console from . import console
from .alias import SubTyperAliasGroup from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup) app = App(
name='replaybuffer', help='Commands for controlling the replay buffer in OBS.'
)
@app.callback() @app.command(name=['start', 's'])
def main(): def start(
"""Control profiles in OBS.""" *,
ctx: Annotated[Context, Parameter(parse=False)],
):
@app.command('start | s')
def start(ctx: typer.Context):
"""Start the replay buffer.""" """Start the replay buffer."""
resp = ctx.obj['obsws'].get_replay_buffer_status() resp = ctx.client.get_replay_buffer_status()
if resp.output_active: if resp.output_active:
console.err.print('Replay buffer is already active.') raise OBSWSCLIError('Replay buffer is already active.', ExitCode.ERROR)
raise typer.Exit(1)
ctx.obj['obsws'].start_replay_buffer() ctx.client.start_replay_buffer()
console.out.print('Replay buffer started.') console.out.print('Replay buffer started.')
@app.command('stop | st') @app.command(name=['stop', 'st'])
def stop(ctx: typer.Context): def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop the replay buffer.""" """Stop the replay buffer."""
resp = ctx.obj['obsws'].get_replay_buffer_status() resp = ctx.client.get_replay_buffer_status()
if not resp.output_active: if not resp.output_active:
console.err.print('Replay buffer is not active.') raise OBSWSCLIError('Replay buffer is not active.', ExitCode.ERROR)
raise typer.Exit(1)
ctx.obj['obsws'].stop_replay_buffer() ctx.client.stop_replay_buffer()
console.out.print('Replay buffer stopped.') console.out.print('Replay buffer stopped.')
@app.command('toggle | tg') @app.command(name=['toggle', 'tg'])
def toggle(ctx: typer.Context): def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle the replay buffer.""" """Toggle the replay buffer."""
resp = ctx.obj['obsws'].toggle_replay_buffer() resp = ctx.client.toggle_replay_buffer()
if resp.output_active: if resp.output_active:
console.out.print('Replay buffer is active.') console.out.print('Replay buffer is active.')
else: else:
console.out.print('Replay buffer is not active.') console.out.print('Replay buffer is not active.')
@app.command('status | ss') @app.command(name=['status', 'ss'])
def status(ctx: typer.Context): def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of the replay buffer.""" """Get the status of the replay buffer."""
resp = ctx.obj['obsws'].get_replay_buffer_status() resp = ctx.client.get_replay_buffer_status()
if resp.output_active: if resp.output_active:
console.out.print('Replay buffer is active.') console.out.print('Replay buffer is active.')
else: else:
console.out.print('Replay buffer is not active.') console.out.print('Replay buffer is not active.')
@app.command('save | sv') @app.command(name=['save', 'sv'])
def save(ctx: typer.Context): def save(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Save the replay buffer.""" """Save the replay buffer."""
ctx.obj['obsws'].save_replay_buffer() ctx.client.save_replay_buffer()
console.out.print('Replay buffer saved.') console.out.print('Replay buffer saved.')

View File

@ -11,7 +11,7 @@ from .context import Context
from .enum import ExitCode from .enum import ExitCode
from .error import OBSWSCLIError from .error import OBSWSCLIError
app = App(name='scene') app = App(name='scene', help='Commands for managing OBS scenes')
@app.command(name=['list', 'ls']) @app.command(name=['list', 'ls'])

View File

@ -2,33 +2,33 @@
from typing import Annotated from typing import Annotated
import typer from cyclopts import App, Argument, Parameter
from rich.table import Table from rich.table import Table
from . import console, validate from . import console, validate
from .alias import SubTyperAliasGroup from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup) app = App(
name='scenecollection', help='Commands for controlling scene collections in OBS.'
)
@app.callback() @app.command(name=['list', 'ls'])
def main(): def list_(
"""Control scene collections in OBS.""" *,
ctx: Annotated[Context, Parameter(parse=False)],
):
@app.command('list | ls')
def list_(ctx: typer.Context):
"""List all scene collections.""" """List all scene collections."""
resp = ctx.obj['obsws'].get_scene_collection_list() resp = ctx.client.get_scene_collection_list()
table = Table( table = Table(
title='Scene Collections', title='Scene Collections',
padding=(0, 2), padding=(0, 2),
border_style=ctx.obj['style'].border, border_style=ctx.style.border,
)
table.add_column(
'Scene Collection Name', justify='left', style=ctx.obj['style'].column
) )
table.add_column('Scene Collection Name', justify='left', style=ctx.style.column)
for scene_collection_name in resp.scene_collections: for scene_collection_name in resp.scene_collections:
table.add_row(scene_collection_name) table.add_row(scene_collection_name)
@ -36,59 +36,66 @@ def list_(ctx: typer.Context):
console.out.print(table) console.out.print(table)
@app.command('current | get') @app.command(name=['current', 'get'])
def current(ctx: typer.Context): def current(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the current scene collection.""" """Get the current scene collection."""
resp = ctx.obj['obsws'].get_scene_collection_list() resp = ctx.client.get_scene_collection_list()
console.out.print( console.out.print(
f'Current scene collection: {console.highlight(ctx, resp.current_scene_collection_name)}' f'Current scene collection: {console.highlight(ctx, resp.current_scene_collection_name)}'
) )
@app.command('switch | set') @app.command(name=['switch', 'set'])
def switch( def switch(
ctx: typer.Context,
scene_collection_name: Annotated[ scene_collection_name: Annotated[
str, typer.Argument(..., help='Name of the scene collection to switch to') str, Argument(hint='Name of the scene collection to switch to')
], ],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Switch to a scene collection.""" """Switch to a scene collection."""
if not validate.scene_collection_in_scene_collections(ctx, scene_collection_name): if not validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
console.err.print( raise OBSWSCLIError(
f'Scene collection [yellow]{scene_collection_name}[/yellow] not found.' f'Scene collection [yellow]{scene_collection_name}[/yellow] not found.',
exit_code=ExitCode.ERROR,
) )
raise typer.Exit(1)
current_scene_collection = ( current_scene_collection = (
ctx.obj['obsws'].get_scene_collection_list().current_scene_collection_name ctx.client.get_scene_collection_list().current_scene_collection_name
) )
if scene_collection_name == current_scene_collection: if scene_collection_name == current_scene_collection:
console.err.print( raise OBSWSCLIError(
f'Scene collection [yellow]{scene_collection_name}[/yellow] is already active.' f'Scene collection [yellow]{scene_collection_name}[/yellow] is already active.',
exit_code=ExitCode.ERROR,
) )
raise typer.Exit(1)
ctx.obj['obsws'].set_current_scene_collection(scene_collection_name) ctx.client.set_current_scene_collection(scene_collection_name)
console.out.print( console.out.print(
f'Switched to scene collection {console.highlight(ctx, scene_collection_name)}.' f'Switched to scene collection {console.highlight(ctx, scene_collection_name)}.'
) )
@app.command('create | new') @app.command(name=['create', 'new'])
def create( def create(
ctx: typer.Context,
scene_collection_name: Annotated[ scene_collection_name: Annotated[
str, typer.Argument(..., help='Name of the scene collection to create') str, Argument(hint='Name of the scene collection to create')
], ],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Create a new scene collection.""" """Create a new scene collection."""
if validate.scene_collection_in_scene_collections(ctx, scene_collection_name): if validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
console.err.print( raise OBSWSCLIError(
f'Scene collection [yellow]{scene_collection_name}[/yellow] already exists.' f'Scene collection [yellow]{scene_collection_name}[/yellow] already exists.',
exit_code=ExitCode.ERROR,
) )
raise typer.Exit(1)
ctx.obj['obsws'].create_scene_collection(scene_collection_name) ctx.client.create_scene_collection(scene_collection_name)
console.out.print( console.out.print(
f'Created scene collection {console.highlight(ctx, scene_collection_name)}.' f'Created scene collection {console.highlight(ctx, scene_collection_name)}.'
) )

View File

@ -2,41 +2,42 @@
from typing import Annotated, Optional from typing import Annotated, Optional
import typer from cyclopts import App, Argument, Parameter
from rich.table import Table from rich.table import Table
from . import console, util, validate from . import console, util, validate
from .alias import SubTyperAliasGroup from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='sceneitem', help='Commands for controlling scene items in OBS.')
@app.callback() @app.command(name=['list', 'ls'])
def main():
"""Control items in OBS scenes."""
@app.command('list | ls')
def list_( def list_(
ctx: typer.Context,
scene_name: Annotated[ scene_name: Annotated[
Optional[str], Optional[str],
typer.Argument( Argument(
show_default='The current scene', hint='Scene name to list items for',
help='Scene name to list items for',
), ),
] = None, ] = None,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of scene items')] = False, /,
uuid: Annotated[bool, Parameter(help='Show UUIDs of scene items')] = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""List all items in a scene.""" """List all items in a scene."""
if not scene_name: if not scene_name:
scene_name = ctx.obj['obsws'].get_current_program_scene().scene_name scene_name = ctx.client.get_current_program_scene().scene_name
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.') console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1) raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
exit_code=ExitCode.ERROR,
)
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.client.get_scene_item_list(scene_name)
items = sorted( items = sorted(
( (
( (
@ -52,10 +53,10 @@ def list_(
) )
if not items: if not items:
console.out.print( raise OBSWSCLIError(
f'No items found in scene {console.highlight(ctx, scene_name)}.' f'No items found in scene [yellow]{scene_name}[/yellow].',
exit_code=ExitCode.SUCCESS,
) )
raise typer.Exit()
table = Table( table = Table(
title=f'Items in Scene: {scene_name}', title=f'Items in Scene: {scene_name}',
@ -138,36 +139,39 @@ def list_(
def _validate_sources( def _validate_sources(
ctx: typer.Context, ctx: Context,
scene_name: str, scene_name: str,
item_name: str, item_name: str,
group: Optional[str] = None, group: Optional[str] = None,
) -> bool: ):
"""Validate the scene name and item name.""" """Validate the scene name and item name."""
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.') raise OBSWSCLIError(
return False f'Scene [yellow]{scene_name}[/yellow] not found.',
exit_code=ExitCode.ERROR,
)
if group: if group:
if not validate.item_in_scene_item_list(ctx, scene_name, group): if not validate.item_in_scene_item_list(ctx, scene_name, group):
console.err.print( raise OBSWSCLIError(
f'Group [yellow]{group}[/yellow] not found in scene [yellow]{scene_name}[/yellow].' f'Group [yellow]{group}[/yellow] not found in scene [yellow]{scene_name}[/yellow].',
exit_code=ExitCode.ERROR,
) )
return False
else: else:
if not validate.item_in_scene_item_list(ctx, scene_name, item_name): if not validate.item_in_scene_item_list(ctx, scene_name, item_name):
console.err.print( raise OBSWSCLIError(
f'Item [yellow]{item_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow]. Is the item in a group? ' f'Item [yellow]{item_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow]. Is the item in a group? '
f'If so use the [yellow]--group[/yellow] option to specify the parent group.\n' f'If so use the [yellow]--group[/yellow] option to specify the parent group.\n'
'Use [yellow]obsws-cli sceneitem ls[/yellow] for a list of items in the scene.' 'Use [yellow]obsws-cli sceneitem ls[/yellow] for a list of items in the scene.',
exit_code=ExitCode.ERROR,
) )
return False
return True
def _get_scene_name_and_item_id( def _get_scene_name_and_item_id(
ctx: typer.Context, scene_name: str, item_name: str, group: Optional[str] = None ctx: Context,
scene_name: str,
item_name: str,
group: Optional[str] = None,
): ):
"""Get the scene name and item ID for the given scene and item name.""" """Get the scene name and item ID for the given scene and item name."""
if group: if group:
@ -178,10 +182,10 @@ def _get_scene_name_and_item_id(
scene_item_id = item.get('sceneItemId') scene_item_id = item.get('sceneItemId')
break break
else: else:
console.err.print( raise OBSWSCLIError(
f'Item [yellow]{item_name}[/yellow] not found in group [yellow]{group}[/yellow].' f'Item [yellow]{item_name}[/yellow] not found in group [yellow]{group}[/yellow].',
exit_code=ExitCode.ERROR,
) )
raise typer.Exit(1)
else: else:
resp = ctx.obj['obsws'].get_scene_item_id(scene_name, item_name) resp = ctx.obj['obsws'].get_scene_item_id(scene_name, item_name)
scene_item_id = resp.scene_item_id scene_item_id = resp.scene_item_id
@ -189,21 +193,20 @@ def _get_scene_name_and_item_id(
return scene_name, scene_item_id return scene_name, scene_item_id
@app.command('show | sh') @app.command(name=['show', 'sh'])
def show( def show(
ctx: typer.Context, scene_name: Annotated[str, Argument(hint='Scene name the item is in')],
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
item_name: Annotated[ item_name: Annotated[
str, str,
typer.Argument(..., show_default=False, help='Item name to show in the scene'), Argument(hint='Item name to show in the scene'),
], ],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None, /,
group: Annotated[Optional[str], Parameter(help='Parent group name')] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Show an item in a scene.""" """Show an item in a scene."""
if not _validate_sources(ctx, scene_name, item_name, group): _validate_sources(ctx, scene_name, item_name, group)
raise typer.Exit(1)
old_scene_name = scene_name old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id( scene_name, scene_item_id = _get_scene_name_and_item_id(
@ -231,21 +234,20 @@ def show(
) )
@app.command('hide | h') @app.command(name=['hide', 'h'])
def hide( def hide(
ctx: typer.Context, scene_name: Annotated[str, Argument(hint='Scene name the item is in')],
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
item_name: Annotated[ item_name: Annotated[
str, str,
typer.Argument(..., show_default=False, help='Item name to hide in the scene'), Argument(hint='Item name to hide in the scene'),
], ],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None, /,
group: Annotated[Optional[str], Parameter(help='Parent group name')] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Hide an item in a scene.""" """Hide an item in a scene."""
if not _validate_sources(ctx, scene_name, item_name, group): _validate_sources(ctx, scene_name, item_name, group)
raise typer.Exit(1)
old_scene_name = scene_name old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id( scene_name, scene_item_id = _get_scene_name_and_item_id(
@ -272,36 +274,30 @@ def hide(
) )
@app.command('toggle | tg') @app.command(name=['toggle', 'tg'])
def toggle( def toggle(
ctx: typer.Context, scene_name: Annotated[str, Argument(hint='Scene name the item is in')],
scene_name: Annotated[ item_name: Annotated[str, Argument(hint='Item name to toggle in the scene')],
str, typer.Argument(..., show_default=False, help='Scene name the item is in') /,
], group: Annotated[Optional[str], Parameter(help='Parent group name')] = None,
item_name: Annotated[ *,
str, ctx: Annotated[Context, Parameter(parse=False)],
typer.Argument(
..., show_default=False, help='Item name to toggle in the scene'
),
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
): ):
"""Toggle an item in a scene.""" """Toggle an item in a scene."""
if not _validate_sources(ctx, scene_name, item_name, group): _validate_sources(ctx, scene_name, item_name, group)
raise typer.Exit(1)
old_scene_name = scene_name old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id( scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group ctx, scene_name, item_name, group
) )
enabled = ctx.obj['obsws'].get_scene_item_enabled( enabled = ctx.client.get_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(scene_item_id), item_id=int(scene_item_id),
) )
new_state = not enabled.scene_item_enabled new_state = not enabled.scene_item_enabled
ctx.obj['obsws'].set_scene_item_enabled( ctx.client.set_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(scene_item_id), item_id=int(scene_item_id),
enabled=new_state, enabled=new_state,
@ -333,30 +329,26 @@ def toggle(
) )
@app.command('visible | v') @app.command(name=['visible', 'v'])
def visible( def visible(
ctx: typer.Context, scene_name: Annotated[str, Argument(hint='Scene name the item is in')],
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
item_name: Annotated[ item_name: Annotated[
str, str, Argument(hint='Item name to check visibility in the scene')
typer.Argument(
..., show_default=False, help='Item name to check visibility in the scene'
),
], ],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None, /,
group: Annotated[Optional[str], Parameter(help='Parent group name')] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Check if an item in a scene is visible.""" """Check if an item in a scene is visible."""
if not _validate_sources(ctx, scene_name, item_name, group): _validate_sources(ctx, scene_name, item_name, group)
raise typer.Exit(1)
old_scene_name = scene_name old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id( scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group ctx, scene_name, item_name, group
) )
enabled = ctx.obj['obsws'].get_scene_item_enabled( enabled = ctx.client.get_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(scene_item_id), item_id=int(scene_item_id),
) )
@ -377,68 +369,62 @@ def visible(
) )
@app.command('transform | t') @app.command(name=['transform', 't'])
def transform( def transform(
ctx: typer.Context, scene_name: Annotated[str, Argument(hint='Scene name the item is in')],
scene_name: Annotated[ item_name: Annotated[str, Argument(hint='Item name to transform in the scene')],
str, typer.Argument(..., show_default=False, help='Scene name the item is in') /,
], group: Annotated[Optional[str], Parameter(help='Parent group name')] = None,
item_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='Item name to transform in the scene'
),
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
alignment: Annotated[ alignment: Annotated[
Optional[int], typer.Option(help='Alignment of the item in the scene') Optional[int], Parameter(help='Alignment of the item in the scene')
] = None, ] = None,
bounds_alignment: Annotated[ bounds_alignment: Annotated[
Optional[int], typer.Option(help='Bounds alignment of the item in the scene') Optional[int], Parameter(help='Bounds alignment of the item in the scene')
] = None, ] = None,
bounds_height: Annotated[ bounds_height: Annotated[
Optional[float], typer.Option(help='Height of the item in the scene') Optional[float], Parameter(help='Height of the item in the scene')
] = None, ] = None,
bounds_type: Annotated[ bounds_type: Annotated[
Optional[str], typer.Option(help='Type of bounds for the item in the scene') Optional[str], Parameter(help='Type of bounds for the item in the scene')
] = None, ] = None,
bounds_width: Annotated[ bounds_width: Annotated[
Optional[float], typer.Option(help='Width of the item in the scene') Optional[float], Parameter(help='Width of the item in the scene')
] = None, ] = None,
crop_to_bounds: Annotated[ crop_to_bounds: Annotated[
Optional[bool], typer.Option(help='Crop the item to the bounds') Optional[bool], Parameter(help='Crop the item to the bounds')
] = None, ] = None,
crop_bottom: Annotated[ crop_bottom: Annotated[
Optional[float], typer.Option(help='Bottom crop of the item in the scene') Optional[float], Parameter(help='Bottom crop of the item in the scene')
] = None, ] = None,
crop_left: Annotated[ crop_left: Annotated[
Optional[float], typer.Option(help='Left crop of the item in the scene') Optional[float], Parameter(help='Left crop of the item in the scene')
] = None, ] = None,
crop_right: Annotated[ crop_right: Annotated[
Optional[float], typer.Option(help='Right crop of the item in the scene') Optional[float], Parameter(help='Right crop of the item in the scene')
] = None, ] = None,
crop_top: Annotated[ crop_top: Annotated[
Optional[float], typer.Option(help='Top crop of the item in the scene') Optional[float], Parameter(help='Top crop of the item in the scene')
] = None, ] = None,
position_x: Annotated[ position_x: Annotated[
Optional[float], typer.Option(help='X position of the item in the scene') Optional[float], Parameter(help='X position of the item in the scene')
] = None, ] = None,
position_y: Annotated[ position_y: Annotated[
Optional[float], typer.Option(help='Y position of the item in the scene') Optional[float], Parameter(help='Y position of the item in the scene')
] = None, ] = None,
rotation: Annotated[ rotation: Annotated[
Optional[float], typer.Option(help='Rotation of the item in the scene') Optional[float], Parameter(help='Rotation of the item in the scene')
] = None, ] = None,
scale_x: Annotated[ scale_x: Annotated[
Optional[float], typer.Option(help='X scale of the item in the scene') Optional[float], Parameter(help='X scale of the item in the scene')
] = None, ] = None,
scale_y: Annotated[ scale_y: Annotated[
Optional[float], typer.Option(help='Y scale of the item in the scene') Optional[float], Parameter(help='Y scale of the item in the scene')
] = None, ] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Set the transform of an item in a scene.""" """Set the transform of an item in a scene."""
if not _validate_sources(ctx, scene_name, item_name, group): _validate_sources(ctx, scene_name, item_name, group)
raise typer.Exit(1)
old_scene_name = scene_name old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id( scene_name, scene_item_id = _get_scene_name_and_item_id(
@ -478,10 +464,12 @@ def transform(
transform['scaleY'] = scale_y transform['scaleY'] = scale_y
if not transform: if not transform:
console.err.print('No transform options provided.') raise OBSWSCLIError(
raise typer.Exit(1) 'No transform options provided. Use at least one of the transform options.',
exit_code=ExitCode.ERROR,
)
transform = ctx.obj['obsws'].set_scene_item_transform( transform = ctx.client.set_scene_item_transform(
scene_name=scene_name, scene_name=scene_name,
item_id=int(scene_item_id), item_id=int(scene_item_id),
transform=transform, transform=transform,

View File

@ -4,66 +4,57 @@ from pathlib import Path
from typing import Annotated from typing import Annotated
import obsws_python as obsws import obsws_python as obsws
import typer from cyclopts import App, Argument, Parameter
from . import console from . import console
from .alias import SubTyperAliasGroup from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='screenshot', help='Commands for taking screenshots using OBS.')
@app.callback() @app.command(name=['save', 'sv'])
def main():
"""Take screenshots using OBS."""
@app.command('save | sv')
def save( def save(
ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[
str, str,
typer.Argument( Argument(
..., hint='Name of the source to take a screenshot of.',
show_default=False,
help='Name of the source to take a screenshot of.',
), ),
], ],
output_path: Annotated[ output_path: Annotated[
Path, Path,
# Since the CLI and OBS may be running on different platforms, # Since the CLI and OBS may be running on different platforms,
# we won't validate the path here. # we won't validate the path here.
typer.Argument( Argument(
..., hint='Path to save the screenshot (must include file name and extension).',
show_default=False,
file_okay=True,
dir_okay=False,
help='Path to save the screenshot (must include file name and extension).',
), ),
], ],
/,
width: Annotated[ width: Annotated[
float, float,
typer.Option( Parameter(
help='Width of the screenshot.', help='Width of the screenshot.',
), ),
] = 1920, ] = 1920,
height: Annotated[ height: Annotated[
float, float,
typer.Option( Parameter(
help='Height of the screenshot.', help='Height of the screenshot.',
), ),
] = 1080, ] = 1080,
quality: Annotated[ quality: Annotated[
float, float,
typer.Option( Parameter(
min=-1,
max=100,
help='Quality of the screenshot.', help='Quality of the screenshot.',
), ),
] = -1, ] = -1,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Take a screenshot and save it to a file.""" """Take a screenshot and save it to a file."""
try: try:
ctx.obj['obsws'].save_source_screenshot( ctx.client.save_source_screenshot(
name=source_name, name=source_name,
img_format=output_path.suffix.lstrip('.').lower(), img_format=output_path.suffix.lstrip('.').lower(),
file_path=str(output_path), file_path=str(output_path),
@ -74,16 +65,16 @@ def save(
except obsws.error.OBSSDKRequestError as e: except obsws.error.OBSSDKRequestError as e:
match e.code: match e.code:
case 403: case 403:
console.err.print( raise OBSWSCLIError(
'The [yellow]image format[/yellow] (file extension) must be included in the file name, ' 'The [yellow]image format[/yellow] (file extension) must be included in the file name, '
"for example: '/path/to/screenshot.png'.", "for example: '/path/to/screenshot.png'.",
code=ExitCode.ERROR,
) )
raise typer.Exit(1)
case 600: case 600:
console.err.print( raise OBSWSCLIError(
f'No source was found by the name of [yellow]{source_name}[/yellow]' 'No source was found by the name of [yellow]{source_name}[/yellow]',
code=ExitCode.ERROR,
) )
raise typer.Exit(1)
case _: case _:
raise raise

View File

@ -1,80 +0,0 @@
"""module for settings management for obsws-cli."""
from collections import UserDict
from pathlib import Path
from dotenv import dotenv_values
SettingsValue = str | int
class Settings(UserDict):
"""A class to manage settings for obsws-cli.
This class extends UserDict to provide a dictionary-like interface for settings.
It loads settings from environment variables and .env files.
The settings are expected to be in uppercase and should start with 'OBS_'.
Example:
-------
settings = Settings()
host = settings['OBS_HOST']
settings['OBS_PORT'] = 4455
"""
PREFIX = 'OBS_'
def __init__(self, *args, **kwargs):
"""Initialize the Settings object."""
kwargs.update(
{
**dotenv_values('.env'),
**dotenv_values(Path.home() / '.config' / 'obsws-cli' / 'obsws.env'),
}
)
super().__init__(*args, **kwargs)
def __getitem__(self, key: str) -> SettingsValue:
"""Get a setting value by key."""
key = key.upper()
if not key.startswith(Settings.PREFIX):
key = f'{Settings.PREFIX}{key}'
return self.data[key]
def __setitem__(self, key: str, value: SettingsValue):
"""Set a setting value by key."""
key = key.upper()
if not key.startswith(Settings.PREFIX):
key = f'{Settings.PREFIX}{key}'
self.data[key] = value
_settings = Settings(
OBS_HOST='localhost',
OBS_PORT=4455,
OBS_PASSWORD='',
OBS_TIMEOUT=5,
OBS_DEBUG=False,
OBS_STYLE='disabled',
OBS_STYLE_NO_BORDER=False,
)
def get(key: str) -> SettingsValue:
"""Get a setting value by key.
Args:
----
key (str): The key of the setting to retrieve.
Returns:
-------
The value of the setting.
Raises:
------
KeyError: If the key does not exist in the settings.
"""
return _settings[key]

View File

@ -1,60 +1,75 @@
"""module for controlling OBS stream functionality.""" """module for controlling OBS stream functionality."""
import typer from typing import Annotated
from cyclopts import App, Parameter
from . import console from . import console
from .alias import SubTyperAliasGroup from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='stream', help='Commands for controlling OBS stream functionality.')
@app.callback() def _get_streaming_status(ctx: Context) -> tuple:
def main():
"""Control OBS stream functionality."""
def _get_streaming_status(ctx: typer.Context) -> tuple:
"""Get streaming status.""" """Get streaming status."""
resp = ctx.obj['obsws'].get_stream_status() resp = ctx.client.get_stream_status()
return resp.output_active, resp.output_duration return resp.output_active, resp.output_duration
@app.command('start | s') @app.command(name=['start', 's'])
def start(ctx: typer.Context): def start(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start streaming.""" """Start streaming."""
active, _ = _get_streaming_status(ctx) active, _ = _get_streaming_status(ctx)
if active: if active:
console.err.print('Streaming is already in progress, cannot start.') raise OBSWSCLIError(
raise typer.Exit(1) 'Streaming is already in progress, cannot start.',
code=ExitCode.ERROR,
)
ctx.obj['obsws'].start_stream() ctx.client.start_stream()
console.out.print('Streaming started successfully.') console.out.print('Streaming started successfully.')
@app.command('stop | st') @app.command(name=['stop', 'st'])
def stop(ctx: typer.Context): def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop streaming.""" """Stop streaming."""
active, _ = _get_streaming_status(ctx) active, _ = _get_streaming_status(ctx)
if not active: if not active:
console.err.print('Streaming is not in progress, cannot stop.') raise OBSWSCLIError(
raise typer.Exit(1) 'Streaming is not in progress, cannot stop.',
code=ExitCode.ERROR,
)
ctx.obj['obsws'].stop_stream() ctx.client.stop_stream()
console.out.print('Streaming stopped successfully.') console.out.print('Streaming stopped successfully.')
@app.command('toggle | tg') @app.command(name=['toggle', 'tg'])
def toggle(ctx: typer.Context): def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle streaming.""" """Toggle streaming."""
resp = ctx.obj['obsws'].toggle_stream() resp = ctx.client.toggle_stream()
if resp.output_active: if resp.output_active:
console.out.print('Streaming started successfully.') console.out.print('Streaming started successfully.')
else: else:
console.out.print('Streaming stopped successfully.') console.out.print('Streaming stopped successfully.')
@app.command('status | ss') @app.command(name=['status', 'ss'])
def status(ctx: typer.Context): def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get streaming status.""" """Get streaming status."""
active, duration = _get_streaming_status(ctx) active, duration = _get_streaming_status(ctx)
if active: if active:

View File

@ -1,48 +1,57 @@
"""module containing commands for manipulating studio mode in OBS.""" """module containing commands for manipulating studio mode in OBS."""
import typer from typing import Annotated
from cyclopts import App, Parameter
from . import console from . import console
from .alias import SubTyperAliasGroup from .context import Context
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='studiomode', help='Commands for controlling studio mode in OBS.')
@app.callback() @app.command(name=['enable', 'on'])
def main(): def enable(
"""Control studio mode in OBS.""" *,
ctx: Annotated[Context, Parameter(parse=False)],
):
@app.command('enable | on')
def enable(ctx: typer.Context):
"""Enable studio mode.""" """Enable studio mode."""
ctx.obj['obsws'].set_studio_mode_enabled(True) ctx.obj['obsws'].set_studio_mode_enabled(True)
console.out.print('Studio mode has been enabled.') console.out.print('Studio mode has been enabled.')
@app.command('disable | off') @app.command(name=['disable', 'off'])
def disable(ctx: typer.Context): def disable(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Disable studio mode.""" """Disable studio mode."""
ctx.obj['obsws'].set_studio_mode_enabled(False) ctx.client.set_studio_mode_enabled(False)
console.out.print('Studio mode has been disabled.') console.out.print('Studio mode has been disabled.')
@app.command('toggle | tg') @app.command(name=['toggle', 'tg'])
def toggle(ctx: typer.Context): def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle studio mode.""" """Toggle studio mode."""
resp = ctx.obj['obsws'].get_studio_mode_enabled() resp = ctx.client.get_studio_mode_enabled()
if resp.studio_mode_enabled: if resp.studio_mode_enabled:
ctx.obj['obsws'].set_studio_mode_enabled(False) ctx.client.set_studio_mode_enabled(False)
console.out.print('Studio mode is now disabled.') console.out.print('Studio mode is now disabled.')
else: else:
ctx.obj['obsws'].set_studio_mode_enabled(True) ctx.client.set_studio_mode_enabled(True)
console.out.print('Studio mode is now enabled.') console.out.print('Studio mode is now enabled.')
@app.command('status | ss') @app.command(name=['status', 'ss'])
def status(ctx: typer.Context): def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of studio mode.""" """Get the status of studio mode."""
resp = ctx.obj['obsws'].get_studio_mode_enabled() resp = ctx.client.get_studio_mode_enabled()
if resp.studio_mode_enabled: if resp.studio_mode_enabled:
console.out.print('Studio mode is enabled.') console.out.print('Studio mode is enabled.')
else: else:

View File

@ -2,35 +2,34 @@
from typing import Annotated, Optional from typing import Annotated, Optional
import typer from cyclopts import App, Argument, Parameter
from . import console, validate from . import console, validate
from .alias import SubTyperAliasGroup from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='text', help='Commands for controlling text inputs in OBS.')
@app.callback() @app.command(name=['current', 'get'])
def main():
"""Control text inputs in OBS."""
@app.command('current | get')
def current( def current(
ctx: typer.Context, input_name: Annotated[str, Argument(hint='Name of the text input to get.')],
input_name: Annotated[str, typer.Argument(help='Name of the text input to get.')], *,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Get the current text for a text input.""" """Get the current text for a text input."""
if not validate.input_in_inputs(ctx, input_name): if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.') raise OBSWSCLIError(
raise typer.Exit(1) f'Input [yellow]{input_name}[/yellow] not found.', code=ExitCode.ERROR
)
resp = ctx.obj['obsws'].get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'): resp = ctx.client.get_input_settings(name=input_name)
console.err.print( if not resp.input_kind.startswith('text_'):
f'Input [yellow]{input_name}[/yellow] is not a text input.', raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] is not a text input.',
code=ExitCode.ERROR,
) )
raise typer.Exit(1)
current_text = resp.input_settings.get('text', '') current_text = resp.input_settings.get('text', '')
if not current_text: if not current_text:
@ -40,32 +39,31 @@ def current(
) )
@app.command('update | set') @app.command(name=['update', 'set'])
def update( def update(
ctx: typer.Context, input_name: Annotated[str, Argument(hint='Name of the text input to update.')],
input_name: Annotated[
str, typer.Argument(help='Name of the text input to update.')
],
new_text: Annotated[ new_text: Annotated[
Optional[str], Optional[str],
typer.Argument( Argument(hint='The new text to set for the input.'),
help='The new text to set for the input.',
),
] = None, ] = None,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Update the text of a text input.""" """Update the text of a text input."""
if not validate.input_in_inputs(ctx, input_name): if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.') raise OBSWSCLIError(
raise typer.Exit(1) f'Input [yellow]{input_name}[/yellow] not found.', code=ExitCode.ERROR
resp = ctx.obj['obsws'].get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
console.err.print(
f'Input [yellow]{input_name}[/yellow] is not a text input.',
) )
raise typer.Exit(1)
ctx.obj['obsws'].set_input_settings( resp = ctx.client.get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] is not a text input.',
code=ExitCode.ERROR,
)
ctx.client.set_input_settings(
name=input_name, name=input_name,
settings={'text': new_text}, settings={'text': new_text},
overlay=True, overlay=True,

View File

@ -1,12 +1,7 @@
"""module containing validation functions.""" """module containing validation functions."""
import typer
from .context import Context from .context import Context
# type alias for an option that is skipped when the command is run
skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=False)
def input_in_inputs(ctx: Context, input_name: str) -> bool: def input_in_inputs(ctx: Context, input_name: str) -> bool:
"""Check if an input is in the input list.""" """Check if an input is in the input list."""

View File

@ -1,46 +1,51 @@
"""module containing commands for manipulating virtual camera in OBS.""" """module containing commands for manipulating virtual camera in OBS."""
import typer from typing import Annotated
from cyclopts import App, Parameter
from . import console from . import console
from .alias import SubTyperAliasGroup from .context import Context
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='virtualcam', help='Commands for controlling the virtual camera in OBS.')
@app.callback() @app.command(name=['start', 's'])
def main(): def start(
"""Control virtual camera in OBS.""" ctx: Annotated[Context, Parameter(parse=False)],
):
@app.command('start | s')
def start(ctx: typer.Context):
"""Start the virtual camera.""" """Start the virtual camera."""
ctx.obj['obsws'].start_virtual_cam() ctx.client.start_virtual_cam()
console.out.print('Virtual camera started.') console.out.print('Virtual camera started.')
@app.command('stop | p') @app.command(name=['stop', 'p'])
def stop(ctx: typer.Context): def stop(
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop the virtual camera.""" """Stop the virtual camera."""
ctx.obj['obsws'].stop_virtual_cam() ctx.client.stop_virtual_cam()
console.out.print('Virtual camera stopped.') console.out.print('Virtual camera stopped.')
@app.command('toggle | tg') @app.command(name=['toggle', 'tg'])
def toggle(ctx: typer.Context): def toggle(
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle the virtual camera.""" """Toggle the virtual camera."""
resp = ctx.obj['obsws'].toggle_virtual_cam() resp = ctx.client.toggle_virtual_cam()
if resp.output_active: if resp.output_active:
console.out.print('Virtual camera is enabled.') console.out.print('Virtual camera is enabled.')
else: else:
console.out.print('Virtual camera is disabled.') console.out.print('Virtual camera is disabled.')
@app.command('status | ss') @app.command(name=['status', 'ss'])
def status(ctx: typer.Context): def status(
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of the virtual camera.""" """Get the status of the virtual camera."""
resp = ctx.obj['obsws'].get_virtual_cam_status() resp = ctx.client.get_virtual_cam_status()
if resp.output_active: if resp.output_active:
console.out.print('Virtual camera is enabled.') console.out.print('Virtual camera is enabled.')
else: else:

View File

@ -21,12 +21,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy", "Programming Language :: Python :: Implementation :: PyPy",
] ]
dependencies = [ dependencies = ["cyclopts>=3.22.2", "obsws-python>=1.8.0"]
"cyclopts>=3.22.2",
"typer>=0.16.0",
"obsws-python>=1.8.0",
"python-dotenv>=1.1.0",
]
[project.urls] [project.urls]