mirror of
https://github.com/onyx-and-iris/slobs-cli.git
synced 2025-06-27 07:10:24 +01:00
add scenecollection command group
minor bump
This commit is contained in:
parent
519db1b46e
commit
6e95e4d670
@ -1,3 +1,3 @@
|
||||
"""module for package metadata."""
|
||||
|
||||
__version__ = '0.8.4'
|
||||
__version__ = '0.9.0'
|
||||
|
@ -5,7 +5,17 @@ from .cli import cli
|
||||
from .record import record
|
||||
from .replaybuffer import replaybuffer
|
||||
from .scene import scene
|
||||
from .scenecollection import scenecollection
|
||||
from .stream import stream
|
||||
from .studiomode import studiomode
|
||||
|
||||
__all__ = ['cli', 'scene', 'stream', 'record', 'audio', 'replaybuffer', 'studiomode']
|
||||
__all__ = [
|
||||
'cli',
|
||||
'scene',
|
||||
'stream',
|
||||
'record',
|
||||
'audio',
|
||||
'replaybuffer',
|
||||
'studiomode',
|
||||
'scenecollection',
|
||||
]
|
||||
|
@ -105,11 +105,10 @@ async def toggle(ctx: click.Context):
|
||||
model = await ss.get_model()
|
||||
active = model.recording_status != 'offline'
|
||||
|
||||
await ss.toggle_recording()
|
||||
if active:
|
||||
await ss.toggle_recording()
|
||||
click.echo('Recording stopped.')
|
||||
else:
|
||||
await ss.toggle_recording()
|
||||
click.echo('Recording started.')
|
||||
|
||||
conn.close()
|
||||
|
@ -41,7 +41,8 @@ async def list(ctx: click.Context, id: bool = False):
|
||||
to_append = [click.style(scene.name, fg='blue')]
|
||||
if id:
|
||||
to_append.append(scene.id)
|
||||
to_append.append('✅' if scene.id == active_scene.id else '')
|
||||
if scene.id == active_scene.id:
|
||||
to_append.append('✅')
|
||||
|
||||
table_data.append(to_append)
|
||||
|
||||
|
173
src/slobs_cli/scenecollection.py
Normal file
173
src/slobs_cli/scenecollection.py
Normal file
@ -0,0 +1,173 @@
|
||||
"""module for scene collection management in SLOBS CLI."""
|
||||
|
||||
import asyncclick as click
|
||||
from anyio import create_task_group
|
||||
from pyslobs import ISceneCollectionCreateOptions, SceneCollectionsService
|
||||
from terminaltables3 import AsciiTable
|
||||
|
||||
from .cli import cli
|
||||
from .errors import SlobsCliError
|
||||
|
||||
|
||||
@cli.group()
|
||||
def scenecollection():
|
||||
"""Manage scene collections in Slobs CLI."""
|
||||
|
||||
|
||||
@scenecollection.command()
|
||||
@click.option('--id', is_flag=True, help='Include scene collection IDs in the output.')
|
||||
@click.pass_context
|
||||
async def list(ctx: click.Context, id: bool):
|
||||
"""List all scene collections."""
|
||||
conn = ctx.obj['connection']
|
||||
scs = SceneCollectionsService(conn)
|
||||
|
||||
async def _run():
|
||||
collections = await scs.collections()
|
||||
if not collections:
|
||||
click.echo('No scene collections found.')
|
||||
conn.close()
|
||||
return
|
||||
|
||||
active_collection = await scs.active_collection()
|
||||
|
||||
table_data = [
|
||||
['Scene Collection Name', 'ID', 'Active']
|
||||
if id
|
||||
else ['Scene Collection Name', 'Active']
|
||||
]
|
||||
for collection in collections:
|
||||
if collection.id == active_collection.id:
|
||||
to_append = [click.style(collection.name, fg='green')]
|
||||
else:
|
||||
to_append = [click.style(collection.name, fg='blue')]
|
||||
if id:
|
||||
to_append.append(collection.id)
|
||||
if collection.id == active_collection.id:
|
||||
to_append.append('✅')
|
||||
table_data.append(to_append)
|
||||
|
||||
table = AsciiTable(table_data)
|
||||
table.justify_columns = {
|
||||
0: 'left',
|
||||
1: 'left' if id else 'center',
|
||||
2: 'center' if id else None,
|
||||
}
|
||||
click.echo(table.table)
|
||||
|
||||
conn.close()
|
||||
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
|
||||
|
||||
@scenecollection.command()
|
||||
@click.argument('scenecollection_name', required=True)
|
||||
@click.pass_context
|
||||
async def load(ctx: click.Context, scenecollection_name: str):
|
||||
"""Load a scene collection by name."""
|
||||
conn = ctx.obj['connection']
|
||||
scs = SceneCollectionsService(conn)
|
||||
|
||||
async def _run():
|
||||
collections = await scs.collections()
|
||||
for collection in collections:
|
||||
if collection.name == scenecollection_name:
|
||||
break
|
||||
else:
|
||||
conn.close()
|
||||
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
|
||||
|
||||
await scs.load(collection.id)
|
||||
click.echo(f'Scene collection "{scenecollection_name}" loaded successfully.')
|
||||
conn.close()
|
||||
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
||||
|
||||
|
||||
@scenecollection.command()
|
||||
@click.argument('scenecollection_name', required=True)
|
||||
@click.pass_context
|
||||
async def create(ctx: click.Context, scenecollection_name: str):
|
||||
"""Create a new scene collection."""
|
||||
conn = ctx.obj['connection']
|
||||
scs = SceneCollectionsService(conn)
|
||||
|
||||
async def _run():
|
||||
await scs.create(ISceneCollectionCreateOptions(scenecollection_name))
|
||||
click.echo(f'Scene collection "{scenecollection_name}" created successfully.')
|
||||
conn.close()
|
||||
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
|
||||
|
||||
@scenecollection.command()
|
||||
@click.argument('scenecollection_name', required=True)
|
||||
@click.pass_context
|
||||
async def delete(ctx: click.Context, scenecollection_name: str):
|
||||
"""Delete a scene collection by name."""
|
||||
conn = ctx.obj['connection']
|
||||
scs = SceneCollectionsService(conn)
|
||||
|
||||
async def _run():
|
||||
collections = await scs.collections()
|
||||
for collection in collections:
|
||||
if collection.name == scenecollection_name:
|
||||
break
|
||||
else:
|
||||
conn.close()
|
||||
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
|
||||
|
||||
await scs.delete(collection.id)
|
||||
click.echo(f'Scene collection "{scenecollection_name}" deleted successfully.')
|
||||
conn.close()
|
||||
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
||||
|
||||
|
||||
@scenecollection.command()
|
||||
@click.argument('scenecollection_name', required=True)
|
||||
@click.argument('new_name', required=True)
|
||||
@click.pass_context
|
||||
async def rename(ctx: click.Context, scenecollection_name: str, new_name: str):
|
||||
"""Rename a scene collection."""
|
||||
conn = ctx.obj['connection']
|
||||
scs = SceneCollectionsService(conn)
|
||||
|
||||
async def _run():
|
||||
collections = await scs.collections()
|
||||
for collection in collections:
|
||||
if collection.name == scenecollection_name:
|
||||
break
|
||||
else:
|
||||
conn.close()
|
||||
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
|
||||
|
||||
await scs.rename(new_name, collection.id)
|
||||
click.echo(
|
||||
f'Scene collection "{scenecollection_name}" renamed to "{new_name}".'
|
||||
)
|
||||
conn.close()
|
||||
|
||||
try:
|
||||
async with create_task_group() as tg:
|
||||
tg.start_soon(conn.background_processing)
|
||||
tg.start_soon(_run)
|
||||
except* SlobsCliError as excgroup:
|
||||
for e in excgroup.exceptions:
|
||||
raise e
|
Loading…
x
Reference in New Issue
Block a user