diff --git a/README.md b/README.md index 39bf41e..feec44a 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,10 @@ # tinyoscquery -A very simple, work in progress, OSCQuery library for python. -**THIS IS VERY MUCH A WORK IN PROGRESS** Very little of OSCQuery is actually implemented right now, just the bare minimum to advertise that a server exists. +A very simple, work-in-progress OSCQuery library for Python. + +**Built on the original [cyberkitsune/tinyoscquery](https://github.com/cyberkitsune/tinyoscquery).** This fork adds same-port HTTP+WebSocket (per the [OSCQuery proposal](https://github.com/Vidvox/OSCQueryProposal)), LISTEN/IGNORE support, and binary OSC streaming for live value updates. + +**THIS IS VERY MUCH A WORK IN PROGRESS** — only a subset of OSCQuery is implemented (advertising, HTTP oscjson, optional WebSocket streaming). ## Installation 1. Clone this repo @@ -72,11 +75,37 @@ for service_info in browser.get_discovered_oscquery(): +## Updating node values + +After advertising endpoints, you can update their values so that HTTP GET requests return the latest value: + +```python +from tinyoscquery.queryservice import OSCQueryService + +oscqs = OSCQueryService("Test-Service", 9020, 9020) +oscqs.advertise_endpoint("/control/knob1", 0) +oscqs.advertise_endpoint("/control/knob2", 0) + +# Update by path (value is visible on next HTTP query) +oscqs.update_value("/control/knob1", 255) +oscqs.update_value("/control/knob2", 128) + +# Or get the node and mutate (same effect for HTTP) +node = oscqs.get_node("/control/knob1") +if node is not None: + node.value[0] = 100 +``` + +- **`get_node(path)`** — Returns the `OSCQueryNode` at `path`, or `None`. You can then set `node.value[0] = x` (or `node.value = [x]`). +- **`update_value(path, value)`** — Finds the node at `path`, sets its value (single value or list). Returns `True` if the node was found and updated. +- **WebSocket (live value updates)** — If `aiohttp` is installed, HTTP and WebSocket run on the **same port** (OSCQuery spec). HOST_INFO includes `ws_ip` and `ws_port` (same as HTTP). Clients connect to `ws://host:httpPort/`, send `{"COMMAND": "LISTEN", "DATA": "/path"}` to subscribe, and receive **binary OSC** packets when `update_value(path, value)` is called. If `aiohttp` is not installed but `websockets` is, a separate WebSocket server runs on `wsPort` (default `httpPort + 1`) and pushes JSON `{"path": path, "VALUE": value}` to all connected clients. +- **`set_push_target(host, port)`** — When set, each `update_value(path, value)` also sends an OSC message to `host:port`. Requires `pip install python-osc`. Pass `None, None` to disable. + ## Project To-Do - [x] Advertise osc and oscjson on zeroconfig - [x] Provide a basic oscjson server with a root node and HOST_INFO - [X] Add a mechanism to advertise OSC nodes -- [ ] Add a mechanism to update OSC nodes with new values +- [x] Add a mechanism to update OSC nodes with new values - [X] Add apis and tools to query other OSC services on the network - [ ] Add more documentation - [ ] Finalize API design \ No newline at end of file diff --git a/Requirements.txt b/Requirements.txt index 89072d6..592159f 100644 --- a/Requirements.txt +++ b/Requirements.txt @@ -1,2 +1,8 @@ zeroconf==0.39.1 -requests \ No newline at end of file +requests +# Optional: for set_push_target (OSC) and WebSocket binary OSC. No-op if not installed. +python-osc +# Optional: combined HTTP+WebSocket on same port (OSCQuery spec). Falls back to separate WS port if not installed. +aiohttp +# Optional: WebSocket on separate port when aiohttp not used. No-op if not installed. +websockets \ No newline at end of file diff --git a/tinyoscquery/queryservice.py b/tinyoscquery/queryservice.py index edc22ca..95cdd3b 100644 --- a/tinyoscquery/queryservice.py +++ b/tinyoscquery/queryservice.py @@ -1,13 +1,52 @@ from zeroconf import ServiceInfo, Zeroconf from http.server import SimpleHTTPRequestHandler, HTTPServer from .shared.node import OSCQueryNode, OSCHostInfo, OSCAccess -import json, threading +import asyncio +import json +import threading + +try: + from pythonosc.udp_client import SimpleUDPClient + from pythonosc.osc_message_builder import OscMessageBuilder + _HAS_PYTHON_OSC = True +except ImportError: + _HAS_PYTHON_OSC = False + +try: + import aiohttp + from aiohttp import web + _HAS_AIOHTTP = True +except ImportError: + _HAS_AIOHTTP = False + +try: + import websockets + _HAS_WEBSOCKETS = True +except ImportError: + _HAS_WEBSOCKETS = False + + +def _osc_message_bytes(path, values): + """Build raw OSC packet bytes for path and value list. Returns None if python-osc not available.""" + if not _HAS_PYTHON_OSC: + return None + try: + builder = OscMessageBuilder(address=path) + for v in values: + builder.add_arg(v) + msg = builder.build() + return getattr(msg, "dgram", getattr(msg, "_dgram", None)) + except Exception: + return None class OSCQueryService(object): """ A class providing an OSCQuery service. Automatically sets up a oscjson http server and advertises the oscjson server and osc server on zeroconf. + If aiohttp is installed, HTTP and WebSocket run on the same port (OSCQuery spec: same IP/port). + Otherwise falls back to threading HTTP server and optional WebSocket on a separate port. + Attributes ---------- serverName : str @@ -18,22 +57,49 @@ class OSCQueryService(object): Desired UDP port number for the osc server """ - def __init__(self, serverName, httpPort, oscPort, oscIp="127.0.0.1") -> None: + def __init__(self, serverName, httpPort, oscPort, oscIp="127.0.0.1", wsPort=None) -> None: self.serverName = serverName self.httpPort = httpPort self.oscPort = oscPort self.oscIp = oscIp + # Same port as HTTP when using aiohttp (OSCQuery spec); else separate port + self.wsPort = httpPort if _HAS_AIOHTTP else (wsPort if wsPort is not None else (httpPort + 1)) + self._push_host = None + self._push_port = None + self._osc_client = None + self._ws_connections = set() + self._ws_listen_paths = {} # ws -> set of paths this client LISTENs to + self._ws_loop = None + self._ws_server = None + self._use_aiohttp = _HAS_AIOHTTP + extensions = {"ACCESS": True, "CLIPMODE": False, "RANGE": True, "TYPE": True, "VALUE": True} + if _HAS_AIOHTTP: + extensions["LISTEN"] = True + extensions["PATH_CHANGED"] = True self.root_node = OSCQueryNode("/", description="root node") - self.host_info = OSCHostInfo(serverName, {"ACCESS":True,"CLIPMODE":False,"RANGE":True,"TYPE":True,"VALUE":True}, - self.oscIp, self.oscPort, "UDP") + self.host_info = OSCHostInfo( + serverName, extensions, + self.oscIp, self.oscPort, "UDP", ws_ip=self.oscIp, ws_port=self.wsPort + ) self._zeroconf = Zeroconf() self._startOSCQueryService() self._advertiseOSCService() - self.http_server = OSCQueryHTTPServer(self.root_node, self.host_info, ('', self.httpPort), OSCQueryHTTPHandler) - self.http_thread = threading.Thread(target=self._startHTTPServer) - self.http_thread.start() + if _HAS_AIOHTTP: + self.http_server = None + self._aiohttp_runner = None + self._aiohttp_site = None + self._aiohttp_loop = asyncio.new_event_loop() + self._aio_thread = threading.Thread(target=self._run_aiohttp_server, daemon=True) + self._aio_thread.start() + else: + self.http_server = OSCQueryHTTPServer(self.root_node, self.host_info, ('', self.httpPort), OSCQueryHTTPHandler) + self.http_thread = threading.Thread(target=self._startHTTPServer, daemon=True) + self.http_thread.start() + if _HAS_WEBSOCKETS and not _HAS_AIOHTTP: + self._ws_thread = threading.Thread(target=self._run_ws_server, daemon=True) + self._ws_thread.start() def __del__(self): self._zeroconf.unregister_all_services() @@ -41,6 +107,179 @@ def __del__(self): def add_node(self, node): self.root_node.add_child_node(node) + def get_node(self, path): + """Return the OSCQueryNode at the given path, or None if not found.""" + path = path.split("?")[0] if path else path + return self.root_node.find_subnode(path) + + def set_push_target(self, host, port): + """ + Set the host:port to send OSC messages to when update_value() is called. + Clients (e.g. Chataigne) can listen for OSC on this port to get live value updates. + Pass None, None to disable push. + """ + self._push_host = host + self._push_port = port + self._osc_client = None + + def update_value(self, path, value): + """ + Update the value of the node at the given path. + Value can be a single value or a list (for multi-argument nodes). + Returns True if the node was found and updated, False otherwise. + The updated value is immediately visible to HTTP GET requests. + If set_push_target(host, port) was set and python-osc is installed, also sends an OSC message. + If WebSocket (same port or separate) is in use, streams value to clients that LISTEN to this path (binary OSC when same port). + """ + node = self.get_node(path) + if node is None: + return False + if not isinstance(value, list): + node.value = [value] + osc_args = [value] + else: + node.value = list(value) + osc_args = list(value) + if self._push_host is not None and self._push_port is not None and _HAS_PYTHON_OSC: + try: + if self._osc_client is None: + self._osc_client = SimpleUDPClient(self._push_host, self._push_port) + self._osc_client.send_message(path, osc_args[0] if len(osc_args) == 1 else osc_args) + except Exception: + pass + if self._use_aiohttp and self._aiohttp_loop is not None and self._ws_connections: + osc_bytes = _osc_message_bytes(path, node.value) + if osc_bytes: + asyncio.run_coroutine_threadsafe( + self._ws_broadcast_osc(path, osc_bytes), self._aiohttp_loop + ) + elif _HAS_WEBSOCKETS and self._ws_loop is not None and self._ws_connections: + try: + msg = json.dumps({"path": path, "VALUE": node.value}) + asyncio.run_coroutine_threadsafe(self._ws_broadcast(msg), self._ws_loop) + except Exception: + pass + return True + + async def _ws_broadcast_osc(self, path, osc_bytes): + """Send binary OSC packet to every WebSocket client that LISTENs to this path.""" + disconnected = [] + for ws in list(self._ws_connections): + paths = self._ws_listen_paths.get(ws) + if paths and path in paths: + try: + await ws.send_bytes(osc_bytes) + except Exception: + disconnected.append(ws) + for ws in disconnected: + self._ws_connections.discard(ws) + self._ws_listen_paths.pop(ws, None) + + async def _ws_broadcast(self, message): + if not self._ws_connections: + return + disconnected = set() + for ws in self._ws_connections: + try: + await ws.send(message) + except Exception: + disconnected.add(ws) + for ws in disconnected: + self._ws_connections.discard(ws) + self._ws_listen_paths.pop(ws, None) + + def _run_aiohttp_server(self): + if not _HAS_AIOHTTP: + return + app = web.Application() + root_node = self.root_node + host_info = self.host_info + + async def handle_get(request): + path_str = request.path if request.path else "/" + if path_str != "/": + path_str = "/" + path_str.lstrip("/") # ensure leading / + if "HOST_INFO" in (request.query_string or ""): + return web.json_response(json.loads(host_info.to_json()), headers={"Cache-Control": "no-store, no-cache, must-revalidate", "Pragma": "no-cache"}) + node = root_node.find_subnode(path_str) + if node is None: + return web.Response(text="OSC Path not found", status=404, content_type="text/json") + return web.json_response(json.loads(node.to_json()), headers={"Cache-Control": "no-store, no-cache, must-revalidate", "Pragma": "no-cache"}) + + async def handle_ws(request): + ws = web.WebSocketResponse() + await ws.prepare(request) + self._ws_connections.add(ws) + self._ws_listen_paths[ws] = set() + try: + async for msg in ws: + if msg.type == web.WSMsgType.TEXT: + try: + obj = json.loads(msg.data) + cmd = (obj.get("COMMAND") or "").strip().upper() + data = obj.get("DATA") + if cmd == "LISTEN" and isinstance(data, str): + self._ws_listen_paths[ws].add(data) + elif cmd == "IGNORE" and isinstance(data, str): + self._ws_listen_paths[ws].discard(data) + except (json.JSONDecodeError, TypeError): + pass + finally: + self._ws_connections.discard(ws) + self._ws_listen_paths.pop(ws, None) + return ws + + async def root_or_ws(request): + if request.headers.get("Upgrade", "").lower() == "websocket": + return await handle_ws(request) + return await handle_get(request) + + app.router.add_get("/", root_or_ws) + app.router.add_get("/{path:.*}", handle_get) + + async def start(): + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, "", self.httpPort) + await site.start() + self._aiohttp_runner = runner + self._aiohttp_site = site + + self._aiohttp_loop.run_until_complete(start()) + self._aiohttp_loop.run_forever() + + def _run_ws_server(self): + if not _HAS_WEBSOCKETS: + return + async def handler(websocket): + self._ws_connections.add(websocket) + self._ws_listen_paths[websocket] = set() + try: + async for msg in websocket: + try: + raw = msg.data if isinstance(msg, object) and hasattr(msg, "data") else msg + if isinstance(raw, bytes): + raw = raw.decode("utf-8", errors="replace") + obj = json.loads(raw) + cmd = (obj.get("COMMAND") or "").strip().upper() + data = obj.get("DATA") + if cmd == "LISTEN" and isinstance(data, str): + self._ws_listen_paths[websocket].add(data) + elif cmd == "IGNORE" and isinstance(data, str): + self._ws_listen_paths[websocket].discard(data) + except (json.JSONDecodeError, TypeError): + pass + finally: + self._ws_connections.discard(websocket) + self._ws_listen_paths.pop(websocket, None) + await websocket.wait_closed() + self._ws_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._ws_loop) + self._ws_server = self._ws_loop.run_until_complete( + websockets.serve(handler, "", self.wsPort, ping_interval=20, ping_timeout=20) + ) + self._ws_loop.run_forever() + def advertise_endpoint(self, address, value=None, access=OSCAccess.READWRITE_VALUE): new_node = OSCQueryNode(full_path=address, access=access) if value is not None: @@ -85,7 +324,9 @@ def do_GET(self) -> None: self.end_headers() self.wfile.write(bytes(str(self.server.host_info.to_json()), 'utf-8')) return - node = self.server.root_node.find_subnode(self.path) + # Strip query string so /ring/X?foo=bar still finds /ring/X + request_path = self.path.split("?")[0] + node = self.server.root_node.find_subnode(request_path) if node is None: self.send_response(404) self.send_header("Content-type", "text/json") @@ -94,6 +335,8 @@ def do_GET(self) -> None: else: self.send_response(200) self.send_header("Content-type", "text/json") + self.send_header("Cache-Control", "no-store, no-cache, must-revalidate") + self.send_header("Pragma", "no-cache") self.end_headers() self.wfile.write(bytes(str(node.to_json()), 'utf-8'))