mirror of
https://github.com/onyx-and-iris/obsws-cli.git
synced 2026-01-11 08:27:50 +00:00
457 lines
12 KiB
Python
457 lines
12 KiB
Python
"""module containing commands for manipulating inputs."""
|
|
|
|
from typing import Annotated
|
|
|
|
import obsws_python as obsws
|
|
import typer
|
|
from rich.table import Table
|
|
from rich.text import Text
|
|
|
|
from . import console, util, validate
|
|
from .alias import SubTyperAliasGroup
|
|
|
|
app = typer.Typer(cls=SubTyperAliasGroup)
|
|
|
|
|
|
@app.callback()
|
|
def main():
|
|
"""Control inputs in OBS."""
|
|
|
|
|
|
@app.command('create | add')
|
|
def create(
|
|
ctx: typer.Context,
|
|
input_name: Annotated[
|
|
str,
|
|
typer.Argument(
|
|
...,
|
|
show_default=False,
|
|
help='Name of the input to create.',
|
|
callback=validate.input_not_in_inputs,
|
|
),
|
|
],
|
|
input_kind: Annotated[
|
|
str,
|
|
typer.Argument(
|
|
...,
|
|
show_default=False,
|
|
help='Kind of the input to create.',
|
|
callback=validate.kind_in_input_kinds,
|
|
),
|
|
],
|
|
):
|
|
"""Create a new input."""
|
|
current_scene = (
|
|
ctx.obj['obsws'].get_current_program_scene().current_program_scene_name
|
|
)
|
|
try:
|
|
ctx.obj['obsws'].create_input(
|
|
inputName=input_name,
|
|
inputKind=input_kind,
|
|
sceneItemEnabled=True,
|
|
sceneName=current_scene,
|
|
inputSettings={},
|
|
)
|
|
except obsws.error.OBSSDKRequestError as e:
|
|
console.err.print(f'Failed to create input: [yellow]{e}[/yellow]')
|
|
raise typer.Exit(1)
|
|
|
|
console.out.print(
|
|
f'Input {console.highlight(ctx, input_name)} of kind '
|
|
f'{console.highlight(ctx, input_kind)} created.',
|
|
)
|
|
|
|
|
|
@app.command('remove | rm')
|
|
def remove(
|
|
ctx: typer.Context,
|
|
input_name: Annotated[
|
|
str,
|
|
typer.Argument(
|
|
...,
|
|
show_default=False,
|
|
help='Name of the input to remove.',
|
|
callback=validate.input_in_inputs,
|
|
),
|
|
],
|
|
):
|
|
"""Remove an input."""
|
|
ctx.obj['obsws'].remove_input(name=input_name)
|
|
|
|
console.out.print(f'Input {console.highlight(ctx, input_name)} removed.')
|
|
|
|
|
|
@app.command('list | ls')
|
|
def list_(
|
|
ctx: typer.Context,
|
|
input: Annotated[bool, typer.Option(help='Filter by input type.')] = False,
|
|
output: Annotated[bool, typer.Option(help='Filter by output type.')] = False,
|
|
colour: Annotated[bool, typer.Option(help='Filter by colour source type.')] = False,
|
|
ffmpeg: Annotated[bool, typer.Option(help='Filter by ffmpeg source type.')] = False,
|
|
vlc: Annotated[bool, typer.Option(help='Filter by VLC source type.')] = False,
|
|
uuid: Annotated[bool, typer.Option(help='Show UUIDs of inputs.')] = False,
|
|
):
|
|
"""List all inputs."""
|
|
resp = ctx.obj['obsws'].get_input_list()
|
|
|
|
kinds = []
|
|
if input:
|
|
kinds.append('input')
|
|
if output:
|
|
kinds.append('output')
|
|
if colour:
|
|
kinds.append('color')
|
|
if ffmpeg:
|
|
kinds.append('ffmpeg')
|
|
if vlc:
|
|
kinds.append('vlc')
|
|
if not any([input, output, colour, ffmpeg, vlc]):
|
|
kinds = ctx.obj['obsws'].get_input_kind_list(False).input_kinds
|
|
|
|
inputs = sorted(
|
|
(
|
|
(input_.get('inputName'), input_.get('inputKind'), input_.get('inputUuid'))
|
|
for input_ in filter(
|
|
lambda input_: any(kind in input_.get('inputKind') for kind in kinds),
|
|
resp.inputs,
|
|
)
|
|
),
|
|
key=lambda x: x[0], # Sort by input name
|
|
)
|
|
|
|
if not inputs:
|
|
console.out.print('No inputs found.')
|
|
raise typer.Exit()
|
|
|
|
table = Table(title='Inputs', padding=(0, 2), border_style=ctx.obj['style'].border)
|
|
if uuid:
|
|
columns = [
|
|
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
|
|
(Text('Kind', justify='center'), 'center', ctx.obj['style'].column),
|
|
(Text('Muted', justify='center'), 'center', None),
|
|
(Text('UUID', justify='center'), 'left', ctx.obj['style'].column),
|
|
]
|
|
else:
|
|
columns = [
|
|
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
|
|
(Text('Kind', justify='center'), 'center', ctx.obj['style'].column),
|
|
(Text('Muted', justify='center'), 'center', None),
|
|
]
|
|
for heading, justify, style in columns:
|
|
table.add_column(heading, justify=justify, style=style)
|
|
|
|
for input_name, input_kind, input_uuid in inputs:
|
|
input_mark = ''
|
|
try:
|
|
input_muted = ctx.obj['obsws'].get_input_mute(name=input_name).input_muted
|
|
input_mark = util.check_mark(input_muted)
|
|
except obsws.error.OBSSDKRequestError as e:
|
|
if e.code == 604: # Input does not support audio
|
|
input_mark = 'N/A'
|
|
else:
|
|
raise
|
|
|
|
if uuid:
|
|
table.add_row(
|
|
input_name,
|
|
util.snakecase_to_titlecase(input_kind),
|
|
input_mark,
|
|
input_uuid,
|
|
)
|
|
else:
|
|
table.add_row(
|
|
input_name,
|
|
util.snakecase_to_titlecase(input_kind),
|
|
input_mark,
|
|
)
|
|
|
|
console.out.print(table)
|
|
|
|
|
|
@app.command('list-kinds | ls-k')
|
|
def list_kinds(
|
|
ctx: typer.Context,
|
|
):
|
|
"""List all input kinds."""
|
|
resp = ctx.obj['obsws'].get_input_kind_list(False)
|
|
kinds = sorted(resp.input_kinds)
|
|
|
|
if not kinds:
|
|
console.out.print('No input kinds found.')
|
|
raise typer.Exit()
|
|
|
|
table = Table(
|
|
title='Input Kinds', padding=(0, 2), border_style=ctx.obj['style'].border
|
|
)
|
|
table.add_column(
|
|
Text('Input Kind', justify='center'),
|
|
justify='left',
|
|
style=ctx.obj['style'].column,
|
|
)
|
|
|
|
for kind in kinds:
|
|
table.add_row(util.snakecase_to_titlecase(kind))
|
|
|
|
console.out.print(table)
|
|
|
|
|
|
@app.command('mute | m')
|
|
def mute(
|
|
ctx: typer.Context,
|
|
input_name: Annotated[
|
|
str,
|
|
typer.Argument(
|
|
...,
|
|
show_default=False,
|
|
help='Name of the input to mute.',
|
|
callback=validate.input_in_inputs,
|
|
),
|
|
],
|
|
):
|
|
"""Mute an input."""
|
|
ctx.obj['obsws'].set_input_mute(
|
|
name=input_name,
|
|
muted=True,
|
|
)
|
|
|
|
console.out.print(f'Input {console.highlight(ctx, input_name)} muted.')
|
|
|
|
|
|
@app.command('unmute | um')
|
|
def unmute(
|
|
ctx: typer.Context,
|
|
input_name: Annotated[
|
|
str,
|
|
typer.Argument(
|
|
...,
|
|
show_default=False,
|
|
help='Name of the input to unmute.',
|
|
callback=validate.input_in_inputs,
|
|
),
|
|
],
|
|
):
|
|
"""Unmute an input."""
|
|
ctx.obj['obsws'].set_input_mute(
|
|
name=input_name,
|
|
muted=False,
|
|
)
|
|
|
|
console.out.print(f'Input {console.highlight(ctx, input_name)} unmuted.')
|
|
|
|
|
|
@app.command('toggle | tg')
|
|
def toggle(
|
|
ctx: typer.Context,
|
|
input_name: Annotated[
|
|
str,
|
|
typer.Argument(
|
|
...,
|
|
show_default=False,
|
|
help='Name of the input to toggle.',
|
|
callback=validate.input_in_inputs,
|
|
),
|
|
],
|
|
):
|
|
"""Toggle an input."""
|
|
resp = ctx.obj['obsws'].get_input_mute(name=input_name)
|
|
new_state = not resp.input_muted
|
|
|
|
ctx.obj['obsws'].set_input_mute(
|
|
name=input_name,
|
|
muted=new_state,
|
|
)
|
|
|
|
if new_state:
|
|
console.out.print(
|
|
f'Input {console.highlight(ctx, input_name)} muted.',
|
|
)
|
|
else:
|
|
console.out.print(
|
|
f'Input {console.highlight(ctx, input_name)} unmuted.',
|
|
)
|
|
|
|
|
|
@app.command('volume | vol')
|
|
def volume(
|
|
ctx: typer.Context,
|
|
input_name: Annotated[
|
|
str,
|
|
typer.Argument(
|
|
...,
|
|
show_default=False,
|
|
help='Name of the input to set volume for.',
|
|
callback=validate.input_in_inputs,
|
|
),
|
|
],
|
|
volume: Annotated[
|
|
float,
|
|
typer.Argument(
|
|
...,
|
|
show_default=False,
|
|
help='Volume level to set (-90 to 0).',
|
|
min=-90,
|
|
max=0,
|
|
),
|
|
],
|
|
):
|
|
"""Set the volume of an input."""
|
|
ctx.obj['obsws'].set_input_volume(
|
|
name=input_name,
|
|
vol_db=volume,
|
|
)
|
|
|
|
console.out.print(
|
|
f'Input {console.highlight(ctx, input_name)} volume set to {console.highlight(ctx, volume)}.',
|
|
)
|
|
|
|
|
|
@app.command('show | s')
|
|
def show(
|
|
ctx: typer.Context,
|
|
input_name: Annotated[
|
|
str,
|
|
typer.Argument(
|
|
...,
|
|
show_default=False,
|
|
help='Name of the input to show.',
|
|
callback=validate.input_in_inputs,
|
|
),
|
|
],
|
|
verbose: Annotated[
|
|
bool, typer.Option(help='List all available input devices.')
|
|
] = False,
|
|
):
|
|
"""Show information for an input in the current scene."""
|
|
input_list = ctx.obj['obsws'].get_input_list()
|
|
for input_ in input_list.inputs:
|
|
if input_['inputName'] == input_name:
|
|
input_kind = input_['inputKind']
|
|
break
|
|
|
|
for prop in ['device', 'device_id']:
|
|
try:
|
|
device_id = (
|
|
ctx.obj['obsws']
|
|
.get_input_settings(
|
|
name=input_name,
|
|
)
|
|
.input_settings.get(prop)
|
|
)
|
|
if device_id:
|
|
break
|
|
except obsws.error.OBSSDKRequestError:
|
|
continue
|
|
else:
|
|
device_id = '(N/A)'
|
|
|
|
for device in (
|
|
ctx.obj['obsws']
|
|
.get_input_properties_list_property_items(
|
|
input_name=input_name,
|
|
prop_name=prop,
|
|
)
|
|
.property_items
|
|
):
|
|
if device.get('itemValue') == device_id:
|
|
device_id = device.get('itemName')
|
|
break
|
|
|
|
table = Table(
|
|
title='Input Information', padding=(0, 2), border_style=ctx.obj['style'].border
|
|
)
|
|
columns = [
|
|
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
|
|
(Text('Kind', justify='center'), 'left', ctx.obj['style'].column),
|
|
(Text('Device', justify='center'), 'left', ctx.obj['style'].column),
|
|
]
|
|
for heading, justify, style in columns:
|
|
table.add_column(heading, justify=justify, style=style)
|
|
table.add_row(
|
|
input_name,
|
|
util.snakecase_to_titlecase(input_kind),
|
|
device_id,
|
|
)
|
|
|
|
console.out.print(table)
|
|
|
|
if verbose:
|
|
resp = ctx.obj['obsws'].get_input_properties_list_property_items(
|
|
input_name=input_name,
|
|
prop_name=prop,
|
|
)
|
|
table = Table(
|
|
title='Devices',
|
|
padding=(0, 2),
|
|
border_style=ctx.obj['style'].border,
|
|
)
|
|
columns = [
|
|
(Text('Name', justify='center'), 'left', ctx.obj['style'].column),
|
|
]
|
|
for heading, justify, style in columns:
|
|
table.add_column(heading, justify=justify, style=style)
|
|
for i, item in enumerate(resp.property_items):
|
|
table.add_row(
|
|
item.get('itemName'),
|
|
style='' if i % 2 == 0 else 'dim',
|
|
)
|
|
|
|
console.out.print(table)
|
|
|
|
|
|
@app.command('update | upd')
|
|
def update(
|
|
ctx: typer.Context,
|
|
input_name: Annotated[
|
|
str,
|
|
typer.Argument(
|
|
...,
|
|
show_default=False,
|
|
help='Name of the input to update.',
|
|
callback=validate.input_in_inputs,
|
|
),
|
|
],
|
|
device_name: Annotated[
|
|
str,
|
|
typer.Argument(
|
|
...,
|
|
show_default=False,
|
|
help='Name of the device to set for the input.',
|
|
),
|
|
],
|
|
):
|
|
"""Update a setting for an input."""
|
|
device_id = None
|
|
for prop in ['device', 'device_id']:
|
|
try:
|
|
for device in (
|
|
ctx.obj['obsws']
|
|
.get_input_properties_list_property_items(
|
|
input_name=input_name,
|
|
prop_name=prop,
|
|
)
|
|
.property_items
|
|
):
|
|
if device.get('itemName') == device_name:
|
|
device_id = device.get('itemValue')
|
|
break
|
|
except obsws.error.OBSSDKRequestError:
|
|
continue
|
|
if device_id:
|
|
break
|
|
|
|
if not device_id:
|
|
console.err.print(
|
|
f'Failed to find device ID for device '
|
|
f'{console.highlight(ctx, device_name)}.',
|
|
)
|
|
raise typer.Exit(1)
|
|
|
|
ctx.obj['obsws'].set_input_settings(
|
|
name=input_name, settings={prop: device_id}, overlay=True
|
|
)
|
|
|
|
console.out.print(
|
|
f'Input {console.highlight(ctx, input_name)} updated to use device '
|
|
f'{console.highlight(ctx, device_name)}.',
|
|
)
|