replace terminaltables with rich tables.

allow rich to handle all console output.

util.check_mark is now used to pass back colourless check/cross marks if NO_COLOR is set or --style/SLOBS_STYLE was not set.
This commit is contained in:
onyx-and-iris 2025-06-22 02:52:27 +01:00
parent 6bcdd8391c
commit c02ffac403
10 changed files with 209 additions and 118 deletions

View File

@ -3,8 +3,10 @@
import asyncclick as click import asyncclick as click
from anyio import create_task_group from anyio import create_task_group
from pyslobs import AudioService from pyslobs import AudioService
from terminaltables3 import AsciiTable from rich.table import Table
from rich.text import Text
from . import console, util
from .cli import cli from .cli import cli
from .errors import SlobsCliError from .errors import SlobsCliError
@ -25,32 +27,40 @@ async def list(ctx: click.Context, id: bool = False):
async def _run(): async def _run():
sources = await as_.get_sources() sources = await as_.get_sources()
if not sources: if not sources:
click.echo('No audio sources found.') console.out.print('No audio sources found.')
conn.close() conn.close()
return return
table_data = [ style = ctx.obj['style']
['Audio Device Name', 'ID', 'Muted'] table = Table(
if id show_header=True, header_style=style.header, border_style=style.border
else ['Audio Device Name', 'Muted'] )
if id:
columns = [
('Audio Source Name', 'left'),
('Muted', 'center'),
('ID', 'left'),
] ]
else:
columns = [
('Audio Source Name', 'left'),
('Muted', 'center'),
]
for col_name, col_justify in columns:
table.add_column(Text(col_name, justify='center'), justify=col_justify)
for source in sources: for source in sources:
model = await source.get_model() model = await source.get_model()
to_append = [click.style(model.name, fg='blue')] to_append = [Text(model.name, style=style.cell)]
to_append.append(util.check_mark(ctx, model.muted))
if id: if id:
to_append.append(model.source_id) to_append.append(Text(model.source_id, style=style.cell))
to_append.append('' if model.muted else '')
table_data.append(to_append) table.add_row(*to_append)
table = AsciiTable(table_data) console.out.print(table)
table.justify_columns = {
0: 'left',
1: 'left' if id else 'center',
2: 'center' if id else None,
}
click.echo(table.table)
conn.close() conn.close()
@ -78,7 +88,7 @@ async def mute(ctx: click.Context, source_name: str):
raise SlobsCliError(f'Audio source "{source_name}" not found.') raise SlobsCliError(f'Audio source "{source_name}" not found.')
await source.set_muted(True) await source.set_muted(True)
click.echo(f'{source_name} muted successfully.') console.out.print(f'{console.highlight(ctx, source_name)} muted successfully.')
conn.close() conn.close()
try: try:
@ -109,7 +119,9 @@ async def unmute(ctx: click.Context, source_name: str):
raise SlobsCliError(f'Audio source "{source_name}" not found.') raise SlobsCliError(f'Audio source "{source_name}" not found.')
await source.set_muted(False) await source.set_muted(False)
click.echo(f'{source_name} unmuted successfully.') console.out.print(
f'{console.highlight(ctx, source_name)} unmuted successfully.'
)
conn.close() conn.close()
try: try:
@ -136,10 +148,14 @@ async def toggle(ctx: click.Context, source_name: str):
if model.name.lower() == source_name.lower(): if model.name.lower() == source_name.lower():
if model.muted: if model.muted:
await source.set_muted(False) await source.set_muted(False)
click.echo(f'{source_name} unmuted successfully.') console.out.print(
f'{console.highlight(ctx, source_name)} unmuted successfully.'
)
else: else:
await source.set_muted(True) await source.set_muted(True)
click.echo(f'{source_name} muted successfully.') console.out.print(
f'{console.highlight(ctx, source_name)} muted successfully.'
)
conn.close() conn.close()
break break
else: # If no source by the given name was found else: # If no source by the given name was found
@ -168,8 +184,8 @@ async def status(ctx: click.Context, source_name: str):
for source in sources: for source in sources:
model = await source.get_model() model = await source.get_model()
if model.name.lower() == source_name.lower(): if model.name.lower() == source_name.lower():
click.echo( console.out.print(
f'"{source_name}" is {"muted" if model.muted else "unmuted"}.' f'{console.highlight(ctx, source_name)} is {"muted" if model.muted else "unmuted"}.'
) )
conn.close() conn.close()
return return

