9 Commits

Author SHA1 Message Date
5f7b62a0e0 add missing cached_property 2026-03-07 14:29:32 +00:00
d1bcbfed6f minor bump 2026-03-07 14:24:44 +00:00
ab80bbf226 use host kwarg/env var in examples 2026-03-07 14:23:29 +00:00
ad58852a77 improve efficiency with cached properties and struct.unpack 2026-03-07 14:22:25 +00:00
5363584940 improve to_bytes efficiency with struct.pack 2026-03-07 14:20:31 +00:00
9f43ee18d3 add more enums so we can remove some of the constants
rename some of the packet classes

patch bump
2026-03-07 00:03:46 +00:00
3cde874a3c remove unnecessary assignment 2026-03-03 20:03:09 +00:00
3d01321be3 separate ping from pong
this separates concerns and allows the pong_timeout to strictly handle timeouts for pongs.

patch bump
2026-03-03 19:47:15 +00:00
2dd52a7258 move ping timeout logic into decorator
patch bump
2026-03-03 18:21:14 +00:00
13 changed files with 388 additions and 444 deletions

View File

@@ -103,7 +103,7 @@ class App(tk.Tk):
def main():
KIND_ID = 'banana'
conn = {
'ip': os.environ.get('VBANCMD_IP', 'localhost'),
'host': os.environ.get('VBANCMD_HOST', 'localhost'),
'port': int(os.environ.get('VBANCMD_PORT', 6980)),
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
}

View File

@@ -94,7 +94,7 @@ class Observer:
def main():
KIND_ID = 'potato'
conn = {
'ip': os.environ.get('VBANCMD_IP', 'localhost'),
'host': os.environ.get('VBANCMD_HOST', 'localhost'),
'port': int(os.environ.get('VBANCMD_PORT', 6980)),
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
}

View File

@@ -25,7 +25,7 @@ class App:
def main():
KIND_ID = 'banana'
conn = {
'ip': os.environ.get('VBANCMD_IP', 'localhost'),
'host': os.environ.get('VBANCMD_HOST', 'localhost'),
'port': int(os.environ.get('VBANCMD_PORT', 6980)),
'streamname': os.environ.get('VBANCMD_STREAMNAME', 'Command1'),
}

View File

@@ -1,6 +1,6 @@
[project]
name = "vban-cmd"
version = "2.9.4"
version = "2.10.0"
description = "Python interface for the VBAN RT Packet Service (Sendtext)"
authors = [{ name = "Onyx and Iris", email = "code@onyxandiris.online" }]
license = { text = "MIT" }

68
uv.lock generated
View File

