obsws-python/obsstudio_sdk/baseclient.py
onyx-and-iris 2a3a86c277 EventsClient renamed to EventClient
remove getter, setter for send.

add persistend data unit test

add hotkey example

default event sub now 0. explicitly define subs in event class.
now subs can be set as kwarg
2022-07-27 19:39:33 +01:00

87 lines
2.4 KiB
Python

import base64
import hashlib
import json
import time
from enum import IntEnum
from pathlib import Path
from random import randint
import tomllib
import websocket
class ObsClient(object):
DELAY = 0.001
def __init__(self, **kwargs):
defaultkwargs = {key: None for key in ["host", "port", "password"]}
defaultkwargs["subs"] = 0
kwargs = defaultkwargs | kwargs
for attr, val in kwargs.items():
setattr(self, attr, val)
if not (self.host and self.port and self.password):
conn = self._conn_from_toml()
self.host = conn["host"]
self.port = conn["port"]
self.password = conn["password"]
self.ws = websocket.WebSocket()
self.ws.connect(f"ws://{self.host}:{self.port}")
self.server_hello = json.loads(self.ws.recv())
def _conn_from_toml(self):
filepath = Path.cwd() / "config.toml"
self._conn = dict()
with open(filepath, "rb") as f:
self._conn = tomllib.load(f)
return self._conn["connection"]
def authenticate(self):
secret = base64.b64encode(
hashlib.sha256(
(
self.password + self.server_hello["d"]["authentication"]["salt"]
).encode()
).digest()
)
auth = base64.b64encode(
hashlib.sha256(
(
secret.decode()
+ self.server_hello["d"]["authentication"]["challenge"]
).encode()
).digest()
).decode()
payload = {
"op": 1,
"d": {
"rpcVersion": 1,
"authentication": auth,
"eventSubscriptions": self.subs,
},
}
self.ws.send(json.dumps(payload))
return self.ws.recv()
def req(self, req_type, req_data=None):
if req_data:
payload = {
"op": 6,
"d": {
"requestType": req_type,
"requestId": randint(1, 1000),
"requestData": req_data,
},
}
else:
payload = {
"op": 6,
"d": {"requestType": req_type, "requestId": randint(1, 1000)},
}
self.ws.send(json.dumps(payload))
response = json.loads(self.ws.recv())
return response["d"]