21
src/slobs_cli/console.py Normal file
View File

@ -0,0 +1,21 @@
"""module for console output handling."""
import asyncclick as click
from rich.console import Console
out = Console()
err = Console(stderr=True, style='bold red')
def highlight(ctx: click.Context, text: str) -> str:
"""Highlight text for console output."""
if ctx.obj['style'].name == 'no_colour':
return text
return f'[{ctx.obj["style"].highlight}]{text}[/{ctx.obj["style"].highlight}]'
def warning(ctx: click.Context, text: str) -> str:
"""Format warning text for console output."""
if ctx.obj['style'].name == 'no_colour':
return text
return f'[magenta]{text}[/magenta]'

View File

@ -4,6 +4,8 @@ import json
import asyncclick as click import asyncclick as click
from . import console
class SlobsCliError(click.ClickException): class SlobsCliError(click.ClickException):
"""Base class for all Slobs CLI errors.""" """Base class for all Slobs CLI errors."""
@ -14,8 +16,8 @@ class SlobsCliError(click.ClickException):
self.exit_code = 1 self.exit_code = 1
def show(self): def show(self):
"""Display the error message in red.""" """Display the error message in red and write to stderr."""
click.secho(f'Error: {self.message}', fg='red', err=True) console.err.print(f'Error: {self.message}')
class SlobsCliProtocolError(SlobsCliError): class SlobsCliProtocolError(SlobsCliError):
@ -36,10 +38,8 @@ class SlobsCliProtocolError(SlobsCliError):
"""Display the protocol error message in red.""" """Display the protocol error message in red."""
match self.protocol_code: match self.protocol_code:
case -32600: case -32600:
click.secho( console.err.print(
'Oops! Looks like we hit a rate limit for this command. Please try again later.', 'Oops! Looks like we hit a rate limit for this command. Please try again later.'
fg='red',
err=True,
) )
case _: case _:
# Fall back to the base error display for unknown protocol codes # Fall back to the base error display for unknown protocol codes

View File

@ -4,6 +4,7 @@ import asyncclick as click
from anyio import create_task_group from anyio import create_task_group
from pyslobs import StreamingService from pyslobs import StreamingService
from . import console
from .cli import cli from .cli import cli
from .errors import SlobsCliError from .errors import SlobsCliError
@ -29,7 +30,7 @@ async def start(ctx: click.Context):
raise SlobsCliError('Recording is already active.') raise SlobsCliError('Recording is already active.')
await ss.toggle_recording() await ss.toggle_recording()
click.echo('Recording started.') console.out.print('Recording started.')
conn.close() conn.close()
@ -58,7 +59,7 @@ async def stop(ctx: click.Context):
raise SlobsCliError('Recording is already inactive.') raise SlobsCliError('Recording is already inactive.')
await ss.toggle_recording() await ss.toggle_recording()
click.echo('Recording stopped.') console.out.print('Recording stopped.')
conn.close() conn.close()
@ -83,9 +84,9 @@ async def status(ctx: click.Context):
active = model.recording_status != 'offline' active = model.recording_status != 'offline'
if active: if active:
click.echo('Recording is currently active.') console.out.print('Recording is currently active.')
else: else:
click.echo('Recording is currently inactive.') console.out.print('Recording is currently inactive.')
conn.close() conn.close()
@ -107,9 +108,9 @@ async def toggle(ctx: click.Context):
await ss.toggle_recording() await ss.toggle_recording()
if active: if active:
click.echo('Recording stopped.') console.out.print('Recording stopped.')
else: else:
click.echo('Recording started.') console.out.print('Recording started.')
conn.close() conn.close()

