Source code for k1lib.k1ui.main

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""`k1ui <https://github.com/157239n/k1ui>`_ is another project made in Java that
aims to record and manipulate the screen, keyboard and mouse. The interface to
that project on its own is clunky, and this module is the Python interface to
ease its use.

Not quite developed yet tho, because I'm lazy."""
import k1lib, numpy as np, asyncio, time, inspect, json, threading, dill, math, base64, os, random, warnings
k1 = k1lib; cli = k1.cli; from k1lib.cli import *; knn = k1.knn; Cbs = k1.Cbs; viz = k1.viz; websockets = k1.dep.websockets
nn = k1.dep("torch.nn", url="https://pytorch.org/"); optim = k1.dep("torch.optim", url="https://pytorch.org/")
PIL = k1.dep.PIL; k1.dep.graphviz; requests = k1.dep.requests; tf = k1.dep("torchvision.transforms", url="https://pytorch.org/")
try: import torch; hasTorch = True
except: torch = k1.dep.torch; hasTorch = False
try: import torchvision; hasTv = True
except: hasTv = False
from typing import Callable, List, Iterator, Tuple, Union, Dict; from collections import defaultdict, deque; from functools import lru_cache
mpl = k1lib.dep.mpl; plt = k1lib.dep.plt
__all__ = ["get", "WsSession", "selectArea", "record", "execute", "Recording",
           "Track", "CharTrack", "WordTrack", "ContourTrack", "ClickTrack", "WheelTrack", "StreamTrack",
           "distNet", "TrainScreen"]
