Merge pull request #2 from onyx-and-iris/main

Support for events + other changes.
This commit is contained in:
Adem 2022-07-29 15:06:06 +03:00 committed by GitHub
commit eda5ee66e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1163 additions and 758 deletions

54
.gitignore vendored
View File

@ -1,5 +1,49 @@
__pycache__ # Byte-compiled / optimized / DLL files
obsstudio_sdk.egg-info __pycache__/
dist *.py[cod]
docs *$py.class
setup.py
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Test/config
quick.py
config.toml

View File

@ -1,9 +1,13 @@
# obs_sdk # A Python SDK for OBS Studio WebSocket v5.0
### A Python SDK for OBS Studio WebSocket v5.0
This is a wrapper around OBS Websocket. This is a wrapper around OBS Websocket.
Not all endpoints in the official documentation are implemented. But all endpoints in the Requests section is implemented. You can find the relevant document using below link. Not all endpoints in the official documentation are implemented.
[obs-websocket github page](https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#Requests)
## Requirements
- [OBS Studio](https://obsproject.com/)
- [OBS Websocket v5 Plugin](https://github.com/obsproject/obs-websocket/releases/tag/5.0.0)
- Python 3.11 or greater
### How to install using pip ### How to install using pip
@ -11,27 +15,39 @@ Not all endpoints in the official documentation are implemented. But all endpoin
pip install obsstudio-sdk pip install obsstudio-sdk
``` ```
### How to Use ### How to Use
* Import and start using Load connection info from toml config. A valid `config.toml` might look like this:
Required parameters are as follows:
host: obs websocket server
port: port to access server
password: obs websocket server password
``` ```toml
>>>from obsstudio_sdk.reqs import ReqClient [connection]
>>> host = "localhost"
>>>client = ReqClient('192.168.1.1', 4444, 'somepassword') port = 4455
password = "mystrongpass"
``` ```
Now you can make calls to OBS It should be placed next to your `__main__.py` file.
Example: Toggle the mute state of your Mic input #### Otherwise:
Import and start using, keyword arguments are as follows:
- `host`: obs websocket server
- `port`: port to access server
- `password`: obs websocket server password
Example `__main__.py`
```python
import obsstudio_sdk as obs
# pass conn info if not in config.toml
cl = obs.ReqClient(host='localhost', port=4455, password='mystrongpass')
# Toggle the mute state of your Mic input
cl.toggle_input_mute('Mic/Aux')
``` ```
>>>cl.ToggleInputMute('Mic/Aux')
>>>
``` ### Official Documentation
- [OBS Websocket SDK](https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#obs-websocket-501-protocol)

View File

@ -0,0 +1,41 @@
import obsstudio_sdk as obs
class Observer:
def __init__(self, cl):
self._cl = cl
self._cl.callback.register(
[
self.on_current_program_scene_changed,
self.on_scene_created,
self.on_input_mute_state_changed,
self.on_exit_started,
]
)
print(f"Registered events: {self._cl.callback.get()}")
def on_current_program_scene_changed(self, data):
"""The current program scene has changed."""
print(f"Switched to scene {data.scene_name}")
def on_scene_created(self, data):
"""A new scene has been created."""
print(f"scene {data.scene_name} has been created")
def on_input_mute_state_changed(self, data):
"""An input's mute state has changed."""
print(f"{data.input_name} mute toggled")
def on_exit_started(self, data):
"""OBS has begun the shutdown process."""
print(f"OBS closing!")
self._cl.unsubscribe()
if __name__ == "__main__":
cl = obs.EventClient()
observer = Observer(cl)
while cmd := input("<Enter> to exit\n"):
if not cmd:
break

View File

@ -0,0 +1,44 @@
import inspect
import keyboard
import obsstudio_sdk as obs
class Observer:
def __init__(self, cl):
self._cl = cl
self._cl.callback.register(self.on_current_program_scene_changed)
print(f"Registered events: {self._cl.callback.get()}")
@property
def event_identifier(self):
return inspect.stack()[1].function
def on_current_program_scene_changed(self, data):
"""The current program scene has changed."""
print(f"{self.event_identifier}: {data.scene_name}")
def version():
resp = req_cl.get_version()
print(
f"Running OBS version:{resp.obs_version} with websocket version:{resp.obs_web_socket_version}"
)
def set_scene(scene, *args):
req_cl.set_current_program_scene(scene)
if __name__ == "__main__":
req_cl = obs.ReqClient()
ev_cl = obs.EventClient()
observer = Observer(ev_cl)
keyboard.add_hotkey("0", version)
keyboard.add_hotkey("1", set_scene, args=("START",))
keyboard.add_hotkey("2", set_scene, args=("BRB",))
keyboard.add_hotkey("3", set_scene, args=("END",))
print("press ctrl+enter to quit")
keyboard.wait("ctrl+enter")

View File

@ -0,0 +1,19 @@
import time
import obsstudio_sdk as obs
def main():
resp = cl.get_scene_list()
scenes = reversed(tuple(di.get("sceneName") for di in resp.scenes))
for sc in scenes:
print(f"Switching to scene {sc}")
cl.set_current_program_scene(sc)
time.sleep(0.5)
if __name__ == "__main__":
cl = obs.ReqClient()
main()

View File

@ -1 +1,4 @@
from .events import EventClient
from .reqs import ReqClient
__ALL__ = ["ReqClient", "EventsClient"]

View File

@ -1,53 +1,86 @@
import websocket
import json
import hashlib
import base64 import base64
import hashlib
import json
from pathlib import Path
from random import randint from random import randint
class ObsClient(object): import tomllib
def __init__(self, host, port, password): import websocket
self.host = host
self.port = port
self.password = password class ObsClient:
DELAY = 0.001
def __init__(self, **kwargs):
defaultkwargs = {
**{key: None for key in ["host", "port", "password"]},
"subs": 0,
}
kwargs = defaultkwargs | kwargs
for attr, val in kwargs.items():
setattr(self, attr, val)
if not (self.host and self.port and self.password):
conn = self._conn_from_toml()
self.host = conn["host"]
self.port = conn["port"]
self.password = conn["password"]
self.ws = websocket.WebSocket() self.ws = websocket.WebSocket()
self.ws.connect(f"ws://{self.host}:{self.port}") self.ws.connect(f"ws://{self.host}:{self.port}")
self.server_hello = json.loads(self.ws.recv()) self.server_hello = json.loads(self.ws.recv())
def _conn_from_toml(self):
filepath = Path.cwd() / "config.toml"
self._conn = dict()
with open(filepath, "rb") as f:
self._conn = tomllib.load(f)
return self._conn["connection"]
def authenticate(self): def authenticate(self):
secret = base64.b64encode( secret = base64.b64encode(
hashlib.sha256( hashlib.sha256(
(self.password + self.server_hello['d']['authentication']['salt']).encode()).digest()) (
self.password + self.server_hello["d"]["authentication"]["salt"]
).encode()
).digest()
)
auth = base64.b64encode( auth = base64.b64encode(
hashlib.sha256( hashlib.sha256(
(secret.decode() + self.server_hello['d']['authentication']['challenge']).encode()).digest()).decode() (
secret.decode()
+ self.server_hello["d"]["authentication"]["challenge"]
).encode()
).digest()
).decode()
payload = { "op":1, "d": { payload = {
"rpcVersion": 1, "op": 1,
"authentication": auth} "d": {
} "rpcVersion": 1,
"authentication": auth,
"eventSubscriptions": self.subs,
},
}
self.ws.send(json.dumps(payload)) self.ws.send(json.dumps(payload))
return self.ws.recv() return self.ws.recv()
def req(self, req_type, req_data=None): def req(self, req_type, req_data=None):
if req_data == None: if req_data:
payload = {
"op": 6,
"d": {
"requestType": req_type,
"requestId": randint(1, 1000)
}
}
else:
payload = { payload = {
"op": 6, "op": 6,
"d": { "d": {
"requestType": req_type, "requestType": req_type,
"requestId": randint(1, 1000), "requestId": randint(1, 1000),
"requestData": req_data "requestData": req_data,
} },
}
else:
payload = {
"op": 6,
"d": {"requestType": req_type, "requestId": randint(1, 1000)},
} }
self.ws.send(json.dumps(payload)) self.ws.send(json.dumps(payload))
return json.loads(self.ws.recv()) response = json.loads(self.ws.recv())
return response["d"]

53
obsstudio_sdk/callback.py Normal file
View File

@ -0,0 +1,53 @@
from typing import Callable, Iterable, Union
from .util import as_dataclass, to_camel_case, to_snake_case
class Callback:
"""Adds support for callbacks"""
def __init__(self):
"""list of current callbacks"""
self._callbacks = list()
def get(self) -> list:
"""returns a list of registered events"""
return [to_camel_case(fn.__name__[2:]) for fn in self._callbacks]
def trigger(self, event, data):
"""trigger callback on update"""
for fn in self._callbacks:
if fn.__name__ == f"on_{to_snake_case(event)}":
fn(as_dataclass(event, data))
def register(self, fns: Union[Iterable, Callable]):
"""registers callback functions"""
try:
iterator = iter(fns)
for fn in iterator:
if fn not in self._callbacks:
self._callbacks.append(fn)
except TypeError as e:
if fns not in self._callbacks:
self._callbacks.append(fns)
def deregister(self, fns: Union[Iterable, Callable]):
"""deregisters a callback from _callbacks"""
try:
iterator = iter(fns)
for fn in iterator:
if fn in self._callbacks:
self._callbacks.remove(fn)
except TypeError as e:
if fns in self._callbacks:
self._callbacks.remove(fns)
def clear(self):
"""clears the _callbacks list"""
self._callbacks.clear()

4
obsstudio_sdk/error.py Normal file
View File

@ -0,0 +1,4 @@
class OBSSDKError(Exception):
"""general errors"""
pass

71
obsstudio_sdk/events.py Normal file
View File

@ -0,0 +1,71 @@
import json
import time
from enum import IntEnum
from threading import Thread
from .baseclient import ObsClient
from .callback import Callback
"""
A class to interact with obs-websocket events
defined in official github repo
https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#events
"""
Subs = IntEnum(
"Subs",
"general config scenes inputs transitions filters outputs sceneitems mediainputs vendors ui",
start=0,
)
class EventClient:
DELAY = 0.001
def __init__(self, **kwargs):
defaultkwargs = {
"subs": (
(1 << Subs.general)
| (1 << Subs.config)
| (1 << Subs.scenes)
| (1 << Subs.inputs)
| (1 << Subs.transitions)
| (1 << Subs.filters)
| (1 << Subs.outputs)
| (1 << Subs.sceneitems)
| (1 << Subs.mediainputs)
| (1 << Subs.vendors)
| (1 << Subs.ui)
)
}
kwargs = defaultkwargs | kwargs
self.base_client = ObsClient(**kwargs)
self.base_client.authenticate()
self.callback = Callback()
self.subscribe()
def subscribe(self):
worker = Thread(target=self.trigger, daemon=True)
worker.start()
def trigger(self):
"""
Continuously listen for events.
Triggers a callback on event received.
"""
self.running = True
while self.running:
self.data = json.loads(self.base_client.ws.recv())
event, data = (
self.data["d"].get("eventType"),
self.data["d"].get("eventData"),
)
self.callback.trigger(event, data)
time.sleep(self.DELAY)
def unsubscribe(self):
"""
stop listening for events
"""
self.running = False

File diff suppressed because it is too large Load Diff

26
obsstudio_sdk/util.py Normal file
View File

@ -0,0 +1,26 @@
import re
from dataclasses import dataclass
def to_camel_case(s):
return "".join(word.title() for word in s.split("_"))
def to_snake_case(s):
return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()
def as_dataclass(identifier, data):
def attrs():
return list(to_snake_case(k) for k in data.keys())
return dataclass(
type(
f"{identifier}Dataclass",
(),
{
"attrs": attrs,
**{to_snake_case(k): v for k, v in data.items()},
},
)
)

11
tests/__init__.py Normal file
View File

@ -0,0 +1,11 @@
import obsstudio_sdk as obs
req_cl = obs.ReqClient()
def setup_module():
pass
def teardown_module():
req_cl.base_client.ws.close()

27
tests/test_attrs.py Normal file
View File

@ -0,0 +1,27 @@
import pytest
from tests import req_cl
class TestAttrs:
__test__ = True
def test_get_version_attrs(self):
resp = req_cl.get_version()
assert resp.attrs() == [
"available_requests",
"obs_version",
"obs_web_socket_version",
"platform",
"platform_description",
"rpc_version",
"supported_image_formats",
]
def test_get_current_program_scene_attrs(self):
resp = req_cl.get_current_program_scene()
assert resp.attrs() == ["current_program_scene_name"]
def test_get_transition_kind_list_attrs(self):
resp = req_cl.get_transition_kind_list()
assert resp.attrs() == ["transition_kinds"]

59
tests/test_callback.py Normal file
View File

@ -0,0 +1,59 @@
import pytest
from obsstudio_sdk.callback import Callback
class TestCallbacks:
__test__ = True
@classmethod
def setup_class(cls):
cls.callback = Callback()
@pytest.fixture(autouse=True)
def wraps_tests(self):
yield
self.callback.clear()
def test_register_callback(self):
def on_callback_method():
pass
self.callback.register(on_callback_method)
assert self.callback.get() == ["CallbackMethod"]
def test_register_callbacks(self):
def on_callback_method_one():
pass
def on_callback_method_two():
pass
self.callback.register((on_callback_method_one, on_callback_method_two))
assert self.callback.get() == ["CallbackMethodOne", "CallbackMethodTwo"]
def test_deregister_callback(self):
def on_callback_method_one():
pass
def on_callback_method_two():
pass
self.callback.register((on_callback_method_one, on_callback_method_two))
self.callback.deregister(on_callback_method_one)
assert self.callback.get() == ["CallbackMethodTwo"]
def test_deregister_callbacks(self):
def on_callback_method_one():
pass
def on_callback_method_two():
pass
def on_callback_method_three():
pass
self.callback.register(
(on_callback_method_one, on_callback_method_two, on_callback_method_three)
)
self.callback.deregister((on_callback_method_two, on_callback_method_three))
assert self.callback.get() == ["CallbackMethodOne"]

118
tests/test_request.py Normal file
View File

@ -0,0 +1,118 @@
import pytest
from tests import req_cl
class TestRequests:
__test__ = True
def test_get_version(self):
resp = req_cl.get_version()
assert hasattr(resp, "obs_version")
assert hasattr(resp, "obs_web_socket_version")
@pytest.mark.parametrize(
"scene",
[
("START"),
("BRB"),
("END"),
],
)
def test_current_program_scene(self, scene):
req_cl.set_current_program_scene(scene)
resp = req_cl.get_current_program_scene()
assert resp.current_program_scene_name == scene
@pytest.mark.parametrize(
"state",
[
(False),
(True),
],
)
def test_studio_mode_enabled(self, state):
req_cl.set_studio_mode_enabled(state)
resp = req_cl.get_studio_mode_enabled()
assert resp.studio_mode_enabled == state
def test_get_hot_key_list(self):
resp = req_cl.get_hot_key_list()
hotkey_list = [
"OBSBasic.StartStreaming",
"OBSBasic.StopStreaming",
"OBSBasic.ForceStopStreaming",
"OBSBasic.StartRecording",
"OBSBasic.StopRecording",
"OBSBasic.PauseRecording",
"OBSBasic.UnpauseRecording",
"OBSBasic.StartReplayBuffer",
"OBSBasic.StopReplayBuffer",
"OBSBasic.StartVirtualCam",
"OBSBasic.StopVirtualCam",
"OBSBasic.EnablePreview",
"OBSBasic.DisablePreview",
"OBSBasic.ShowContextBar",
"OBSBasic.HideContextBar",
"OBSBasic.TogglePreviewProgram",
"OBSBasic.Transition",
"OBSBasic.ResetStats",
"OBSBasic.Screenshot",
"OBSBasic.SelectedSourceScreenshot",
"libobs.mute",
"libobs.unmute",
"libobs.push-to-mute",
"libobs.push-to-talk",
"libobs.mute",
"libobs.unmute",
"libobs.push-to-mute",
"libobs.push-to-talk",
"OBSBasic.SelectScene",
"OBSBasic.SelectScene",
"OBSBasic.SelectScene",
"OBSBasic.SelectScene",
"libobs.show_scene_item.Colour Source 2",
"libobs.hide_scene_item.Colour Source 2",
"libobs.show_scene_item.Colour Source 3",
"libobs.hide_scene_item.Colour Source 3",
"libobs.show_scene_item.Colour Source",
"libobs.hide_scene_item.Colour Source",
"OBSBasic.QuickTransition.1",
"OBSBasic.QuickTransition.2",
"OBSBasic.QuickTransition.3",
]
assert all(x in resp.hotkeys for x in hotkey_list)
@pytest.mark.parametrize(
"name,data",
[
("val1", 3),
("val2", "hello"),
],
)
def test_persistent_data(self, name, data):
req_cl.set_persistent_data("OBS_WEBSOCKET_DATA_REALM_PROFILE", name, data)
resp = req_cl.get_persistent_data("OBS_WEBSOCKET_DATA_REALM_PROFILE", name)
assert resp.slot_value == data
def test_profile_list(self):
req_cl.create_profile("test")
resp = req_cl.get_profile_list()
assert "test" in resp.profiles
req_cl.remove_profile("test")
resp = req_cl.get_profile_list()
assert "test" not in resp.profiles
def test_source_filter(self):
req_cl.create_source_filter("START", "test", "color_key_filter_v2")
resp = req_cl.get_source_filter_list("START")
assert resp.filters == [
{
"filterEnabled": True,
"filterIndex": 0,
"filterKind": "color_key_filter_v2",
"filterName": "test",
"filterSettings": {},
}
]
req_cl.remove_source_filter("START", "test")