View File

@ -4,6 +4,7 @@ import asyncclick as click
from anyio import create_task_group from anyio import create_task_group
from pyslobs import StreamingService from pyslobs import StreamingService
from . import console
from .cli import cli from .cli import cli
from .errors import SlobsCliError from .errors import SlobsCliError
@ -29,7 +30,7 @@ async def start(ctx: click.Context):
raise SlobsCliError('Replay buffer is already active.') raise SlobsCliError('Replay buffer is already active.')
await ss.start_replay_buffer() await ss.start_replay_buffer()
click.echo('Replay buffer started.') console.out.print('Replay buffer started.')
conn.close() conn.close()
try: try:
@ -57,7 +58,7 @@ async def stop(ctx: click.Context):
raise SlobsCliError('Replay buffer is already inactive.') raise SlobsCliError('Replay buffer is already inactive.')
await ss.stop_replay_buffer() await ss.stop_replay_buffer()
click.echo('Replay buffer stopped.') console.out.print('Replay buffer stopped.')
conn.close() conn.close()
try: try:
@ -80,9 +81,9 @@ async def status(ctx: click.Context):
model = await ss.get_model() model = await ss.get_model()
active = model.replay_buffer_status != 'offline' active = model.replay_buffer_status != 'offline'
if active: if active:
click.echo('Replay buffer is currently active.') console.out.print('Replay buffer is currently active.')
else: else:
click.echo('Replay buffer is currently inactive.') console.out.print('Replay buffer is currently inactive.')
conn.close() conn.close()
async with create_task_group() as tg: async with create_task_group() as tg:
@ -99,7 +100,7 @@ async def save(ctx: click.Context):
async def _run(): async def _run():
await ss.save_replay() await ss.save_replay()
click.echo('Replay buffer saved.') console.out.print('Replay buffer saved.')
conn.close() conn.close()
async with create_task_group() as tg: async with create_task_group() as tg:

View File

