add support for toml config.

subject module added, supports callbacks.

events module added. Provides an event listener and callback trigger.

import isorted, code run through black.

toml section added to readme.

added a couple of examples.
This commit is contained in:
onyx-and-iris 2022-07-25 23:51:30 +01:00
parent 1793685be0
commit b5b69de218
14 changed files with 2719 additions and 481 deletions

4
.gitignore vendored
View File

@ -3,3 +3,7 @@ obsstudio_sdk.egg-info
dist
docs
setup.py
venv
quick.py
config.toml

View File

@ -1,4 +1,5 @@
# obs_sdk
### A Python SDK for OBS Studio WebSocket v5.0
This is a wrapper around OBS Websocket.
@ -11,11 +12,23 @@ Not all endpoints in the official documentation are implemented. But all endpoin
pip install obsstudio-sdk
```
### How to Use
* Import and start using
Required parameters are as follows:
- Load connection info from toml config. A valid `config.toml` might look like this:
```toml
[connection]
host = "localhost"
port = 4455
password = "mystrongpass"
```
It should be placed next to your `__main__.py` file.
Otherwise:
- Import and start using
Parameters are as follows:
host: obs websocket server
port: port to access server
password: obs websocket server password

View File

@ -0,0 +1,4 @@
from .events import EventsClient
from .reqs import ReqClient
__ALL__ = ["ReqClient", "EventsClient"]

View File

@ -0,0 +1,71 @@
import base64
import hashlib
import json
from pathlib import Path
from random import randint
import tomllib
import websocket
class ObsClient(object):
def __init__(self, host=None, port=None, password=None):
self.host = host
self.port = port
self.password = password
if not (self.host and self.port and self.password):
conn = self._conn_from_toml()
self.host = conn["host"]
self.port = conn["port"]
self.password = conn["password"]
self.ws = websocket.WebSocket()
self.ws.connect(f"ws://{self.host}:{self.port}")
self.server_hello = json.loads(self.ws.recv())
def _conn_from_toml(self):
filepath = Path.cwd() / "config.toml"
self._conn = dict()
with open(filepath, "rb") as f:
self._conn = tomllib.load(f)
return self._conn["connection"]
def authenticate(self):
secret = base64.b64encode(
hashlib.sha256(
(
self.password + self.server_hello["d"]["authentication"]["salt"]
).encode()
).digest()
)
auth = base64.b64encode(
hashlib.sha256(
(
secret.decode()
+ self.server_hello["d"]["authentication"]["challenge"]
).encode()
).digest()
).decode()
payload = {"op": 1, "d": {"rpcVersion": 1, "authentication": auth}}
self.ws.send(json.dumps(payload))
return self.ws.recv()
def req(self, req_type, req_data=None):
if req_data:
payload = {
"op": 6,
"d": {
"requestType": req_type,
"requestId": randint(1, 1000),
"requestData": req_data,
},
}
else:
payload = {
"op": 6,
"d": {"requestType": req_type, "requestId": randint(1, 1000)},
}
self.ws.send(json.dumps(payload))
return json.loads(self.ws.recv())

View File

@ -0,0 +1,43 @@
import json
import time
from threading import Thread
from .baseclient import ObsClient
from .subject import Callback
"""
A class to interact with obs-websocket events
defined in official github repo
https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#events
"""
class EventsClient(object):
DELAY = 0.001
def __init__(self, **kwargs):
self.base_client = ObsClient(**kwargs)
self.base_client.authenticate()
self.callback = Callback()
self.running = True
worker = Thread(target=self.trigger, daemon=True)
worker.start()
def trigger(self):
"""
Continuously listen for events.
Triggers a callback on event received.
"""
while self.running:
self.data = json.loads(self.base_client.ws.recv())
event, data = (self.data["d"].get("eventType"), self.data["d"])
self.callback.trigger(event, data)
time.sleep(self.DELAY)
def unsubscribe(self):
"""
stop listening for events
"""
self.running = False

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,58 @@
import re
class Callback:
"""Adds support for callbacks"""
def __init__(self):
"""list of current callbacks"""
self._callbacks = list()
def to_camel_case(self, s):
s = "".join(word.title() for word in s.split("_"))
return s[2:]
def to_snake_case(self, s):
s = re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()
return f"on_{s}"
def get(self) -> list:
"""returns a list of registered events"""
return [self.to_camel_case(fn.__name__) for fn in self._callbacks]
def trigger(self, event, data=None):
"""trigger callback on update"""
for fn in self._callbacks:
if fn.__name__ == self.to_snake_case(event):
if "eventData" in data:
fn(data["eventData"])
else:
fn()
def register(self, fns):
"""registers callback functions"""
try:
iter(fns)
for fn in fns:
if fn not in self._callbacks:
self._callbacks.append(fn)
except TypeError as e:
if fns not in self._callbacks:
self._callbacks.append(fns)
def deregister(self, callback):
"""deregisters a callback from _callbacks"""
try:
self._callbacks.remove(callback)
except ValueError:
print(f"Failed to remove: {callback}")
def clear(self):
"""clears the _callbacks list"""
self._callbacks.clear()

View File

@ -0,0 +1,26 @@
import obsstudio_sdk as obs
class Observer:
def __init__(self, cl):
self._cl = cl
self._cl.callback.register(
[self.on_current_program_scene_changed, self.on_exit_started]
)
print(f"Registered events: {self._cl.callback.get()}")
def on_exit_started(self):
print(f"OBS closing!")
self._cl.unsubscribe()
def on_current_program_scene_changed(self, data):
print(f"Switched to scene {data['sceneName']}")
if __name__ == "__main__":
cl = obs.EventsClient()
observer = Observer(cl)
while cmd := input("<Enter> to exit\n"):
if not cmd:
break

View File

@ -0,0 +1,19 @@
import time
import obsstudio_sdk as obs
def main():
res = cl.GetSceneList()
scenes = reversed(tuple(d["sceneName"] for d in res["d"]["responseData"]["scenes"]))
for sc in scenes:
print(f"Switching to scene {sc}")
cl.SetCurrentProgramScene(sc)
time.sleep(0.5)
if __name__ == "__main__":
cl = obs.ReqClient()
main()

View File

@ -1 +1,4 @@
from .events import EventsClient
from .reqs import ReqClient
__ALL__ = ["ReqClient", "EventsClient"]

View File

@ -1,53 +1,73 @@
import websocket
import json
import hashlib
import base64
import hashlib
import json
from pathlib import Path
from random import randint
import tomllib
import websocket
class ObsClient(object):
def __init__(self, host, port, password):
self.host = host
self.port = port
self.password = password
def __init__(self, **kwargs):
defaultkwargs = {key: None for key in ["host", "port", "password"]}
kwargs = defaultkwargs | kwargs
for attr, val in kwargs.items():
setattr(self, attr, val)
if not (self.host and self.port and self.password):
conn = self._conn_from_toml()
self.host = conn["host"]
self.port = conn["port"]
self.password = conn["password"]
self.ws = websocket.WebSocket()
self.ws.connect(f"ws://{self.host}:{self.port}")
self.server_hello = json.loads(self.ws.recv())
def _conn_from_toml(self):
filepath = Path.cwd() / "config.toml"
self._conn = dict()
with open(filepath, "rb") as f:
self._conn = tomllib.load(f)
return self._conn["connection"]
def authenticate(self):
secret = base64.b64encode(
hashlib.sha256(
(self.password + self.server_hello['d']['authentication']['salt']).encode()).digest())
(
self.password + self.server_hello["d"]["authentication"]["salt"]
).encode()
).digest()
)
auth = base64.b64encode(
hashlib.sha256(
(secret.decode() + self.server_hello['d']['authentication']['challenge']).encode()).digest()).decode()
(
secret.decode()
+ self.server_hello["d"]["authentication"]["challenge"]
).encode()
).digest()
).decode()
payload = { "op":1, "d": {
"rpcVersion": 1,
"authentication": auth}
}
payload = {"op": 1, "d": {"rpcVersion": 1, "authentication": auth}}
self.ws.send(json.dumps(payload))
return self.ws.recv()
def req(self, req_type, req_data=None):
if req_data == None:
payload = {
"op": 6,
"d": {
"requestType": req_type,
"requestId": randint(1, 1000)
}
}
else:
if req_data:
payload = {
"op": 6,
"d": {
"requestType": req_type,
"requestId": randint(1, 1000),
"requestData": req_data
"requestData": req_data,
},
}
else:
payload = {
"op": 6,
"d": {"requestType": req_type, "requestId": randint(1, 1000)},
}
self.ws.send(json.dumps(payload))
return json.loads(self.ws.recv())

58
obsstudio_sdk/callback.py Normal file
View File

@ -0,0 +1,58 @@
import re
class Callback:
"""Adds support for callbacks"""
def __init__(self):
"""list of current callbacks"""
self._callbacks = list()
def to_camel_case(self, s):
s = "".join(word.title() for word in s.split("_"))
return s[2:]
def to_snake_case(self, s):
s = re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()
return f"on_{s}"
def get(self) -> list:
"""returns a list of registered events"""
return [self.to_camel_case(fn.__name__) for fn in self._callbacks]
def trigger(self, event, data=None):
"""trigger callback on update"""
for fn in self._callbacks:
if fn.__name__ == self.to_snake_case(event):
if "eventData" in data:
fn(data["eventData"])
else:
fn()
def register(self, fns):
"""registers callback functions"""
try:
iter(fns)
for fn in fns:
if fn not in self._callbacks:
self._callbacks.append(fn)
except TypeError as e:
if fns not in self._callbacks:
self._callbacks.append(fns)
def deregister(self, callback):
"""deregisters a callback from _callbacks"""
try:
self._callbacks.remove(callback)
except ValueError:
print(f"Failed to remove: {callback}")
def clear(self):
"""clears the _callbacks list"""
self._callbacks.clear()

43
obsstudio_sdk/events.py Normal file
View File

@ -0,0 +1,43 @@
import json
import time
from threading import Thread
from .baseclient import ObsClient
from .callback import Callback
"""
A class to interact with obs-websocket events
defined in official github repo
https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#events
"""
class EventsClient(object):
DELAY = 0.001
def __init__(self, **kwargs):
self.base_client = ObsClient(**kwargs)
self.base_client.authenticate()
self.callback = Callback()
self.running = True
worker = Thread(target=self.trigger, daemon=True)
worker.start()
def trigger(self):
"""
Continuously listen for events.
Triggers a callback on event received.
"""
while self.running:
self.data = json.loads(self.base_client.ws.recv())
event, data = (self.data["d"].get("eventType"), self.data["d"])
self.callback.trigger(event, data)
time.sleep(self.DELAY)
def unsubscribe(self):
"""
stop listening for events
"""
self.running = False

File diff suppressed because it is too large Load Diff