k1lib.settings.add("k1ui", k1.Settings().add("server", k1.Settings().add("http", "http://localhost:9511", "normal http server").add("ws", "ws://localhost:9512", "websocket server"), "server urls"), "docs related to k1ui java library");
settings = k1lib.settings.k1ui
settings.add("draw", k1.Settings(), "drawing settings")
settings.draw.add("trackHeight", 30, "Track's height in Recording visualization")
settings.draw.add("pad", 10, "Padding between tracks");
[docs]def get(path): # get """Sends a get request to the Java server. Example:: k1ui.get("mouse/200/300") # move mouse to (200, 300)""" # get return requests.get(f"{settings.server.http}/{path}", timeout=60*10).text # get
def post(path, jsObj): # post """Sends a post request to the Java server. Example:: k1ui.post("mouse/200/300") # move mouse to (200, 300)""" # post return requests.post(f"{settings.server.http}/{path}", json=jsObj, timeout=60*10).text # post portAutoInc = k1.AutoIncrement(9520) # post
[docs]class WsSession: # WsSession
[docs] def __init__(self, eventCb:Callable[["WsSession", dict], None], mainThreadCb:Callable[["WsSession"], None]): # WsSession """Creates a websocket connection with the server, with some callback functions The callback functions (most are async btw) will be passed a WebSocket object as the first argument. You can use it to send messages like this:: # this will send a signal to the server to close the session sess.ws.send(json.dumps({"type": "close"})) # this will send a signal to the server requesting the current screenshot. Result will be deposited into eventCb sess.ws.send(json.dumps({"type": "screenshot"})) # this will execute a single event sess.ws.send(json.dumps({"type": "execute", "event": {"type": "keyTyped", "javaKeyCode": 0, ...}})) Complete, minimum example:: events = [] async def eventCb(sess, event): events.append(event) async def mainThreadCb(sess): sess.stream(300) # starts a stream with output screen width of 300px await asyncio.sleep(2) await sess.ws.send(json.dumps({"type": "execute", "event": {"type": "keyPressed", "javaKeyCode": 65, "timestamp": 0}})) await sess.ws.send(json.dumps({"type": "execute", "event": {"type": "keyReleased", "javaKeyCode": 65, "timestamp": 0}})) await asyncio.sleep(10); sess.close() await k1ui.WsSession(eventCb, mainThreadCb).run() What this code does is that it will communicate with the server continuously for 12 seconds, capturing all events in the mean time and save them into ``events`` list. It will start up a UDP stream to capture screenshots continuously, and after 2 seconds, it sends 2 events to the server, trying to type the letter "A". Finally, it waits for another 10 seconds and then terminates the connection. This interface is quite low-level, and is the basis for all other functionalities. Some of them include: * :meth:`record`: recording a session * :meth:`execute`: executes a list of events :param eventCb: (async) will be called whenever there's a new event :param mainThreadCb: (async) will be called after setting up everything :param streamWidth: specifies the width of the UDP stream, in pixels""" # WsSession self.ws = None; self.eventCb = eventCb; self.mainThreadCb = mainThreadCb # WsSession if not inspect.iscoroutinefunction(eventCb): raise Exception(f"eventCb has to be an async function") # WsSession if not inspect.iscoroutinefunction(mainThreadCb): raise Exception(f"mainThreadCb has to be an async function") # WsSession self.closed = False; self.streams = {} # width -> [width, lock, port] # WsSession
async def _listenLoop(self): # WsSession while True: # WsSession res = await self.ws.recv() | cli.aS(json.loads); _type = res["type"] # WsSession if _type == "close": break # python sends close signal to java, java then sends a close signal back, as an acknowledgement # WsSession if _type == "screenshot": await self.eventCb(self, {"type": "screenshot", "bytes": base64.b64decode(res["screenshot"]), "timestamp": int(time.time()*1000)}) # WsSession if _type == "newEvent": await self.eventCb(self, res["event"]) # WsSession async def _pingLoop(self): # WsSession while True: # WsSession if self.closed: break # WsSession try: await self.ws.send({"type": "ping"} | cli.aS(json.dumps)); await asyncio.sleep(1) # WsSession except: break # WsSession async def _streamLoop(self, width, locks, port): # WsSession import cv2; streamRefresh = 100 # refreshes udp stream after this many seconds, so that it doesn't hang # WsSession def threadLoop(lock, port): # WsSession with lock, k1.captureStdout(False, True): # WsSession get(f"startStream/{width}/{port}"); cap = cv2.VideoCapture(f'udp://0.0.0.0:{port}', cv2.CAP_FFMPEG); beginTime = time.time() # WsSession while (cap.isOpened()): # WsSession if self.closed: break # WsSession res, frame = cap.read() # WsSession if not res: break # WsSession self.loop.create_task(self.eventCb(self, {"type": "stream", "width": width, "frame": frame[:,:,::-1], "timestamp": int(time.time()*1000)})) # WsSession if time.time() - beginTime > streamRefresh + 10: break # there will be a short time (5s) where there're 2 udp streams simultaneously dumps events # WsSession cap.release(); get(f"stopStream/{port}") # WsSession ports = [port, port + 100]; sel = 0 # WsSession while not self.closed: # WsSession threading.Thread(target=threadLoop, args=(locks[sel], ports[sel])).start() # WsSession await asyncio.sleep(streamRefresh); sel = 1-sel # WsSession
[docs] def stream(self, width): # WsSession """Starts a stream with a particular output width. The lower the width, the higher the fps and vice versa""" # WsSession if width in self.streams: raise Exception(f"Can't start stream with width {width}. Just use the existing stream.") # WsSession port = portAutoInc() # WsSession self.streams[width] = [width, [threading.Lock(), threading.Lock()], port]; import cv2 # placed here so that users can see error message if cv2 is not imported # WsSession asyncio.create_task(self._streamLoop(*self.streams[width])) # WsSession
[docs] async def run(self): # WsSession """Connects with Java server, set things up and runs ``mainThreadCb``""" # WsSession async with websockets.connect(settings.server.ws, max_size=1_000_000_000) as ws: # WsSession self.ws = ws; self.loop = asyncio.get_event_loop() # WsSession _listenLoop = asyncio.create_task(self._listenLoop()) # WsSession _pingLoop = asyncio.create_task(self._pingLoop()); # WsSession try: await self.mainThreadCb(self) # WsSession except asyncio.CancelledError: self.close() # WsSession await _listenLoop # WsSession
[docs] def close(self): # WsSession """Closes the connection with the Java server""" # WsSession if self.closed: print("Already closed"); return # WsSession self.closed = True; asyncio.create_task(self.ws.send({"type": "close"} | cli.aS(json.dumps))) # WsSession for width, locks, port in self.streams.values(): # WsSession with locks[0]: # make sure all locks are freed. Also important to have the 2 locks be nested in each other, in case everything aligns just right that evades this mechanism # WsSession with locks[1]: pass # WsSession
[docs] async def execute(self, events): # WsSession """Executes a series of events""" # WsSession events = events | sortF(op()["timestamp"]) | aS(list) # WsSession deltaT = int(time.time()*1000) - events[0]["timestamp"] # WsSession for e in events | apply(lambda x: {**x, "timestamp": x["timestamp"]+deltaT}): # WsSession st = e["timestamp"]/1000 - time.time() # WsSession if st > 0: await asyncio.sleep(st) # WsSession await self.ws.send(json.dumps({"type": "execute", "event": e})) # WsSession
[docs]def selectArea(x, y, w, h): # selectArea """Selects an area on the screen to focus into""" # selectArea return get(f"selectArea/{x}/{y}/{w}/{h}") # selectArea
[docs]async def record(t=None, keyCode=None, streamWidth=300, f=iden()): # selectArea """Records activities. Examples:: events = await k1ui.record(t=5) # records for 5 seconds events = await k1ui.record(keyCode=5) # records until "Escape" is pressed events = await k1ui.record() # records until interrupt signal is sent to the process Note: these examples only work on jupyter notebooks. For regular Python processes, check out official Python docs (https://docs.python.org/3/library/asyncio-task.html) :param t: record duration :param keyCode: key to stop the recording :param streamWidth: whether to opens the UDP stream and capture screenshots at this width or not :param f: extra event post processing function""" # selectArea events = [] # selectArea async def eventCb(sess, event): # selectArea res = f(event) # selectArea if res is not None: events.append(res) # selectArea if event["type"] == "keyReleased" and event["keyCode"] == keyCode: sess.close() # selectArea async def mainThreadCb(sess): # selectArea if streamWidth: sess.stream(streamWidth) # selectArea if t is not None: await asyncio.sleep(t); sess.close() # selectArea else: await asyncio.sleep(1e9) # selectArea await WsSession(eventCb, mainThreadCb).run(); return events # selectArea
[docs]async def execute(events:List[dict]): # selectArea """Executes some events""" # selectArea async def eventCb(sess, event): pass # selectArea async def mainThreadCb(sess): await sess.execute(events); sess.close() # selectArea await WsSession(eventCb, mainThreadCb).run() # selectArea
uuid = k1.AutoIncrement(random.randint(0, int(1e9)), prefix="k1ui-") # selectArea def escapeHtml(s): return s.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;") # escapeHtml
[docs]class Recording: # Recording def __init__(self, events): # Recording self.uuid = uuid(); self._tracks = [] # Recording if len(events) == 0: return # shortcut to initialize using cloned tracks rather than events # Recording events = events | sortF(op()["timestamp"]) | deref() # Recording self._tracks.extend(ContourTrack.parse(events)) # Recording self._tracks.extend(CharTrack.parse(events)) # Recording self._tracks.extend(ClickTrack.parse(events)) # Recording self._tracks.extend(WheelTrack.parse(events)) # Recording self._tracks.extend(StreamTrack.parse(events)) # Recording self._tracks = self._tracks | filt(op()) | apply(lambda x: x._rec(self)) | deref() # Recording self._resetTimes(); self._resetDis() # Recording def _resetTimes(self): self.startTime, self.endTime = self._tracks | op().timeUnix().all() | joinStreams() | filt(op()) | toMin() & toMax(); return self # Recording def _resetDis(self): self.dis1, self.dis2 = (self.startTime+self.endTime)/2 | aS(lambda x: [x-self.duration*0.53, x+self.duration*0.53]); return self # display times # Recording @property # Recording def duration(self): return self.endTime - self.startTime # Recording
[docs] def addTracks(self, *tracks) -> "Recording": # Recording """Adds tracks to the Recording""" # Recording if not isinstance(tracks[0], Track) and len(tracks) == 1: tracks = tracks[0] # Recording self._tracks.extend(tracks | apply(lambda tr: tr._rec(self))); self._resetTimes(); self._resetDis(); return self # Recording
[docs] def removeTracks(self, *tracks) -> "Recording": # Recording """Removes tracks from the Recording""" # Recording if not isinstance(tracks[0], Track) and len(tracks) == 1: tracks = tracks[0] # Recording tracks | apply(self._tracks.remove) | ignore(); self._resetTimes(); self._resetDis(); return self # Recording
def _normTime(self, t=None, default=None): return default if t is None else t + self.startTime # Recording
[docs] def zoom(self, t1=None, t2=None): # Recording """Zooms into a particular time range. If either bounds are not specified, they will default to the start and end of all events. :param t1: time values are relative to the recording's start time""" # Recording _dis1 = self.dis1; t1 = _dis1 if t1 is None else t1 + self.startTime # Recording _dis2 = self.dis2; t2 = _dis2 if t2 is None else t2 + self.startTime # Recording delta = t2-t1; t1-=delta*0.03; t2+=delta*0.03; self.dis1 = t1; self.dis2 = t2 # Recording html = self._repr_html_(); self.dis1 = _dis1; self.dis2 = _dis2; return k1lib.viz.Html(html) # Recording
[docs] def sel(self, t1=None, t2=None, klass=None) -> List["Track"]: # Recording """Selects a subset of tracks using several filters. For selecting time, assuming we have a track that looks like this (x, y are t1, t2):: # |-1--| |-2-| # |---3---| # x y Then, tracks 1 and 3 are selected. Time values are relative to recording's start time :param t1: choose tracks that happen after this time :param t2: choose tracks that happen before this time :param klass: choose specific track class""" # Recording tracks = self._tracks # Recording if klass: tracks = tracks | instanceOf(klass) # Recording if t1 is not None or t2 is not None: # Recording t1 = self._normTime(t1, self.startTime); t2 = self._normTime(t2, self.endTime) # Recording tracks = tracks | apply(lambda o: [o.startTime or 0, o.endTime, o]) | ~filt(op()[1]<t1) | ~filt(op()[0]>=t2) | cut(2) # Recording return tracks | aS(list) # Recording
[docs] def sel1(self, **kwargs) -> List["Track"]: # Recording """Like :meth:`sel`, but this time gets the first element only.""" # Recording return self.sel(**kwargs) | item() # Recording
[docs] def time0(self) -> List[float]: # Recording """Start and end recording times. Start time is zero""" # Recording return [0, self.endTime - self.startTime] # Recording
[docs] def timeUnix(self) -> List[float]: # Recording """Start and end recording times. Both are absolute unix times""" # Recording return [self.startTime, self.endTime] # Recording
[docs] def events(self) -> List[dict]: # Recording """Reconstructs events from the Recording's internal data. The events are lossy though:: events = ... # events recorded r = k1ui.Recording(events) assert r.events() != events # this is the lossy part. Don't expect the produced events match exactly with each other""" # Recording return self._tracks | op().events().all() | joinStreams() | sortF(op()["timestamp"]) | deref(igT=False) # Recording
[docs] def copy(self) -> "Recording": # Recording """Creates a clone of this recording""" # Recording return Recording([]).addTracks(self._tracks | op().copy().all())._resetDis() # Recording
def _repr_html_(self): return self | aS(createTrackss) | aS(drawTrackss) # Recording
[docs]class Track: # Track
[docs] def __init__(self, startTime, endTime): # Track """Time values are absolute unix time.""" # Track self.recording = None; self.startTime = startTime if startTime else None; self.endTime = endTime; self.uuid = uuid() # Track
[docs] def time0(self) -> List[float]: # Track """Start and end track times. Times are relative to track's start time""" # Track return [0, self.endTime - self.startTime] # Track
[docs] def time0Rec(self) -> List[float]: # Track """Start and end track times. Times are relative to recording's start time""" # Track return [self.startTime-self.recording.startTime if self.startTime else None, self.endTime-self.recording.startTime] # Track
[docs] def timeUnix(self) -> List[float]: # Track """Start and end track times. Times are absolute unix times""" # Track return [self.startTime, self.endTime] # Track
[docs] def concurrent(self) -> List["Track"]: # Track """Grabs all tracks that are concurrent to this track""" # Track return self.recording.sel(*self.time0Rec()) # Track
def _rec(self, recording): self.recording = recording; return self # inject dependency # Track def _tooltip(self, ctx): return "" # Track def _displayTimes(self): # shortcut func for displaying in __repr__ # Track s = f"{self.startTime-self.recording.startTime:.2f}s" if self.startTime else None # Track e = f"{self.endTime-self.recording.startTime:.2f}s"; return f"time ({s}->{e})" # Track
[docs] def events(self) -> List[dict]: # Track """Reconstructs events from the Track's internal data, to be implemented by subclasses.""" # Track return NotImplemented # Track
[docs] def copy(self): # Track """Creates a clone of this Track, to be implemented by subclasses""" # Track return NotImplemented # Track
[docs] def move(self, deltaTime): # Track """Moves the entire track left or right, to be implemented by subclasses. :param deltaTime: if negative, move left by this number of seconds, else move right""" # Track self.startTime += deltaTime; self.endTime += deltaTime; self.recording._resetTimes(); self.recording._resetDis() # Track
[docs]class CharTrack(Track): # CharTrack
[docs] def __init__(self, keyText:str, keyCode:int, mods:List[bool], times:List[float]): # CharTrack """Representing 1 key pressed and released. :param keyText: text to display to user, like "Enter" :param keyCode: event's "javaKeyCode" :param mods: list of 3 booleans, whether ctrl, shift or alt is pressed""" # CharTrack super().__init__(*times); self.keyText = keyText; self.keyCode = keyCode; self.mods = mods # CharTrack
[docs] @staticmethod # CharTrack def parse(events) -> List["CharTrack"]: # CharTrack stacks = {} # keyCode -> obj # CharTrack def process(e): # CharTrack _type, keyText, keyCode, mods, timestamp = e # CharTrack if _type == "keyPressed": # CharTrack if keyCode in stacks and stacks[keyCode]: # CharTrack a = stacks[keyCode]; stacks[keyCode] = e # CharTrack return [a, [_type, keyText, keyCode, mods, timestamp - 0.001]] # CharTrack #raise Exception("Strange case. Why would the same key be pressed twice without being released first") # CharTrack stacks[keyCode] = e # CharTrack if _type == "keyReleased": # CharTrack a = stacks[keyCode] if keyCode in stacks and stacks[keyCode] else None # CharTrack stacks[keyCode] = None; return [a, e] # CharTrack def makeTrack(x, y): # CharTrack if x is None: x = [0, y[1], y[2], y[3], None] # CharTrack return CharTrack(x[1], x[2], x[3], [x[4], y[4]]) # CharTrack return events | filt(op()["type"].startswith("key")) | filt(op()["type"] != "keyTyped") | apply(lambda x: [x["type"], x["keyText"], x["javaKeyCode"], [x["ctrl"], x["shift"], x["alt"]], x["timestamp"]/1000]) | apply(process) | filt(op()) | ~apply(makeTrack) | deref() # CharTrack
def _tooltip(self, ctx): return escapeHtml(self.__repr__()) # CharTrack def __repr__(self): return f"<CharTrack {self._displayTimes()} keyText ({self.keyText})>" # CharTrack
[docs] def events(self): # CharTrack d = []; t1, t2 = self.timeUnix() # does not care about mods because the mods will have a separate CharTrack already, so we don't have to repeat # CharTrack if t1: d.append({"type": "keyPressed", "keyText": self.keyText, "javaKeyCode": self.keyCode, "timestamp": int(t1*1000)}) # CharTrack if t2: d.append({"type": "keyReleased", "keyText": self.keyText, "javaKeyCode": self.keyCode, "timestamp": int(t2*1000)}) # CharTrack return d # CharTrack
[docs] def copy(self): return CharTrack(self.keyText, self.keyCode, self.mods, self.timeUnix()) # CharTrack
[docs] def move(self, deltaTime): # CharTrack if self.startTime: self.startTime += deltaTime # CharTrack self.endTime += deltaTime; self.recording._resetTimes() # CharTrack
def _ord2(x): # _ord2 y = x | apply(ord) | deref() # _ord2 x2y = [x, y] | toDict(False) # _ord2 y2x = [y, x] | toDict(False) # _ord2 return [x, y, x2y, y2x] # _ord2 _upper, _upperCs, _upperD1, _upperD2 = _ord2("ABCDEFGHIJKLMNOPQRSTUVWXYZ"); # _ord2 _lower, _lowerCs, _lowerD1, _lowerD2 = _ord2("abcdefghijklmnopqrstuvwxyz") # _ord2 _num, _numCs, _numD1, _numD2 = _ord2("1234567890") # _ord2 _puncLower, _puncLowerCs, _puncLowerD1, _puncLowerD2 = _ord2("[];',./`-=\\") # _ord2 _puncUpper, _puncUpperCs, _puncUpperD1, _puncUpperD2 = _ord2("{}:\"<>?~_+|") # _ord2 # maps from numbers 12345 to punctuation like !@#$% # _ord2 _numPunc, _numPuncCs, _numPuncD1, _numPuncD2 = _ord2("!@#$%^&*()") # _ord2 _numPuncMap1 = [_numPuncCs, _numCs] | toDict(False); _numPuncMap2 = [_numCs, _numPuncCs] | toDict(False) # _ord2 _punc, _puncCs, _puncD1, _puncD2 = _ord2(_puncLower + _puncUpper + _numPunc + " ") # _ord2 # maps from lower case punctuation like ;',./ into upper case like :"<>? # _ord2 _puncMap = [_puncLower, _puncUpper] | toDict(False); _puncMapCs = [_puncLowerCs, _puncUpperCs] | toDict(False) # _ord2 _puncMap2 = [_puncUpper, _puncLower] | toDict(False) # _ord2 def _inferText(code:int, mods) -> str: # _inferText if mods[0] or mods[2]: return None # _inferText shift = mods[1] # _inferText if shift: # _inferText if code in _upperCs: return _upperD2[code] # _inferText if code in _lowerCs: return _lowerD2[code].upper() # _inferText if code in _numCs: return _numPuncD2[_numPuncMap2[code]] # _inferText if code in _puncLowerCs: return _puncUpperD2[_puncMapCs[code]] # _inferText if code in _puncCs: return _puncD2[code] # _inferText return None # _inferText else: # _inferText if code in _upperCs: return _upperD2[code].lower() # _inferText if code in _lowerCs: return _lowerD2[code] # _inferText if code in _numCs: return _numD2[code] # _inferText if code in _puncCs: return _puncD2[code] # _inferText return None # _inferText def _isUpper(x:str) -> bool: return x in _upper or x in _puncUpper or x in _numPunc # _isUpper def _canon(x:str) -> Union[int, str]: # returns canonical key to be pressed # _canon if x in _num: return _numD1[x] # _canon if x in _upper: return _upperD1[x] # _canon if x in _lower: return _upperD1[x.upper()] # _canon if x in _puncLower: return x # _canon if x in _puncUpper: return _puncMap2[x] # _canon if x in _numPunc: return _numPuncMap1[_numPuncD1[x]] # _canon if x in _punc: return x # _canon return None # _canon def _textToKeys(text:str): # opposite of _interText # _textToKeys cap = False; d = []; sk = 16 # shift key # _textToKeys for c in text: # _textToKeys _cap = _isUpper(c) # _textToKeys if _cap and not cap: d.append(["down", sk]); cap = True # change to upper # _textToKeys elif not _cap and cap: d.append(["up", sk]); cap = False # change to lower # _textToKeys d.append(["down", _canon(c)]); d.append(["up", _canon(c)]) # _textToKeys if cap: d.append(["up", sk]) # _textToKeys return d # _textToKeys def _getTextBlocks(charTracks:List["CharTrack"]): # Get potential collection of CharTracks # _getTextBlocks es = charTracks | filt(op().startTime) | sortF(op().startTime) | apply(lambda x: [_inferText(x.keyCode, x.mods), x]) | aS(list) # _getTextBlocks d = []; _d = []; inBlock = False # _getTextBlocks for c, obj in es: # _getTextBlocks if c is None and inBlock: d.append(_d); inBlock = False # ends a block # _getTextBlocks elif c is not None and not inBlock: _d = []; inBlock = True # starts a new block # _getTextBlocks if inBlock: _d.append([c, obj]) # _getTextBlocks if inBlock: d.append(_d) # _getTextBlocks return d | apply(transpose() | join("") + iden()) # _getTextBlocks
[docs]class WordTrack(Track): # WordTrack
[docs] def __init__(self, text, times:List[float]): # WordTrack """Representing normal text input. This is not created from events directly. Rather, it's created from scanning over CharTracks and merging them together""" # WordTrack super().__init__(*times); self.text = text # WordTrack
def _tooltip(self, ctx): return escapeHtml(self.__repr__()) # WordTrack def __repr__(self): return f"<WordTrack {self._displayTimes()} text ({self.text}) >" # WordTrack
[docs] def events(self): # WordTrack es = _textToKeys(self.text); d = []; ts = np.linspace(*self.timeUnix(), len(es)) # WordTrack for t, (_type, code) in zip(ts, es): # WordTrack _type = "keyPressed" if _type == "down" else "keyReleased"; t = int(t*1000) # WordTrack if isinstance(code, str): d.append({"type": _type, "text": code, "timestamp": t}) # WordTrack else: d.append({"type": _type, "javaKeyCode": code, "timestamp": t}) # WordTrack return d # WordTrack
[docs] def copy(self): return WordTrack(self.text, self.timeUnix()) # WordTrack
@k1.patch(Recording) # WordTrack def formWords(self) -> Recording: # formWords """Tries to merge nearby CharTracks together that looks like the user is trying to type something, if they make sense. Assuming the user types "a", then "b", then "c". This should be able to detect the intent that the user is trying to type "abc", and replace 3 CharTracks with a WordTrack. Example:: # example recording, run in notebook cell to see interactive interface r = k1ui.Recording.sample(); r # run in another notebook cell and compare difference r.formWords()""" # formWords for word, charTracks in _getTextBlocks(self.sel(klass=CharTrack)): # formWords if len(word) <= 0: continue # formWords ts = charTracks | op().timeUnix().all() | joinStreams() | toMin() & toMax() | deref() # formWords self.removeTracks(charTracks); self.addTracks(WordTrack(word, ts)) # formWords self.removeTracks(self.sel(*ts | apply(op()-self.startTime), klass=CharTrack) | filt(op().keyCode == 16)) # removing shift CharTracks # formWords return self # formWords
[docs]class ContourTrack(Track): # mouse movements # ContourTrack
[docs] def __init__(self, coords): # ContourTrack """Representing mouse trajectory ("mouseMoved" event). :param coords: numpy array with shape (#events, [x, y, unix time])""" # ContourTrack super().__init__(*coords | cut(2) | toMin() & toMax()); self.coords = coords; self._cachedImg = None # ContourTrack
[docs] @staticmethod # ContourTrack def parse(events) -> List["ContourTrack"]: # ContourTrack coords = events | filt(lambda x: x["type"] == "mouseMoved" or x["type"] == "mouseDragged") | apply(lambda x: [x["x"], x["y"], x["timestamp"]/1000]) | deref() | aS(np.array) # ContourTrack return [] if coords | shape(0) == 0 else [ContourTrack(coords)] # ContourTrack
def _img(self): # ContourTrack if self._cachedImg: return self._cachedImg # ContourTrack x, y, t = self.coords | transpose(); c = mpl.cm.rainbow(t - t[0] | aS(lambda x: x/x[-1])); plt.scatter(x, y, None, c, ".") # ContourTrack plt.colorbar(mpl.cm.ScalarMappable(norm=mpl.colors.Normalize(*self.time0Rec()), cmap=mpl.cm.rainbow)).ax.set_title("Time (s)") # ContourTrack plt.title("ContourTrack"); plt.grid(True); plt.tight_layout(); self._cachedImg = plt.gcf() | toImg(); return self._cachedImg # ContourTrack def __repr__(self): return f"<ContourTrack {self._displayTimes()} n ({self.coords.shape[0]})>" # ContourTrack def _tooltip(self, ctx): # ContourTrack return f"""<div><div style="margin-bottom:10px">{escapeHtml(self.__repr__())}</div>{self._imgHtml()}</div>""" # ContourTrack def _imgHtml(self): return f"""<img src="data:image/png;base64,{self._img() | toBytes(dataType="png") | aS(base64.b64encode) | op().decode()}" alt="Mouse trajectory" />""" # ContourTrack def _repr_html_(self): return f"""<!-- k1ui.ContourTrack --><div>{self._imgHtml()}</div>""" # ContourTrack
[docs] def events(self): return self.coords | ~apply(lambda x, y, t: {"type": "mouseMoved", "x": x, "y": y, "timestamp": int(t*1000)}) | deref() # ContourTrack
[docs] def copy(self): return ContourTrack(np.copy(self.coords)) # ContourTrack
[docs] def move(self, deltaTime): self.coords[:,2] += deltaTime; super().move(deltaTime) # ContourTrack
[docs]class ClickTrack(Track): # mouse down, then up # ClickTrack
[docs] def __init__(self, coords:np.ndarray, times:List[float]): # ClickTrack """Representing a mouse pressed and released event""" # ClickTrack super().__init__(*times); self.coords = coords # coords = [[x1, y1], [x2, y2]] # ClickTrack
[docs] @staticmethod # ClickTrack def parse(events) -> List["ClickTrack"]: # ClickTrack tracks = []; pressedEvents = defaultdict(lambda: None) # haha, get it? # ClickTrack def process(e): # ClickTrack _type, x, y, button, t = e # ClickTrack pe = pressedEvents[button] # ClickTrack if _type == "mousePressed": # ClickTrack if pe: raise Exception("Strange case. Why would inRange be true when mouse has just been pressed?") # ClickTrack pressedEvents[button] = e # ClickTrack if _type == "mouseReleased": # ClickTrack if pe: tracks.append(ClickTrack(np.array([pe[1:4], e[1:4]]), [pe[4], e[4]])); pressedEvents[button] = None # ClickTrack else: warnings.warn("Strange case. Why would mouse be released right at the start? Not strange enough to warrant an exception though") # ClickTrack events | filt(lambda x: x["type"] == "mousePressed" or x["type"] == "mouseReleased") | apply(lambda x: [x["type"], x["x"], x["y"], x["button"], x["timestamp"]/1000]) | apply(process) | deref() # ClickTrack return tracks # ClickTrack
[docs] def isClick(self, threshold=1): # ClickTrack """Whether this ClickTrack represents a single click. :param threshold: if Manhattan distance between start and end is less than this amount, then declare it a single click""" # ClickTrack return abs(self.coords[0] - self.coords[1]).sum() <= threshold # ClickTrack
def __repr__(self): return f"<ClickTrack {self._displayTimes()} coords ({self.coords[0]} -> {self.coords[1]})>" # ClickTrack def _tooltip(self, ctx): return escapeHtml(f"{self}") # ClickTrack
[docs] def events(self): # ClickTrack xy1, xy2 = self.coords; t1, t2 = self.timeUnix() # ClickTrack return [{"type": "mousePressed", "x": xy1[0], "y": xy1[1], "button": xy1[2], "timestamp": int(t1*1000)}, # ClickTrack {"type": "mouseReleased", "x": xy2[0], "y": xy2[1], "button": xy2[2], "timestamp": int(t2*1000)}] # ClickTrack
[docs] def copy(self): return ClickTrack(self.coords | deref(), self.timeUnix()) # ClickTrack
[docs]class WheelTrack(Track): # WheelTrack
[docs] def __init__(self, coords:np.ndarray, times:List[float]): # WheelTrack """Representing mouse wheel moved event""" # WheelTrack super().__init__(*times); self.coords = coords # WheelTrack
[docs] @staticmethod # WheelTrack def parse(events) -> List["WheelTrack"]: # WheelTrack d = []; _d = []; lastTime = 0 # WheelTrack for rot, t in events | filt(op()["type"] == "mouseWheelMoved") | apply(lambda x: [x["wheelRotation"], x["timestamp"]/1000]): # WheelTrack if t > lastTime + 2: d.append(_d); _d = [] # WheelTrack _d.append([rot, t]); lastTime = t # WheelTrack d.append(_d); return d | filt(lambda x: len(x)) | apply(aS(np.array) & (cut(1) | rows(0, -1)) | ~aS(WheelTrack)) | aS(list) # WheelTrack
def __repr__(self): return f"<WheelTrack {self._displayTimes()} rotations (avg {self.coords[:,0].sum()}, {self.coords[:,0] | apply(lambda x: '+' if x > 0.5 else '0') | join('')})>" # WheelTrack def _tooltip(self, ctx): return escapeHtml(f"{self}") # WheelTrack
[docs] def events(self): # WheelTrack rs = self.coords[:,0]; ts = np.linspace(*self.timeUnix(), self.coords.shape[0]) # WheelTrack return [rs, ts] | transpose() | ~apply(lambda rot, t: {"type": "mouseWheelMoved", "wheelRotation": rot, "timestamp": int(t*1000)}) # WheelTrack
[docs] def copy(self): return WheelTrack(self.coords, self.timeUnix()) # WheelTrack
[docs]class StreamTrack(Track): # StreamTrack
[docs] def __init__(self, frames:np.ndarray, times:np.ndarray): # StreamTrack """Representing screenshots from the UDP stream""" # StreamTrack super().__init__(times[0], times[-1]); self.frames = frames; self.times = times; self.aspect = self.frames.shape[2]/self.frames.shape[1] # StreamTrack
[docs] @staticmethod # StreamTrack def parse(events) -> List["StreamTrack"]: # StreamTrack events = events | filt(op()["type"] == "stream") | aS(list) # StreamTrack if len(events) == 0: return [] # StreamTrack return [StreamTrack(*events | apply(lambda x: [x["frame"], x["timestamp"]/1000]) | transpose() | apply(np.array))] # StreamTrack
def __repr__(self): return f"<StreamTrack {self._displayTimes()} #frames ({self.frames.shape[0]}) resolution {self.frames.shape[1:3][::-1]}>" # StreamTrack def _frames(self, n, f=iden()): return [self.frames, self.times] | transpose() | insertIdColumn(True, False) | f | aS(list) | aS(lambda x: x | batched(len(x)//n)) | item().all() # StreamTrack def _carousel(self): return self._frames(36) | cut(0) | toImg().all() | batched(9) | plotImgs(3, self.aspect, 3, im=True).all() | aS(k1.viz.Carousel) # StreamTrack def _tooltip(self, ctx): # StreamTrack metaId = ctx.metaId; streamId = autoId(); f = filt(ctx.dis1<op()<ctx.dis2, 1) # StreamTrack data = self._frames(40, f) | apply(toImg() | aS(k1.viz.HtmlImage, style="width:800px") | aS(lambda x: x._repr_html_()), 0) | deref() | aS(json.dumps) # StreamTrack ctx.scriptTags[streamId] = f""" data_{streamId} = {data}; meta_{metaId}.cbs[{streamId}] = (x) => {{ const stream_{streamId} = document.querySelector("#stream_{streamId}"); const streamText_{streamId} = document.querySelector("#streamText_{streamId}"); if (!stream_{streamId}) return; const fT = x/800*{ctx.dis2-ctx.dis1}+{ctx.dis1}; // frame time let minT = Infinity; let minIm = null; let minI = null for (const [imE, t, i] of data_{streamId}) {{ const dT = Math.abs(fT-t); if (dT < minT) {{ minIm = imE; minT = dT; minI = i }} else break; }} stream_{streamId}.innerHTML = minIm; streamText_{streamId}.innerHTML = "frame: " + minI; }};""" # StreamTrack return f"""<div>{escapeHtml(str(self))} <div style="position:relative"> <div id="stream_{streamId}"></div> <div id="streamText_{streamId}" style="position:absolute;top:8px;left:12px;padding:4px 8px;background-color:white;border-radius:12px"></div> </div> </div>""" # StreamTrack def _repr_html_(self): return f"""<div>{escapeHtml(str(self))}<div>{self._carousel()._repr_html_()}</div></div>""" # StreamTrack
[docs] def events(self): return [] # StreamTrack
[docs] def copy(self): return StreamTrack(np.copy(self.frames), np.copy(self.times)) # StreamTrack
[docs] def move(self, deltaTime): self.times += deltaTime; super().move(deltaTime) # StreamTrack
def createTrackss(rec:Recording): # createTrackss dis1 = rec.dis1; dis2 = rec.dis2; delta = dis2-dis1 # nTrack for "new track" # createTrackss def process(f=iden()): # createTrackss trackss = [] # createTrackss for nTrack in rec._tracks | f | apply(lambda x: [max(x.startTime or 0, dis1+delta*0.01), min(x.endTime, dis2-delta*0.01), x]) | filt(op()>dis1, 1) | filt(op()<dis2, 0) | deref(): # createTrackss cTracks = None # "chosen track" # createTrackss for eTracks in trackss: # "existing track" # createTrackss if eTracks["tracks"][-1][1] < nTrack[0]: cTracks = eTracks; break # can fit # createTrackss if cTracks: cTracks["tracks"].append(nTrack) # createTrackss else: trackss.append({"tracks": [nTrack], "type": nTrack[2].__class__.__name__.split(".")[-1]}) # createTrackss return trackss # createTrackss trackss = [ # createTrackss *process(instanceOf(CharTrack)), # createTrackss *process(instanceOf(WordTrack)), # createTrackss *process(instanceOf(ContourTrack)), # createTrackss *process(instanceOf(ClickTrack)), # createTrackss *process(instanceOf(WheelTrack)), # createTrackss *process(instanceOf(StreamTrack)) # createTrackss ]; # createTrackss return [trackss, rec] # createTrackss autoId = k1.AutoIncrement(random.randint(0, int(1e9))) # createTrackss def drawTrackss(obj) -> "html": # drawTrackss h = settings.draw.trackHeight; pad = settings.draw.pad; trackss, rec = obj; sidebarW=120; # width # drawTrackss infoId = autoId(); metaId = autoId(); timeId = autoId(); timeLId = autoId(); sketchId = autoId(); sketchLId = autoId() # drawTrackss ctx = k1.Object.fromDict({"id2Tt": {}, "dis1": rec.dis1, "dis2": rec.dis2, "metaId": metaId, "scriptTags": {}}) # drawTrackss # drawTrackss children = enumerate(trackss) | permute(1, 0) | ~apply(drawTracks, ctx=ctx) | join("") # drawTrackss trackNames = trackss | op()["type"].all() | insertIdColumn() | ~apply(lambda i, x: f"<div style='position:absolute;top:{pad+(pad+h)*i}px;left:12px;height:{h}px;text-align:center;line-height:{h}px'><div>{x}s</div></div>") | join("") # drawTrackss st0 = rec.dis1 - rec.startTime; et0 = rec.dis2 - rec.startTime; ticks0 = k1.ticks(st0, et0) # 0-based # drawTrackss ticksP = (ticks0+rec.startTime-rec.dis1)/(rec.dis2-rec.dis1)*800 # pixel scale # drawTrackss ticks = [ticks0, ticksP] | transpose() | filt(op()>0, 1) | filt(op()<800, 1) | ~apply(lambda x, y: f"<div style='position:absolute;width:1px;height:10px;background-color:black;left:{y}px;bottom:4px'></div> <div style='position:absolute;left:{y-8}px;top:0px'>{x}</div>") | join("") # drawTrackss sketchH = (pad+h)*len(trackss)+pad; extraScripts = "\n".join(ctx.scriptTags.values()) # drawTrackss return f""" <div style="display:flex;flex-direction:column;align-items:flex-start"> <div style="display:flex;flex-direction:row"> <div style="width:{sidebarW}px;padding-right:10px;display:flex;justify-content:center;align-items:center"><div>Time (s)</div></div> <div id="time_{timeId}" style="background-color:red;height:{h}px;position:relative;height:34px"> {ticks} <div id="timeL_{timeLId}" style="position:absolute;top:0px;background-color:white;border:1px solid black;border-radius:8px;padding:0px 8px">&nbsp;&nbsp;</div> </div> </div> <div style="display:flex;flex-direction:row"> <div style="width:{sidebarW}px;padding-right:10px;position:relative">{trackNames}</div> <div id="sketch_{sketchId}" style="width:{800}px;height:{sketchH}px;background-color:grey;position:relative"> <div id="sketchL_{sketchLId}" style="position:absolute;width:1px;height:{sketchH}px;background-color:black;top:0px"></div> {children} </div> </div> <div id="info_{infoId}" style="min-height:30px;display:flex;flex-direction:column;justify-content:center;align-items:flex-start;padding:4px 12px"></div> </div> <script> id2Tt = {ctx.id2Tt | aS(json.dumps)} info_{infoId} = document.querySelector("#info_{infoId}"); time_{timeId} = document.querySelector("#time_{timeId}"); sketch_{sketchId} = document.querySelector("#sketch_{sketchId}"); sketchL_{sketchLId} = document.querySelector("#sketchL_{sketchLId}"); timeL_{timeLId} = document.querySelector("#timeL_{timeLId}"); meta_{metaId} = {{x: 0, y: 0, cbs: {{}}}}; for (const [k, v] of Object.entries(id2Tt)) {{ let elem = document.querySelector(`#track_${{k}}`); elem.onmouseover = () => {{info_{infoId}.innerHTML = atob(v[0]);elem.style.backgroundColor = "red";}}; elem.onmouseout = () => {{info_{infoId}.innerHTML = ""; elem.style.backgroundColor = "white";}}; }} sketch_{sketchId}.onmousemove = (event) => {{ const x = event.pageX-sketch_{sketchId}.getBoundingClientRect().x; meta_{metaId}.x = x; sketchL_{sketchLId}.style.left = x + "px"; timeL_{timeLId}.style.left = (x-timeL_{timeLId}.getBoundingClientRect().width/2) + "px"; timeL_{timeLId}.innerHTML = Number(x/800*{et0-st0}+{st0}).toFixed(2) + "s"; for (const cb of Object.values(meta_{metaId}.cbs)) cb(x); }} {extraScripts} </script>""" # drawTrackss def drawTracks(tracks, rowId, ctx) -> "html": return tracks["tracks"] | apply(drawTrack, rowId=rowId, ctx=ctx) | join("") # drawTracks def drawTrack(track, rowId, ctx) -> "html": # drawTrack h = settings.draw.trackHeight; pad = settings.draw.pad; st, et, obj = track # drawTrack x1 = (st-ctx.dis1)/(ctx.dis2-ctx.dis1)*800; x2 = (et-ctx.dis1)/(ctx.dis2-ctx.dis1)*800 # drawTrack y = rowId*(h+pad)+pad; w = x2-x1; trackId = autoId() # drawTrack tooltip = obj._tooltip(ctx).encode() | aS(base64.b64encode) | op().decode() # drawTrack ctx.id2Tt[trackId] = [tooltip, x1, x2, y] # drawTrack return f"""<div id="track_{trackId}" style="top:{y}px;left:{x1}px;width:{w}px;height:{h}px;background-color:white;position:absolute"></div>""" # drawTrack basePath = os.path.dirname(inspect.getabsfile(k1lib)) + os.sep + "k1ui" + os.sep # drawTrack @k1.patch(Recording, static=True) # drawTrack def sampleEvents() -> List[dict]: # sampleEvents """Grabs the built-in example events. Results will be really long, so beware, as it can crash your notebook if you try to display it.""" # sampleEvents mouseE, keyE = cat(f"{basePath}mouseKey.pth", False) | aS(dill.loads) # sampleEvents deltaT = keyE()[0]["timestamp"] - mouseE()[0]["timestamp"] # sampleEvents ev = [*mouseE() | apply(lambda x: {**x, "timestamp": x["timestamp"]+deltaT}), *keyE()] # sampleEvents try: # local comp has the k1ui-screen file, but it will not be bundled with the library, cause it's like 80MB! # sampleEvents screenE = cat("screen.pth", False) | aS(dill.loads) # sampleEvents deltaT = keyE()[0]["timestamp"] - screenE()[0]["timestamp"] # sampleEvents return [*screenE() | apply(lambda x: {**x, "timestamp": x["timestamp"]+deltaT}), *ev] # sampleEvents except: return ev # sampleEvents @k1.patch(Recording, static=True) # sampleEvents def sample() -> Recording: # sample """Creates a Recording from :meth:`sampleEvents`""" # sample return Recording(Recording.sampleEvents()) # sample @k1.patch(ContourTrack) # sample def split(self, times:List[float]): # split """Splits this contour track by multiple timestamps relative to recording's start time. Example:: r = k1ui.Recording.sample() r.sel1(klass=k1ui.ContourTrack).split([5])""" # split rec = self.recording; c = self.coords; i = 0; x = 0; y = 0; d = []; cps = np.array(times) + rec.startTime # split while True: # split if cps[i] > c[y,2]: y += 1 # split else: # split if y > x: d.append(c[x:y]) # split x = y; i += 1 # split if y >= len(c): d.append(c[x:y]); break # split if i >= len(cps): d.append(c[x:]); break # split rec.removeTracks(self) # split rec.addTracks(d | apply(ContourTrack)) # split @k1.patch(ContourTrack) # split def splitClick(self, clickTracks:List["ClickTrack"]=None): # splitClick """Splits this contour track by click events. Essentially, the click events chops this contour into multiple segments. Example:: r = k1ui.Recording.sample() r.sel1(klass=k1ui.ContourTrack).splitClick() :param clickTracks: if not specified, use all ClickTracks from the recording""" # splitClick rec = self.recording; c = self.coords; i = 0; x = 0; y = 0; d = [] # splitClick if clickTracks is None: clickTracks = rec.sel(*self.time0Rec()) | instanceOf(ClickTrack) # splitClick self.split(clickTracks | ~filt(op().isClick(-1)) | op().timeUnix().all() | joinStreams() | sort(None) | apply(op()-rec.startTime) | deref()) # splitClick @k1.patch(Recording) # splitClick def addTime(self, t:float, duration:float) -> Recording: # addTime """Inserts a specific duration into a specific point in time. More clearly, this transfroms this:: # |-1--| |-2-| # |---3---| # ^ insert duration=3 here Into this:: # |-1--| |-2-| # |---3------| Tracks that partly overlaps with the range will have their start/end times modified, and potentially delete some of the Track's internal data: - Tracks whose only start and end times are modified: Char, Word, Click, Wheel - Tracks whose internal data are also modified: Contour, Stream :param t: where to insert the duration, relative to Recording's start time :param duration: how long (in seconds) to insert?""" # addTime at = self.sel(t,t); after = self.sel(t) # tracks at or after the specified time # addTime unix = t + self.startTime # addTime for track in at: after.remove(track) # addTime for track in at: # addTime track.endTime += duration # addTime if isinstance(track, ContourTrack): # addTime c = track.coords; idx = (c[:,2] > unix).argmax(); track._cachedImg = None # addTime if c[idx,2] > unix: c[idx:,2] += duration # index is valid # addTime if isinstance(track, StreamTrack): # addTime c = track.times; idx = (c > unix).argmax() # addTime if c[idx] > unix: c[idx:] += duration # index is valid # addTime for track in after: # addTime track.startTime += duration; track.endTime += duration # addTime if isinstance(track, ContourTrack): track.coords[2] += duration # addTime if isinstance(track, StreamTrack): track.times += duration # addTime self.endTime += duration; self._resetDis(); return self # addTime @k1.patch(Recording) # addTime def removeTime(self, t1:float, t2:float) -> Recording: # removeTime """Deletes time from t1 to t2 (relative to Recording's start time). All tracks lying completely inside this range will be deleted. More clearly, it transforms this:: # |-1--| |-2-| |-3-| # |---4---| |-5-| # ^ ^ delete between these carets Into this:: # |-1--| |-3-| # |-4-||5-| Tracks that partly overlaps with the range will have their start/end times modified, and potentially delete some of the Track's internal data: - Tracks whose only start and end times are modified: Char, Word, Click, Wheel - Tracks whose internal data are also modified: Contour, Stream""" # removeTime duration = t2 - t1; t1U = t1 + self.startTime; t2U = t2 + self.startTime # removeTime self.removeTracks(self.sel(t1, t2) | filt(op().startTime >= t1U) | filt(op().endTime < t2U)) # removing everything that's completely inside # removeTime overlap = self.sel(t1, t2) | aS(list); after = self.sel(t2) | filt(op().startTime >= t2U) | aS(list) # removeTime for track in overlap: # handling left overhang # removeTime if isinstance(track, ContourTrack): # removeTime c = track.coords; idx1 = (c[:,2] > t1U).argmax(); idx2 = (c[:,2] > t2U).argmax() # removeTime if c[idx2,2] <= t2U: idx2 = len(c) # removeTime a = c[:idx1]; b = c[idx2:]; b[:,2] -= duration # removeTime track.coords = np.concatenate([a, b]); track._cachedImg = None # removeTime if isinstance(track, StreamTrack): # removeTime c = track.times; idx1 = (c > t1U).argmax(); idx2 = (c > t2U).argmax() # removeTime if c[idx2] <= t2U: idx2 = len(c) # special case if idx2 is not valid # removeTime track.times = np.concatenate([track.times[:idx1], track.times[idx2:]-duration]) # removeTime track.frames = np.concatenate([track.frames[:idx1], track.frames[idx2:]]) # removeTime track.endTime = max(t1U, track.endTime - duration); track.startTime = min(t1U, track.startTime) # removeTime for track in after: # removeTime if isinstance(track, ContourTrack): track.coords[:,2] -= duration # removeTime if isinstance(track, StreamTrack): track.times -= duration # removeTime track.startTime -= duration; track.endTime -= duration # removeTime self._resetTimes(); self._resetDis(); return self # removeTime def _move(cs, e1, e2): # _move det = e1[0]*e2[1] - e1[1]*e2[0]; dot = e1@e2; angle = math.atan2(det, dot) # _move s = math.sin(angle); c = math.cos(angle); rot = np.array([[c, -s], [s, c]]) # _move scale = (e2**2).sum()**0.5/(e1**2).sum()**0.5; return (rot @ cs.T)*scale | transpose() # _move @k1.patch(ContourTrack) # _move def movePoint(self, x, y, start=True): # movePoint """Move contour's start/end to another location, smoothly scaling all intermediary points along. :param start: if True, move the start point, else move the end point""" # movePoint c = self.coords; e2 = np.array([x, y]) # movePoint if start: s = c[-1,:2]; e1 = c[0,:2] - s # movePoint else: s = c[0,:2]; e1 = c[-1,:2] - s # movePoint e2 = e2 - s; c[:,:2] = _move(c[:,:2]-s, e1, e2)+s # movePoint @k1.patch(Track) # movePoint def nextTrack(self) -> Track: # nextTrack """Grabs the next track (ordered by start time) in the recording""" # nextTrack return self.recording._tracks | filt(op().startTime) | filt(op().startTime > (self.startTime or 0)) | sortF(op().startTime) | item() # nextTrack @k1.patch(Recording) # nextTrack def refine(self, enabled:List[int]=[1,1,0]) -> Recording: # refine """Perform sensible default operations to refine the Recording. This currently includes: - (0) Splitting ContourTracks into multiple smaller tracks using click events - (1) Forming words from nearby CharTracks - (2) Removing open-close CharTracks. Basically, CharTracks that don't have a begin or end time :param enabled: list of integers, whether to turn on or off certain features. 1 to turn on, 0 to turn off""" # refine if enabled[0]: self.formWords() # refine if enabled[1]: self.sel(klass=ContourTrack) | op().splitClick().all() | ignore() # refine if enabled[2]: self.removeTracks(self.sel(klass=CharTrack) | ~filt(op().startTime)) # refine return self # refine def convBlock(inC, outC, kernel=3, stride=2, padding=1): # convBlock return torch.nn.Sequential(torch.nn.Conv2d(inC, outC, kernel, stride, padding), torch.nn.ReLU(), torch.nn.BatchNorm2d(outC)) # convBlock if hasTorch: # convBlock class skipBlock(torch.nn.Module): # convBlock def __init__(self, inC): # convBlock super().__init__(); self.conv1 = convBlock(inC, inC, stride=1) # convBlock self.conv2 = convBlock(inC, inC*2) # convBlock def forward(self, x): return ((x | self.conv1) + x) | self.conv2 # convBlock class Net(torch.nn.Module): # convBlock def __init__(self, skips:int=5): # convBlock super().__init__() # convBlock self.skips = torch.nn.Sequential(convBlock(3, 8), *[skipBlock(8*2**i) for i in range(skips)]) # convBlock self.avgPool = torch.nn.AdaptiveAvgPool2d([1, 1]); self.lin1 = knn.LinBlock(8 * 2**skips, 50) # convBlock self.lin2 = torch.nn.Linear(50, 10); self.softmax = torch.nn.Softmax(dim=1) # convBlock self.distThreshold = torch.nn.Parameter(torch.tensor(-0.5)); self.sigmoid = torch.nn.Sigmoid() # convBlock self.headOnly = True # convBlock def forward(self, x): # convBlock x = x | self.skips | self.avgPool | op().squeeze() | self.lin1 # convBlock return x if self.headOnly else x | self.lin2 # convBlock x = ((x[None] - x[:,None])**2).sum(dim=-1) # convBlock x = (x + 1e-7)**0.5 + self.distThreshold | self.sigmoid # convBlock return x # convBlock
[docs]def distNet() -> "torch.nn.Module": # distNet """Grabs a pretrained network that might be useful in distinguishing between screens. Example:: net = k1ui.distNet() net(torch.randn(16, 3, 192, 192)) # returns tensor of shape (16, 10)""" # distNet net = Net(); net.load_state_dict(cat(f"{basePath}256.model.state_dict.pth", False) | aS(dill.loads)) # distNet net.parameters() | op().requires_grad_(False).all() | ignore(); net.eval(); return net # distNet
def discardTransients(it, col=None, countThres=7, regular=False): # consistent for 7 consecutive frames, then output the results # discardTransients lastRow = None; lastE = None # discardTransients yielded = False; count = 0 # discardTransients for row in it: # discardTransients e = row[col] if col else row # discardTransients if e == lastE: count += 1 # discardTransients else: count = 0; lastE = e; lastRow = row; yielded = False # discardTransients if count > countThres-2 and not yielded: yielded = True; yield lastRow # discardTransients elif regular: yield None # discardTransients class Buffer: # Buffer def __init__(self): self.l = deque() # Buffer def append(self, x): self.l.append(x) # Buffer def __next__(self): return self.l.popleft() # Buffer if hasTorch and hasTv: # Buffer np2Tensor = toImg() | aS(tf.Resize([192, 192])) | toTensor() # Buffer class MLP(nn.Module): # Buffer def __init__(self, nClasses, **kwargs): # Buffer super().__init__(); self.l1 = knn.LinBlock(50, nClasses); self.l2 = nn.Linear(nClasses, nClasses) # Buffer def forward(self, xb): return xb | self.l1 | self.l2 # Buffer whatever = object() # Buffer
[docs]class TrainScreen: # TrainScreen data: List[Tuple[int, str]] # TrainScreen """Core dataset of TrainScreen. Essentially just a list of (frameId, screen name)""" # TrainScreen
[docs] def __init__(self, r:Recording): # TrainScreen """Creates a screen training system that will train a small neural network to recognize different screens using a small amount of feedback from the user. Overview on how it's supposed to look like: Setting up:: r = k1ui.Recording(await k1ui.record(30)) # record everything for 30 seconds, and creates a recording out of it ts = k1ui.TrainScreen(r) # creates the TrainScreen object r # run this in a cell to display the recording, including StreamTrack ts.addRule("home", "settings", "home") # add expected screen transition dynamics (home -> settings -> home) Training with user's feedback:: ts.registerFrames({"home": [100, 590, 4000, 4503], "settings": [1200, 2438]}) # label some frames of the recording. Network will train for ~6 seconds next(ts) # display 20 images that confuses the network the most ts.register({"home": [2, 6], "settings": [1, 16]}) # label some frames from the last line. Notice the frame numbers are much smaller and are <20 next(ts); ts.register({}); next(ts); ts.register({}) # repeat the last 2 lines for a few times (3-5 times is probably good enough for ~7 screens) Evaluating the performance:: ts.graphs() # displays 2 graphs: network's prediction graph and the actual rule graph. Best way to judge performance ts.l.Accuracy.plot() # actual accuracy metric while training. Network could have bad accuracy here while still able to construct a perfect graph, so don't rely much on this Using the model:: ts.predict(torch.randn(2, 3, 192, 192) | k1ui.distNet()) # returns list of ints. Can use ts.idx2Name dict to convert to screen names Saving the model:: ts | aS(dill.dumps) | file("ts.pth") .. warning:: This won't actually save the associated recording, because recordings are very heavy objects (several GB). It is expected that you manually manage the lifecycle of the recording.""" # TrainScreen self.r = r; self.data = []; # [(frame id, screen name)] # TrainScreen self._aspect = self.frames | item() | op().shape[:2] | ~aS(lambda x, y: y/x) # TrainScreen self._distNet = distNet(); self._rules = set(); self._trainParams = {"joinAlpha": 0, "epochs": 300} # TrainScreen self._lastScreenName = None; self._screenDump = Buffer(); self._screenTransients = discardTransients(self._screenDump, regular=True) # TrainScreen
@property # TrainScreen def _coldStart(self): return len(self.data) == 0 # whether there are any data at all to work with # TrainScreen def _coldGuard(self): # TrainScreen if self._coldStart: raise Exception("TrainScreen has not started yet. Run `next(ts)`, choose a few frames using `ts.register()` to access this functionality") # TrainScreen def _learner(self): # TrainScreen self._coldGuard(); l = k1.Learner(); l.data = self._dataF() # TrainScreen l.model = MLP(len(self.name2Idx)) # TrainScreen l.opt = optim.AdamW([l.model.parameters(), self._distNet.parameters()] | joinStreams(), lr=3e-3) # TrainScreen l.cbs.add(Cbs.LossCrossEntropy()); l.css = "none" # TrainScreen l.ConfusionMatrix.categories = deref()(self.name2Idx.items()) | sort(1) | cut(0) | deref() # TrainScreen l.cbs.remove("AccuracyTop5", "AccF0"); return l # TrainScreen
[docs] def train(self, restart=True): # TrainScreen """Trains the network for a while (300 epochs/6 seconds). Will be called automatically when you register new frames to the system :param restart: whether to restart the small network or not""" # TrainScreen if restart: self.l = self._learner(); # TrainScreen self.l.run(self._trainParams["epochs"]) # TrainScreen
[docs] def trainParams(self, joinAlpha:float=None, epochs:int=None): # TrainScreen """Sets training parameters. :param joinAlpha: (default 0) alpha used in joinStreamsRandom component for each screen categories. Read more at :class:`~k1lib.cli.structural.joinStreamsRandom` :param epochs: (default 300) number of epochs for each training session""" # TrainScreen if joinAlpha: self._trainParams["joinAlpha"] = joinAlpha # TrainScreen if epochs: self._trainParams["epochs"] = epochs # TrainScreen
@property # TrainScreen def frames(self) -> np.ndarray: # TrainScreen """Grab the frames from the first :class:`StreamTrack` from the :class:`Recording`""" # TrainScreen return self.r.sel1(klass=StreamTrack).frames # TrainScreen @property # TrainScreen @lru_cache # TrainScreen def feats(self) -> List[np.ndarray]: # TrainScreen """Gets the feature array of shape (N, 10) by passing the frames through :meth:`distNet`. This returns a list of arrays, not a giant, stacked array for memory performance""" # TrainScreen self._coldGuard(); print("Converting all frames to features using `distNet`..."); a = k1.AutoIncrement() # TrainScreen res = self.frames | tee(lambda x: f"{a()}/{len(self)}").crt() | np2Tensor.all() | batched(16, True) | apply(aS(list) | aS(torch.stack) | aS(self._distNet)) | joinStreams() | aS(list) # TrainScreen print(); return res # TrainScreen def __len__(self): return len(self.frames) # TrainScreen def _randomConsidering(self): return range(len(self)) | splitW(1, 1, 1, 1, 1) | apply(randomize(None, 42) | head(4)) | joinStreams() | aS(list) # TrainScreen def __next__(self) -> "PIL.Image.Image": # show frames # TrainScreen if self._coldStart: self._considering = self._randomConsidering() # TrainScreen else: # TrainScreen a = self.transitionScreens(False) | randomize(None, 42) | cut(0) | aS(iter) # TrainScreen b = self._randomConsidering() | aS(iter); c = self._midBoundaryConsidering() | aS(iter) # TrainScreen self._considering = [a, a, b, c, c, c] | apply(wrapList() | insert(yieldT | repeat(), False) | joinStreams() | randomize()) | joinStreamsRandom() | head(20) | deref() # TrainScreen return self._considering | lookup(self.frames) | insertIdColumn(begin=False) | plotImgs(5, self._aspect-0.2, im=True) # TrainScreen def _refreshIdx(self): self.idx2Name, self.name2Idx = self.data | cut(1) | aS(set) | insertIdColumn() | toDict() & (permute(1, 0) | toDict()) | deref() # TrainScreen
[docs] def register(self, d): # TrainScreen """Tells the object which images previously displayed by :meth:`TrainScreen.__next__` associate with what screen name. Example:: next(ts) # displays the images out to a notebook cell ts.register({"home": [3, 4, 7], "settings": [5, 19, 2], "monkeys": [15, 11], "guns": []}) This will also quickly (around 6 seconds) train a small neural network on all available frames based on the new information you provided. See also: :meth:`registerFrames`""" # TrainScreen self.data = [self.data, deref()(d.items()) | apply(repeat(), 0) | transpose().all() | joinStreams() | permute(1, 0) | lookup(self._considering, 0)] | joinStreams() | sort(0) | unique(0) | deref() # TrainScreen self._refreshIdx(); self.train() # TrainScreen
[docs] def registerFrames(self, data:Dict[str, List[int]]): # TrainScreen """Tells the object which frames should have which labels. Example:: ts.registerFrames({"home": [328, 609], "settings": [12029], "monkeys": [1238]}) This differs from :meth:`register` in that the frame id here is the absolute frame index in the recording, while in :meth:`register`, it's the frame displayed by :meth:`TrainScreen.__next__`.""" # TrainScreen self.data = [self.data, deref()(data.items()) | apply(repeat(), 0) | transpose().all() | joinStreams() | permute(1, 0)] | joinStreams() | sort(0) | unique(0) | deref(); self._refreshIdx(); self.train() # TrainScreen
[docs] def addRule(self, *screenNames:List[str]) -> "TrainScreen": # TrainScreen """Adds a screen transition rule. Let's say that the transition dynamic looks like this: .. code-block:: text home <---> settings <---> account ^ | v shortcuts You can represent it like this:: ts.addRule("home", "settings", "account", "settings", "home") ts.addRule("settings", "shortcuts", "settings")""" # TrainScreen screenNames | window(2) | apply(tuple) | apply(self._rules.add) | ignore(); return self # TrainScreen
[docs] def transitionScreens(self, obeyRule:bool=whatever) -> List[Tuple[int, str]]: # TrainScreen """Get the list of screens (list of (frameId, screen name) tuple) that the network deems to be transitions between screen states. :param obeyRule: if not specified, then don't filter. If True, returns only screens that are part of the specified rule and vice versa""" # TrainScreen self._coldGuard() # TrainScreen with torch.no_grad(): transitions = self.predict(self.feats) | insertIdColumn() | aS(discardTransients, 1) | window(2, True, None) | filt(~aS(lambda x, y: not y or x[1] != y[1])) | cut(0) | lookup(self.idx2Name, 1) | deref() # TrainScreen if obeyRule is whatever: return transitions # TrainScreen f = inSet(self._rules, 1) if obeyRule else ~inSet(self._rules, 1) # TrainScreen return transitions | window(2) | apply(transpose() | iden() + aS(tuple)) | f | transpose().all() | joinStreams() | unique(0) | deref() # TrainScreen
[docs] def newEvent(self, sess:WsSession, event:dict): # TrainScreen if event["type"] == "stream": # TrainScreen with torch.no_grad(): # TrainScreen name = event["frame"] | np2Tensor | op().reshape(-1, 3, 192, 192) | aS(self._distNet) | op().view(1, -1)\ | self.l.model | op().argmax().item() | aS(lambda x: self.idx2Name[x]) # TrainScreen sess.loop.create_task(sess.eventCb(sess, {"type": "screenName", "name": name})) # TrainScreen if event["type"] == "screenName": # TrainScreen self._screenDump.append(event["name"]); res = next(self._screenTransients) # TrainScreen if res: # TrainScreen sess.loop.create_task(sess.eventCb(sess, {"type": "screenTransition", "transition": (self._lastScreenName, res)})) # TrainScreen self._lastScreenName = res # TrainScreen
[docs] def predict(self, feats:"torch.Tensor") -> List[int]: # TrainScreen """Using the built-in network, tries to predict the screen name for a bunch of features of shape (N, 10). Example:: r = ...; ts = k1ui.TrainScreen(r); next(ts) ts.register({"bg": [9, 10, 11, 12, 17, 19], "docs": [5, 6, 7, 8, 0, 1, 4], "jupyter": [2, 3]}) # returns list of 2 integers ts.predict(torch.randn(2, 3, 192, 192) | aS(k1ui.distNet()))""" # TrainScreen self._coldGuard(); return feats | batched(128, True) | apply(aS(list) | aS(torch.stack) | aS(self.l.model) | op().argmax(1).numpy()) | joinStreams() # TrainScreen
[docs] def transitionGraph(self) -> "graphviz.graphs.Digraph": # TrainScreen """Gets a screen transition graph of the entire recording. See also: :meth:`graphs`""" # TrainScreen g = k1.digraph(); self.transitionScreens() | cut(1) | window(2) | apply(tuple) | count() | cut(0, 1) | ~apply(lambda c, xy: g(*xy, label=f" {c}")) | ignore(); return g # TrainScreen
[docs] def ruleGraph(self) -> "graphviz.graphs.Digraph": # TrainScreen """Gets a screen transition graph based on the specified rules. Rules are added using :meth:`addRule`. See also: :meth:`graphs`""" # TrainScreen g = k1.digraph(); self._rules | ~apply(g) | ignore(); return g # TrainScreen
[docs] def graphs(self) -> viz.Carousel: # TrainScreen """Combines both graphs from :meth:`transitionGraph` and :meth:`ruleGraph`""" # TrainScreen return [self.transitionGraph(), self.ruleGraph()] | toImg().all() | aS(viz.Carousel) # TrainScreen
[docs] def labeledData(self) -> viz.Carousel: # TrainScreen """Visualizes labeled data""" # TrainScreen return self.data | groupBy(1) | apply(randomize(None) | head(5) | lookup(self.frames, 0)) | batched(5) | plotImgs(5, self._aspect-0.2, table=True, im=True).all() | aS(viz.Carousel) # TrainScreen
def __getstate__(self): d = dict(self.__dict__); del d["r"]; del d["_lastScreenName"]; del d["_screenDump"]; del d["_screenTransients"]; return d # TrainScreen def __setstate__(self, d): # TrainScreen self.__dict__.update(d); self._lastScreenName = None; self._screenDump = Buffer() # TrainScreen self._screenTransients = discardTransients(self._screenDump, regular=True) # TrainScreen
[docs] def correctRatio(self): # TrainScreen """Ratio between the number of screens that is in a valid transition and ones that isn't in a valid transition. Just a quick metric to see how well the network is doing. The higher the number, the better it is""" # TrainScreen return len(self.transitionScreens(True))/len(self.transitionScreens(False)) # TrainScreen
def fillIn(n, states): # fillIn iS = 0 # index of states # fillIn state = None; nextI, nextS = states[iS] # fillIn for i in range(n): # fillIn if i >= nextI: # fillIn iS += 1; state = nextS # fillIn if iS < len(states): nextI, nextS = states[iS] # fillIn yield [i, state] # fillIn def blocks(it): # blocks lastIs = []; lastE = None # blocks for i, e in it: # blocks if e != lastE: # blocks if lastE is not None: yield min(lastIs), max(lastIs), lastE # blocks lastIs = []; lastE = e # blocks lastIs.append(i) # blocks yield min(lastIs), max(lastIs), lastE # blocks @k1.patch(TrainScreen) # blocks def _dataF(self, bs=64): # _dataF self._coldGuard(); v1 = fillIn(len(self), self.data) | filt(op(), 1) # old version. A bit more liveral than v2, and will accidentally auto label wrongly from time to time # _dataF v2 = blocks(self.data) | ~apply(lambda x, y, z: [range(x, y+1), z | repeat()] | transpose()) | joinStreams() # _dataF js = deref() | aS(lambda xs: xs | apply(repeatFrom() | randomize()) | joinStreamsRandom(self._trainParams["joinAlpha"], xs | apply(len) | deref())) # joinStreams # _dataF return v2 | randomize(None) | groupBy(1) | filt(lambda x: len(x) > 1) | splitW().all() | transpose()\ | apply(js | lookup(self.feats, 0) | lookup(self.name2Idx, 1) | batched(bs)\ | apply(transpose() | (aS(list) | aS(torch.stack)) + toTensor(int)) # _dataF ) | stagger.tv(1024/bs) | aS(list) # _dataF def midBounds(it): # to grab data samples that's in between blocks. Aka the really confusing case in-between transitions, so that the user can guide it effectively # midBounds lastI = 0; lastE = None # midBounds for i, e in it: # midBounds if e != lastE: yield (i + lastI)//2, i-lastI, lastE; lastE = e # midBounds lastI = i # midBounds return it # midBounds @k1.patch(TrainScreen) # midBounds def _midBoundaryConsidering(self): return midBounds(self.data) | ~head(1) | ~sort(1) | cut(0) # _midBoundaryConsidering