@ -3,8 +3,10 @@
import asyncclick as click import asyncclick as click
from anyio import create_task_group from anyio import create_task_group
from pyslobs import ProtocolError, ScenesService, TransitionsService from pyslobs import ProtocolError, ScenesService, TransitionsService
from terminaltables3 import AsciiTable from rich.table import Table
from rich.text import Text
from . import console, util
from .cli import cli from .cli import cli
from .errors import SlobsCliError, SlobsCliProtocolError from .errors import SlobsCliError, SlobsCliProtocolError
@ -25,34 +27,45 @@ async def list(ctx: click.Context, id: bool = False):
async def _run(): async def _run():
scenes = await ss.get_scenes() scenes = await ss.get_scenes()
if not scenes: if not scenes:
click.echo('No scenes found.') console.out.print('No scenes found.')
conn.close() conn.close()
return return
active_scene = await ss.active_scene() active_scene = await ss.active_scene()
table_data = [ style = ctx.obj['style']
['Scene Name', 'ID', 'Active'] if id else ['Scene Name', 'Active'] table = Table(
] show_header=True,
for scene in scenes: header_style=style.header,
if scene.id == active_scene.id: border_style=style.border,
to_append = [click.style(scene.name, fg='green')] )
else:
to_append = [click.style(scene.name, fg='blue')]
if id: if id:
to_append.append(scene.id) columns = [
if scene.id == active_scene.id: ('Scene Name', 'left'),
to_append.append('') ('Active', 'center'),
('ID', 'left'),
]
else:
columns = [
('Scene Name', 'left'),
('Active', 'center'),
]
table_data.append(to_append) for col_name, col_justify in columns:
table.add_column(Text(col_name, justify='center'), justify=col_justify)
table = AsciiTable(table_data) for scene in scenes:
table.justify_columns = { to_append = [Text(scene.name, style=style.cell)]
0: 'left', to_append.append(
1: 'left' if id else 'center', util.check_mark(ctx, scene.id == active_scene.id, empty_if_false=True)
2: 'center' if id else None, )
} if id:
click.echo(table.table) to_append.append(Text(scene.id, style=style.cell))
table.add_row(*to_append)
console.out.print(table)
conn.close() conn.close()
@ -76,9 +89,9 @@ async def current(ctx: click.Context, id: bool = False):
async def _run(): async def _run():
active_scene = await ss.active_scene() active_scene = await ss.active_scene()
click.echo( console.out.print(
f'Current active scene: {click.style(active_scene.name, fg="green")} ' f'Current active scene: {console.highlight(ctx, active_scene.name)} '
f'{f"(ID: {active_scene.id})" if id else ""}' f'{f"(ID: {console.highlight(ctx, active_scene.id)})" if id else ""}'
) )
conn.close() conn.close()
@ -118,18 +131,21 @@ async def switch(
if model.studio_mode: if model.studio_mode:
await ss.make_scene_active(scene.id) await ss.make_scene_active(scene.id)
if preview: if preview:
click.echo( console.out.print(
f'Switched to preview scene: {click.style(scene.name, fg="blue")} ' f'Switched to preview scene: {console.highlight(ctx, scene.name)} '
f'{f"(ID: {scene.id})." if id else ""}' f'{f"(ID: {console.highlight(ctx, scene.id)})" if id else ""}'
) )
else: else:
click.echo( console.out.print(
f'Switched to scene: {click.style(scene.name, fg="blue")} ' f'Switched to scene: {console.highlight(ctx, scene.name)} '
f'{f"(ID: {scene.id})." if id else ""}' f'{f"(ID: {console.highlight(ctx, scene.id)})" if id else ""}'
)
console.err.print(
console.warning(
ctx,
'Warning: You are in studio mode. The scene switch is not active yet.\n'
'use `slobs-cli studiomode force-transition` to activate the scene switch.',
) )
await ts.execute_studio_mode_transition()
click.echo(
'Executed studio mode transition to make the scene active.'
) )
else: else:
if preview: if preview:
@ -139,9 +155,9 @@ async def switch(
) )
await ss.make_scene_active(scene.id) await ss.make_scene_active(scene.id)
click.echo( console.out.print(
f'Switched to scene: {click.style(scene.name, fg="blue")} ' f'Switched to scene: {console.highlight(ctx, scene.name)} '
f'{f"(ID: {scene.id})." if id else ""}' f'{f"(ID: {console.highlight(ctx, scene.id)})" if id else ""}'
) )
conn.close() conn.close()

View File

