Source code for k1lib.kws

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""Reliable websocket client and server handle functions"""
import k1lib, math, numpy as np, random, base64, json, time; import k1lib.cli as cli; from typing import List, Iterator
from collections import defaultdict, deque
websockets = k1lib.dep.websockets; asyncio = k1lib.dep("asyncio")
__all__ = ["serverHandle", "serverSend", "serverClose", "WsClient"]
[docs]async def serverHandle(ws: "websockets.WebSocketServerProtocol", msg:str) -> "bytes | str | None": """Tiny server handle addon function. Example:: async def handle_client(ws: "websockets.WebSocketServerProtocol"): try: # Continuously listen for messages from the client async for raw in ws: msg = await kws.serverHandle(ws, raw) if msg: # can be None print(f"Received message: {msg}") await kws.serverSend(ws, f"modified msg ({msg})") except websockets.exceptions.ConnectionClosed: print(f"Client at {ws.remote_address} disconnected") # Create a WebSocket server asyncio.get_event_loop().run_until_complete(websockets.serve(handle_client, "localhost", 8765)) asyncio.get_event_loop().run_forever() Essentially, this function will convert the raw message received (json string with extra metadata) into your intended message sent from :class:`WsClient`. So, if you do `ws.send("abc")` on the client side, then msg variable will be "abc" on the server side.""" obj = json.loads(msg) if obj["type"] == "ping": await asyncio.wait_for(ws.send(msg), 0.3); return if obj["type"] == "msg": return base64.b64decode(obj["msg"]) if obj["dataType"] == "bytes" else obj["msg"]
[docs]async def serverSend(ws: "websockets.WebSocketServerProtocol", msg:"any", retries=3, timeout=3): """Msg send wrapper. Basically adds some metadata to the message. Also resends if it errors out for some reason. See :meth:`serverHandle`""" attempts = 0 while True: attempts += 1 if attempts > retries: break try: return await asyncio.wait_for(ws.send(json.dumps({"type": "msg", "dataType": "str", "msg": msg})), timeout) except: await asyncio.sleep(0.5)
[docs]async def serverClose(ws: "websockets.WebSocketServerProtocol"): """Msg send wrapper. Basically adds some metadata to the message. See :meth:`serverHandle`""" await ws.send(json.dumps({"type": "close"})) try: ws.close() except: pass
def log(s): pass # uncomment line below to log stuff while testing in dev # log # def log(s): requests.get(f"https://logs.mlexps.com/{s}") # log
[docs]class WsClient: # WsClient
[docs] def __init__(self, url:str): # WsClient """WebSocket client that works with :class:`WsServer` class. This features automatic pings and will attempt to reconnect whenever the network fails. Example:: async def main(): async with kws.WsClient("ws://localhost:8765") as ws: while True: msg = await aioconsole.ainput("Enter a message to send (or 'exit' to quit): ") if msg.lower() == "exit": break await ws.send(msg) print(f"Received response: {await ws.recv()}") asyncio.get_event_loop().run_until_complete(main()) See :meth:`serverHandle` for a ws server example :param url: websocket server url, like 'ws://localhost:8765'""" # WsClient self.url = url; self.wsCon = None # WsClient self.savedMsgs = deque(); self.lastSeen = 0 # unix timestamp # WsClient self._wsReady = False # for some reason `if self.ws` does not work, so have to use this flag bit # WsClient eventLoop = asyncio.get_event_loop() # WsClient async def _send(s): # returns whether successful or not # WsClient try: await asyncio.wait_for(self.ws.send(s), 0.1); return True # WsClient except: return False # WsClient self._send = _send # WsClient async def ping(): # WsClient try: # WsClient clock = k1lib.AutoIncrement() # WsClient while True: # WsClient if self._wsReady: await self._send(json.dumps({"type": "ping", "msg": clock()})) # WsClient await asyncio.sleep(1) # WsClient except Exception as e: log(f"ping exited: {e}") # WsClient async def recvLoop(): # WsClient try: # WsClient while True: # WsClient if self._wsReady: # WsClient try: res = await asyncio.wait_for(self.ws.recv(), 0.001) # if there're messages, then don't wait for a long time # WsClient except: await asyncio.sleep(0.01); continue # no messages, so sleep for a "long" time # WsClient msg = json.loads(res) # WsClient if msg["type"] == "ping": self.lastSeen = time.time() # WsClient elif msg["type"] == "msg": # WsClient self.lastSeen = time.time() # WsClient self.savedMsgs.append(msg["msg"] if msg["dataType"] == "str" else base64.b64decode(msg["msg"])) # WsClient elif msg["type"] == "close": await self.__aexit__(None, None, None) # WsClient else: await asyncio.sleep(0.01) # WsClient except Exception as e: log(f"recv exited: {e}") # WsClient async def watchdog(): # WsClient try: # WsClient while True: # WsClient if not self._wsReady or not self.alive: # WsClient if self._wsReady: # WsClient try: await asyncio.wait_for(self.wsCon.__aexit__(None, None, None), 0.01) # WsClient except: pass # WsClient try: # WsClient self.wsCon = websockets.connect(url) # WsClient self.ws = await self.wsCon.__aenter__() # WsClient self._wsReady = True; self.lastSeen = time.time() # WsClient except: pass # WsClient await asyncio.sleep(1) # WsClient except Exception as e: log(f"watchdog exited: {e}") # WsClient self.t1 = eventLoop.create_task(ping()) # WsClient self.t2 = eventLoop.create_task(recvLoop()) # WsClient self.t3 = eventLoop.create_task(watchdog()) # WsClient
@property # WsClient def alive(self): return time.time()-self.lastSeen < 3 # WsClient async def __aenter__(self): return self # WsClient async def __aexit__(self, *_): # WsClient self.t1.cancel(); self.t2.cancel(); self.t3.cancel() # WsClient if self._wsReady: return await self.wsCon.__aexit__(*_) # WsClient
[docs] async def send(self, msg:"str|bytes"): # WsClient """Send data to server. If server offline, then will hang until server is online again. Won't resolve until message has been sent""" # WsClient if isinstance(msg, bytes): dataType = "bytes"; msg = base64.b64encode(msg).decode() # WsClient elif isinstance(msg, str): dataType = "str" # WsClient else: dataType = "obj" # WsClient s = json.dumps({"type": "msg", "dataType": dataType, "msg": msg}) # WsClient while True: # WsClient if await self._send(s): break # WsClient await asyncio.sleep(0.1) # WsClient
[docs] async def recv(self): # WsClient """Receive data from server. If no data received or server offline, then hangs until some data is received""" # WsClient while len(self.savedMsgs) == 0: await asyncio.sleep(0.01) # WsClient return self.savedMsgs.popleft() # WsClient