@@ -1,68 +0,0 @@
version = 1
revision = 3
requires-python = ">=3.10"
[[package]]
name = "tomli"
version = "2.4.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/82/30/31573e9457673ab10aa432461bee537ce6cef177667deca369efb79df071/tomli-2.4.0.tar.gz", hash = "sha256:aa89c3f6c277dd275d8e243ad24f3b5e701491a860d5121f2cdd399fbb31fc9c", size = 17477, upload-time = "2026-01-11T11:22:38.165Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/3c/d9/3dc2289e1f3b32eb19b9785b6a006b28ee99acb37d1d47f78d4c10e28bf8/tomli-2.4.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:b5ef256a3fd497d4973c11bf142e9ed78b150d36f5773f1ca6088c230ffc5867", size = 153663, upload-time = "2026-01-11T11:21:45.27Z" },
{ url = "https://files.pythonhosted.org/packages/51/32/ef9f6845e6b9ca392cd3f64f9ec185cc6f09f0a2df3db08cbe8809d1d435/tomli-2.4.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:5572e41282d5268eb09a697c89a7bee84fae66511f87533a6f88bd2f7b652da9", size = 148469, upload-time = "2026-01-11T11:21:46.873Z" },
{ url = "https://files.pythonhosted.org/packages/d6/c2/506e44cce89a8b1b1e047d64bd495c22c9f71f21e05f380f1a950dd9c217/tomli-2.4.0-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:551e321c6ba03b55676970b47cb1b73f14a0a4dce6a3e1a9458fd6d921d72e95", size = 236039, upload-time = "2026-01-11T11:21:48.503Z" },
{ url = "https://files.pythonhosted.org/packages/b3/40/e1b65986dbc861b7e986e8ec394598187fa8aee85b1650b01dd925ca0be8/tomli-2.4.0-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5e3f639a7a8f10069d0e15408c0b96a2a828cfdec6fca05296ebcdcc28ca7c76", size = 243007, upload-time = "2026-01-11T11:21:49.456Z" },
{ url = "https://files.pythonhosted.org/packages/9c/6f/6e39ce66b58a5b7ae572a0f4352ff40c71e8573633deda43f6a379d56b3e/tomli-2.4.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:1b168f2731796b045128c45982d3a4874057626da0e2ef1fdd722848b741361d", size = 240875, upload-time = "2026-01-11T11:21:50.755Z" },
{ url = "https://files.pythonhosted.org/packages/aa/ad/cb089cb190487caa80204d503c7fd0f4d443f90b95cf4ef5cf5aa0f439b0/tomli-2.4.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:133e93646ec4300d651839d382d63edff11d8978be23da4cc106f5a18b7d0576", size = 246271, upload-time = "2026-01-11T11:21:51.81Z" },
{ url = "https://files.pythonhosted.org/packages/0b/63/69125220e47fd7a3a27fd0de0c6398c89432fec41bc739823bcc66506af6/tomli-2.4.0-cp311-cp311-win32.whl", hash = "sha256:b6c78bdf37764092d369722d9946cb65b8767bfa4110f902a1b2542d8d173c8a", size = 96770, upload-time = "2026-01-11T11:21:52.647Z" },
{ url = "https://files.pythonhosted.org/packages/1e/0d/a22bb6c83f83386b0008425a6cd1fa1c14b5f3dd4bad05e98cf3dbbf4a64/tomli-2.4.0-cp311-cp311-win_amd64.whl", hash = "sha256:d3d1654e11d724760cdb37a3d7691f0be9db5fbdaef59c9f532aabf87006dbaa", size = 107626, upload-time = "2026-01-11T11:21:53.459Z" },
{ url = "https://files.pythonhosted.org/packages/2f/6d/77be674a3485e75cacbf2ddba2b146911477bd887dda9d8c9dfb2f15e871/tomli-2.4.0-cp311-cp311-win_arm64.whl", hash = "sha256:cae9c19ed12d4e8f3ebf46d1a75090e4c0dc16271c5bce1c833ac168f08fb614", size = 94842, upload-time = "2026-01-11T11:21:54.831Z" },
{ url = "https://files.pythonhosted.org/packages/3c/43/7389a1869f2f26dba52404e1ef13b4784b6b37dac93bac53457e3ff24ca3/tomli-2.4.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:920b1de295e72887bafa3ad9f7a792f811847d57ea6b1215154030cf131f16b1", size = 154894, upload-time = "2026-01-11T11:21:56.07Z" },
{ url = "https://files.pythonhosted.org/packages/e9/05/2f9bf110b5294132b2edf13fe6ca6ae456204f3d749f623307cbb7a946f2/tomli-2.4.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:7d6d9a4aee98fac3eab4952ad1d73aee87359452d1c086b5ceb43ed02ddb16b8", size = 149053, upload-time = "2026-01-11T11:21:57.467Z" },
{ url = "https://files.pythonhosted.org/packages/e8/41/1eda3ca1abc6f6154a8db4d714a4d35c4ad90adc0bcf700657291593fbf3/tomli-2.4.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:36b9d05b51e65b254ea6c2585b59d2c4cb91c8a3d91d0ed0f17591a29aaea54a", size = 243481, upload-time = "2026-01-11T11:21:58.661Z" },
{ url = "https://files.pythonhosted.org/packages/d2/6d/02ff5ab6c8868b41e7d4b987ce2b5f6a51d3335a70aa144edd999e055a01/tomli-2.4.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1c8a885b370751837c029ef9bc014f27d80840e48bac415f3412e6593bbc18c1", size = 251720, upload-time = "2026-01-11T11:22:00.178Z" },
{ url = "https://files.pythonhosted.org/packages/7b/57/0405c59a909c45d5b6f146107c6d997825aa87568b042042f7a9c0afed34/tomli-2.4.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:8768715ffc41f0008abe25d808c20c3d990f42b6e2e58305d5da280ae7d1fa3b", size = 247014, upload-time = "2026-01-11T11:22:01.238Z" },
{ url = "https://files.pythonhosted.org/packages/2c/0e/2e37568edd944b4165735687cbaf2fe3648129e440c26d02223672ee0630/tomli-2.4.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:7b438885858efd5be02a9a133caf5812b8776ee0c969fea02c45e8e3f296ba51", size = 251820, upload-time = "2026-01-11T11:22:02.727Z" },
{ url = "https://files.pythonhosted.org/packages/5a/1c/ee3b707fdac82aeeb92d1a113f803cf6d0f37bdca0849cb489553e1f417a/tomli-2.4.0-cp312-cp312-win32.whl", hash = "sha256:0408e3de5ec77cc7f81960c362543cbbd91ef883e3138e81b729fc3eea5b9729", size = 97712, upload-time = "2026-01-11T11:22:03.777Z" },
{ url = "https://files.pythonhosted.org/packages/69/13/c07a9177d0b3bab7913299b9278845fc6eaaca14a02667c6be0b0a2270c8/tomli-2.4.0-cp312-cp312-win_amd64.whl", hash = "sha256:685306e2cc7da35be4ee914fd34ab801a6acacb061b6a7abca922aaf9ad368da", size = 108296, upload-time = "2026-01-11T11:22:04.86Z" },
{ url = "https://files.pythonhosted.org/packages/18/27/e267a60bbeeee343bcc279bb9e8fbed0cbe224bc7b2a3dc2975f22809a09/tomli-2.4.0-cp312-cp312-win_arm64.whl", hash = "sha256:5aa48d7c2356055feef06a43611fc401a07337d5b006be13a30f6c58f869e3c3", size = 94553, upload-time = "2026-01-11T11:22:05.854Z" },
{ url = "https://files.pythonhosted.org/packages/34/91/7f65f9809f2936e1f4ce6268ae1903074563603b2a2bd969ebbda802744f/tomli-2.4.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:84d081fbc252d1b6a982e1870660e7330fb8f90f676f6e78b052ad4e64714bf0", size = 154915, upload-time = "2026-01-11T11:22:06.703Z" },
{ url = "https://files.pythonhosted.org/packages/20/aa/64dd73a5a849c2e8f216b755599c511badde80e91e9bc2271baa7b2cdbb1/tomli-2.4.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:9a08144fa4cba33db5255f9b74f0b89888622109bd2776148f2597447f92a94e", size = 149038, upload-time = "2026-01-11T11:22:07.56Z" },
{ url = "https://files.pythonhosted.org/packages/9e/8a/6d38870bd3d52c8d1505ce054469a73f73a0fe62c0eaf5dddf61447e32fa/tomli-2.4.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:c73add4bb52a206fd0c0723432db123c0c75c280cbd67174dd9d2db228ebb1b4", size = 242245, upload-time = "2026-01-11T11:22:08.344Z" },
{ url = "https://files.pythonhosted.org/packages/59/bb/8002fadefb64ab2669e5b977df3f5e444febea60e717e755b38bb7c41029/tomli-2.4.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1fb2945cbe303b1419e2706e711b7113da57b7db31ee378d08712d678a34e51e", size = 250335, upload-time = "2026-01-11T11:22:09.951Z" },
{ url = "https://files.pythonhosted.org/packages/a5/3d/4cdb6f791682b2ea916af2de96121b3cb1284d7c203d97d92d6003e91c8d/tomli-2.4.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:bbb1b10aa643d973366dc2cb1ad94f99c1726a02343d43cbc011edbfac579e7c", size = 245962, upload-time = "2026-01-11T11:22:11.27Z" },
{ url = "https://files.pythonhosted.org/packages/f2/4a/5f25789f9a460bd858ba9756ff52d0830d825b458e13f754952dd15fb7bb/tomli-2.4.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:4cbcb367d44a1f0c2be408758b43e1ffb5308abe0ea222897d6bfc8e8281ef2f", size = 250396, upload-time = "2026-01-11T11:22:12.325Z" },
{ url = "https://files.pythonhosted.org/packages/aa/2f/b73a36fea58dfa08e8b3a268750e6853a6aac2a349241a905ebd86f3047a/tomli-2.4.0-cp313-cp313-win32.whl", hash = "sha256:7d49c66a7d5e56ac959cb6fc583aff0651094ec071ba9ad43df785abc2320d86", size = 97530, upload-time = "2026-01-11T11:22:13.865Z" },
{ url = "https://files.pythonhosted.org/packages/3b/af/ca18c134b5d75de7e8dc551c5234eaba2e8e951f6b30139599b53de9c187/tomli-2.4.0-cp313-cp313-win_amd64.whl", hash = "sha256:3cf226acb51d8f1c394c1b310e0e0e61fecdd7adcb78d01e294ac297dd2e7f87", size = 108227, upload-time = "2026-01-11T11:22:15.224Z" },
{ url = "https://files.pythonhosted.org/packages/22/c3/b386b832f209fee8073c8138ec50f27b4460db2fdae9ffe022df89a57f9b/tomli-2.4.0-cp313-cp313-win_arm64.whl", hash = "sha256:d20b797a5c1ad80c516e41bc1fb0443ddb5006e9aaa7bda2d71978346aeb9132", size = 94748, upload-time = "2026-01-11T11:22:16.009Z" },
{ url = "https://files.pythonhosted.org/packages/f3/c4/84047a97eb1004418bc10bdbcfebda209fca6338002eba2dc27cc6d13563/tomli-2.4.0-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:26ab906a1eb794cd4e103691daa23d95c6919cc2fa9160000ac02370cc9dd3f6", size = 154725, upload-time = "2026-01-11T11:22:17.269Z" },
{ url = "https://files.pythonhosted.org/packages/a8/5d/d39038e646060b9d76274078cddf146ced86dc2b9e8bbf737ad5983609a0/tomli-2.4.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:20cedb4ee43278bc4f2fee6cb50daec836959aadaf948db5172e776dd3d993fc", size = 148901, upload-time = "2026-01-11T11:22:18.287Z" },
{ url = "https://files.pythonhosted.org/packages/73/e5/383be1724cb30f4ce44983d249645684a48c435e1cd4f8b5cded8a816d3c/tomli-2.4.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:39b0b5d1b6dd03684b3fb276407ebed7090bbec989fa55838c98560c01113b66", size = 243375, upload-time = "2026-01-11T11:22:19.154Z" },
{ url = "https://files.pythonhosted.org/packages/31/f0/bea80c17971c8d16d3cc109dc3585b0f2ce1036b5f4a8a183789023574f2/tomli-2.4.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a26d7ff68dfdb9f87a016ecfd1e1c2bacbe3108f4e0f8bcd2228ef9a766c787d", size = 250639, upload-time = "2026-01-11T11:22:20.168Z" },
{ url = "https://files.pythonhosted.org/packages/2c/8f/2853c36abbb7608e3f945d8a74e32ed3a74ee3a1f468f1ffc7d1cb3abba6/tomli-2.4.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:20ffd184fb1df76a66e34bd1b36b4a4641bd2b82954befa32fe8163e79f1a702", size = 246897, upload-time = "2026-01-11T11:22:21.544Z" },
{ url = "https://files.pythonhosted.org/packages/49/f0/6c05e3196ed5337b9fe7ea003e95fd3819a840b7a0f2bf5a408ef1dad8ed/tomli-2.4.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:75c2f8bbddf170e8effc98f5e9084a8751f8174ea6ccf4fca5398436e0320bc8", size = 254697, upload-time = "2026-01-11T11:22:23.058Z" },
{ url = "https://files.pythonhosted.org/packages/f3/f5/2922ef29c9f2951883525def7429967fc4d8208494e5ab524234f06b688b/tomli-2.4.0-cp314-cp314-win32.whl", hash = "sha256:31d556d079d72db7c584c0627ff3a24c5d3fb4f730221d3444f3efb1b2514776", size = 98567, upload-time = "2026-01-11T11:22:24.033Z" },
{ url = "https://files.pythonhosted.org/packages/7b/31/22b52e2e06dd2a5fdbc3ee73226d763b184ff21fc24e20316a44ccc4d96b/tomli-2.4.0-cp314-cp314-win_amd64.whl", hash = "sha256:43e685b9b2341681907759cf3a04e14d7104b3580f808cfde1dfdb60ada85475", size = 108556, upload-time = "2026-01-11T11:22:25.378Z" },
{ url = "https://files.pythonhosted.org/packages/48/3d/5058dff3255a3d01b705413f64f4306a141a8fd7a251e5a495e3f192a998/tomli-2.4.0-cp314-cp314-win_arm64.whl", hash = "sha256:3d895d56bd3f82ddd6faaff993c275efc2ff38e52322ea264122d72729dca2b2", size = 96014, upload-time = "2026-01-11T11:22:26.138Z" },
{ url = "https://files.pythonhosted.org/packages/b8/4e/75dab8586e268424202d3a1997ef6014919c941b50642a1682df43204c22/tomli-2.4.0-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:5b5807f3999fb66776dbce568cc9a828544244a8eb84b84b9bafc080c99597b9", size = 163339, upload-time = "2026-01-11T11:22:27.143Z" },
{ url = "https://files.pythonhosted.org/packages/06/e3/b904d9ab1016829a776d97f163f183a48be6a4deb87304d1e0116a349519/tomli-2.4.0-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:c084ad935abe686bd9c898e62a02a19abfc9760b5a79bc29644463eaf2840cb0", size = 159490, upload-time = "2026-01-11T11:22:28.399Z" },
{ url = "https://files.pythonhosted.org/packages/e3/5a/fc3622c8b1ad823e8ea98a35e3c632ee316d48f66f80f9708ceb4f2a0322/tomli-2.4.0-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0f2e3955efea4d1cfbcb87bc321e00dc08d2bcb737fd1d5e398af111d86db5df", size = 269398, upload-time = "2026-01-11T11:22:29.345Z" },
{ url = "https://files.pythonhosted.org/packages/fd/33/62bd6152c8bdd4c305ad9faca48f51d3acb2df1f8791b1477d46ff86e7f8/tomli-2.4.0-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:0e0fe8a0b8312acf3a88077a0802565cb09ee34107813bba1c7cd591fa6cfc8d", size = 276515, upload-time = "2026-01-11T11:22:30.327Z" },
{ url = "https://files.pythonhosted.org/packages/4b/ff/ae53619499f5235ee4211e62a8d7982ba9e439a0fb4f2f351a93d67c1dd2/tomli-2.4.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:413540dce94673591859c4c6f794dfeaa845e98bf35d72ed59636f869ef9f86f", size = 273806, upload-time = "2026-01-11T11:22:32.56Z" },
{ url = "https://files.pythonhosted.org/packages/47/71/cbca7787fa68d4d0a9f7072821980b39fbb1b6faeb5f5cf02f4a5559fa28/tomli-2.4.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:0dc56fef0e2c1c470aeac5b6ca8cc7b640bb93e92d9803ddaf9ea03e198f5b0b", size = 281340, upload-time = "2026-01-11T11:22:33.505Z" },
{ url = "https://files.pythonhosted.org/packages/f5/00/d595c120963ad42474cf6ee7771ad0d0e8a49d0f01e29576ee9195d9ecdf/tomli-2.4.0-cp314-cp314t-win32.whl", hash = "sha256:d878f2a6707cc9d53a1be1414bbb419e629c3d6e67f69230217bb663e76b5087", size = 108106, upload-time = "2026-01-11T11:22:34.451Z" },
{ url = "https://files.pythonhosted.org/packages/de/69/9aa0c6a505c2f80e519b43764f8b4ba93b5a0bbd2d9a9de6e2b24271b9a5/tomli-2.4.0-cp314-cp314t-win_amd64.whl", hash = "sha256:2add28aacc7425117ff6364fe9e06a183bb0251b03f986df0e78e974047571fd", size = 120504, upload-time = "2026-01-11T11:22:35.764Z" },
{ url = "https://files.pythonhosted.org/packages/b3/9f/f1668c281c58cfae01482f7114a4b88d345e4c140386241a1a24dcc9e7bc/tomli-2.4.0-cp314-cp314t-win_arm64.whl", hash = "sha256:2b1e3b80e1d5e52e40e9b924ec43d81570f0e7d09d11081b797bc4692765a3d4", size = 99561, upload-time = "2026-01-11T11:22:36.624Z" },
{ url = "https://files.pythonhosted.org/packages/23/d1/136eb2cb77520a31e1f64cbae9d33ec6df0d78bdf4160398e86eec8a8754/tomli-2.4.0-py3-none-any.whl", hash = "sha256:1f776e7d669ebceb01dee46484485f43a4048746235e683bcdffacdf1fb4785a", size = 14477, upload-time = "2026-01-11T11:22:37.446Z" },
]
[[package]]
name = "vban-cmd"
version = "2.9.1"
source = { editable = "." }
dependencies = [
{ name = "tomli", marker = "python_full_version < '3.11'" },
]
[package.metadata]
requires-dist = [{ name = "tomli", marker = "python_full_version < '3.11'", specifier = ">=2.0.1,<3.0" }]