@ -3,8 +3,10 @@
import asyncclick as click import asyncclick as click
from anyio import create_task_group from anyio import create_task_group
from pyslobs import ISceneCollectionCreateOptions, SceneCollectionsService from pyslobs import ISceneCollectionCreateOptions, SceneCollectionsService
from terminaltables3 import AsciiTable from rich.table import Table
from rich.text import Text
from . import console, util
from .cli import cli from .cli import cli
from .errors import SlobsCliError from .errors import SlobsCliError
@ -25,35 +27,46 @@ async def list(ctx: click.Context, id: bool):
async def _run(): async def _run():
collections = await scs.collections() collections = await scs.collections()
if not collections: if not collections:
click.echo('No scene collections found.') console.out.print('No scene collections found.')
conn.close() conn.close()
return return
active_collection = await scs.active_collection() active_collection = await scs.active_collection()
table_data = [ style = ctx.obj['style']
['Scene Collection Name', 'ID', 'Active'] table = Table(
if id show_header=True,
else ['Scene Collection Name', 'Active'] header_style=style.header,
] border_style=style.border,
for collection in collections: )
if collection.id == active_collection.id:
to_append = [click.style(collection.name, fg='green')]
else:
to_append = [click.style(collection.name, fg='blue')]
if id:
to_append.append(collection.id)
if collection.id == active_collection.id:
to_append.append('')
table_data.append(to_append)
table = AsciiTable(table_data) if id:
table.justify_columns = { columns = [
0: 'left', ('Scene Collection Name', 'left'),
1: 'left' if id else 'center', ('Active', 'center'),
2: 'center' if id else None, ('ID', 'left'),
} ]
click.echo(table.table) else:
columns = [
('Scene Collection Name', 'left'),
('Active', 'center'),
]
for col_name, col_justify in columns:
table.add_column(Text(col_name, justify='center'), justify=col_justify)
for collection in collections:
to_append = [Text(collection.name, style=style.cell)]
to_append.append(
util.check_mark(
ctx, collection.id == active_collection.id, empty_if_false=True
)
)
if id:
to_append.append(Text(collection.id, style=style.cell))
table.add_row(*to_append)
console.out.print(table)
conn.close() conn.close()
@ -80,7 +93,9 @@ async def load(ctx: click.Context, scenecollection_name: str):
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.') raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
await scs.load(collection.id) await scs.load(collection.id)
click.echo(f'Scene collection "{scenecollection_name}" loaded successfully.') console.out.print(
f'Scene collection {console.highlight(scenecollection_name)} loaded successfully.'
)
conn.close() conn.close()
try: try:
@ -102,7 +117,9 @@ async def create(ctx: click.Context, scenecollection_name: str):
async def _run(): async def _run():
await scs.create(ISceneCollectionCreateOptions(scenecollection_name)) await scs.create(ISceneCollectionCreateOptions(scenecollection_name))
click.echo(f'Scene collection "{scenecollection_name}" created successfully.') console.out.print(
f'Scene collection {console.highlight(scenecollection_name)} created successfully.'
)
conn.close() conn.close()
async with create_task_group() as tg: async with create_task_group() as tg:
@ -128,7 +145,9 @@ async def delete(ctx: click.Context, scenecollection_name: str):
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.') raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
await scs.delete(collection.id) await scs.delete(collection.id)
click.echo(f'Scene collection "{scenecollection_name}" deleted successfully.') console.out.print(
f'Scene collection {console.highlight(scenecollection_name)} deleted successfully.'
)
conn.close() conn.close()
try: try:
@ -159,8 +178,8 @@ async def rename(ctx: click.Context, scenecollection_name: str, new_name: str):
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.') raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
await scs.rename(new_name, collection.id) await scs.rename(new_name, collection.id)
click.echo( console.out.print(
f'Scene collection "{scenecollection_name}" renamed to "{new_name}".' f'Scene collection {console.highlight(scenecollection_name)} renamed to {console.highlight(new_name)}.'
) )
conn.close() conn.close()

View File

@ -4,6 +4,7 @@ import asyncclick as click
from anyio import create_task_group from anyio import create_task_group
from pyslobs import StreamingService from pyslobs import StreamingService
from . import console
from .cli import cli from .cli import cli
from .errors import SlobsCliError from .errors import SlobsCliError
@ -29,7 +30,7 @@ async def start(ctx: click.Context):
raise SlobsCliError('Stream is already active.') raise SlobsCliError('Stream is already active.')
await ss.toggle_streaming() await ss.toggle_streaming()
click.echo('Stream started.') console.out.print('Stream started.')
conn.close() conn.close()
try: try:
@ -57,7 +58,7 @@ async def stop(ctx: click.Context):
raise SlobsCliError('Stream is already inactive.') raise SlobsCliError('Stream is already inactive.')
await ss.toggle_streaming() await ss.toggle_streaming()
click.echo('Stream stopped.') console.out.print('Stream stopped.')
conn.close() conn.close()
try: try:
@ -81,9 +82,9 @@ async def status(ctx: click.Context):
active = model.streaming_status != 'offline' active = model.streaming_status != 'offline'
if active: if active:
click.echo('Stream is currently active.') console.out.print('Stream is currently active.')
else: else:
click.echo('Stream is currently inactive.') console.out.print('Stream is currently inactive.')
conn.close() conn.close()
async with create_task_group() as tg: async with create_task_group() as tg:
@ -104,9 +105,9 @@ async def toggle(ctx: click.Context):
await ss.toggle_streaming() await ss.toggle_streaming()
if active: if active:
click.echo('Stream stopped.') console.out.print('Stream stopped.')
else: else:
click.echo('Stream started.') console.out.print('Stream started.')
conn.close() conn.close()

