mirror of
https://github.com/onyx-and-iris/obsws-python.git
synced 2025-01-18 03:20:47 +00:00
refreshed ignored files
This commit is contained in:
parent
5399b66e45
commit
d37cda9976
5
.gitignore
vendored
5
.gitignore
vendored
@ -1,3 +1,8 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
|
@ -1,4 +0,0 @@
|
||||
from .events import EventsClient
|
||||
from .reqs import ReqClient
|
||||
|
||||
__ALL__ = ["ReqClient", "EventsClient"]
|
@ -1,71 +0,0 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
from pathlib import Path
|
||||
from random import randint
|
||||
|
||||
import tomllib
|
||||
import websocket
|
||||
|
||||
|
||||
class ObsClient(object):
|
||||
def __init__(self, host=None, port=None, password=None):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.password = password
|
||||
if not (self.host and self.port and self.password):
|
||||
conn = self._conn_from_toml()
|
||||
self.host = conn["host"]
|
||||
self.port = conn["port"]
|
||||
self.password = conn["password"]
|
||||
self.ws = websocket.WebSocket()
|
||||
self.ws.connect(f"ws://{self.host}:{self.port}")
|
||||
self.server_hello = json.loads(self.ws.recv())
|
||||
|
||||
def _conn_from_toml(self):
|
||||
filepath = Path.cwd() / "config.toml"
|
||||
self._conn = dict()
|
||||
with open(filepath, "rb") as f:
|
||||
self._conn = tomllib.load(f)
|
||||
return self._conn["connection"]
|
||||
|
||||
def authenticate(self):
|
||||
secret = base64.b64encode(
|
||||
hashlib.sha256(
|
||||
(
|
||||
self.password + self.server_hello["d"]["authentication"]["salt"]
|
||||
).encode()
|
||||
).digest()
|
||||
)
|
||||
|
||||
auth = base64.b64encode(
|
||||
hashlib.sha256(
|
||||
(
|
||||
secret.decode()
|
||||
+ self.server_hello["d"]["authentication"]["challenge"]
|
||||
).encode()
|
||||
).digest()
|
||||
).decode()
|
||||
|
||||
payload = {"op": 1, "d": {"rpcVersion": 1, "authentication": auth}}
|
||||
|
||||
self.ws.send(json.dumps(payload))
|
||||
return self.ws.recv()
|
||||
|
||||
def req(self, req_type, req_data=None):
|
||||
if req_data:
|
||||
payload = {
|
||||
"op": 6,
|
||||
"d": {
|
||||
"requestType": req_type,
|
||||
"requestId": randint(1, 1000),
|
||||
"requestData": req_data,
|
||||
},
|
||||
}
|
||||
else:
|
||||
payload = {
|
||||
"op": 6,
|
||||
"d": {"requestType": req_type, "requestId": randint(1, 1000)},
|
||||
}
|
||||
self.ws.send(json.dumps(payload))
|
||||
return json.loads(self.ws.recv())
|
@ -1,43 +0,0 @@
|
||||
import json
|
||||
import time
|
||||
from threading import Thread
|
||||
|
||||
from .baseclient import ObsClient
|
||||
from .subject import Callback
|
||||
|
||||
"""
|
||||
A class to interact with obs-websocket events
|
||||
defined in official github repo
|
||||
https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#events
|
||||
"""
|
||||
|
||||
|
||||
class EventsClient(object):
|
||||
DELAY = 0.001
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.base_client = ObsClient(**kwargs)
|
||||
self.base_client.authenticate()
|
||||
self.callback = Callback()
|
||||
|
||||
self.running = True
|
||||
worker = Thread(target=self.trigger, daemon=True)
|
||||
worker.start()
|
||||
|
||||
def trigger(self):
|
||||
"""
|
||||
Continuously listen for events.
|
||||
|
||||
Triggers a callback on event received.
|
||||
"""
|
||||
while self.running:
|
||||
self.data = json.loads(self.base_client.ws.recv())
|
||||
event, data = (self.data["d"].get("eventType"), self.data["d"])
|
||||
self.callback.trigger(event, data)
|
||||
time.sleep(self.DELAY)
|
||||
|
||||
def unsubscribe(self):
|
||||
"""
|
||||
stop listening for events
|
||||
"""
|
||||
self.running = False
|
File diff suppressed because it is too large
Load Diff
@ -1,58 +0,0 @@
|
||||
import re
|
||||
|
||||
|
||||
class Callback:
|
||||
"""Adds support for callbacks"""
|
||||
|
||||
def __init__(self):
|
||||
"""list of current callbacks"""
|
||||
|
||||
self._callbacks = list()
|
||||
|
||||
def to_camel_case(self, s):
|
||||
s = "".join(word.title() for word in s.split("_"))
|
||||
return s[2:]
|
||||
|
||||
def to_snake_case(self, s):
|
||||
s = re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()
|
||||
return f"on_{s}"
|
||||
|
||||
def get(self) -> list:
|
||||
"""returns a list of registered events"""
|
||||
|
||||
return [self.to_camel_case(fn.__name__) for fn in self._callbacks]
|
||||
|
||||
def trigger(self, event, data=None):
|
||||
"""trigger callback on update"""
|
||||
|
||||
for fn in self._callbacks:
|
||||
if fn.__name__ == self.to_snake_case(event):
|
||||
if "eventData" in data:
|
||||
fn(data["eventData"])
|
||||
else:
|
||||
fn()
|
||||
|
||||
def register(self, fns):
|
||||
"""registers callback functions"""
|
||||
|
||||
try:
|
||||
iter(fns)
|
||||
for fn in fns:
|
||||
if fn not in self._callbacks:
|
||||
self._callbacks.append(fn)
|
||||
except TypeError as e:
|
||||
if fns not in self._callbacks:
|
||||
self._callbacks.append(fns)
|
||||
|
||||
def deregister(self, callback):
|
||||
"""deregisters a callback from _callbacks"""
|
||||
|
||||
try:
|
||||
self._callbacks.remove(callback)
|
||||
except ValueError:
|
||||
print(f"Failed to remove: {callback}")
|
||||
|
||||
def clear(self):
|
||||
"""clears the _callbacks list"""
|
||||
|
||||
self._callbacks.clear()
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue
Block a user