View File

@@ -1,6 +1,36 @@
from enum import Flag
class SubProtocols(Flag):
"""Sub Protocols - Bit flags that can be combined"""
AUDIO = 0x00
SERIAL = 0x20
TEXT = 0x40
SERVICE = 0x60
MASK = 0xE0
class ServiceTypes(Flag):
"""Service Types - Bit flags that can be combined"""
PING = 0
PONG = 0
CHATUTF8 = 1
RTPACKETREGISTER = 32
RTPACKET = 33
REQUESTREPLY = 0x02 # A Matrix reply
FNCT_REPLY = 0x80 # An RTPacket reply
class StreamTypes(Flag):
"""Stream Types - Bit flags that can be combined"""
ASCII = 0x00
UTF8 = 0x10
WCHAR = 0x20
class ChannelModes(Flag):
"""Channel Modes - Bit flags that can be combined"""

View File

@@ -1,29 +1,106 @@
import struct
from dataclasses import dataclass
from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass
VBAN_PROTOCOL_TXT = 0x40
VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
VBAN_SERVICE_PING = 0
VBAN_SERVICE_PONG = 0 # PONG uses same service type as PING
VBAN_SERVICE_MASK = 0xE0
VBAN_PROTOCOL_MASK = 0xE0
VBAN_SERVICE_REQUESTREPLY = 0x02
VBAN_SERVICE_FNCT_REPLY = 0x02
from .enums import ServiceTypes, StreamTypes, SubProtocols
PINGPONG_PACKET_SIZE = 704 # Size of the PING/PONG header + payload in bytes
MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
@dataclass
class VbanPacket:
"""Represents the header of an incoming VBAN data packet"""
class VbanPingHeader:
"""Represents the header of a PING packet"""
name: str = 'PING0'
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0
format_nbc: int = ServiceTypes.PING.value
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def to_bytes(cls, framecounter: int = 0) -> bytes:
"""Creates the PING header bytes only."""
header = cls(framecounter=framecounter)
return struct.pack(
'<4s4B16sI',
header.vban,
header.format_sr,
header.format_nbs,
header.format_nbc,
header.format_bit,
header.streamname,
header.framecounter,
)
@dataclass
class VbanPongHeader:
"""Represents the header of a PONG response packet"""
name: str = 'PING0'
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0
format_nbc: int = ServiceTypes.PONG.value
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a PONG response packet from bytes."""
parsed = _parse_vban_service_header(data)
# PONG responses use the same service type as PING (0x00)
# and are identified by having payload data
if parsed['format_nbc'] != ServiceTypes.PONG.value:
raise ValueError(f'Not a PONG response packet: {parsed["format_nbc"]:02x}')
return cls(**parsed)
@classmethod
def is_pong_response(cls, data: bytes) -> bool:
"""Check if packet is a PONG response by analyzing the actual response format."""
try:
parsed = _parse_vban_service_header(data)
# Validate this is a service protocol packet with PING/PONG service type
if parsed['format_nbc'] != ServiceTypes.PONG.value:
return False
if parsed['name'] not in ['PING0', 'VBAN Service']:
return False
# PONG should have payload data (same size as PING)
return len(data) >= PINGPONG_PACKET_SIZE
except (ValueError, Exception):
return False
@dataclass
class VbanRTPacket:
"""Represents the header of an incoming RTPacket"""
nbs: NBS
_kind: KindMapClass
@@ -53,8 +130,8 @@ class VbanPacket:
@dataclass
class VbanSubscribeHeader:
"""Represents the header of a subscription packet"""
class VbanRTSubscribeHeader:
"""Represents the header of an RT subscription packet"""
nbs: NBS = NBS.zero
name: str = 'Register-RTP'
@@ -66,7 +143,7 @@ class VbanSubscribeHeader:
@property
def format_sr(self) -> bytes:
return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little')
return SubProtocols.SERVICE.value.to_bytes(1, 'little')
@property
def format_nbs(self) -> bytes:
@@ -74,7 +151,7 @@ class VbanSubscribeHeader:
@property
def format_nbc(self) -> bytes:
return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little')
return ServiceTypes.RTPACKETREGISTER.value.to_bytes(1, 'little')
@property
def format_bit(self) -> bytes:
@@ -88,15 +165,76 @@ class VbanSubscribeHeader:
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
header = cls(nbs=nbs)
data = bytearray()
data.extend(header.vban)
data.extend(header.format_sr)
data.extend(header.format_nbs)
data.extend(header.format_nbc)
data.extend(header.format_bit)
data.extend(header.streamname)
data.extend(framecounter.to_bytes(4, 'little'))
return bytes(data)
return struct.pack(
'<4s4B16sI',
header.vban,
header.format_sr[0],
header.format_nbs[0],
header.format_nbc[0],
header.format_bit[0],
header.streamname,
framecounter,
)
@dataclass
class VbanRTRequestHeader:
"""Represents the header of an RT request packet"""
name: str
bps_index: int
channel: int
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def sr(self) -> bytes:
return (self.bps_index | SubProtocols.TEXT.value).to_bytes(1, 'little')
@property
def nbs(self) -> bytes:
return (0).to_bytes(1, 'little')
@property
def nbc(self) -> bytes:
return (self.channel).to_bytes(1, 'little')
@property
def bit(self) -> bytes:
return (StreamTypes.UTF8.value).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode()[:16].ljust(16, b'\x00')
@classmethod
def to_bytes(
cls, name: str, bps_index: int, channel: int, framecounter: int
) -> bytes:
header = cls(
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
)
return struct.pack(
'<4s4B16sI',
header.vban,
header.sr[0],
header.nbs[0],
header.nbc[0],
header.bit[0],
header.streamname,
header.framecounter,
)
@classmethod
def encode_with_payload(
cls, name: str, bps_index: int, channel: int, framecounter: int, payload: str
) -> bytes:
"""Creates the complete packet with header and payload."""
return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode()
def _parse_vban_service_header(data: bytes) -> dict:
@@ -113,8 +251,8 @@ def _parse_vban_service_header(data: bytes) -> dict:
format_bit = data[7]
# Verify this is a service protocol packet
protocol = format_sr & VBAN_PROTOCOL_MASK
if protocol != VBAN_PROTOCOL_SERVICE:
protocol = format_sr & SubProtocols.MASK.value
if protocol != SubProtocols.SERVICE.value:
raise ValueError(f'Not a service protocol packet: {protocol:02x}')
# Extract stream name and frame counter
@@ -132,13 +270,13 @@ def _parse_vban_service_header(data: bytes) -> dict:
@dataclass
class VbanResponseHeader:
"""Represents the header of a response packet"""
class VbanRTResponseHeader:
"""Represents the header of an RT response packet"""
name: str = 'Voicemeeter-RTP'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_RTPACKET
format_nbc: int = ServiceTypes.RTPACKET.value
format_bit: int = 0
framecounter: int = 0
@@ -156,7 +294,7 @@ class VbanResponseHeader:
parsed = _parse_vban_service_header(data)
# Validate this is an RTPacket response
if parsed['format_nbc'] != VBAN_SERVICE_RTPACKET:
if parsed['format_nbc'] != ServiceTypes.RTPACKET.value:
raise ValueError(
f'Not an RTPacket response packet: {parsed["format_nbc"]:02x}'
)
@@ -169,9 +307,9 @@ class VbanMatrixResponseHeader:
"""Represents the header of a matrix response packet"""
name: str = 'Request Reply'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = VBAN_SERVICE_FNCT_REPLY
format_nbc: int = VBAN_SERVICE_REQUESTREPLY
format_sr: int = SubProtocols.SERVICE.value
format_nbs: int = ServiceTypes.FNCT_REPLY.value
format_nbc: int = ServiceTypes.REQUESTREPLY.value
format_bit: int = 0
framecounter: int = 0
@@ -189,7 +327,7 @@ class VbanMatrixResponseHeader:
parsed = _parse_vban_service_header(data)
# Validate this is a service reply packet
if parsed['format_nbs'] != VBAN_SERVICE_FNCT_REPLY:
if parsed['format_nbs'] != ServiceTypes.FNCT_REPLY.value:
raise ValueError(f'Not a service reply packet: {parsed["format_nbs"]:02x}')
return cls(**parsed)
@@ -209,148 +347,3 @@ class VbanMatrixResponseHeader:
header = cls.from_bytes(data)
payload = cls.extract_payload(data)
return header, payload
@dataclass
class VbanPingHeader:
"""Represents the header of a PING packet"""
name: str = 'PING0'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_PING
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def to_bytes(cls, framecounter: int = 0) -> bytes:
"""Creates the PING header bytes only."""
header = cls(framecounter=framecounter)
data = bytearray()
data.extend(header.vban)
data.extend(header.format_sr.to_bytes(1, 'little'))
data.extend(header.format_nbs.to_bytes(1, 'little'))
data.extend(header.format_nbc.to_bytes(1, 'little'))
data.extend(header.format_bit.to_bytes(1, 'little'))
data.extend(header.streamname)
data.extend(header.framecounter.to_bytes(4, 'little'))
return bytes(data)
@dataclass
class VbanPongHeader:
"""Represents the header of a PONG response packet"""
name: str = 'PING0'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_PONG
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a PONG response packet from bytes."""
parsed = _parse_vban_service_header(data)
# PONG responses use the same service type as PING (0x00)
# and are identified by having payload data
if parsed['format_nbc'] != VBAN_SERVICE_PONG:
raise ValueError(f'Not a PONG response packet: {parsed["format_nbc"]:02x}')
return cls(**parsed)
@classmethod
def is_pong_response(cls, data: bytes) -> bool:
"""Check if packet is a PONG response by analyzing the actual response format."""
try:
parsed = _parse_vban_service_header(data)
# Validate this is a service protocol packet with PING/PONG service type
if parsed['format_nbc'] != VBAN_SERVICE_PONG:
return False
if parsed['name'] not in ['PING0', 'VBAN Service']:
return False
# PONG should have payload data (same size as PING)
return len(data) >= PINGPONG_PACKET_SIZE
except (ValueError, Exception):
return False
@dataclass
class VbanRequestHeader:
"""Represents the header of a request packet"""
name: str
bps_index: int
channel: int
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def sr(self) -> bytes:
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little')
@property
def nbs(self) -> bytes:
return (0).to_bytes(1, 'little')
@property
def nbc(self) -> bytes:
return (self.channel).to_bytes(1, 'little')
@property
def bit(self) -> bytes:
return (0x10).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode()[:16].ljust(16, b'\x00')
@classmethod
def to_bytes(
cls, name: str, bps_index: int, channel: int, framecounter: int
) -> bytes:
header = cls(
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
)
data = bytearray()
data.extend(header.vban)
data.extend(header.sr)
data.extend(header.nbs)
data.extend(header.nbc)
data.extend(header.bit)
data.extend(header.streamname)
data.extend(header.framecounter.to_bytes(4, 'little'))
return bytes(data)
@classmethod
def encode_with_payload(
cls, name: str, bps_index: int, channel: int, framecounter: int, payload: str
) -> bytes:
"""Creates the complete packet with header and payload."""
return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode()

