mirror of
https://github.com/onyx-and-iris/obsws-cli.git
synced 2025-08-05 11:31:44 +00:00
convert stream commands
This commit is contained in:
parent
e77627b845
commit
bb7a468dd5
@ -35,6 +35,7 @@ for sub_app in (
|
||||
'scenecollection',
|
||||
'sceneitem',
|
||||
'screenshot',
|
||||
'stream',
|
||||
):
|
||||
module = importlib.import_module(f'.{sub_app}', package=__package__)
|
||||
app.command(module.app)
|
||||
|
@ -1,60 +1,75 @@
|
||||
"""module for controlling OBS stream functionality."""
|
||||
|
||||
import typer
|
||||
from typing import Annotated
|
||||
|
||||
from cyclopts import App, Parameter
|
||||
|
||||
from . import console
|
||||
from .alias import SubTyperAliasGroup
|
||||
from .context import Context
|
||||
from .enum import ExitCode
|
||||
from .error import OBSWSCLIError
|
||||
|
||||
app = typer.Typer(cls=SubTyperAliasGroup)
|
||||
app = App(name='stream', help='Commands for controlling OBS stream functionality.')
|
||||
|
||||
|
||||
@app.callback()
|
||||
def main():
|
||||
"""Control OBS stream functionality."""
|
||||
|
||||
|
||||
def _get_streaming_status(ctx: typer.Context) -> tuple:
|
||||
def _get_streaming_status(ctx: Context) -> tuple:
|
||||
"""Get streaming status."""
|
||||
resp = ctx.obj['obsws'].get_stream_status()
|
||||
resp = ctx.client.get_stream_status()
|
||||
return resp.output_active, resp.output_duration
|
||||
|
||||
|
||||
@app.command('start | s')
|
||||
def start(ctx: typer.Context):
|
||||
@app.command(name=['start', 's'])
|
||||
def start(
|
||||
*,
|
||||
ctx: Annotated[Context, Parameter(parse=False)],
|
||||
):
|
||||
"""Start streaming."""
|
||||
active, _ = _get_streaming_status(ctx)
|
||||
if active:
|
||||
console.err.print('Streaming is already in progress, cannot start.')
|
||||
raise typer.Exit(1)
|
||||
raise OBSWSCLIError(
|
||||
'Streaming is already in progress, cannot start.',
|
||||
code=ExitCode.ERROR,
|
||||
)
|
||||
|
||||
ctx.obj['obsws'].start_stream()
|
||||
ctx.client.start_stream()
|
||||
console.out.print('Streaming started successfully.')
|
||||
|
||||
|
||||
@app.command('stop | st')
|
||||
def stop(ctx: typer.Context):
|
||||
@app.command(name=['stop', 'st'])
|
||||
def stop(
|
||||
*,
|
||||
ctx: Annotated[Context, Parameter(parse=False)],
|
||||
):
|
||||
"""Stop streaming."""
|
||||
active, _ = _get_streaming_status(ctx)
|
||||
if not active:
|
||||
console.err.print('Streaming is not in progress, cannot stop.')
|
||||
raise typer.Exit(1)
|
||||
raise OBSWSCLIError(
|
||||
'Streaming is not in progress, cannot stop.',
|
||||
code=ExitCode.ERROR,
|
||||
)
|
||||
|
||||
ctx.obj['obsws'].stop_stream()
|
||||
ctx.client.stop_stream()
|
||||
console.out.print('Streaming stopped successfully.')
|
||||
|
||||
|
||||
@app.command('toggle | tg')
|
||||
def toggle(ctx: typer.Context):
|
||||
@app.command(name=['toggle', 'tg'])
|
||||
def toggle(
|
||||
*,
|
||||
ctx: Annotated[Context, Parameter(parse=False)],
|
||||
):
|
||||
"""Toggle streaming."""
|
||||
resp = ctx.obj['obsws'].toggle_stream()
|
||||
resp = ctx.client.toggle_stream()
|
||||
if resp.output_active:
|
||||
console.out.print('Streaming started successfully.')
|
||||
else:
|
||||
console.out.print('Streaming stopped successfully.')
|
||||
|
||||
|
||||
@app.command('status | ss')
|
||||
def status(ctx: typer.Context):
|
||||
@app.command(name=['status', 'ss'])
|
||||
def status(
|
||||
*,
|
||||
ctx: Annotated[Context, Parameter(parse=False)],
|
||||
):
|
||||
"""Get streaming status."""
|
||||
active, duration = _get_streaming_status(ctx)
|
||||
if active:
|
||||
|
Loading…
x
Reference in New Issue
Block a user