# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""Reliable websocket client and server handle functions"""
import k1lib, math, numpy as np, random, base64, json, time; import k1lib.cli as cli; from typing import List, Iterator
from collections import defaultdict, deque
websockets = k1lib.dep.websockets; asyncio = k1lib.dep("asyncio")
__all__ = ["serverHandle", "serverSend", "serverClose", "WsClient"]
[docs]async def serverHandle(ws: "websockets.WebSocketServerProtocol", msg:str) -> "bytes | str | None":
"""Tiny server handle addon function.
Example::
async def handle_client(ws: "websockets.WebSocketServerProtocol"):
try:
# Continuously listen for messages from the client
async for raw in ws:
msg = await kws.serverHandle(ws, raw)
if msg: # can be None
print(f"Received message: {msg}")
await kws.serverSend(ws, f"modified msg ({msg})")
except websockets.exceptions.ConnectionClosed: print(f"Client at {ws.remote_address} disconnected")
# Create a WebSocket server
asyncio.get_event_loop().run_until_complete(websockets.serve(handle_client, "localhost", 8765))
asyncio.get_event_loop().run_forever()
Essentially, this function will convert the raw message received (json
string with extra metadata) into your intended message sent from
:class:`WsClient`.
So, if you do `ws.send("abc")` on the client side, then msg variable will
be "abc" on the server side."""
obj = json.loads(msg)
if obj["type"] == "ping": await asyncio.wait_for(ws.send(msg), 0.3); return
if obj["type"] == "msg": return base64.b64decode(obj["msg"]) if obj["dataType"] == "bytes" else obj["msg"]
[docs]async def serverSend(ws: "websockets.WebSocketServerProtocol", msg:"any", retries=3, timeout=3):
"""Msg send wrapper. Basically adds some metadata to the message. Also resends
if it errors out for some reason. See :meth:`serverHandle`"""
attempts = 0
while True:
attempts += 1
if attempts > retries: break
try: return await asyncio.wait_for(ws.send(json.dumps({"type": "msg", "dataType": "str", "msg": msg})), timeout)
except: await asyncio.sleep(0.5)
[docs]async def serverClose(ws: "websockets.WebSocketServerProtocol"):
"""Msg send wrapper. Basically adds some metadata to the message. See :meth:`serverHandle`"""
await ws.send(json.dumps({"type": "close"}))
try: ws.close()
except: pass
def log(s): pass # uncomment line below to log stuff while testing in dev # log
# def log(s): requests.get(f"https://logs.mlexps.com/{s}") # log
[docs]class WsClient: # WsClient
[docs] def __init__(self, url:str): # WsClient
"""WebSocket client that works with :class:`WsServer` class.
This features automatic pings and will attempt to reconnect whenever
the network fails. Example::
async def main():
async with kws.WsClient("ws://localhost:8765") as ws:
while True:
msg = await aioconsole.ainput("Enter a message to send (or 'exit' to quit): ")
if msg.lower() == "exit": break
await ws.send(msg)
print(f"Received response: {await ws.recv()}")
asyncio.get_event_loop().run_until_complete(main())
See :meth:`serverHandle` for a ws server example
:param url: websocket server url, like 'ws://localhost:8765'""" # WsClient
self.url = url; self.wsCon = None # WsClient
self.savedMsgs = deque(); self.lastSeen = 0 # unix timestamp # WsClient
self._wsReady = False # for some reason `if self.ws` does not work, so have to use this flag bit # WsClient
eventLoop = asyncio.get_event_loop() # WsClient
async def _send(s): # returns whether successful or not # WsClient
try: await asyncio.wait_for(self.ws.send(s), 0.1); return True # WsClient
except: return False # WsClient
self._send = _send # WsClient
async def ping(): # WsClient
try: # WsClient
clock = k1lib.AutoIncrement() # WsClient
while True: # WsClient
if self._wsReady: await self._send(json.dumps({"type": "ping", "msg": clock()})) # WsClient
await asyncio.sleep(1) # WsClient
except Exception as e: log(f"ping exited: {e}") # WsClient
async def recvLoop(): # WsClient
try: # WsClient
while True: # WsClient
if self._wsReady: # WsClient
try: res = await asyncio.wait_for(self.ws.recv(), 0.001) # if there're messages, then don't wait for a long time # WsClient
except: await asyncio.sleep(0.01); continue # no messages, so sleep for a "long" time # WsClient
msg = json.loads(res) # WsClient
if msg["type"] == "ping": self.lastSeen = time.time() # WsClient
elif msg["type"] == "msg": # WsClient
self.lastSeen = time.time() # WsClient
self.savedMsgs.append(msg["msg"] if msg["dataType"] == "str" else base64.b64decode(msg["msg"])) # WsClient
elif msg["type"] == "close": await self.__aexit__(None, None, None) # WsClient
else: await asyncio.sleep(0.01) # WsClient
except Exception as e: log(f"recv exited: {e}") # WsClient
async def watchdog(): # WsClient
try: # WsClient
while True: # WsClient
if not self._wsReady or not self.alive: # WsClient
if self._wsReady: # WsClient
try: await asyncio.wait_for(self.wsCon.__aexit__(None, None, None), 0.01) # WsClient
except: pass # WsClient
try: # WsClient
self.wsCon = websockets.connect(url) # WsClient
self.ws = await self.wsCon.__aenter__() # WsClient
self._wsReady = True; self.lastSeen = time.time() # WsClient
except: pass # WsClient
await asyncio.sleep(1) # WsClient
except Exception as e: log(f"watchdog exited: {e}") # WsClient
self.t1 = eventLoop.create_task(ping()) # WsClient
self.t2 = eventLoop.create_task(recvLoop()) # WsClient
self.t3 = eventLoop.create_task(watchdog()) # WsClient
@property # WsClient
def alive(self): return time.time()-self.lastSeen < 3 # WsClient
async def __aenter__(self): return self # WsClient
async def __aexit__(self, *_): # WsClient
self.t1.cancel(); self.t2.cancel(); self.t3.cancel() # WsClient
if self._wsReady: return await self.wsCon.__aexit__(*_) # WsClient
[docs] async def send(self, msg:"str|bytes"): # WsClient
"""Send data to server. If server offline, then will hang until
server is online again. Won't resolve until message has been sent""" # WsClient
if isinstance(msg, bytes): dataType = "bytes"; msg = base64.b64encode(msg).decode() # WsClient
elif isinstance(msg, str): dataType = "str" # WsClient
else: dataType = "obj" # WsClient
s = json.dumps({"type": "msg", "dataType": dataType, "msg": msg}) # WsClient
while True: # WsClient
if await self._send(s): break # WsClient
await asyncio.sleep(0.1) # WsClient
[docs] async def recv(self): # WsClient
"""Receive data from server. If no data received or server offline,
then hangs until some data is received""" # WsClient
while len(self.savedMsgs) == 0: await asyncio.sleep(0.01) # WsClient
return self.savedMsgs.popleft() # WsClient