View File

@@ -1,4 +1,6 @@
import struct
from dataclasses import dataclass
from functools import cached_property
from typing import NamedTuple
from vban_cmd.enums import NBS
@@ -6,7 +8,7 @@ from vban_cmd.kinds import KindMapClass
from vban_cmd.util import comp
from .enums import ChannelModes
from .headers import VbanPacket
from .headers import VbanRTPacket
class Levels(NamedTuple):
@@ -21,6 +23,13 @@ class ChannelState:
# Convert 4-byte state to integer once for efficient lookups
self._state = int.from_bytes(state_bytes, 'little')
@classmethod
def from_int(cls, state_int: int):
"""Create ChannelState directly from integer for efficiency"""
instance = cls.__new__(cls)
instance._state = state_int
return instance
def get_mode(self, mode_value: int) -> bool:
"""Get boolean state for a specific mode"""
return (self._state & mode_value) != 0
@@ -96,8 +105,8 @@ class Labels(NamedTuple):
@dataclass
class VbanPacketNBS0(VbanPacket):
"""Represents the body of a VBAN data packet with ident:0"""
class VbanRTPacketNBS0(VbanRTPacket):
"""Represents the body of a VBAN RTPacket with ident:0"""
_inputLeveldB100: bytes
_outputLeveldB100: bytes
@@ -147,32 +156,17 @@ class VbanPacketNBS0(VbanPacket):
def pdirty(self, other) -> bool:
"""True iff any defined parameter has changed"""
self_gains = (
self._stripGaindB100Layer1
+ self._stripGaindB100Layer2
+ self._stripGaindB100Layer3
+ self._stripGaindB100Layer4
+ self._stripGaindB100Layer5
+ self._stripGaindB100Layer6
+ self._stripGaindB100Layer7
+ self._stripGaindB100Layer8
)
other_gains = (
other._stripGaindB100Layer1
+ other._stripGaindB100Layer2
+ other._stripGaindB100Layer3
+ other._stripGaindB100Layer4
+ other._stripGaindB100Layer5
+ other._stripGaindB100Layer6
+ other._stripGaindB100Layer7
+ other._stripGaindB100Layer8
)
return (
self._stripState != other._stripState
or self._busState != other._busState
or self_gains != other_gains
or self._stripGaindB100Layer1 != other._stripGaindB100Layer1
or self._stripGaindB100Layer2 != other._stripGaindB100Layer2
or self._stripGaindB100Layer3 != other._stripGaindB100Layer3
or self._stripGaindB100Layer4 != other._stripGaindB100Layer4
or self._stripGaindB100Layer5 != other._stripGaindB100Layer5
or self._stripGaindB100Layer6 != other._stripGaindB100Layer6
or self._stripGaindB100Layer7 != other._stripGaindB100Layer7
or self._stripGaindB100Layer8 != other._stripGaindB100Layer8
or self._busGaindB100 != other._busGaindB100
or self._stripLabelUTF8c60 != other._stripLabelUTF8c60
or self._busLabelUTF8c60 != other._busLabelUTF8c60
@@ -186,77 +180,54 @@ class VbanPacketNBS0(VbanPacket):
)
return any(self._strip_comp) or any(self._bus_comp)
@property
@cached_property
def strip_levels(self) -> tuple[float, ...]:
"""Returns strip levels in dB"""
return tuple(
round(
int.from_bytes(self._inputLeveldB100[i : i + 2], 'little', signed=True)
* 0.01,
1,
)
for i in range(0, len(self._inputLeveldB100), 2)
)[: self._kind.num_strip_levels]
strip_raw = struct.unpack('<34h', self._inputLeveldB100)
return tuple(round(val * 0.01, 1) for val in strip_raw)[
: self._kind.num_strip_levels
]
@property
@cached_property
def bus_levels(self) -> tuple[float, ...]:
"""Returns bus levels in dB"""
return tuple(
round(
int.from_bytes(self._outputLeveldB100[i : i + 2], 'little', signed=True)
* 0.01,
1,
)
for i in range(0, len(self._outputLeveldB100), 2)
)[: self._kind.num_bus_levels]
bus_raw = struct.unpack('<64h', self._outputLeveldB100)
return tuple(round(val * 0.01, 1) for val in bus_raw)[
: self._kind.num_bus_levels
]
@property
def levels(self) -> Levels:
"""Returns strip and bus levels as a namedtuple"""
return Levels(strip=self.strip_levels, bus=self.bus_levels)
@property
@cached_property
def states(self) -> States:
"""returns States object with processed strip and bus channel states"""
strip_states = struct.unpack('<8I', self._stripState)
bus_states = struct.unpack('<8I', self._busState)
return States(
strip=tuple(
ChannelState(self._stripState[i : i + 4]) for i in range(0, 32, 4)
),
bus=tuple(ChannelState(self._busState[i : i + 4]) for i in range(0, 32, 4)),
strip=tuple(ChannelState.from_int(state) for state in strip_states),
bus=tuple(ChannelState.from_int(state) for state in bus_states),
)
@property
@cached_property
def gainlayers(self) -> tuple:
"""returns tuple of all strip gain layers as tuples"""
return tuple(
tuple(
round(
int.from_bytes(
getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2],
'little',
signed=True,
)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
for layer in range(1, 9)
)
layer_data = []
for layer in range(1, 9):
layer_bytes = getattr(self, f'_stripGaindB100Layer{layer}')
layer_raw = struct.unpack('<8h', layer_bytes)
layer_data.append(tuple(round(val * 0.01, 2) for val in layer_raw))
return tuple(layer_data)
@property
@cached_property
def busgain(self) -> tuple:
"""returns tuple of bus gains"""
return tuple(
round(
int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
bus_gain_raw = struct.unpack('<8h', self._busGaindB100)
return tuple(round(val * 0.01, 2) for val in bus_gain_raw)
@property
@cached_property
def labels(self) -> Labels:
"""returns Labels namedtuple of strip and bus labels"""

View File

@@ -1,11 +1,12 @@
import struct
from dataclasses import dataclass
from functools import cached_property
from typing import NamedTuple
from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass
from .headers import VbanPacket
from .headers import VbanRTPacket
VMPARAMSTRIP_SIZE = 174
@@ -193,11 +194,15 @@ class VbanVMParamStrip:
_Pitch_formant_high=data[172:174],
)
@property
@cached_property
def mode(self) -> int:
return int.from_bytes(self._mode, 'little')
@property
@cached_property
def karaoke(self) -> int:
return int.from_bytes(self._nKaraoke, 'little')
@cached_property
def audibility(self) -> Audibility:
return Audibility(
round(int.from_bytes(self._audibility, 'little', signed=True) * 0.01, 2),
@@ -206,7 +211,7 @@ class VbanVMParamStrip:
round(int.from_bytes(self._audibility_d, 'little', signed=True) * 0.01, 2),
)
@property
@cached_property
def positions(self) -> Positions:
return Positions(
round(int.from_bytes(self._pos3D_x, 'little', signed=True) * 0.01, 2),
@@ -217,7 +222,7 @@ class VbanVMParamStrip:
round(int.from_bytes(self._posMod_y, 'little', signed=True) * 0.01, 2),
)
@property
@cached_property
def eqgains(self) -> EqGains:
return EqGains(
*[
@@ -230,7 +235,7 @@ class VbanVMParamStrip:
]
)
@property
@cached_property
def parametric_eq(self) -> tuple[ParametricEQSettings, ...]:
return tuple(
ParametricEQSettings(
@@ -243,7 +248,7 @@ class VbanVMParamStrip:
for i in range(6)
)
@property
@cached_property
def sends(self) -> Sends:
return Sends(
round(int.from_bytes(self._send_reverb, 'little', signed=True) * 0.01, 2),
@@ -252,11 +257,7 @@ class VbanVMParamStrip:
round(int.from_bytes(self._send_fx2, 'little', signed=True) * 0.01, 2),
)
@property
def karaoke(self) -> int:
return int.from_bytes(self._nKaraoke, 'little')
@property
@cached_property
def compressor(self) -> CompressorSettings:
return CompressorSettings(
gain_in=round(
@@ -276,7 +277,7 @@ class VbanVMParamStrip:
),
)
@property
@cached_property
def gate(self) -> GateSettings:
return GateSettings(
threshold_in=round(
@@ -295,7 +296,7 @@ class VbanVMParamStrip:
release_ms=round(int.from_bytes(self._GATE_release_ms, 'little') * 0.1, 2),
)
@property
@cached_property
def denoiser(self) -> DenoiserSettings:
return DenoiserSettings(
threshold=round(
@@ -303,7 +304,7 @@ class VbanVMParamStrip:
)
)
@property
@cached_property
def pitch(self) -> PitchSettings:
return PitchSettings(
enabled=bool(int.from_bytes(self._PitchEnabled, 'little')),
@@ -327,8 +328,8 @@ class VbanVMParamStrip:
@dataclass
class VbanPacketNBS1(VbanPacket):
"""Represents the body of a VBAN data packet with ident:1"""
class VbanRTPacketNBS1(VbanRTPacket):
"""Represents the body of a VBAN RTPacket with ident:1"""
strips: tuple[VbanVMParamStrip, ...]

View File

@@ -1,3 +1,4 @@
import struct
from dataclasses import dataclass
from enum import Enum
@@ -65,30 +66,31 @@ class VbanPing0Payload:
"""Convert payload to bytes"""
payload = cls()
data = bytearray()
data.extend(payload.bit_type.to_bytes(4, 'little'))
data.extend(payload.bit_feature.to_bytes(4, 'little'))
data.extend(payload.bit_feature_ex.to_bytes(4, 'little'))
data.extend(payload.preferred_rate.to_bytes(4, 'little'))
data.extend(payload.min_rate.to_bytes(4, 'little'))
data.extend(payload.max_rate.to_bytes(4, 'little'))
data.extend(payload.color_rgb.to_bytes(4, 'little'))
data.extend(payload.version)
data.extend(payload.gps_position)
data.extend(payload.user_position)
data.extend(payload.lang_code)
data.extend(payload.reserved)
data.extend(payload.reserved_ex)
data.extend(payload.distant_ip)
data.extend(payload.distant_port.to_bytes(2, 'little'))
data.extend(payload.distant_reserved.to_bytes(2, 'little'))
data.extend(payload.device_name)
data.extend(payload.manufacturer_name)
data.extend(payload.application_name)
data.extend(payload.host_name)
data.extend(payload.user_name)
data.extend(payload.user_comment)
return bytes(data)
return struct.pack(
'<7I4s8s8s8s8s64s32s2H64s64s64s64s128s128s',
payload.bit_type,
payload.bit_feature,
payload.bit_feature_ex,
payload.preferred_rate,
payload.min_rate,
payload.max_rate,
payload.color_rgb,
payload.version,
payload.gps_position,
payload.user_position,
payload.lang_code,
payload.reserved,
payload.reserved_ex,
payload.distant_ip,
payload.distant_port,
payload.distant_reserved,
payload.device_name,
payload.manufacturer_name,
payload.application_name,
payload.host_name,
payload.user_name,
payload.user_comment,
)
@classmethod
def create_packet(cls, framecounter: int) -> bytes:

View File

@@ -1,6 +1,9 @@
import socket
import time
from typing import Iterator
from .error import VBANCMDConnectionError
def ratelimit(func):
"""ratelimit decorator for {VbanCmd}.sendtext, to prevent flooding the network with script requests."""
@@ -18,6 +21,43 @@ def ratelimit(func):
return wrapper
def pong_timeout(func):
"""pong_timeout decorator for {VbanCmd}._handle_pong, to handle timeout logic and socket management."""
def wrapper(self, timeout: float = None):
if timeout is None:
timeout = min(self.timeout, 3.0)
original_timeout = self.sock.gettimeout()
self.sock.settimeout(0.5)
try:
start_time = time.time()
response_count = 0
while time.time() - start_time < timeout:
try:
response_count += 1
if func(self):
return
except socket.timeout:
continue
self.logger.debug(
f'PING timeout after {timeout}s, received {response_count} non-PONG packets'
)
raise VBANCMDConnectionError(
f'PING timeout: No response from {self.host}:{self.port} after {timeout}s'
)
finally:
self.sock.settimeout(original_timeout)
return wrapper
def cache_bool(func, param):
"""Check cache for a bool prop"""

View File

@@ -13,11 +13,11 @@ from .event import Event
from .packet.headers import (
VbanMatrixResponseHeader,
VbanPongHeader,
VbanRequestHeader,
VbanRTRequestHeader,
)
from .packet.ping0 import VbanPing0Payload, VbanServerType
from .subject import Subject
from .util import bump_framecounter, deep_merge, ratelimit
from .util import bump_framecounter, deep_merge, pong_timeout, ratelimit
from .worker import Producer, Subscriber, Updater
logger = logging.getLogger(__name__)
@@ -59,7 +59,6 @@ class VbanCmd(abc.ABC):
@abc.abstractmethod
def __str__(self):
"""Ensure subclasses override str magic method"""
pass
def _conn_from_toml(self) -> dict:
try:
@@ -97,6 +96,7 @@ class VbanCmd(abc.ABC):
If the server is detected as Matrix, RT listeners will be disabled for compatibility.
"""
self._ping()
self._handle_pong()
if not self.disable_rt_listeners:
self.event.info()
@@ -138,72 +138,48 @@ class VbanCmd(abc.ABC):
self._framecounter = bump_framecounter(self._framecounter)
return current
def _ping(self, timeout: float = None) -> None:
"""Send a PING packet and wait for PONG response to verify connectivity."""
if timeout is None:
timeout = min(self.timeout, 3.0)
ping_packet = VbanPing0Payload.create_packet(self._get_next_framecounter())
original_timeout = self.sock.gettimeout()
self.sock.settimeout(0.5)
def _ping(self):
"""Initiates the PING/PONG handshake with the VBAN server."""
try:
self.sock.sendto(ping_packet, (socket.gethostbyname(self.host), self.port))
self.sock.sendto(
VbanPing0Payload.create_packet(self._get_next_framecounter()),
(socket.gethostbyname(self.host), self.port),
)
self.logger.debug(f'PING sent to {self.host}:{self.port}')
start_time = time.time()
response_count = 0
while time.time() - start_time < timeout:
try:
data, addr = self.sock.recvfrom(2048)
response_count += 1
self.logger.debug(
f'Received packet #{response_count} from {addr}: {len(data)} bytes'
)
self.logger.debug(
f'Response header: {data[: min(32, len(data))].hex()}'
)
if VbanPongHeader.is_pong_response(data):
self.logger.debug(
f'PONG received from {addr}, connectivity confirmed'
)
server_type = VbanPing0Payload.detect_server_type(data)
self._handle_server_type(server_type)
return # Exit after successful PONG response
else:
if len(data) >= 8:
if data[:4] == b'VBAN':
protocol = data[4] & 0xE0
nbc = data[6]
self.logger.debug(
f'Non-PONG VBAN packet: protocol=0x{protocol:02x}, nbc=0x{nbc:02x}'
)
else:
self.logger.debug('Non-VBAN packet received')
except socket.timeout:
continue
self.logger.debug(
f'PING timeout after {timeout}s, received {response_count} non-PONG packets'
)
raise VBANCMDConnectionError(
f'PING timeout: No response from {self.host}:{self.port} after {timeout}s'
)
except socket.gaierror as e:
raise VBANCMDConnectionError(
f'Unable to resolve hostname {self.host}'
) from e
except Exception as e:
raise VBANCMDConnectionError(f'PING failed: {e}') from e
finally:
self.sock.settimeout(original_timeout)
@pong_timeout
def _handle_pong(self) -> bool:
"""Handles incoming packets during the PING/PONG handshake, looking for a valid PONG response to confirm connectivity and detect server type.
Returns True if a valid PONG is received, False otherwise."""
data, addr = self.sock.recvfrom(2048)
if VbanPongHeader.is_pong_response(data):
self.logger.debug(f'PONG received from {addr}, connectivity confirmed')
server_type = VbanPing0Payload.detect_server_type(data)
self._handle_server_type(server_type)
return True
else:
if len(data) >= 8:
if data[:4] == b'VBAN':
protocol = data[4] & 0xE0
nbc = data[6]
self.logger.debug(
f'Non-PONG VBAN packet: protocol=0x{protocol:02x}, nbc=0x{nbc:02x}'
)
else:
self.logger.debug('Non-VBAN packet received')
return False
def _handle_server_type(self, server_type: VbanServerType) -> None:
"""Handle the detected server type by adjusting settings accordingly."""
@@ -225,7 +201,7 @@ class VbanCmd(abc.ABC):
def _send_request(self, payload: str) -> None:
"""Sends a request packet over the network and bumps the framecounter."""
self.sock.sendto(
VbanRequestHeader.encode_with_payload(
VbanRTRequestHeader.encode_with_payload(
name=self.streamname,
bps_index=self.BPS_OPTS.index(self.bps),
channel=self.channel,
@@ -249,7 +225,7 @@ class VbanCmd(abc.ABC):
if self.disable_rt_listeners and script.endswith(('?', '?;')):
try:
data, _ = self.sock.recvfrom(2048)
payload = VbanMatrixResponseHeader.extract_payload(data)
return VbanMatrixResponseHeader.extract_payload(data)
except ValueError as e:
self.logger.warning(f'Error extracting matrix response: {e}')
except TimeoutError as e:
@@ -257,7 +233,6 @@ class VbanCmd(abc.ABC):
raise VBANCMDConnectionError(
f'Timeout waiting for response from {self.host}:{self.port}'
) from e
return payload
@property
def type(self) -> str:

View File

@@ -6,12 +6,12 @@ from .enums import NBS
from .error import VBANCMDConnectionError
from .packet.headers import (
HEADER_SIZE,
VbanPacket,
VbanResponseHeader,
VbanSubscribeHeader,
VbanRTPacket,
VbanRTResponseHeader,
VbanRTSubscribeHeader,
)
from .packet.nbs0 import VbanPacketNBS0
from .packet.nbs1 import VbanPacketNBS1
from .packet.nbs0 import VbanRTPacketNBS0
from .packet.nbs1 import VbanRTPacketNBS1
logger = logging.getLogger(__name__)
@@ -28,7 +28,7 @@ class Subscriber(threading.Thread):
def run(self):
while not self.stopped():
for nbs in NBS:
sub_packet = VbanSubscribeHeader().to_bytes(
sub_packet = VbanRTSubscribeHeader().to_bytes(
nbs, self._remote._get_next_framecounter()
)
self._remote.sock.sendto(
@@ -66,7 +66,7 @@ class Producer(threading.Thread):
self._remote.cache['bus_level'],
) = self._remote.public_packets[NBS.zero].levels
def _get_rt(self) -> VbanPacket:
def _get_rt(self) -> VbanRTPacket:
"""Attempt to fetch data packet until a valid one found"""
while True:
try:
@@ -80,19 +80,19 @@ class Producer(threading.Thread):
) from e
try:
header = VbanResponseHeader.from_bytes(data[:HEADER_SIZE])
header = VbanRTResponseHeader.from_bytes(data[:HEADER_SIZE])
except ValueError as e:
self.logger.debug(f'Error parsing response packet: {e}')
continue
match header.format_nbs:
case NBS.zero:
return VbanPacketNBS0.from_bytes(
return VbanRTPacketNBS0.from_bytes(
nbs=NBS.zero, kind=self._remote.kind, data=data
)
case NBS.one:
return VbanPacketNBS1.from_bytes(
return VbanRTPacketNBS1.from_bytes(
nbs=NBS.one, kind=self._remote.kind, data=data
)