mirror of
https://github.com/onyx-and-iris/obsws-cli.git
synced 2026-01-11 16:37:49 +00:00
extend input command group
This commit is contained in:
parent
7a73ec35f6
commit
dceafba065
@ -18,6 +18,69 @@ def main():
|
||||
"""Control inputs in OBS."""
|
||||
|
||||
|
||||
@app.command('create | add')
|
||||
def create(
|
||||
ctx: typer.Context,
|
||||
input_name: Annotated[
|
||||
str,
|
||||
typer.Argument(
|
||||
...,
|
||||
show_default=False,
|
||||
help='Name of the input to create.',
|
||||
callback=validate.input_not_in_inputs,
|
||||
),
|
||||
],
|
||||
input_kind: Annotated[
|
||||
str,
|
||||
typer.Argument(
|
||||
...,
|
||||
show_default=False,
|
||||
help='Kind of the input to create.',
|
||||
callback=validate.kind_in_input_kinds,
|
||||
),
|
||||
],
|
||||
):
|
||||
"""Create a new input."""
|
||||
current_scene = (
|
||||
ctx.obj['obsws'].get_current_program_scene().current_program_scene_name
|
||||
)
|
||||
try:
|
||||
ctx.obj['obsws'].create_input(
|
||||
inputName=input_name,
|
||||
inputKind=input_kind,
|
||||
sceneItemEnabled=True,
|
||||
sceneName=current_scene,
|
||||
inputSettings={},
|
||||
)
|
||||
except obsws.error.OBSSDKRequestError as e:
|
||||
console.err.print(f'Failed to create input: [yellow]{e}[/yellow]')
|
||||
raise typer.Exit(1)
|
||||
|
||||
console.out.print(
|
||||
f'Input {console.highlight(ctx, input_name)} of kind '
|
||||
f'{console.highlight(ctx, input_kind)} created.',
|
||||
)
|
||||
|
||||
|
||||
@app.command('remove | rm')
|
||||
def remove(
|
||||
ctx: typer.Context,
|
||||
input_name: Annotated[
|
||||
str,
|
||||
typer.Argument(
|
||||
...,
|
||||
show_default=False,
|
||||
help='Name of the input to remove.',
|
||||
callback=validate.input_in_inputs,
|
||||
),
|
||||
],
|
||||
):
|
||||
"""Remove an input."""
|
||||
ctx.obj['obsws'].remove_input(name=input_name)
|
||||
|
||||
console.out.print(f'Input {console.highlight(ctx, input_name)} removed.')
|
||||
|
||||
|
||||
@app.command('list | ls')
|
||||
def list_(
|
||||
ctx: typer.Context,
|
||||
@ -105,18 +168,47 @@ def list_(
|
||||
console.out.print(table)
|
||||
|
||||
|
||||
@app.command('list-kinds | ls-k')
|
||||
def list_kinds(
|
||||
ctx: typer.Context,
|
||||
):
|
||||
"""List all input kinds."""
|
||||
resp = ctx.obj['obsws'].get_input_kind_list(False)
|
||||
kinds = sorted(resp.input_kinds)
|
||||
|
||||
if not kinds:
|
||||
console.out.print('No input kinds found.')
|
||||
raise typer.Exit()
|
||||
|
||||
table = Table(
|
||||
title='Input Kinds', padding=(0, 2), border_style=ctx.obj['style'].border
|
||||
)
|
||||
table.add_column(
|
||||
Text('Input Kind', justify='center'),
|
||||
justify='left',
|
||||
style=ctx.obj['style'].column,
|
||||
)
|
||||
|
||||
for kind in kinds:
|
||||
table.add_row(util.snakecase_to_titlecase(kind))
|
||||
|
||||
console.out.print(table)
|
||||
|
||||
|
||||
@app.command('mute | m')
|
||||
def mute(
|
||||
ctx: typer.Context,
|
||||
input_name: Annotated[
|
||||
str, typer.Argument(..., show_default=False, help='Name of the input to mute.')
|
||||
str,
|
||||
typer.Argument(
|
||||
...,
|
||||
show_default=False,
|
||||
help='Name of the input to mute.',
|
||||
callback=validate.input_in_inputs,
|
||||
),
|
||||
],
|
||||
):
|
||||
"""Mute an input."""
|
||||
if not validate.input_in_inputs(ctx, input_name):
|
||||
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
|
||||
raise typer.Exit(1)
|
||||
|
||||
ctx.obj['obsws'].set_input_mute(
|
||||
name=input_name,
|
||||
muted=True,
|
||||
@ -130,14 +222,15 @@ def unmute(
|
||||
ctx: typer.Context,
|
||||
input_name: Annotated[
|
||||
str,
|
||||
typer.Argument(..., show_default=False, help='Name of the input to unmute.'),
|
||||
typer.Argument(
|
||||
...,
|
||||
show_default=False,
|
||||
help='Name of the input to unmute.',
|
||||
callback=validate.input_in_inputs,
|
||||
),
|
||||
],
|
||||
):
|
||||
"""Unmute an input."""
|
||||
if not validate.input_in_inputs(ctx, input_name):
|
||||
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
|
||||
raise typer.Exit(1)
|
||||
|
||||
ctx.obj['obsws'].set_input_mute(
|
||||
name=input_name,
|
||||
muted=False,
|
||||
@ -151,14 +244,15 @@ def toggle(
|
||||
ctx: typer.Context,
|
||||
input_name: Annotated[
|
||||
str,
|
||||
typer.Argument(..., show_default=False, help='Name of the input to toggle.'),
|
||||
typer.Argument(
|
||||
...,
|
||||
show_default=False,
|
||||
help='Name of the input to toggle.',
|
||||
callback=validate.input_in_inputs,
|
||||
),
|
||||
],
|
||||
):
|
||||
"""Toggle an input."""
|
||||
if not validate.input_in_inputs(ctx, input_name):
|
||||
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
|
||||
raise typer.Exit(1)
|
||||
|
||||
resp = ctx.obj['obsws'].get_input_mute(name=input_name)
|
||||
new_state = not resp.input_muted
|
||||
|
||||
@ -175,3 +269,188 @@ def toggle(
|
||||
console.out.print(
|
||||
f'Input {console.highlight(ctx, input_name)} unmuted.',
|
||||
)
|
||||
|
||||
|
||||
@app.command('volume | vol')
|
||||
def volume(
|
||||
ctx: typer.Context,
|
||||
input_name: Annotated[
|
||||
str,
|
||||
typer.Argument(
|
||||
...,
|
||||
show_default=False,
|
||||
help='Name of the input to set volume for.',
|
||||
callback=validate.input_in_inputs,
|
||||
),
|
||||
],
|
||||
volume: Annotated[
|
||||
float,
|
||||
typer.Argument(
|
||||
...,
|
||||
show_default=False,
|
||||
help='Volume level to set (-90 to 0).',
|
||||
min=-90,
|
||||
max=0,
|
||||
),
|
||||
],
|
||||
):
|
||||
"""Set the volume of an input."""
|
||||
ctx.obj['obsws'].set_input_volume(
|
||||
name=input_name,
|
||||
vol_db=volume,
|
||||
)
|
||||
|
||||
console.out.print(
|
||||
f'Input {console.highlight(ctx, input_name)} volume set to {console.highlight(ctx, volume)}.',
|
||||
)
|
||||
|
||||
|
||||
@app.command('show | s')
|
||||
def show(
|
||||
ctx: typer.Context,
|
||||
input_name: Annotated[
|
||||
str,
|
||||
typer.Argument(
|
||||
...,
|
||||
show_default=False,
|
||||
help='Name of the input to show.',
|
||||
callback=validate.input_in_inputs,
|
||||
),
|
||||
],
|
||||
verbose: Annotated[
|
||||
bool, typer.Option(help='List all available input devices.')
|
||||
] = False,
|
||||
):
|
||||
"""Show information for an input in the current scene."""
|
||||
input_list = ctx.obj['obsws'].get_input_list()
|
||||
for input_ in input_list.inputs:
|
||||
if input_['inputName'] == input_name:
|
||||
input_kind = input_['inputKind']
|
||||
break
|
||||
|
||||
for prop in ['device', 'device_id']:
|
||||
try:
|
||||
device_id = (
|
||||
ctx.obj['obsws']
|
||||
.get_input_settings(
|
||||
name=input_name,
|
||||
)
|
||||
.input_settings.get(prop)
|
||||
)
|
||||
if device_id:
|
||||
break
|
||||
except obsws.error.OBSSDKRequestError:
|
||||
continue
|
||||
else:
|
||||
device_id = '(N/A)'
|
||||
|
||||
for device in (
|
||||
ctx.obj['obsws']
|
||||
.get_input_properties_list_property_items(
|
||||
input_name=input_name,
|
||||
prop_name=prop,
|
||||
)
|
||||
.property_items
|
||||
):
|
||||
if device.get('itemValue') == device_id:
|
||||
device_id = device.get('itemName')
|
||||
break
|
||||
|
||||
table = Table(
|
||||
title='Input Information', padding=(0, 2), border_style=ctx.obj['style'].border
|
||||
)
|
||||
columns = [
|
||||
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
|
||||
(Text('Kind', justify='center'), 'left', ctx.obj['style'].column),
|
||||
(Text('Device', justify='center'), 'left', ctx.obj['style'].column),
|
||||
]
|
||||
for heading, justify, style in columns:
|
||||
table.add_column(heading, justify=justify, style=style)
|
||||
table.add_row(
|
||||
input_name,
|
||||
util.snakecase_to_titlecase(input_kind),
|
||||
device_id,
|
||||
)
|
||||
|
||||
console.out.print(table)
|
||||
|
||||
if verbose:
|
||||
resp = ctx.obj['obsws'].get_input_properties_list_property_items(
|
||||
input_name=input_name,
|
||||
prop_name=prop,
|
||||
)
|
||||
table = Table(
|
||||
title='Devices',
|
||||
padding=(0, 2),
|
||||
border_style=ctx.obj['style'].border,
|
||||
)
|
||||
columns = [
|
||||
(Text('Name', justify='center'), 'left', ctx.obj['style'].column),
|
||||
]
|
||||
for heading, justify, style in columns:
|
||||
table.add_column(heading, justify=justify, style=style)
|
||||
for i, item in enumerate(resp.property_items):
|
||||
table.add_row(
|
||||
item.get('itemName'),
|
||||
style='' if i % 2 == 0 else 'dim',
|
||||
)
|
||||
|
||||
console.out.print(table)
|
||||
|
||||
|
||||
@app.command('update | upd')
|
||||
def update(
|
||||
ctx: typer.Context,
|
||||
input_name: Annotated[
|
||||
str,
|
||||
typer.Argument(
|
||||
...,
|
||||
show_default=False,
|
||||
help='Name of the input to update.',
|
||||
callback=validate.input_in_inputs,
|
||||
),
|
||||
],
|
||||
device_name: Annotated[
|
||||
str,
|
||||
typer.Argument(
|
||||
...,
|
||||
show_default=False,
|
||||
help='Name of the device to set for the input.',
|
||||
),
|
||||
],
|
||||
):
|
||||
"""Update a setting for an input."""
|
||||
device_id = None
|
||||
for prop in ['device', 'device_id']:
|
||||
try:
|
||||
for device in (
|
||||
ctx.obj['obsws']
|
||||
.get_input_properties_list_property_items(
|
||||
input_name=input_name,
|
||||
prop_name=prop,
|
||||
)
|
||||
.property_items
|
||||
):
|
||||
if device.get('itemName') == device_name:
|
||||
device_id = device.get('itemValue')
|
||||
break
|
||||
except obsws.error.OBSSDKRequestError:
|
||||
continue
|
||||
if device_id:
|
||||
break
|
||||
|
||||
if not device_id:
|
||||
console.err.print(
|
||||
f'Failed to find device ID for device '
|
||||
f'{console.highlight(ctx, device_name)}.',
|
||||
)
|
||||
raise typer.Exit(1)
|
||||
|
||||
ctx.obj['obsws'].set_input_settings(
|
||||
name=input_name, settings={prop: device_id}, overlay=True
|
||||
)
|
||||
|
||||
console.out.print(
|
||||
f'Input {console.highlight(ctx, input_name)} updated to use device '
|
||||
f'{console.highlight(ctx, device_name)}.',
|
||||
)
|
||||
|
||||
@ -18,13 +18,14 @@ def main():
|
||||
@app.command('current | get')
|
||||
def current(
|
||||
ctx: typer.Context,
|
||||
input_name: Annotated[str, typer.Argument(help='Name of the text input to get.')],
|
||||
input_name: Annotated[
|
||||
str,
|
||||
typer.Argument(
|
||||
help='Name of the text input to get.', callback=validate.input_in_inputs
|
||||
),
|
||||
],
|
||||
):
|
||||
"""Get the current text for a text input."""
|
||||
if not validate.input_in_inputs(ctx, input_name):
|
||||
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
|
||||
raise typer.Exit(1)
|
||||
|
||||
resp = ctx.obj['obsws'].get_input_settings(name=input_name)
|
||||
if not resp.input_kind.startswith('text_'):
|
||||
console.err.print(
|
||||
@ -44,7 +45,10 @@ def current(
|
||||
def update(
|
||||
ctx: typer.Context,
|
||||
input_name: Annotated[
|
||||
str, typer.Argument(help='Name of the text input to update.')
|
||||
str,
|
||||
typer.Argument(
|
||||
help='Name of the text input to update.', callback=validate.input_in_inputs
|
||||
),
|
||||
],
|
||||
new_text: Annotated[
|
||||
Optional[str],
|
||||
@ -54,10 +58,6 @@ def update(
|
||||
] = None,
|
||||
):
|
||||
"""Update the text of a text input."""
|
||||
if not validate.input_in_inputs(ctx, input_name):
|
||||
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
|
||||
raise typer.Exit(1)
|
||||
|
||||
resp = ctx.obj['obsws'].get_input_settings(name=input_name)
|
||||
if not resp.input_kind.startswith('text_'):
|
||||
console.err.print(
|
||||
|
||||
@ -2,14 +2,28 @@
|
||||
|
||||
import typer
|
||||
|
||||
from . import console
|
||||
|
||||
# type alias for an option that is skipped when the command is run
|
||||
skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=False)
|
||||
|
||||
|
||||
def input_in_inputs(ctx: typer.Context, input_name: str) -> bool:
|
||||
"""Check if an input is in the input list."""
|
||||
inputs = ctx.obj['obsws'].get_input_list().inputs
|
||||
return any(input_.get('inputName') == input_name for input_ in inputs)
|
||||
"""Ensure the given input exists in the list of inputs."""
|
||||
resp = ctx.obj['obsws'].get_input_list()
|
||||
if not any(input.get('inputName') == input_name for input in resp.inputs):
|
||||
console.err.print(f'Input [yellow]{input_name}[/yellow] does not exist.')
|
||||
raise typer.Exit(1)
|
||||
return input_name
|
||||
|
||||
|
||||
def input_not_in_inputs(ctx: typer.Context, input_name: str) -> bool:
|
||||
"""Ensure an input does not already exist in the list of inputs."""
|
||||
resp = ctx.obj['obsws'].get_input_list()
|
||||
if any(input.get('inputName') == input_name for input in resp.inputs):
|
||||
console.err.print(f'Input [yellow]{input_name}[/yellow] already exists.')
|
||||
raise typer.Exit(1)
|
||||
return input_name
|
||||
|
||||
|
||||
def scene_in_scenes(ctx: typer.Context, scene_name: str) -> bool:
|
||||
@ -52,3 +66,12 @@ def monitor_exists(ctx: typer.Context, monitor_index: int) -> bool:
|
||||
"""Check if a monitor exists."""
|
||||
resp = ctx.obj['obsws'].get_monitor_list()
|
||||
return any(monitor['monitorIndex'] == monitor_index for monitor in resp.monitors)
|
||||
|
||||
|
||||
def kind_in_input_kinds(ctx: typer.Context, input_kind: str) -> str:
|
||||
"""Check if an input kind is valid."""
|
||||
resp = ctx.obj['obsws'].get_input_kind_list(False)
|
||||
if not any(kind == input_kind for kind in resp.input_kinds):
|
||||
console.err.print(f'Input kind [yellow]{input_kind}[/yellow] not found.')
|
||||
raise typer.Exit(1)
|
||||
return input_kind
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user