# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""`k1ui <https://github.com/157239n/k1ui>`_ is another project made in Java that
aims to record and manipulate the screen, keyboard and mouse. The interface to
that project on its own is clunky, and this module is the Python interface to
ease its use.
Not quite developed yet tho, because I'm lazy."""
import k1lib, numpy as np, asyncio, time, inspect, json, threading, dill, math, base64, os, random, warnings
k1 = k1lib; cli = k1.cli; from k1lib.cli import *; knn = k1.knn; Cbs = k1.Cbs; viz = k1.viz; websockets = k1.dep.websockets
nn = k1.dep("torch.nn", url="https://pytorch.org/"); optim = k1.dep("torch.optim", url="https://pytorch.org/")
PIL = k1.dep.PIL; k1.dep.graphviz; requests = k1.dep.requests; tf = k1.dep("torchvision.transforms", url="https://pytorch.org/")
try: import torch; hasTorch = True
except: torch = k1.dep.torch; hasTorch = False
try: import torchvision; hasTv = True
except: hasTv = False
from typing import Callable, List, Iterator, Tuple, Union, Dict; from collections import defaultdict, deque; from functools import lru_cache
mpl = k1lib.dep.mpl; plt = k1lib.dep.plt
__all__ = ["get", "WsSession", "selectArea", "record", "execute", "Recording",
"Track", "CharTrack", "WordTrack", "ContourTrack", "ClickTrack", "WheelTrack", "StreamTrack",
"distNet", "TrainScreen"]
k1lib.settings.add("k1ui", k1.Settings().add("server", k1.Settings().add("http", "http://localhost:9511", "normal http server").add("ws", "ws://localhost:9512", "websocket server"), "server urls"), "docs related to k1ui java library");
settings = k1lib.settings.k1ui
settings.add("draw", k1.Settings(), "drawing settings")
settings.draw.add("trackHeight", 30, "Track's height in Recording visualization")
settings.draw.add("pad", 10, "Padding between tracks");
[docs]def get(path): # get
"""Sends a get request to the Java server.
Example::
k1ui.get("mouse/200/300") # move mouse to (200, 300)""" # get
return requests.get(f"{settings.server.http}/{path}", timeout=60*10).text # get
def post(path, jsObj): # post
"""Sends a post request to the Java server.
Example::
k1ui.post("mouse/200/300") # move mouse to (200, 300)""" # post
return requests.post(f"{settings.server.http}/{path}", json=jsObj, timeout=60*10).text # post
portAutoInc = k1.AutoIncrement(9520) # post
[docs]class WsSession: # WsSession
[docs] def __init__(self, eventCb:Callable[["WsSession", dict], None], mainThreadCb:Callable[["WsSession"], None]): # WsSession
"""Creates a websocket connection with the server, with some callback functions
The callback functions (most are async btw) will be passed a WebSocket object
as the first argument. You can use it to send messages like this::
# this will send a signal to the server to close the session
sess.ws.send(json.dumps({"type": "close"}))
# this will send a signal to the server requesting the current screenshot. Result will be deposited into eventCb
sess.ws.send(json.dumps({"type": "screenshot"}))
# this will execute a single event
sess.ws.send(json.dumps({"type": "execute", "event": {"type": "keyTyped", "javaKeyCode": 0, ...}}))
Complete, minimum example::
events = []
async def eventCb(sess, event): events.append(event)
async def mainThreadCb(sess):
sess.stream(300) # starts a stream with output screen width of 300px
await asyncio.sleep(2)
await sess.ws.send(json.dumps({"type": "execute", "event": {"type": "keyPressed", "javaKeyCode": 65, "timestamp": 0}}))
await sess.ws.send(json.dumps({"type": "execute", "event": {"type": "keyReleased", "javaKeyCode": 65, "timestamp": 0}}))
await asyncio.sleep(10); sess.close()
await k1ui.WsSession(eventCb, mainThreadCb).run()
What this code does is that it will communicate with the server continuously for 12
seconds, capturing all events in the mean time and save them into ``events`` list.
It will start up a UDP stream to capture screenshots continuously, and after 2 seconds,
it sends 2 events to the server, trying to type the letter "A". Finally, it waits for
another 10 seconds and then terminates the connection.
This interface is quite low-level, and is the basis for all other functionalities.
Some of them include:
* :meth:`record`: recording a session
* :meth:`execute`: executes a list of events
:param eventCb: (async) will be called whenever there's a new event
:param mainThreadCb: (async) will be called after setting up everything
:param streamWidth: specifies the width of the UDP stream, in pixels""" # WsSession
self.ws = None; self.eventCb = eventCb; self.mainThreadCb = mainThreadCb # WsSession
if not inspect.iscoroutinefunction(eventCb): raise Exception(f"eventCb has to be an async function") # WsSession
if not inspect.iscoroutinefunction(mainThreadCb): raise Exception(f"mainThreadCb has to be an async function") # WsSession
self.closed = False; self.streams = {} # width -> [width, lock, port] # WsSession
async def _listenLoop(self): # WsSession
while True: # WsSession
res = await self.ws.recv() | cli.aS(json.loads); _type = res["type"] # WsSession
if _type == "close": break # python sends close signal to java, java then sends a close signal back, as an acknowledgement # WsSession
if _type == "screenshot": await self.eventCb(self, {"type": "screenshot", "bytes": base64.b64decode(res["screenshot"]), "timestamp": int(time.time()*1000)}) # WsSession
if _type == "newEvent": await self.eventCb(self, res["event"]) # WsSession
async def _pingLoop(self): # WsSession
while True: # WsSession
if self.closed: break # WsSession
try: await self.ws.send({"type": "ping"} | cli.aS(json.dumps)); await asyncio.sleep(1) # WsSession
except: break # WsSession
async def _streamLoop(self, width, locks, port): # WsSession
import cv2; streamRefresh = 100 # refreshes udp stream after this many seconds, so that it doesn't hang # WsSession
def threadLoop(lock, port): # WsSession
with lock, k1.captureStdout(False, True): # WsSession
get(f"startStream/{width}/{port}"); cap = cv2.VideoCapture(f'udp://0.0.0.0:{port}', cv2.CAP_FFMPEG); beginTime = time.time() # WsSession
while (cap.isOpened()): # WsSession
if self.closed: break # WsSession
res, frame = cap.read() # WsSession
if not res: break # WsSession
self.loop.create_task(self.eventCb(self, {"type": "stream", "width": width, "frame": frame[:,:,::-1], "timestamp": int(time.time()*1000)})) # WsSession
if time.time() - beginTime > streamRefresh + 10: break # there will be a short time (5s) where there're 2 udp streams simultaneously dumps events # WsSession
cap.release(); get(f"stopStream/{port}") # WsSession
ports = [port, port + 100]; sel = 0 # WsSession
while not self.closed: # WsSession
threading.Thread(target=threadLoop, args=(locks[sel], ports[sel])).start() # WsSession
await asyncio.sleep(streamRefresh); sel = 1-sel # WsSession
[docs] def stream(self, width): # WsSession
"""Starts a stream with a particular output width. The lower the width, the higher the fps and vice versa""" # WsSession
if width in self.streams: raise Exception(f"Can't start stream with width {width}. Just use the existing stream.") # WsSession
port = portAutoInc() # WsSession
self.streams[width] = [width, [threading.Lock(), threading.Lock()], port]; import cv2 # placed here so that users can see error message if cv2 is not imported # WsSession
asyncio.create_task(self._streamLoop(*self.streams[width])) # WsSession
[docs] async def run(self): # WsSession
"""Connects with Java server, set things up and runs ``mainThreadCb``""" # WsSession
async with websockets.connect(settings.server.ws, max_size=1_000_000_000) as ws: # WsSession
self.ws = ws; self.loop = asyncio.get_event_loop() # WsSession
_listenLoop = asyncio.create_task(self._listenLoop()) # WsSession
_pingLoop = asyncio.create_task(self._pingLoop()); # WsSession
try: await self.mainThreadCb(self) # WsSession
except asyncio.CancelledError: self.close() # WsSession
await _listenLoop # WsSession
[docs] def close(self): # WsSession
"""Closes the connection with the Java server""" # WsSession
if self.closed: print("Already closed"); return # WsSession
self.closed = True; asyncio.create_task(self.ws.send({"type": "close"} | cli.aS(json.dumps))) # WsSession
for width, locks, port in self.streams.values(): # WsSession
with locks[0]: # make sure all locks are freed. Also important to have the 2 locks be nested in each other, in case everything aligns just right that evades this mechanism # WsSession
with locks[1]: pass # WsSession
[docs] async def execute(self, events): # WsSession
"""Executes a series of events""" # WsSession
events = events | sortF(op()["timestamp"]) | aS(list) # WsSession
deltaT = int(time.time()*1000) - events[0]["timestamp"] # WsSession
for e in events | apply(lambda x: {**x, "timestamp": x["timestamp"]+deltaT}): # WsSession
st = e["timestamp"]/1000 - time.time() # WsSession
if st > 0: await asyncio.sleep(st) # WsSession
await self.ws.send(json.dumps({"type": "execute", "event": e})) # WsSession
[docs]def selectArea(x, y, w, h): # selectArea
"""Selects an area on the screen to focus into""" # selectArea
return get(f"selectArea/{x}/{y}/{w}/{h}") # selectArea
[docs]async def record(t=None, keyCode=None, streamWidth=300, f=iden()): # selectArea
"""Records activities.
Examples::
events = await k1ui.record(t=5) # records for 5 seconds
events = await k1ui.record(keyCode=5) # records until "Escape" is pressed
events = await k1ui.record() # records until interrupt signal is sent to the process
Note: these examples only work on jupyter notebooks. For regular Python processes,
check out official Python docs (https://docs.python.org/3/library/asyncio-task.html)
:param t: record duration
:param keyCode: key to stop the recording
:param streamWidth: whether to opens the UDP stream and capture screenshots at this width or not
:param f: extra event post processing function""" # selectArea
events = [] # selectArea
async def eventCb(sess, event): # selectArea
res = f(event) # selectArea
if res is not None: events.append(res) # selectArea
if event["type"] == "keyReleased" and event["keyCode"] == keyCode: sess.close() # selectArea
async def mainThreadCb(sess): # selectArea
if streamWidth: sess.stream(streamWidth) # selectArea
if t is not None: await asyncio.sleep(t); sess.close() # selectArea
else: await asyncio.sleep(1e9) # selectArea
await WsSession(eventCb, mainThreadCb).run(); return events # selectArea
[docs]async def execute(events:List[dict]): # selectArea
"""Executes some events""" # selectArea
async def eventCb(sess, event): pass # selectArea
async def mainThreadCb(sess): await sess.execute(events); sess.close() # selectArea
await WsSession(eventCb, mainThreadCb).run() # selectArea
uuid = k1.AutoIncrement(random.randint(0, int(1e9)), prefix="k1ui-") # selectArea
def escapeHtml(s): return s.replace("&", "&").replace("<", "<").replace(">", ">") # escapeHtml
[docs]class Recording: # Recording
def __init__(self, events): # Recording
self.uuid = uuid(); self._tracks = [] # Recording
if len(events) == 0: return # shortcut to initialize using cloned tracks rather than events # Recording
events = events | sortF(op()["timestamp"]) | deref() # Recording
self._tracks.extend(ContourTrack.parse(events)) # Recording
self._tracks.extend(CharTrack.parse(events)) # Recording
self._tracks.extend(ClickTrack.parse(events)) # Recording
self._tracks.extend(WheelTrack.parse(events)) # Recording
self._tracks.extend(StreamTrack.parse(events)) # Recording
self._tracks = self._tracks | filt(op()) | apply(lambda x: x._rec(self)) | deref() # Recording
self._resetTimes(); self._resetDis() # Recording
def _resetTimes(self): self.startTime, self.endTime = self._tracks | op().timeUnix().all() | joinStreams() | filt(op()) | toMin() & toMax(); return self # Recording
def _resetDis(self): self.dis1, self.dis2 = (self.startTime+self.endTime)/2 | aS(lambda x: [x-self.duration*0.53, x+self.duration*0.53]); return self # display times # Recording
@property # Recording
def duration(self): return self.endTime - self.startTime # Recording
[docs] def addTracks(self, *tracks) -> "Recording": # Recording
"""Adds tracks to the Recording""" # Recording
if not isinstance(tracks[0], Track) and len(tracks) == 1: tracks = tracks[0] # Recording
self._tracks.extend(tracks | apply(lambda tr: tr._rec(self))); self._resetTimes(); self._resetDis(); return self # Recording
[docs] def removeTracks(self, *tracks) -> "Recording": # Recording
"""Removes tracks from the Recording""" # Recording
if not isinstance(tracks[0], Track) and len(tracks) == 1: tracks = tracks[0] # Recording
tracks | apply(self._tracks.remove) | ignore(); self._resetTimes(); self._resetDis(); return self # Recording
def _normTime(self, t=None, default=None): return default if t is None else t + self.startTime # Recording
[docs] def zoom(self, t1=None, t2=None): # Recording
"""Zooms into a particular time range. If either bounds are not
specified, they will default to the start and end of all events.
:param t1: time values are relative to the recording's start time""" # Recording
_dis1 = self.dis1; t1 = _dis1 if t1 is None else t1 + self.startTime # Recording
_dis2 = self.dis2; t2 = _dis2 if t2 is None else t2 + self.startTime # Recording
delta = t2-t1; t1-=delta*0.03; t2+=delta*0.03; self.dis1 = t1; self.dis2 = t2 # Recording
html = self._repr_html_(); self.dis1 = _dis1; self.dis2 = _dis2; return k1lib.viz.Html(html) # Recording
[docs] def sel(self, t1=None, t2=None, klass=None) -> List["Track"]: # Recording
"""Selects a subset of tracks using several filters.
For selecting time, assuming we have a track that looks like
this (x, y are t1, t2)::
# |-1--| |-2-|
# |---3---|
# x y
Then, tracks 1 and 3 are selected. Time values are relative to
recording's start time
:param t1: choose tracks that happen after this time
:param t2: choose tracks that happen before this time
:param klass: choose specific track class""" # Recording
tracks = self._tracks # Recording
if klass: tracks = tracks | instanceOf(klass) # Recording
if t1 is not None or t2 is not None: # Recording
t1 = self._normTime(t1, self.startTime); t2 = self._normTime(t2, self.endTime) # Recording
tracks = tracks | apply(lambda o: [o.startTime or 0, o.endTime, o]) | ~filt(op()[1]<t1) | ~filt(op()[0]>=t2) | cut(2) # Recording
return tracks | aS(list) # Recording
[docs] def sel1(self, **kwargs) -> List["Track"]: # Recording
"""Like :meth:`sel`, but this time gets the first element only.""" # Recording
return self.sel(**kwargs) | item() # Recording
[docs] def time0(self) -> List[float]: # Recording
"""Start and end recording times. Start time is zero""" # Recording
return [0, self.endTime - self.startTime] # Recording
[docs] def timeUnix(self) -> List[float]: # Recording
"""Start and end recording times. Both are absolute unix times""" # Recording
return [self.startTime, self.endTime] # Recording
[docs] def events(self) -> List[dict]: # Recording
"""Reconstructs events from the Recording's internal data.
The events are lossy though::
events = ... # events recorded
r = k1ui.Recording(events)
assert r.events() != events # this is the lossy part. Don't expect the produced events match exactly with each other""" # Recording
return self._tracks | op().events().all() | joinStreams() | sortF(op()["timestamp"]) | deref(igT=False) # Recording
[docs] def copy(self) -> "Recording": # Recording
"""Creates a clone of this recording""" # Recording
return Recording([]).addTracks(self._tracks | op().copy().all())._resetDis() # Recording
def _repr_html_(self): return self | aS(createTrackss) | aS(drawTrackss) # Recording
[docs]class Track: # Track
[docs] def __init__(self, startTime, endTime): # Track
"""Time values are absolute unix time.""" # Track
self.recording = None; self.startTime = startTime if startTime else None; self.endTime = endTime; self.uuid = uuid() # Track
[docs] def time0(self) -> List[float]: # Track
"""Start and end track times. Times are relative to track's start time""" # Track
return [0, self.endTime - self.startTime] # Track
[docs] def time0Rec(self) -> List[float]: # Track
"""Start and end track times. Times are relative to recording's start time""" # Track
return [self.startTime-self.recording.startTime if self.startTime else None, self.endTime-self.recording.startTime] # Track
[docs] def timeUnix(self) -> List[float]: # Track
"""Start and end track times. Times are absolute unix times""" # Track
return [self.startTime, self.endTime] # Track
[docs] def concurrent(self) -> List["Track"]: # Track
"""Grabs all tracks that are concurrent to this track""" # Track
return self.recording.sel(*self.time0Rec()) # Track
def _rec(self, recording): self.recording = recording; return self # inject dependency # Track
def _tooltip(self, ctx): return "" # Track
def _displayTimes(self): # shortcut func for displaying in __repr__ # Track
s = f"{self.startTime-self.recording.startTime:.2f}s" if self.startTime else None # Track
e = f"{self.endTime-self.recording.startTime:.2f}s"; return f"time ({s}->{e})" # Track
[docs] def events(self) -> List[dict]: # Track
"""Reconstructs events from the Track's internal data, to be implemented by subclasses.""" # Track
return NotImplemented # Track
[docs] def copy(self): # Track
"""Creates a clone of this Track, to be implemented by subclasses""" # Track
return NotImplemented # Track
[docs] def move(self, deltaTime): # Track
"""Moves the entire track left or right, to be implemented by subclasses.
:param deltaTime: if negative, move left by this number of seconds, else move right""" # Track
self.startTime += deltaTime; self.endTime += deltaTime; self.recording._resetTimes(); self.recording._resetDis() # Track
[docs]class CharTrack(Track): # CharTrack
[docs] def __init__(self, keyText:str, keyCode:int, mods:List[bool], times:List[float]): # CharTrack
"""Representing 1 key pressed and released.
:param keyText: text to display to user, like "Enter"
:param keyCode: event's "javaKeyCode"
:param mods: list of 3 booleans, whether ctrl, shift or alt is pressed""" # CharTrack
super().__init__(*times); self.keyText = keyText; self.keyCode = keyCode; self.mods = mods # CharTrack
[docs] @staticmethod # CharTrack
def parse(events) -> List["CharTrack"]: # CharTrack
stacks = {} # keyCode -> obj # CharTrack
def process(e): # CharTrack
_type, keyText, keyCode, mods, timestamp = e # CharTrack
if _type == "keyPressed": # CharTrack
if keyCode in stacks and stacks[keyCode]: # CharTrack
a = stacks[keyCode]; stacks[keyCode] = e # CharTrack
return [a, [_type, keyText, keyCode, mods, timestamp - 0.001]] # CharTrack
#raise Exception("Strange case. Why would the same key be pressed twice without being released first") # CharTrack
stacks[keyCode] = e # CharTrack
if _type == "keyReleased": # CharTrack
a = stacks[keyCode] if keyCode in stacks and stacks[keyCode] else None # CharTrack
stacks[keyCode] = None; return [a, e] # CharTrack
def makeTrack(x, y): # CharTrack
if x is None: x = [0, y[1], y[2], y[3], None] # CharTrack
return CharTrack(x[1], x[2], x[3], [x[4], y[4]]) # CharTrack
return events | filt(op()["type"].startswith("key")) | filt(op()["type"] != "keyTyped") | apply(lambda x: [x["type"], x["keyText"], x["javaKeyCode"], [x["ctrl"], x["shift"], x["alt"]], x["timestamp"]/1000]) | apply(process) | filt(op()) | ~apply(makeTrack) | deref() # CharTrack
def _tooltip(self, ctx): return escapeHtml(self.__repr__()) # CharTrack
def __repr__(self): return f"<CharTrack {self._displayTimes()} keyText ({self.keyText})>" # CharTrack
[docs] def events(self): # CharTrack
d = []; t1, t2 = self.timeUnix() # does not care about mods because the mods will have a separate CharTrack already, so we don't have to repeat # CharTrack
if t1: d.append({"type": "keyPressed", "keyText": self.keyText, "javaKeyCode": self.keyCode, "timestamp": int(t1*1000)}) # CharTrack
if t2: d.append({"type": "keyReleased", "keyText": self.keyText, "javaKeyCode": self.keyCode, "timestamp": int(t2*1000)}) # CharTrack
return d # CharTrack
[docs] def copy(self): return CharTrack(self.keyText, self.keyCode, self.mods, self.timeUnix()) # CharTrack
[docs] def move(self, deltaTime): # CharTrack
if self.startTime: self.startTime += deltaTime # CharTrack
self.endTime += deltaTime; self.recording._resetTimes() # CharTrack
def _ord2(x): # _ord2
y = x | apply(ord) | deref() # _ord2
x2y = [x, y] | toDict(False) # _ord2
y2x = [y, x] | toDict(False) # _ord2
return [x, y, x2y, y2x] # _ord2
_upper, _upperCs, _upperD1, _upperD2 = _ord2("ABCDEFGHIJKLMNOPQRSTUVWXYZ"); # _ord2
_lower, _lowerCs, _lowerD1, _lowerD2 = _ord2("abcdefghijklmnopqrstuvwxyz") # _ord2
_num, _numCs, _numD1, _numD2 = _ord2("1234567890") # _ord2
_puncLower, _puncLowerCs, _puncLowerD1, _puncLowerD2 = _ord2("[];',./`-=\\") # _ord2
_puncUpper, _puncUpperCs, _puncUpperD1, _puncUpperD2 = _ord2("{}:\"<>?~_+|") # _ord2
# maps from numbers 12345 to punctuation like !@#$% # _ord2
_numPunc, _numPuncCs, _numPuncD1, _numPuncD2 = _ord2("!@#$%^&*()") # _ord2
_numPuncMap1 = [_numPuncCs, _numCs] | toDict(False); _numPuncMap2 = [_numCs, _numPuncCs] | toDict(False) # _ord2
_punc, _puncCs, _puncD1, _puncD2 = _ord2(_puncLower + _puncUpper + _numPunc + " ") # _ord2
# maps from lower case punctuation like ;',./ into upper case like :"<>? # _ord2
_puncMap = [_puncLower, _puncUpper] | toDict(False); _puncMapCs = [_puncLowerCs, _puncUpperCs] | toDict(False) # _ord2
_puncMap2 = [_puncUpper, _puncLower] | toDict(False) # _ord2
def _inferText(code:int, mods) -> str: # _inferText
if mods[0] or mods[2]: return None # _inferText
shift = mods[1] # _inferText
if shift: # _inferText
if code in _upperCs: return _upperD2[code] # _inferText
if code in _lowerCs: return _lowerD2[code].upper() # _inferText
if code in _numCs: return _numPuncD2[_numPuncMap2[code]] # _inferText
if code in _puncLowerCs: return _puncUpperD2[_puncMapCs[code]] # _inferText
if code in _puncCs: return _puncD2[code] # _inferText
return None # _inferText
else: # _inferText
if code in _upperCs: return _upperD2[code].lower() # _inferText
if code in _lowerCs: return _lowerD2[code] # _inferText
if code in _numCs: return _numD2[code] # _inferText
if code in _puncCs: return _puncD2[code] # _inferText
return None # _inferText
def _isUpper(x:str) -> bool: return x in _upper or x in _puncUpper or x in _numPunc # _isUpper
def _canon(x:str) -> Union[int, str]: # returns canonical key to be pressed # _canon
if x in _num: return _numD1[x] # _canon
if x in _upper: return _upperD1[x] # _canon
if x in _lower: return _upperD1[x.upper()] # _canon
if x in _puncLower: return x # _canon
if x in _puncUpper: return _puncMap2[x] # _canon
if x in _numPunc: return _numPuncMap1[_numPuncD1[x]] # _canon
if x in _punc: return x # _canon
return None # _canon
def _textToKeys(text:str): # opposite of _interText # _textToKeys
cap = False; d = []; sk = 16 # shift key # _textToKeys
for c in text: # _textToKeys
_cap = _isUpper(c) # _textToKeys
if _cap and not cap: d.append(["down", sk]); cap = True # change to upper # _textToKeys
elif not _cap and cap: d.append(["up", sk]); cap = False # change to lower # _textToKeys
d.append(["down", _canon(c)]); d.append(["up", _canon(c)]) # _textToKeys
if cap: d.append(["up", sk]) # _textToKeys
return d # _textToKeys
def _getTextBlocks(charTracks:List["CharTrack"]): # Get potential collection of CharTracks # _getTextBlocks
es = charTracks | filt(op().startTime) | sortF(op().startTime) | apply(lambda x: [_inferText(x.keyCode, x.mods), x]) | aS(list) # _getTextBlocks
d = []; _d = []; inBlock = False # _getTextBlocks
for c, obj in es: # _getTextBlocks
if c is None and inBlock: d.append(_d); inBlock = False # ends a block # _getTextBlocks
elif c is not None and not inBlock: _d = []; inBlock = True # starts a new block # _getTextBlocks
if inBlock: _d.append([c, obj]) # _getTextBlocks
if inBlock: d.append(_d) # _getTextBlocks
return d | apply(transpose() | join("") + iden()) # _getTextBlocks
[docs]class WordTrack(Track): # WordTrack
[docs] def __init__(self, text, times:List[float]): # WordTrack
"""Representing normal text input. This is not created from events
directly. Rather, it's created from scanning over CharTracks and merging them together""" # WordTrack
super().__init__(*times); self.text = text # WordTrack
def _tooltip(self, ctx): return escapeHtml(self.__repr__()) # WordTrack
def __repr__(self): return f"<WordTrack {self._displayTimes()} text ({self.text}) >" # WordTrack
[docs] def events(self): # WordTrack
es = _textToKeys(self.text); d = []; ts = np.linspace(*self.timeUnix(), len(es)) # WordTrack
for t, (_type, code) in zip(ts, es): # WordTrack
_type = "keyPressed" if _type == "down" else "keyReleased"; t = int(t*1000) # WordTrack
if isinstance(code, str): d.append({"type": _type, "text": code, "timestamp": t}) # WordTrack
else: d.append({"type": _type, "javaKeyCode": code, "timestamp": t}) # WordTrack
return d # WordTrack
[docs] def copy(self): return WordTrack(self.text, self.timeUnix()) # WordTrack
@k1.patch(Recording) # WordTrack
def formWords(self) -> Recording: # formWords
"""Tries to merge nearby CharTracks together that looks like the user
is trying to type something, if they make sense. Assuming the user types
"a", then "b", then "c". This should be able to detect the intent that
the user is trying to type "abc", and replace 3 CharTracks with a WordTrack.
Example::
# example recording, run in notebook cell to see interactive interface
r = k1ui.Recording.sample(); r
# run in another notebook cell and compare difference
r.formWords()""" # formWords
for word, charTracks in _getTextBlocks(self.sel(klass=CharTrack)): # formWords
if len(word) <= 0: continue # formWords
ts = charTracks | op().timeUnix().all() | joinStreams() | toMin() & toMax() | deref() # formWords
self.removeTracks(charTracks); self.addTracks(WordTrack(word, ts)) # formWords
self.removeTracks(self.sel(*ts | apply(op()-self.startTime), klass=CharTrack) | filt(op().keyCode == 16)) # removing shift CharTracks # formWords
return self # formWords
[docs]class ContourTrack(Track): # mouse movements # ContourTrack
[docs] def __init__(self, coords): # ContourTrack
"""Representing mouse trajectory ("mouseMoved" event).
:param coords: numpy array with shape (#events, [x, y, unix time])""" # ContourTrack
super().__init__(*coords | cut(2) | toMin() & toMax()); self.coords = coords; self._cachedImg = None # ContourTrack
[docs] @staticmethod # ContourTrack
def parse(events) -> List["ContourTrack"]: # ContourTrack
coords = events | filt(lambda x: x["type"] == "mouseMoved" or x["type"] == "mouseDragged") | apply(lambda x: [x["x"], x["y"], x["timestamp"]/1000]) | deref() | aS(np.array) # ContourTrack
return [] if coords | shape(0) == 0 else [ContourTrack(coords)] # ContourTrack
def _img(self): # ContourTrack
if self._cachedImg: return self._cachedImg # ContourTrack
x, y, t = self.coords | transpose(); c = mpl.cm.rainbow(t - t[0] | aS(lambda x: x/x[-1])); plt.scatter(x, y, None, c, ".") # ContourTrack
plt.colorbar(mpl.cm.ScalarMappable(norm=mpl.colors.Normalize(*self.time0Rec()), cmap=mpl.cm.rainbow)).ax.set_title("Time (s)") # ContourTrack
plt.title("ContourTrack"); plt.grid(True); plt.tight_layout(); self._cachedImg = plt.gcf() | toImg(); return self._cachedImg # ContourTrack
def __repr__(self): return f"<ContourTrack {self._displayTimes()} n ({self.coords.shape[0]})>" # ContourTrack
def _tooltip(self, ctx): # ContourTrack
return f"""<div><div style="margin-bottom:10px">{escapeHtml(self.__repr__())}</div>{self._imgHtml()}</div>""" # ContourTrack
def _imgHtml(self): return f"""<img src="data:image/png;base64,{self._img() | toBytes(dataType="png") | aS(base64.b64encode) | op().decode()}" alt="Mouse trajectory" />""" # ContourTrack
def _repr_html_(self): return f"""<!-- k1ui.ContourTrack --><div>{self._imgHtml()}</div>""" # ContourTrack
[docs] def events(self): return self.coords | ~apply(lambda x, y, t: {"type": "mouseMoved", "x": x, "y": y, "timestamp": int(t*1000)}) | deref() # ContourTrack
[docs] def copy(self): return ContourTrack(np.copy(self.coords)) # ContourTrack
[docs] def move(self, deltaTime): self.coords[:,2] += deltaTime; super().move(deltaTime) # ContourTrack
[docs]class ClickTrack(Track): # mouse down, then up # ClickTrack
[docs] def __init__(self, coords:np.ndarray, times:List[float]): # ClickTrack
"""Representing a mouse pressed and released event""" # ClickTrack
super().__init__(*times); self.coords = coords # coords = [[x1, y1], [x2, y2]] # ClickTrack
[docs] @staticmethod # ClickTrack
def parse(events) -> List["ClickTrack"]: # ClickTrack
tracks = []; pressedEvents = defaultdict(lambda: None) # haha, get it? # ClickTrack
def process(e): # ClickTrack
_type, x, y, button, t = e # ClickTrack
pe = pressedEvents[button] # ClickTrack
if _type == "mousePressed": # ClickTrack
if pe: raise Exception("Strange case. Why would inRange be true when mouse has just been pressed?") # ClickTrack
pressedEvents[button] = e # ClickTrack
if _type == "mouseReleased": # ClickTrack
if pe: tracks.append(ClickTrack(np.array([pe[1:4], e[1:4]]), [pe[4], e[4]])); pressedEvents[button] = None # ClickTrack
else: warnings.warn("Strange case. Why would mouse be released right at the start? Not strange enough to warrant an exception though") # ClickTrack
events | filt(lambda x: x["type"] == "mousePressed" or x["type"] == "mouseReleased") | apply(lambda x: [x["type"], x["x"], x["y"], x["button"], x["timestamp"]/1000]) | apply(process) | deref() # ClickTrack
return tracks # ClickTrack
[docs] def isClick(self, threshold=1): # ClickTrack
"""Whether this ClickTrack represents a single click.
:param threshold: if Manhattan distance between start and end is less than this amount, then declare it a single click""" # ClickTrack
return abs(self.coords[0] - self.coords[1]).sum() <= threshold # ClickTrack
def __repr__(self): return f"<ClickTrack {self._displayTimes()} coords ({self.coords[0]} -> {self.coords[1]})>" # ClickTrack
def _tooltip(self, ctx): return escapeHtml(f"{self}") # ClickTrack
[docs] def events(self): # ClickTrack
xy1, xy2 = self.coords; t1, t2 = self.timeUnix() # ClickTrack
return [{"type": "mousePressed", "x": xy1[0], "y": xy1[1], "button": xy1[2], "timestamp": int(t1*1000)}, # ClickTrack
{"type": "mouseReleased", "x": xy2[0], "y": xy2[1], "button": xy2[2], "timestamp": int(t2*1000)}] # ClickTrack
[docs] def copy(self): return ClickTrack(self.coords | deref(), self.timeUnix()) # ClickTrack
[docs]class WheelTrack(Track): # WheelTrack
[docs] def __init__(self, coords:np.ndarray, times:List[float]): # WheelTrack
"""Representing mouse wheel moved event""" # WheelTrack
super().__init__(*times); self.coords = coords # WheelTrack
[docs] @staticmethod # WheelTrack
def parse(events) -> List["WheelTrack"]: # WheelTrack
d = []; _d = []; lastTime = 0 # WheelTrack
for rot, t in events | filt(op()["type"] == "mouseWheelMoved") | apply(lambda x: [x["wheelRotation"], x["timestamp"]/1000]): # WheelTrack
if t > lastTime + 2: d.append(_d); _d = [] # WheelTrack
_d.append([rot, t]); lastTime = t # WheelTrack
d.append(_d); return d | filt(lambda x: len(x)) | apply(aS(np.array) & (cut(1) | rows(0, -1)) | ~aS(WheelTrack)) | aS(list) # WheelTrack
def __repr__(self): return f"<WheelTrack {self._displayTimes()} rotations (avg {self.coords[:,0].sum()}, {self.coords[:,0] | apply(lambda x: '+' if x > 0.5 else '0') | join('')})>" # WheelTrack
def _tooltip(self, ctx): return escapeHtml(f"{self}") # WheelTrack
[docs] def events(self): # WheelTrack
rs = self.coords[:,0]; ts = np.linspace(*self.timeUnix(), self.coords.shape[0]) # WheelTrack
return [rs, ts] | transpose() | ~apply(lambda rot, t: {"type": "mouseWheelMoved", "wheelRotation": rot, "timestamp": int(t*1000)}) # WheelTrack
[docs] def copy(self): return WheelTrack(self.coords, self.timeUnix()) # WheelTrack
[docs]class StreamTrack(Track): # StreamTrack
[docs] def __init__(self, frames:np.ndarray, times:np.ndarray): # StreamTrack
"""Representing screenshots from the UDP stream""" # StreamTrack
super().__init__(times[0], times[-1]); self.frames = frames; self.times = times; self.aspect = self.frames.shape[2]/self.frames.shape[1] # StreamTrack
[docs] @staticmethod # StreamTrack
def parse(events) -> List["StreamTrack"]: # StreamTrack
events = events | filt(op()["type"] == "stream") | aS(list) # StreamTrack
if len(events) == 0: return [] # StreamTrack
return [StreamTrack(*events | apply(lambda x: [x["frame"], x["timestamp"]/1000]) | transpose() | apply(np.array))] # StreamTrack
def __repr__(self): return f"<StreamTrack {self._displayTimes()} #frames ({self.frames.shape[0]}) resolution {self.frames.shape[1:3][::-1]}>" # StreamTrack
def _frames(self, n, f=iden()): return [self.frames, self.times] | transpose() | insertIdColumn(True, False) | f | aS(list) | aS(lambda x: x | batched(len(x)//n)) | item().all() # StreamTrack
def _carousel(self): return self._frames(36) | cut(0) | toImg().all() | batched(9) | plotImgs(3, self.aspect, 3, im=True).all() | aS(k1.viz.Carousel) # StreamTrack
def _tooltip(self, ctx): # StreamTrack
metaId = ctx.metaId; streamId = autoId(); f = filt(ctx.dis1<op()<ctx.dis2, 1) # StreamTrack
data = self._frames(40, f) | apply(toImg() | aS(k1.viz.HtmlImage, style="width:800px") | aS(lambda x: x._repr_html_()), 0) | deref() | aS(json.dumps) # StreamTrack
ctx.scriptTags[streamId] = f"""
data_{streamId} = {data};
meta_{metaId}.cbs[{streamId}] = (x) => {{
const stream_{streamId} = document.querySelector("#stream_{streamId}");
const streamText_{streamId} = document.querySelector("#streamText_{streamId}");
if (!stream_{streamId}) return;
const fT = x/800*{ctx.dis2-ctx.dis1}+{ctx.dis1}; // frame time
let minT = Infinity; let minIm = null; let minI = null
for (const [imE, t, i] of data_{streamId}) {{
const dT = Math.abs(fT-t);
if (dT < minT) {{ minIm = imE; minT = dT; minI = i }}
else break;
}}
stream_{streamId}.innerHTML = minIm;
streamText_{streamId}.innerHTML = "frame: " + minI;
}};""" # StreamTrack
return f"""<div>{escapeHtml(str(self))}
<div style="position:relative">
<div id="stream_{streamId}"></div>
<div id="streamText_{streamId}" style="position:absolute;top:8px;left:12px;padding:4px 8px;background-color:white;border-radius:12px"></div>
</div>
</div>""" # StreamTrack
def _repr_html_(self): return f"""<div>{escapeHtml(str(self))}<div>{self._carousel()._repr_html_()}</div></div>""" # StreamTrack
[docs] def events(self): return [] # StreamTrack
[docs] def copy(self): return StreamTrack(np.copy(self.frames), np.copy(self.times)) # StreamTrack
[docs] def move(self, deltaTime): self.times += deltaTime; super().move(deltaTime) # StreamTrack
def createTrackss(rec:Recording): # createTrackss
dis1 = rec.dis1; dis2 = rec.dis2; delta = dis2-dis1 # nTrack for "new track" # createTrackss
def process(f=iden()): # createTrackss
trackss = [] # createTrackss
for nTrack in rec._tracks | f | apply(lambda x: [max(x.startTime or 0, dis1+delta*0.01), min(x.endTime, dis2-delta*0.01), x]) | filt(op()>dis1, 1) | filt(op()<dis2, 0) | deref(): # createTrackss
cTracks = None # "chosen track" # createTrackss
for eTracks in trackss: # "existing track" # createTrackss
if eTracks["tracks"][-1][1] < nTrack[0]: cTracks = eTracks; break # can fit # createTrackss
if cTracks: cTracks["tracks"].append(nTrack) # createTrackss
else: trackss.append({"tracks": [nTrack], "type": nTrack[2].__class__.__name__.split(".")[-1]}) # createTrackss
return trackss # createTrackss
trackss = [ # createTrackss
*process(instanceOf(CharTrack)), # createTrackss
*process(instanceOf(WordTrack)), # createTrackss
*process(instanceOf(ContourTrack)), # createTrackss
*process(instanceOf(ClickTrack)), # createTrackss
*process(instanceOf(WheelTrack)), # createTrackss
*process(instanceOf(StreamTrack)) # createTrackss
]; # createTrackss
return [trackss, rec] # createTrackss
autoId = k1.AutoIncrement(random.randint(0, int(1e9))) # createTrackss
def drawTrackss(obj) -> "html": # drawTrackss
h = settings.draw.trackHeight; pad = settings.draw.pad; trackss, rec = obj; sidebarW=120; # width # drawTrackss
infoId = autoId(); metaId = autoId(); timeId = autoId(); timeLId = autoId(); sketchId = autoId(); sketchLId = autoId() # drawTrackss
ctx = k1.Object.fromDict({"id2Tt": {}, "dis1": rec.dis1, "dis2": rec.dis2, "metaId": metaId, "scriptTags": {}}) # drawTrackss
# drawTrackss
children = enumerate(trackss) | permute(1, 0) | ~apply(drawTracks, ctx=ctx) | join("") # drawTrackss
trackNames = trackss | op()["type"].all() | insertIdColumn() | ~apply(lambda i, x: f"<div style='position:absolute;top:{pad+(pad+h)*i}px;left:12px;height:{h}px;text-align:center;line-height:{h}px'><div>{x}s</div></div>") | join("") # drawTrackss
st0 = rec.dis1 - rec.startTime; et0 = rec.dis2 - rec.startTime; ticks0 = k1.ticks(st0, et0) # 0-based # drawTrackss
ticksP = (ticks0+rec.startTime-rec.dis1)/(rec.dis2-rec.dis1)*800 # pixel scale # drawTrackss
ticks = [ticks0, ticksP] | transpose() | filt(op()>0, 1) | filt(op()<800, 1) | ~apply(lambda x, y: f"<div style='position:absolute;width:1px;height:10px;background-color:black;left:{y}px;bottom:4px'></div> <div style='position:absolute;left:{y-8}px;top:0px'>{x}</div>") | join("") # drawTrackss
sketchH = (pad+h)*len(trackss)+pad; extraScripts = "\n".join(ctx.scriptTags.values()) # drawTrackss
return f"""
<div style="display:flex;flex-direction:column;align-items:flex-start">
<div style="display:flex;flex-direction:row">
<div style="width:{sidebarW}px;padding-right:10px;display:flex;justify-content:center;align-items:center"><div>Time (s)</div></div>
<div id="time_{timeId}" style="background-color:red;height:{h}px;position:relative;height:34px">
{ticks}
<div id="timeL_{timeLId}" style="position:absolute;top:0px;background-color:white;border:1px solid black;border-radius:8px;padding:0px 8px"> </div>
</div>
</div>
<div style="display:flex;flex-direction:row">
<div style="width:{sidebarW}px;padding-right:10px;position:relative">{trackNames}</div>
<div id="sketch_{sketchId}" style="width:{800}px;height:{sketchH}px;background-color:grey;position:relative">
<div id="sketchL_{sketchLId}" style="position:absolute;width:1px;height:{sketchH}px;background-color:black;top:0px"></div>
{children}
</div>
</div>
<div id="info_{infoId}" style="min-height:30px;display:flex;flex-direction:column;justify-content:center;align-items:flex-start;padding:4px 12px"></div>
</div>
<script>
id2Tt = {ctx.id2Tt | aS(json.dumps)}
info_{infoId} = document.querySelector("#info_{infoId}");
time_{timeId} = document.querySelector("#time_{timeId}");
sketch_{sketchId} = document.querySelector("#sketch_{sketchId}");
sketchL_{sketchLId} = document.querySelector("#sketchL_{sketchLId}");
timeL_{timeLId} = document.querySelector("#timeL_{timeLId}");
meta_{metaId} = {{x: 0, y: 0, cbs: {{}}}};
for (const [k, v] of Object.entries(id2Tt)) {{
let elem = document.querySelector(`#track_${{k}}`);
elem.onmouseover = () => {{info_{infoId}.innerHTML = atob(v[0]);elem.style.backgroundColor = "red";}};
elem.onmouseout = () => {{info_{infoId}.innerHTML = ""; elem.style.backgroundColor = "white";}};
}}
sketch_{sketchId}.onmousemove = (event) => {{
const x = event.pageX-sketch_{sketchId}.getBoundingClientRect().x;
meta_{metaId}.x = x;
sketchL_{sketchLId}.style.left = x + "px";
timeL_{timeLId}.style.left = (x-timeL_{timeLId}.getBoundingClientRect().width/2) + "px";
timeL_{timeLId}.innerHTML = Number(x/800*{et0-st0}+{st0}).toFixed(2) + "s";
for (const cb of Object.values(meta_{metaId}.cbs)) cb(x);
}}
{extraScripts}
</script>""" # drawTrackss
def drawTracks(tracks, rowId, ctx) -> "html": return tracks["tracks"] | apply(drawTrack, rowId=rowId, ctx=ctx) | join("") # drawTracks
def drawTrack(track, rowId, ctx) -> "html": # drawTrack
h = settings.draw.trackHeight; pad = settings.draw.pad; st, et, obj = track # drawTrack
x1 = (st-ctx.dis1)/(ctx.dis2-ctx.dis1)*800; x2 = (et-ctx.dis1)/(ctx.dis2-ctx.dis1)*800 # drawTrack
y = rowId*(h+pad)+pad; w = x2-x1; trackId = autoId() # drawTrack
tooltip = obj._tooltip(ctx).encode() | aS(base64.b64encode) | op().decode() # drawTrack
ctx.id2Tt[trackId] = [tooltip, x1, x2, y] # drawTrack
return f"""<div id="track_{trackId}" style="top:{y}px;left:{x1}px;width:{w}px;height:{h}px;background-color:white;position:absolute"></div>""" # drawTrack
basePath = os.path.dirname(inspect.getabsfile(k1lib)) + os.sep + "k1ui" + os.sep # drawTrack
@k1.patch(Recording, static=True) # drawTrack
def sampleEvents() -> List[dict]: # sampleEvents
"""Grabs the built-in example events. Results will be really long,
so beware, as it can crash your notebook if you try to display it.""" # sampleEvents
mouseE, keyE = cat(f"{basePath}mouseKey.pth", False) | aS(dill.loads) # sampleEvents
deltaT = keyE()[0]["timestamp"] - mouseE()[0]["timestamp"] # sampleEvents
ev = [*mouseE() | apply(lambda x: {**x, "timestamp": x["timestamp"]+deltaT}), *keyE()] # sampleEvents
try: # local comp has the k1ui-screen file, but it will not be bundled with the library, cause it's like 80MB! # sampleEvents
screenE = cat("screen.pth", False) | aS(dill.loads) # sampleEvents
deltaT = keyE()[0]["timestamp"] - screenE()[0]["timestamp"] # sampleEvents
return [*screenE() | apply(lambda x: {**x, "timestamp": x["timestamp"]+deltaT}), *ev] # sampleEvents
except: return ev # sampleEvents
@k1.patch(Recording, static=True) # sampleEvents
def sample() -> Recording: # sample
"""Creates a Recording from :meth:`sampleEvents`""" # sample
return Recording(Recording.sampleEvents()) # sample
@k1.patch(ContourTrack) # sample
def split(self, times:List[float]): # split
"""Splits this contour track by multiple timestamps relative
to recording's start time. Example::
r = k1ui.Recording.sample()
r.sel1(klass=k1ui.ContourTrack).split([5])""" # split
rec = self.recording; c = self.coords; i = 0; x = 0; y = 0; d = []; cps = np.array(times) + rec.startTime # split
while True: # split
if cps[i] > c[y,2]: y += 1 # split
else: # split
if y > x: d.append(c[x:y]) # split
x = y; i += 1 # split
if y >= len(c): d.append(c[x:y]); break # split
if i >= len(cps): d.append(c[x:]); break # split
rec.removeTracks(self) # split
rec.addTracks(d | apply(ContourTrack)) # split
@k1.patch(ContourTrack) # split
def splitClick(self, clickTracks:List["ClickTrack"]=None): # splitClick
"""Splits this contour track by click events. Essentially, the click
events chops this contour into multiple segments. Example::
r = k1ui.Recording.sample()
r.sel1(klass=k1ui.ContourTrack).splitClick()
:param clickTracks: if not specified, use all ClickTracks from the recording""" # splitClick
rec = self.recording; c = self.coords; i = 0; x = 0; y = 0; d = [] # splitClick
if clickTracks is None: clickTracks = rec.sel(*self.time0Rec()) | instanceOf(ClickTrack) # splitClick
self.split(clickTracks | ~filt(op().isClick(-1)) | op().timeUnix().all() | joinStreams() | sort(None) | apply(op()-rec.startTime) | deref()) # splitClick
@k1.patch(Recording) # splitClick
def addTime(self, t:float, duration:float) -> Recording: # addTime
"""Inserts a specific duration into a specific point in time.
More clearly, this transfroms this::
# |-1--| |-2-|
# |---3---|
# ^ insert duration=3 here
Into this::
# |-1--| |-2-|
# |---3------|
Tracks that partly overlaps with the range will have their start/end times
modified, and potentially delete some of the Track's internal data:
- Tracks whose only start and end times are modified: Char, Word, Click, Wheel
- Tracks whose internal data are also modified: Contour, Stream
:param t: where to insert the duration, relative to Recording's start time
:param duration: how long (in seconds) to insert?""" # addTime
at = self.sel(t,t); after = self.sel(t) # tracks at or after the specified time # addTime
unix = t + self.startTime # addTime
for track in at: after.remove(track) # addTime
for track in at: # addTime
track.endTime += duration # addTime
if isinstance(track, ContourTrack): # addTime
c = track.coords; idx = (c[:,2] > unix).argmax(); track._cachedImg = None # addTime
if c[idx,2] > unix: c[idx:,2] += duration # index is valid # addTime
if isinstance(track, StreamTrack): # addTime
c = track.times; idx = (c > unix).argmax() # addTime
if c[idx] > unix: c[idx:] += duration # index is valid # addTime
for track in after: # addTime
track.startTime += duration; track.endTime += duration # addTime
if isinstance(track, ContourTrack): track.coords[2] += duration # addTime
if isinstance(track, StreamTrack): track.times += duration # addTime
self.endTime += duration; self._resetDis(); return self # addTime
@k1.patch(Recording) # addTime
def removeTime(self, t1:float, t2:float) -> Recording: # removeTime
"""Deletes time from t1 to t2 (relative to Recording's start time).
All tracks lying completely inside this range will be deleted. More
clearly, it transforms this::
# |-1--| |-2-| |-3-|
# |---4---| |-5-|
# ^ ^ delete between these carets
Into this::
# |-1--| |-3-|
# |-4-||5-|
Tracks that partly overlaps with the range will have their start/end times
modified, and potentially delete some of the Track's internal data:
- Tracks whose only start and end times are modified: Char, Word, Click, Wheel
- Tracks whose internal data are also modified: Contour, Stream""" # removeTime
duration = t2 - t1; t1U = t1 + self.startTime; t2U = t2 + self.startTime # removeTime
self.removeTracks(self.sel(t1, t2) | filt(op().startTime >= t1U) | filt(op().endTime < t2U)) # removing everything that's completely inside # removeTime
overlap = self.sel(t1, t2) | aS(list); after = self.sel(t2) | filt(op().startTime >= t2U) | aS(list) # removeTime
for track in overlap: # handling left overhang # removeTime
if isinstance(track, ContourTrack): # removeTime
c = track.coords; idx1 = (c[:,2] > t1U).argmax(); idx2 = (c[:,2] > t2U).argmax() # removeTime
if c[idx2,2] <= t2U: idx2 = len(c) # removeTime
a = c[:idx1]; b = c[idx2:]; b[:,2] -= duration # removeTime
track.coords = np.concatenate([a, b]); track._cachedImg = None # removeTime
if isinstance(track, StreamTrack): # removeTime
c = track.times; idx1 = (c > t1U).argmax(); idx2 = (c > t2U).argmax() # removeTime
if c[idx2] <= t2U: idx2 = len(c) # special case if idx2 is not valid # removeTime
track.times = np.concatenate([track.times[:idx1], track.times[idx2:]-duration]) # removeTime
track.frames = np.concatenate([track.frames[:idx1], track.frames[idx2:]]) # removeTime
track.endTime = max(t1U, track.endTime - duration); track.startTime = min(t1U, track.startTime) # removeTime
for track in after: # removeTime
if isinstance(track, ContourTrack): track.coords[:,2] -= duration # removeTime
if isinstance(track, StreamTrack): track.times -= duration # removeTime
track.startTime -= duration; track.endTime -= duration # removeTime
self._resetTimes(); self._resetDis(); return self # removeTime
def _move(cs, e1, e2): # _move
det = e1[0]*e2[1] - e1[1]*e2[0]; dot = e1@e2; angle = math.atan2(det, dot) # _move
s = math.sin(angle); c = math.cos(angle); rot = np.array([[c, -s], [s, c]]) # _move
scale = (e2**2).sum()**0.5/(e1**2).sum()**0.5; return (rot @ cs.T)*scale | transpose() # _move
@k1.patch(ContourTrack) # _move
def movePoint(self, x, y, start=True): # movePoint
"""Move contour's start/end to another location, smoothly scaling all
intermediary points along.
:param start: if True, move the start point, else move the end point""" # movePoint
c = self.coords; e2 = np.array([x, y]) # movePoint
if start: s = c[-1,:2]; e1 = c[0,:2] - s # movePoint
else: s = c[0,:2]; e1 = c[-1,:2] - s # movePoint
e2 = e2 - s; c[:,:2] = _move(c[:,:2]-s, e1, e2)+s # movePoint
@k1.patch(Track) # movePoint
def nextTrack(self) -> Track: # nextTrack
"""Grabs the next track (ordered by start time) in the recording""" # nextTrack
return self.recording._tracks | filt(op().startTime) | filt(op().startTime > (self.startTime or 0)) | sortF(op().startTime) | item() # nextTrack
@k1.patch(Recording) # nextTrack
def refine(self, enabled:List[int]=[1,1,0]) -> Recording: # refine
"""Perform sensible default operations to refine the Recording.
This currently includes:
- (0) Splitting ContourTracks into multiple smaller tracks using click events
- (1) Forming words from nearby CharTracks
- (2) Removing open-close CharTracks. Basically, CharTracks that don't have a begin or end time
:param enabled: list of integers, whether to turn on or off certain features. 1 to turn on, 0 to turn off""" # refine
if enabled[0]: self.formWords() # refine
if enabled[1]: self.sel(klass=ContourTrack) | op().splitClick().all() | ignore() # refine
if enabled[2]: self.removeTracks(self.sel(klass=CharTrack) | ~filt(op().startTime)) # refine
return self # refine
def convBlock(inC, outC, kernel=3, stride=2, padding=1): # convBlock
return torch.nn.Sequential(torch.nn.Conv2d(inC, outC, kernel, stride, padding), torch.nn.ReLU(), torch.nn.BatchNorm2d(outC)) # convBlock
if hasTorch: # convBlock
class skipBlock(torch.nn.Module): # convBlock
def __init__(self, inC): # convBlock
super().__init__(); self.conv1 = convBlock(inC, inC, stride=1) # convBlock
self.conv2 = convBlock(inC, inC*2) # convBlock
def forward(self, x): return ((x | self.conv1) + x) | self.conv2 # convBlock
class Net(torch.nn.Module): # convBlock
def __init__(self, skips:int=5): # convBlock
super().__init__() # convBlock
self.skips = torch.nn.Sequential(convBlock(3, 8), *[skipBlock(8*2**i) for i in range(skips)]) # convBlock
self.avgPool = torch.nn.AdaptiveAvgPool2d([1, 1]); self.lin1 = knn.LinBlock(8 * 2**skips, 50) # convBlock
self.lin2 = torch.nn.Linear(50, 10); self.softmax = torch.nn.Softmax(dim=1) # convBlock
self.distThreshold = torch.nn.Parameter(torch.tensor(-0.5)); self.sigmoid = torch.nn.Sigmoid() # convBlock
self.headOnly = True # convBlock
def forward(self, x): # convBlock
x = x | self.skips | self.avgPool | op().squeeze() | self.lin1 # convBlock
return x if self.headOnly else x | self.lin2 # convBlock
x = ((x[None] - x[:,None])**2).sum(dim=-1) # convBlock
x = (x + 1e-7)**0.5 + self.distThreshold | self.sigmoid # convBlock
return x # convBlock
[docs]def distNet() -> "torch.nn.Module": # distNet
"""Grabs a pretrained network that might be useful in distinguishing
between screens. Example::
net = k1ui.distNet()
net(torch.randn(16, 3, 192, 192)) # returns tensor of shape (16, 10)""" # distNet
net = Net(); net.load_state_dict(cat(f"{basePath}256.model.state_dict.pth", False) | aS(dill.loads)) # distNet
net.parameters() | op().requires_grad_(False).all() | ignore(); net.eval(); return net # distNet
def discardTransients(it, col=None, countThres=7, regular=False): # consistent for 7 consecutive frames, then output the results # discardTransients
lastRow = None; lastE = None # discardTransients
yielded = False; count = 0 # discardTransients
for row in it: # discardTransients
e = row[col] if col else row # discardTransients
if e == lastE: count += 1 # discardTransients
else: count = 0; lastE = e; lastRow = row; yielded = False # discardTransients
if count > countThres-2 and not yielded: yielded = True; yield lastRow # discardTransients
elif regular: yield None # discardTransients
class Buffer: # Buffer
def __init__(self): self.l = deque() # Buffer
def append(self, x): self.l.append(x) # Buffer
def __next__(self): return self.l.popleft() # Buffer
if hasTorch and hasTv: # Buffer
np2Tensor = toImg() | aS(tf.Resize([192, 192])) | toTensor() # Buffer
class MLP(nn.Module): # Buffer
def __init__(self, nClasses, **kwargs): # Buffer
super().__init__(); self.l1 = knn.LinBlock(50, nClasses); self.l2 = nn.Linear(nClasses, nClasses) # Buffer
def forward(self, xb): return xb | self.l1 | self.l2 # Buffer
whatever = object() # Buffer
[docs]class TrainScreen: # TrainScreen
data: List[Tuple[int, str]] # TrainScreen
"""Core dataset of TrainScreen. Essentially just a list of (frameId, screen name)""" # TrainScreen
[docs] def __init__(self, r:Recording): # TrainScreen
"""Creates a screen training system that will train a small neural network
to recognize different screens using a small amount of feedback from the user.
Overview on how it's supposed to look like:
Setting up::
r = k1ui.Recording(await k1ui.record(30)) # record everything for 30 seconds, and creates a recording out of it
ts = k1ui.TrainScreen(r) # creates the TrainScreen object
r # run this in a cell to display the recording, including StreamTrack
ts.addRule("home", "settings", "home") # add expected screen transition dynamics (home -> settings -> home)
Training with user's feedback::
ts.registerFrames({"home": [100, 590, 4000, 4503], "settings": [1200, 2438]}) # label some frames of the recording. Network will train for ~6 seconds
next(ts) # display 20 images that confuses the network the most
ts.register({"home": [2, 6], "settings": [1, 16]}) # label some frames from the last line. Notice the frame numbers are much smaller and are <20
next(ts); ts.register({}); next(ts); ts.register({}) # repeat the last 2 lines for a few times (3-5 times is probably good enough for ~7 screens)
Evaluating the performance::
ts.graphs() # displays 2 graphs: network's prediction graph and the actual rule graph. Best way to judge performance
ts.l.Accuracy.plot() # actual accuracy metric while training. Network could have bad accuracy here while still able to construct a perfect graph, so don't rely much on this
Using the model::
ts.predict(torch.randn(2, 3, 192, 192) | k1ui.distNet()) # returns list of ints. Can use ts.idx2Name dict to convert to screen names
Saving the model::
ts | aS(dill.dumps) | file("ts.pth")
.. warning::
This won't actually save the associated recording, because recordings are
very heavy objects (several GB). It is expected that you manually manage
the lifecycle of the recording.""" # TrainScreen
self.r = r; self.data = []; # [(frame id, screen name)] # TrainScreen
self._aspect = self.frames | item() | op().shape[:2] | ~aS(lambda x, y: y/x) # TrainScreen
self._distNet = distNet(); self._rules = set(); self._trainParams = {"joinAlpha": 0, "epochs": 300} # TrainScreen
self._lastScreenName = None; self._screenDump = Buffer(); self._screenTransients = discardTransients(self._screenDump, regular=True) # TrainScreen
@property # TrainScreen
def _coldStart(self): return len(self.data) == 0 # whether there are any data at all to work with # TrainScreen
def _coldGuard(self): # TrainScreen
if self._coldStart: raise Exception("TrainScreen has not started yet. Run `next(ts)`, choose a few frames using `ts.register()` to access this functionality") # TrainScreen
def _learner(self): # TrainScreen
self._coldGuard(); l = k1.Learner(); l.data = self._dataF() # TrainScreen
l.model = MLP(len(self.name2Idx)) # TrainScreen
l.opt = optim.AdamW([l.model.parameters(), self._distNet.parameters()] | joinStreams(), lr=3e-3) # TrainScreen
l.cbs.add(Cbs.LossCrossEntropy()); l.css = "none" # TrainScreen
l.ConfusionMatrix.categories = deref()(self.name2Idx.items()) | sort(1) | cut(0) | deref() # TrainScreen
l.cbs.remove("AccuracyTop5", "AccF0"); return l # TrainScreen
[docs] def train(self, restart=True): # TrainScreen
"""Trains the network for a while (300 epochs/6 seconds). Will be called
automatically when you register new frames to the system
:param restart: whether to restart the small network or not""" # TrainScreen
if restart: self.l = self._learner(); # TrainScreen
self.l.run(self._trainParams["epochs"]) # TrainScreen
[docs] def trainParams(self, joinAlpha:float=None, epochs:int=None): # TrainScreen
"""Sets training parameters.
:param joinAlpha: (default 0) alpha used in joinStreamsRandom component for each
screen categories. Read more at :class:`~k1lib.cli.structural.joinStreamsRandom`
:param epochs: (default 300) number of epochs for each training session""" # TrainScreen
if joinAlpha: self._trainParams["joinAlpha"] = joinAlpha # TrainScreen
if epochs: self._trainParams["epochs"] = epochs # TrainScreen
@property # TrainScreen
def frames(self) -> np.ndarray: # TrainScreen
"""Grab the frames from the first :class:`StreamTrack` from the :class:`Recording`""" # TrainScreen
return self.r.sel1(klass=StreamTrack).frames # TrainScreen
@property # TrainScreen
@lru_cache # TrainScreen
def feats(self) -> List[np.ndarray]: # TrainScreen
"""Gets the feature array of shape (N, 10) by passing the frames
through :meth:`distNet`. This returns a list of arrays, not a giant,
stacked array for memory performance""" # TrainScreen
self._coldGuard(); print("Converting all frames to features using `distNet`..."); a = k1.AutoIncrement() # TrainScreen
res = self.frames | tee(lambda x: f"{a()}/{len(self)}").crt() | np2Tensor.all() | batched(16, True) | apply(aS(list) | aS(torch.stack) | aS(self._distNet)) | joinStreams() | aS(list) # TrainScreen
print(); return res # TrainScreen
def __len__(self): return len(self.frames) # TrainScreen
def _randomConsidering(self): return range(len(self)) | splitW(1, 1, 1, 1, 1) | apply(randomize(None, 42) | head(4)) | joinStreams() | aS(list) # TrainScreen
def __next__(self) -> "PIL.Image.Image": # show frames # TrainScreen
if self._coldStart: self._considering = self._randomConsidering() # TrainScreen
else: # TrainScreen
a = self.transitionScreens(False) | randomize(None, 42) | cut(0) | aS(iter) # TrainScreen
b = self._randomConsidering() | aS(iter); c = self._midBoundaryConsidering() | aS(iter) # TrainScreen
self._considering = [a, a, b, c, c, c] | apply(wrapList() | insert(yieldT | repeat(), False) | joinStreams() | randomize()) | joinStreamsRandom() | head(20) | deref() # TrainScreen
return self._considering | lookup(self.frames) | insertIdColumn(begin=False) | plotImgs(5, self._aspect-0.2, im=True) # TrainScreen
def _refreshIdx(self): self.idx2Name, self.name2Idx = self.data | cut(1) | aS(set) | insertIdColumn() | toDict() & (permute(1, 0) | toDict()) | deref() # TrainScreen
[docs] def register(self, d): # TrainScreen
"""Tells the object which images previously displayed by :meth:`TrainScreen.__next__`
associate with what screen name. Example::
next(ts) # displays the images out to a notebook cell
ts.register({"home": [3, 4, 7], "settings": [5, 19, 2], "monkeys": [15, 11], "guns": []})
This will also quickly (around 6 seconds) train a small neural network on all
available frames based on the new information you provided.
See also: :meth:`registerFrames`""" # TrainScreen
self.data = [self.data, deref()(d.items()) | apply(repeat(), 0) | transpose().all() | joinStreams() | permute(1, 0) | lookup(self._considering, 0)] | joinStreams() | sort(0) | unique(0) | deref() # TrainScreen
self._refreshIdx(); self.train() # TrainScreen
[docs] def registerFrames(self, data:Dict[str, List[int]]): # TrainScreen
"""Tells the object which frames should have which labels.
Example::
ts.registerFrames({"home": [328, 609], "settings": [12029], "monkeys": [1238]})
This differs from :meth:`register` in that the frame id here is the
absolute frame index in the recording, while in :meth:`register`,
it's the frame displayed by :meth:`TrainScreen.__next__`.""" # TrainScreen
self.data = [self.data, deref()(data.items()) | apply(repeat(), 0) | transpose().all() | joinStreams() | permute(1, 0)] | joinStreams() | sort(0) | unique(0) | deref(); self._refreshIdx(); self.train() # TrainScreen
[docs] def addRule(self, *screenNames:List[str]) -> "TrainScreen": # TrainScreen
"""Adds a screen transition rule. Let's say that the transition
dynamic looks like this:
.. code-block:: text
home <---> settings <---> account
^
|
v
shortcuts
You can represent it like this::
ts.addRule("home", "settings", "account", "settings", "home")
ts.addRule("settings", "shortcuts", "settings")""" # TrainScreen
screenNames | window(2) | apply(tuple) | apply(self._rules.add) | ignore(); return self # TrainScreen
[docs] def transitionScreens(self, obeyRule:bool=whatever) -> List[Tuple[int, str]]: # TrainScreen
"""Get the list of screens (list of (frameId, screen name) tuple) that
the network deems to be transitions between screen states.
:param obeyRule: if not specified, then don't filter. If True, returns only screens that
are part of the specified rule and vice versa""" # TrainScreen
self._coldGuard() # TrainScreen
with torch.no_grad(): transitions = self.predict(self.feats) | insertIdColumn() | aS(discardTransients, 1) | window(2, True, None) | filt(~aS(lambda x, y: not y or x[1] != y[1])) | cut(0) | lookup(self.idx2Name, 1) | deref() # TrainScreen
if obeyRule is whatever: return transitions # TrainScreen
f = inSet(self._rules, 1) if obeyRule else ~inSet(self._rules, 1) # TrainScreen
return transitions | window(2) | apply(transpose() | iden() + aS(tuple)) | f | transpose().all() | joinStreams() | unique(0) | deref() # TrainScreen
[docs] def newEvent(self, sess:WsSession, event:dict): # TrainScreen
if event["type"] == "stream": # TrainScreen
with torch.no_grad(): # TrainScreen
name = event["frame"] | np2Tensor | op().reshape(-1, 3, 192, 192) | aS(self._distNet) | op().view(1, -1)\
| self.l.model | op().argmax().item() | aS(lambda x: self.idx2Name[x]) # TrainScreen
sess.loop.create_task(sess.eventCb(sess, {"type": "screenName", "name": name})) # TrainScreen
if event["type"] == "screenName": # TrainScreen
self._screenDump.append(event["name"]); res = next(self._screenTransients) # TrainScreen
if res: # TrainScreen
sess.loop.create_task(sess.eventCb(sess, {"type": "screenTransition", "transition": (self._lastScreenName, res)})) # TrainScreen
self._lastScreenName = res # TrainScreen
[docs] def predict(self, feats:"torch.Tensor") -> List[int]: # TrainScreen
"""Using the built-in network, tries to predict the screen name for a
bunch of features of shape (N, 10). Example::
r = ...; ts = k1ui.TrainScreen(r); next(ts)
ts.register({"bg": [9, 10, 11, 12, 17, 19], "docs": [5, 6, 7, 8, 0, 1, 4], "jupyter": [2, 3]})
# returns list of 2 integers
ts.predict(torch.randn(2, 3, 192, 192) | aS(k1ui.distNet()))""" # TrainScreen
self._coldGuard(); return feats | batched(128, True) | apply(aS(list) | aS(torch.stack) | aS(self.l.model) | op().argmax(1).numpy()) | joinStreams() # TrainScreen
[docs] def transitionGraph(self) -> "graphviz.graphs.Digraph": # TrainScreen
"""Gets a screen transition graph of the entire recording. See also: :meth:`graphs`""" # TrainScreen
g = k1.digraph(); self.transitionScreens() | cut(1) | window(2) | apply(tuple) | count() | cut(0, 1) | ~apply(lambda c, xy: g(*xy, label=f" {c}")) | ignore(); return g # TrainScreen
[docs] def ruleGraph(self) -> "graphviz.graphs.Digraph": # TrainScreen
"""Gets a screen transition graph based on the specified rules.
Rules are added using :meth:`addRule`. See also: :meth:`graphs`""" # TrainScreen
g = k1.digraph(); self._rules | ~apply(g) | ignore(); return g # TrainScreen
[docs] def graphs(self) -> viz.Carousel: # TrainScreen
"""Combines both graphs from :meth:`transitionGraph` and :meth:`ruleGraph`""" # TrainScreen
return [self.transitionGraph(), self.ruleGraph()] | toImg().all() | aS(viz.Carousel) # TrainScreen
[docs] def labeledData(self) -> viz.Carousel: # TrainScreen
"""Visualizes labeled data""" # TrainScreen
return self.data | groupBy(1) | apply(randomize(None) | head(5) | lookup(self.frames, 0)) | batched(5) | plotImgs(5, self._aspect-0.2, table=True, im=True).all() | aS(viz.Carousel) # TrainScreen
def __getstate__(self): d = dict(self.__dict__); del d["r"]; del d["_lastScreenName"]; del d["_screenDump"]; del d["_screenTransients"]; return d # TrainScreen
def __setstate__(self, d): # TrainScreen
self.__dict__.update(d); self._lastScreenName = None; self._screenDump = Buffer() # TrainScreen
self._screenTransients = discardTransients(self._screenDump, regular=True) # TrainScreen
[docs] def correctRatio(self): # TrainScreen
"""Ratio between the number of screens that is in a valid transition and
ones that isn't in a valid transition. Just a quick metric to see how well the
network is doing. The higher the number, the better it is""" # TrainScreen
return len(self.transitionScreens(True))/len(self.transitionScreens(False)) # TrainScreen
def fillIn(n, states): # fillIn
iS = 0 # index of states # fillIn
state = None; nextI, nextS = states[iS] # fillIn
for i in range(n): # fillIn
if i >= nextI: # fillIn
iS += 1; state = nextS # fillIn
if iS < len(states): nextI, nextS = states[iS] # fillIn
yield [i, state] # fillIn
def blocks(it): # blocks
lastIs = []; lastE = None # blocks
for i, e in it: # blocks
if e != lastE: # blocks
if lastE is not None: yield min(lastIs), max(lastIs), lastE # blocks
lastIs = []; lastE = e # blocks
lastIs.append(i) # blocks
yield min(lastIs), max(lastIs), lastE # blocks
@k1.patch(TrainScreen) # blocks
def _dataF(self, bs=64): # _dataF
self._coldGuard(); v1 = fillIn(len(self), self.data) | filt(op(), 1) # old version. A bit more liveral than v2, and will accidentally auto label wrongly from time to time # _dataF
v2 = blocks(self.data) | ~apply(lambda x, y, z: [range(x, y+1), z | repeat()] | transpose()) | joinStreams() # _dataF
js = deref() | aS(lambda xs: xs | apply(repeatFrom() | randomize()) | joinStreamsRandom(self._trainParams["joinAlpha"], xs | apply(len) | deref())) # joinStreams # _dataF
return v2 | randomize(None) | groupBy(1) | filt(lambda x: len(x) > 1) | splitW().all() | transpose()\
| apply(js | lookup(self.feats, 0) | lookup(self.name2Idx, 1) | batched(bs)\
| apply(transpose() | (aS(list) | aS(torch.stack)) + toTensor(int)) # _dataF
) | stagger.tv(1024/bs) | aS(list) # _dataF
def midBounds(it): # to grab data samples that's in between blocks. Aka the really confusing case in-between transitions, so that the user can guide it effectively # midBounds
lastI = 0; lastE = None # midBounds
for i, e in it: # midBounds
if e != lastE: yield (i + lastI)//2, i-lastI, lastE; lastE = e # midBounds
lastI = i # midBounds
return it # midBounds
@k1.patch(TrainScreen) # midBounds
def _midBoundaryConsidering(self): return midBounds(self.data) | ~head(1) | ~sort(1) | cut(0) # _midBoundaryConsidering