View File

@ -4,6 +4,7 @@ import asyncclick as click
from anyio import create_task_group from anyio import create_task_group
from pyslobs import TransitionsService from pyslobs import TransitionsService
from . import console
from .cli import cli from .cli import cli
from .errors import SlobsCliError from .errors import SlobsCliError
@ -27,7 +28,7 @@ async def enable(ctx: click.Context):
raise SlobsCliError('Studio mode is already enabled.') raise SlobsCliError('Studio mode is already enabled.')
await ts.enable_studio_mode() await ts.enable_studio_mode()
click.echo('Studio mode enabled successfully.') console.out.print('Studio mode enabled successfully.')
conn.close() conn.close()
try: try:
@ -53,7 +54,7 @@ async def disable(ctx: click.Context):
raise SlobsCliError('Studio mode is already disabled.') raise SlobsCliError('Studio mode is already disabled.')
await ts.disable_studio_mode() await ts.disable_studio_mode()
click.echo('Studio mode disabled successfully.') console.out.print('Studio mode disabled successfully.')
conn.close() conn.close()
try: try:
@ -75,9 +76,9 @@ async def status(ctx: click.Context):
async def _run(): async def _run():
model = await ts.get_model() model = await ts.get_model()
if model.studio_mode: if model.studio_mode:
click.echo('Studio mode is currently enabled.') console.out.print('Studio mode is currently enabled.')
else: else:
click.echo('Studio mode is currently disabled.') console.out.print('Studio mode is currently disabled.')
conn.close() conn.close()
async with create_task_group() as tg: async with create_task_group() as tg:
@ -96,10 +97,10 @@ async def toggle(ctx: click.Context):
model = await ts.get_model() model = await ts.get_model()
if model.studio_mode: if model.studio_mode:
await ts.disable_studio_mode() await ts.disable_studio_mode()
click.echo('Studio mode disabled successfully.') console.out.print('Studio mode disabled successfully.')
else: else:
await ts.enable_studio_mode() await ts.enable_studio_mode()
click.echo('Studio mode enabled successfully.') console.out.print('Studio mode enabled successfully.')
conn.close() conn.close()
async with create_task_group() as tg: async with create_task_group() as tg:
@ -121,7 +122,7 @@ async def force_transition(ctx: click.Context):
raise SlobsCliError('Studio mode is not enabled.') raise SlobsCliError('Studio mode is not enabled.')
await ts.execute_studio_mode_transition() await ts.execute_studio_mode_transition()
click.echo('Forced studio mode transition.') console.out.print('Forced studio mode transition.')
conn.close() conn.close()
try: try:

15
src/slobs_cli/util.py Normal file
View File

@ -0,0 +1,15 @@
"""module containing utility functions for Slobs CLI."""
import os
import asyncclick as click
def check_mark(ctx: click.Context, value: bool, empty_if_false: bool = False) -> str:
"""Return a check mark or cross mark based on the boolean value."""
if empty_if_false and not value:
return ''
if os.getenv('NO_COLOR', '') != '' or ctx.obj['style'].name == 'no_colour':
return '' if value else ''
return '' if value else ''