# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import logging, warnings, os, time, re, gc, json, k1lib, importlib, urllib.parse, math, base64, dill, inspect, threading, datetime, traceback, functools, collections, numpy as np, pprint
from typing import Any, List, Union, Tuple, Iterator, Dict
try: import torch; hasTorch = True
except: hasTorch = False
__all__ = ["_docsUrl", "mplLock", "dep", "depCli", "isNumeric",
"patch", "cache", "cron", "preload", "wrapMod", "wraps", "squeeze", "raiseEx",
"numDigits", "limitLines",
"limitChars", "showLog", "cleanDiv",
"beep", "beepOnAvailable", "dontWrap",
"debounce", "scaleSvg", "now", "pushNotification", "ticks", "digraph", "graph",
"encode", "decode", "hash", "resolve", "config", "modbusCrc", "parseTimeStr"]
_docsUrl = "https://k1lib.com"
mplLock = threading.Lock()
class Dependency: # Dependency
def __init__(self, s, alt:str=None, url:str=None): self.s = s; self.alt = alt; self.url = url; self.errorMsg = "" # Dependency
def __getattr__(self, attr): # Dependency
after = f"More information is available on {self.url}. " if self.url else f"" # Dependency
raise ImportError(f"Python package `{self.alt or self.s}` not found. Please install it. {after}After installing, if you're in a notebook environment, then restart the kernel. This is the error when importing it:\n\n{self.errorMsg}") # Dependency
def oldDep(s, alt:str=None, url:str=None): # oldDep
try: return importlib.import_module(s) # oldDep
except: d = Dependency(s, alt, url); d.errorMsg = traceback.format_exc(); return d # oldDep
[docs]
class dep: # dep
def __init__(self, s, alt=None, url=None): # dep
"""Imports a potentially unavailable package
Example::
graphviz = k1.dep("graphviz")
# executes as normal, if graphviz is available, else throws an error
g = graphviz.Digraph()
I don't imagine this would be useful for everyday use though.
This is mainly for writing this library, so that it can use
optional dependencies.
:param s: name of the package. Can be nested, like `matplotlib.pyplot`
:param alt: (optional) name of the package to display in error message if package is not found
:param url: (optional) url of the package's website, so that they know where to get official docs""" # dep
self._k_s = s; self._k_alt = alt; self._k_url = url; self._k_mod = None; self.loaded = False # dep
def _k_bootstrap(self): # dep
try: self.loaded = True; self._k_mod = importlib.import_module(self._k_s); return self._k_mod # dep
except: # dep
after = f"More information is available on {self._k_url}. " if self._k_url else f"" # dep
raise ImportError(f"Python package `{self._k_alt or self._k_s}` not found. Please install it. {after}After installing, if you're in a notebook environment, then restart the kernel. This is the error when importing it:\n\n{traceback.format_exc()}") # dep
def __getattr__(self, attr): # dep
"""This code is a little magical.
Say you're doing this::
np = k1.dep("numpy")
np # it's an object of class k1lib.dep
np.array([2, 3]) # instantiate a new array like usual. This is by running k1lib.__getattr__
np # now the old python object has turned into the actual numpy module, as if you import it using "import numpy as np"
It's not foolproof, but works in most cases and gives a nice little speed boost
Inspired from https://github.com/cart0113/pyjack""" # dep
if not self.loaded: # dep
replaceWith = self._k_bootstrap(); gc.collect(); arr = gc.get_referrers(self); iSelf = id(self) # dep
for x in arr: # dep
if isinstance(x, dict): # dep
for k,v in x.items(): # dep
if id(v) == iSelf: x[k] = replaceWith # dep
return getattr(self._k_mod, attr) # dep
dep.mpl = dep("matplotlib", url="https://matplotlib.org/"); dep.plt = dep("matplotlib.pyplot", url="https://matplotlib.org/"); dep.cm = dep("matplotlib.cm", url="https://matplotlib.org/") # dep
dep.pygments = dep("pygments", url="https://pygments.org/"); dep.torch = dep("torch", url="https://pytorch.org/") # dep
try: import pygments.formatters; import pygments.lexers # these are not loaded by default, so have to prime them # dep
except: pass # dep
dep.bs4 = dep("bs4", "beautifulsoup4", "https://www.crummy.com/software/BeautifulSoup/") # dep
dep.yaml = dep("yaml", "pyyaml", "https://pyyaml.org/"); dep.pd = dep("pandas", url="https://pandas.pydata.org/") # dep
dep.requests = dep("requests", url="https://requests.readthedocs.io/"); dep.graphviz = dep("graphviz", url="https://graphviz.readthedocs.io/") # dep
dep.PIL = dep("PIL", "Pillow", "https://pillow.readthedocs.io/"); dep.websockets = dep("websockets", url="https://websockets.readthedocs.io/") # dep
dep.cv2 = dep("cv2", "opencv-python", "https://opencv.org/") # dep
def depCli(prog="ls", insCmd=None, website=None): # depCli
"""Checks whether a particular command-line program is installed or not.
Example::
k1.depCli("vim")
This will throw an error if that program is not found
:param prog: program name
:param insCmd: (optional) installation command, could be "apt install vim"
:param website: (optional) website that the user can learn more about the program""" # depCli
if None | k1lib.cli.cmd(("where " if os.name == "nt" else "which ") + prog) | k1lib.cli.shape(0) == 0: # depCli
insS = f" by executing '{insCmd}'" if insCmd else "" # depCli
webS = f". Visit {website} to learn more about the program" if website else "" # depCli
raise Exception(f"Command line program '{prog}' not found in PATH. Please install it{insS}{webS}") # depCli
return True # depCli
[docs]
def isNumeric(x) -> bool: # isNumeric
"""Returns whether object is actually a number""" # isNumeric
return isinstance(x, (int, float, np.number)) # isNumeric
[docs]
def patch(_class:type, name:str=None, docs:Union[str, Any]=None, static=False): # patch
"""Patches a function to a class/object.
:param _class: object to patch function. Can also be a type
:param name: name of patched function, if different from current
:param docs: docs of patched function. Can be object with defined __doc__ attr
:param static: whether to wrap this inside :class:`staticmethod` or not
:return: modified function just before patching
Intended to be used like this::
class A:
def methA(self):
return "inside methA"
@k1lib.patch(A)
def methB(self):
return "inside methB"
a = A()
a.methB() # returns "inside methB"
You can do ``@property`` attributes like this::
class A: pass
@k1lib.patch(A, "propC")
@property
def propC(self): return self._propC
@k1lib.patch(A, "propC")
@propC.setter
def propC(self, value): self._propC = value
a = A(); a.propC = "abc"
a.propC # returns "abc"
The attribute name unfortunately has to be explicitly declared, as I can't
really find a way to extract the original name. You can also do static
methods like this::
class A: pass
@k1lib.patch(A, static=True)
def staticD(arg1): return arg1
A.staticD("def") # returns "def"
""" # patch
def inner(function): # patch
_docs = docs # patch
if _docs is not None and not isinstance(_docs, str): _docs = _docs.__doc__ # patch
_docs = _docs or function.__doc__ or _class.__doc__ # patch
_name = name or function.__qualname__.split(".")[-1] # patch
# patch
_function = staticmethod(function) if static else function # patch
_function.__doc__ = _docs; setattr(_class, _name, _function); return _function # patch
return inner # patch
def hashableAKw(args, kwargs): # hashableAKw
a = tuple((tuple(x) if isinstance(x, list) else x) for x in args) # can accept lists of hashable items! # hashableAKw
b = tuple(sorted(list(kwargs.items()), key=lambda x: x[0])); key = (*a, *b); return (*a, *b) # hashableAKw
_k1_cacheD = {} # Dict[name -> {docs, func, clearF, hits, misses}] # hashableAKw
_k1_cache_autoInc = [1] # hashableAKw
[docs]
class cache: # cache
[docs]
def __init__(self, maxsize=128, timeout=60, name=None, docs=None): # cache
"""Like functools.lru_cache, but with timeout period.
Example::
@k1.cache(timeout=10)
def slow_function(x): return x * 2
You can also quickly deploy a management plane to manage all caches::
app = flask.Flask(__name__)
k1.cache.flask(app)
app.run(host="0.0.0.0", port=80)
See more about this at :meth:`flask`
:param maxsize: max size of the cache
:param timeout: timeout in seconds""" # cache
if not name: name = f"_k1_cache_{_k1_cache_autoInc[0]}"; _k1_cache_autoInc[0] += 1 # cache
if "/" in name: raise Exception("Can't have forward slash in the name") # cache
self.maxsize = maxsize; self.timeout = timeout; self.cache = collections.OrderedDict(); self.name = name; self.docs = docs # cache
def __call__(self, func): # cache
if self.name in _k1_cacheD: raise Exception(f"Name '{self.name}' has appeared before. Please use a different name") # cache
_k1_cacheD[self.name] = meta = {"docs": self.docs, "func": func, "clearF": (lambda: self.cache.clear()), "hits": 0, "timeouts": 0, "misses": 0, "capacity": self.maxsize, "len": len(self.cache)} # cache
@functools.wraps(func) # cache
def wrapper(*args, _k1_cache_refresh=False, **kwargs): # _k1_cache_refresh: if True, returns the cached item, but then immediately fetches the new version. Not entirely sure where this is used in # cache
if self.maxsize == 0: return func(*args, **kwargs) # cache
key = hashableAKw(args, kwargs); now = time.time() # cache
if key in self.cache: # cache
result, timestamp = self.cache[key] # cache
if _k1_cache_refresh: result = func(*args, **kwargs); self.cache[key] = (result, now); return result # cache
if now - timestamp < self.timeout: self.cache.move_to_end(key); meta["hits"] += 1; return result # cache
else: del self.cache[key]; meta["timeouts"] += 1 # cache
else: meta["misses"] += 1 # cache
result = func(*args, **kwargs) # cache
if len(self.cache) >= self.maxsize: self.cache.popitem(last=False) # cache
self.cache[key] = (result, now); meta["len"] = len(self.cache); return result # cache
return wrapper # cache
[docs]
@staticmethod # cache
def allData(): # cache
"""Grabs ``Dict[name -> {docs, func, clearF}]`` of all caches""" # cache
return _k1_cacheD # cache
[docs]
@staticmethod # cache
def flask(app, **kwargs): # cache
"""Attaches a cache management plane to a flask app.
Example::
app = flask.Flask(__name__)
k1.cache.flask(app)
app.run(host="0.0.0.0", port=80)
Then, you can access the route "/k1/cache" to see an overview of all caches
:param app: flask app object
:param kwargs: extra random kwargs that you want to add to ``app.route()`` function""" # cache
k1 = k1lib; cli = k1.cli; viz = k1.viz # cache
@app.route("/k1/cache", **kwargs) # cache
def index(): ui3 = k1.cache.allData().items() | ~cli.apply(lambda k,v: [k, v["func"].__name__, inspect.getfile(v["func"]), v["hits"], v["misses"], v["timeouts"], round(v["hits"]/(v["hits"] + v["misses"] + v["timeouts"] + 1e-9)*100, 2), round(v["timeouts"]/(v["hits"] + v["misses"] + v["timeouts"] + 1e-9)*100, 2), v["len"], v["capacity"], round(v["len"]/(v["capacity"]+1e-9)*100, 2), f"<button onclick='clearCache(\"{k}\")'>Clear</button>", v["docs"]]) | cli.deref() | (cli.toJsFunc("term") | cli.grep("${term}") | viz.Table(["name", "func's name", "func's file name", "hits", "misses", "timeouts", "% hit", "% timeouts", "len", "capacity", "% capacity", "clear cache", "docs"], height=600, sortF=True)) | cli.op().interface() | cli.toHtml(); return f"""
<h1>Caches</h1><div style="overflow-x: auto">{ui3}</div><script>async function clearCache(x) {{ await fetch(`/k1/cache/api/${{x}}/clear`); alert("Cleared"); }}</script>""" # cache
@app.route("/k1/cache/api/<cache>/clear") # cache
def api_clear(cache): k1.cache.allData()[cache]["clearF"](); return "ok" # cache
_k1_cron_delay_data = {} # Dict[name -> {docs, delay, logs, lastRun}] # cache
_k1_cron_autoInc = [1] # cache
[docs]
class cron: # cron
stop = "_k1.cron.stopSignal" # cron
[docs]
def __init__(self, f=None, delay=None, kw=None, name=None, docs=None, daemon=False, delayedStart=0): # cron
"""Sets up a cron job in another thread, running the decorated
function in 2 modes:
Mode 1: whenever ``f`` goes from False to True. It's activated when ``delay`` is not specified. Example::
@k1.cron(lambda minute: minute == 0)
def f1(): # runs every hour
... # do some stuff
@k1.cron(lambda second: second % 5 == 0)
def f2(): # runs every 5 seconds
... # do some stuff
So, the first function will run every hour, and the second function will
run every 5 seconds. Pretty straightforward. The timing function ``f`` can
be as complicated as you want, but it can only accept the following parameters:
- year
- month: 1-12
- day: 1-31
- weekday: 0-6, 0 for Monday
- hour: 0-23
- minute: 0-59
- second: 0-59
Mode 2: every ``delay`` seconds. It's activated when ``delay`` is specified. Example::
@k1.cron(delay=10)
def f1(): # runs every 10 seconds
... # do some stuff
Theres a special sentinel ``k1.cron.stop`` that when returns, it terminates the cron
loop, for both modes. Example::
@k1.cron(delay=10)
def f1(): # only prints out "abc" once
print("abc")
return k1.cron.stop
You can also get a python object describing past runs, stdout and errors by doing ``k1.cron.allData``
You can also quickly deploy a management plane to manage all cron jobs::
app = flask.Flask(__name__)
k1.cron.flask(app)
app.run(host="0.0.0.0", port=80)
See more about this at :meth:`flask`
:param f: function to trigger wrapped function when this transitions from False to True
:param delay: if specified, just run the function repeatedly, delayed by this amount after running. Mutually exclusive with .f
:param kw: extra keyword arguments to pass to the function
:param name: optional name to show up in :meth:`allData`
:param docs: optional documentation to show up in :meth:`allData`
:param daemon: if True, makes the cron thread daemon
:param delayedStart: if specified, delay the start of the first function execution to after this number of seconds""" # cron
if not name: name = f"_k1_cron_{_k1_cron_autoInc[0]}"; _k1_cron_autoInc[0] += 1 # cron
if "/" in name: raise Exception("Can't have forward slash in the name") # cron
self.f = f; self.delay = delay; self.kw = kw; self.name = name; self.docs = docs; self.daemon = daemon; self.delayedStart = delayedStart # cron
[docs]
@staticmethod # cron
def allData(): # cron
"""Grabs ``Dict[name -> {docs, delay, logs}]`` for all delay crons""" # cron
return _k1_cron_delay_data # cron
def __call__(self, func): # cron
f = self.f; delay = self.delay; kw = self.kw; name = self.name; docs = self.docs; kw = kw or {} # cron
if self.name in _k1_cron_delay_data: raise Exception(f"Name '{self.name}' has appeared before. Please use a different name") # cron
_k1_cron_delay_data[name] = self.obj = {"docs": docs, "delay": delay, "logs": collections.deque([], 100), "lastRun": [0, ""], "delayedStart": self.delayedStart, "func": func} # cron
if delay: # cron
def startLoop(): # cron
if self.delayedStart > 0: time.sleep(self.delayedStart) # cron
while True: # cron
beginTime = time.time() # cron
try: # cron
_k1_cron_delay_data[name]["lastRun"] = [beginTime, "now"] # cron
res = func(**kw) # _k1_cron_delay_data[name]["logs"].append({"beginTime": beginTime, "duration": time.time() - beginTime, "success": True}) # don't log successful requests, cause that's too frequent # cron
if res == cron.stop: break # cron
except Exception as e: print(f"{type(e)}\ue004{e}"); print(traceback.format_exc()); _k1_cron_delay_data[name]["logs"].append({"beginTime": beginTime, "duration": time.time() - beginTime, "success": False, "exc": f"{type(e)}\ue004{e}", "tb": traceback.format_exc()}) # cron
time.sleep(delay) # cron
T = threading.Thread(target=startLoop) # cron
if self.daemon: T.setDaemon(True) # cron
T.start(); return func # cron
else: # cron
args = list(inspect.signature(f).parameters.keys()) # cron
s = {"year", "month", "day", "weekday", "hour", "minute", "second"} # cron
for arg in args: # cron
if arg not in s: raise Exception(f"Unknown argument {arg}. Only (year, month, day, weekday, hour, minute, seconds) are allowed") # cron
def startLoop(): # cron
last = False; this = last # cron
if self.delayedStart > 0: time.sleep(self.delayedStart) # cron
while True: # cron
a = datetime.datetime.now() # cron
now = {"year": a.year, "month": a.month, "day": a.day, "weekday": a.weekday(), "hour": a.hour, "minute": a.minute, "second": a.second} # cron
this = f(*[now[e] for e in args]) # cron
if not last and this: # cron
try: # cron
if func(**kw) == cron.stop: break # cron
except Exception as e: print(f"{type(e)}\ue004{e}"); print(traceback.format_exc()) # cron
last = this; time.sleep(0.5) # cron
T = threading.Thread(target=startLoop) # cron
if self.daemon: T.setDaemon(True) # cron
T.start(); return func # cron
[docs]
@staticmethod # cron
def flask(app, **kwargs): # cron
"""Attaches a cron management plane to a flask app.
Example::
app = flask.Flask(__name__)
k1.cron.flask(app)
app.run(host="0.0.0.0", port=80)
Then, you can access the route "/k1/cron" to see an overview of all cron jobs
:param app: flask app object
:param kwargs: extra random kwargs that you want to add to ``app.route()`` function""" # cron
k1 = k1lib; cli = k1.cli; viz = k1.viz # cron
@app.route("/k1/cron", **kwargs) # cron
def index(): # cron
ui1 = k1.cron.allData().items() | ~cli.apply(lambda k,v: [k, v["delay"], v["delayedStart"], v["lastRun"][1], v["func"].__name__, inspect.getfile(v["func"]), v["docs"]]) | cli.deref() | (cli.toJsFunc("term") | cli.grep("${term}") | viz.Table(["name", "delay", "delayedStart", "lastRun", "func's name", "func's file name", "docs"], height=600, sortF=True)) | cli.op().interface() | cli.toHtml() # cron
ui2 = k1.cron.allData().items() | ~cli.apply(lambda k,v: [k, f"<h2>{k}</h2><pre>{pprint.pformat(v)}</pre>"]) | cli.deref() | (cli.toJsFunc("term") | cli.grep("${term}", col=0) | cli.cut(1) | cli.join("") | viz.Scroll(600)) | cli.op().interface() | cli.toHtml() # cron
return f"""<h1>Crons</h1><div style="overflow-x: auto">{ui1}</div><div style="overflow-x: auto">{ui2}</div>""" # cron
@cron(delay=10, daemon=True, delayedStart=8, name="scanCron", docs="scans through all cron jobs (k1.cron) to calculate time string of how long ago it was executed") # cron
def scanCron(): # scans all crons and inject in seconds ago, things like that # scanCron
now = time.time() # scanCron
for key in _k1_cron_delay_data: # scanCron
beginTime = _k1_cron_delay_data[key]["lastRun"][0] # scanCron
_k1_cron_delay_data[key]["lastRun"] = [beginTime, k1lib.fmt.time(now - beginTime, metric=True) + " ago"] # scanCron
_k1_preload_autoInc = [1] # scanCron
[docs]
class preload: # preload
[docs]
def __init__(self, refreshTime=10, inactiveTime=60, maxsize=128, name=None, docs=None): # preload
"""Wraps a function and pre executes that function every once in a
while if the function is being used. Example::
@k1.preload(timeout=10)
def slow_func(x): time.sleep(0.1); return x * 2
slow_func(3) # takes 100ms
time.sleep(4); slow_func(3) # takes 2us
time.sleep(4); slow_func(3) # takes 2us
time.sleep(4); slow_func(3) # takes 2us
time.sleep(4); slow_func(3) # takes 2us
time.sleep(15); slow_func(3) # takes 100ms
Normally, you don't have to use this, use :class:`~k1lib.cache` instead. This is
for those scenarios where you can't afford to have any delays, as :class:`~k1lib.cache`
will sometime take quite a while to run, but most of the time it runs fast.
With this decorator, if you use the function frequent enough (less than ``inactiveTime``),
then you will never experience any kind of delays, no matter how infrequent. This is good
for generating plots that don't change for a long time, but requires fast server response
all the time.
:param refreshTime: how often to refresh the internal cache to make content fresh?
:param inactiveTime: how long before a function's argument considered inactive and will stop refreshing automatically
:param maxsize: maximum size of the internal cache. See also :class:`~k1lib.cache` """ # preload
if not name: name = f"_k1_preload_{_k1_preload_autoInc[0]}"; _k1_preload_autoInc[0] += 1 # preload
self.refreshTime = refreshTime; self.inactiveTime = inactiveTime; self.maxsize = maxsize # preload
self.cache = cache(maxsize, refreshTime*3, name=f"preload: {name or '(no name)'}", docs=docs); self.name = name; self.docs = docs # preload
def __call__(self, func): # preload
cachedFunc = self.cache(func); lastFetches = collections.defaultdict(lambda: [0, None]) # Dict[key -> [lastFetch, og args]] # preload
@cron(delay=self.refreshTime, name=f'preload: {(self.name or "(no name)")}', docs=self.docs) # preload
def watchdog(): # preload
for lastFetch, [args, kwargs] in list(lastFetches.values()): # preload
if time.time() - lastFetch < self.inactiveTime: cachedFunc(*args, **kwargs, _k1_cache_refresh=True) # preload
@functools.wraps(func) # preload
def wrapper(*args, **kwargs): key = hashableAKw(args, kwargs); lastFetches[key][0] = time.time(); lastFetches[key][1] = [args, kwargs]; return cachedFunc(*args, **kwargs) # preload
return wrapper # preload
[docs]
class wrapMod: # wrapMod
[docs]
def __init__(self, m, moduleName=None): # wrapMod
"""Wraps around a module, and only suggest symbols in __all__ list
defined inside the module. Example::
from . import randomModule
randomModule = wrapMod(randomModule)
:param m: the imported module
:param moduleName: optional new module name for elements (their __module__ attr)""" # wrapMod
self._init(m, moduleName) # wrapMod
def _init(self, m, moduleName): # wrapMod
if moduleName is not None: # wrapMod
for v in m.__dict__.values(): # wrapMod
v.__module__ = moduleName # wrapMod
self._wrapMod_moduleName = moduleName; self._wrapMod_m = m # wrapMod
self.__dict__.update(m.__dict__) # wrapMod
self._wrapMod_extraDirs = [] # wrapMod
def __dir__(self): return self._wrapMod_m.__all__ + self._wrapMod_extraDirs # wrapMod
def __str__(self): return str(self._wrapMod_m) # wrapMod
def __getstate__(self): return {"m": self._wrapMod_m, "name": self._wrapMod_moduleName} # wrapMod
def __setstate__(self, d): self._init(d["m"], d["name"]) # wrapMod
def __repr__(self): return str(self) # wrapMod
[docs]
def wraps(ogF): # wraps
"""Kinda like :meth:`functools.wraps`, but don't update __annotations__.""" # wraps
def inner(f): # wraps
f.__doc__ = ogF.__doc__ # wraps
f.__name__ = ogF.__name__ # wraps
f.__qualname__ = ogF.__qualname__ # wraps
f.__module__ = ogF.__module__ # wraps
return f # wraps
return inner # wraps
[docs]
def squeeze(_list:Union[list, tuple, Any], hard=False): # squeeze
"""If list only has 1 element, returns that element, else returns original
list.
:param hard: If True, then if list/tuple, filters out None, and takes the first
element out even if that list/tuple has more than 1 element""" # squeeze
if isinstance(_list, (tuple, list)): # squeeze
if hard: return [e for e in _list if e != None and e != ""][0] # squeeze
elif len(_list) == 1: return _list[0] # squeeze
if hasTorch and isinstance(_list, torch.Tensor): return _list.squeeze() # squeeze
return _list # squeeze
[docs]
def raiseEx(ex:Exception): # raiseEx
"""Raises a specific exception. May be useful in lambdas""" # raiseEx
raise ex # raiseEx
[docs]
def numDigits(num) -> int: # numDigits
"""Get the number of digits/characters of this number/object""" # numDigits
return len(f"{num}") # numDigits
[docs]
def limitLines(s:str, limit:int=10) -> str: # limitLines
"""If input string is too long, truncates it and adds ellipsis""" # limitLines
splits = s.split("\n") # limitLines
if len(splits) > limit: return "\n".join(splits[:limit]) + "\n....." # limitLines
else: return s # limitLines
[docs]
def limitChars(s:str, limit:int=50): # limitChars
"""If input string is too long, truncates to first `limit` characters of the first line""" # limitChars
if s is None: return "" # limitChars
s = f"{s}".split("\n")[0] # limitChars
return s[:limit-3] + "..." if len(s) > limit else s # limitChars
[docs]
def showLog(loggerName:str="", level:int=logging.DEBUG): # showLog
"""Prints out logs of a particular logger at a particular level""" # showLog
logger = logging.getLogger(loggerName); logger.setLevel(level) # showLog
sh = logging.StreamHandler(); sh.setLevel(logging.DEBUG); logger.addHandler(sh) # showLog
def cleanDiv(_list:List[float], total:int) -> List[int]: # cleanDiv
"""Cleanly divides :total: into int segments with weights specified by
:_list:. Looks like this: ``(_list / _list.sum()) * total``, but
everything is an ``int``""" # cleanDiv
_list = np.array(_list) # cleanDiv
_list = (_list*total/_list.sum()).astype(int) # cleanDiv
_list[-1] = total - _list[:-1].sum() # cleanDiv
return _list # cleanDiv
[docs]
def beep(seconds=0.3): # beep
"""Plays a beeping sound, may be useful as notification for long-running tasks""" # beep
try: import IPython; IPython.core.display.display_html(IPython.display.HTML(f"""<script>(new Audio('data:audio/wav;base64,UklGRl9vT19XQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YU'+Array(Math.round(3.3333e3*{seconds})).join(123))).play();</script>""")); # beep
except: os.system("printf '\a'") # beep
def beepOnAvailable(url:str, timeout=5, **kwargs): # beepOnAvailable
"""Tries to connect with a url repeatedly, and if successful, plays
a beep sound""" # beepOnAvailable
import requests # beepOnAvailable
try: # beepOnAvailable
while True: # beepOnAvailable
time.sleep(1); successful = False # beepOnAvailable
try: # beepOnAvailable
if requests.get(url, timeout=timeout, **kwargs).ok: # beepOnAvailable
successful = True # beepOnAvailable
except: pass # beepOnAvailable
if successful: # beepOnAvailable
beep(); break # beepOnAvailable
except KeyboardInterrupt: print("Still not available") # beepOnAvailable
[docs]
def dontWrap(): # dontWrap
"""Don't wrap horizontally when in a notebook. Normally, if you're
displaying something long, like the output of ``print('a'*1000)`` in a notebook,
it will display it in multiple lines. This may be undesirable, so this solves
that by displaying some HTML with css styles so that the notebook doesn't wrap.""" # dontWrap
try: # dontWrap
from IPython.core.display import display, HTML # dontWrap
display(HTML("""<style>
div.jp-OutputArea-output pre {white-space: pre;}
div.output_area pre {white-space: pre;}
div.CodeMirror > div.highlight {overflow-y: auto;}
</style>""")) # dontWrap
except: pass#print("Can't run dontWrap()") # dontWrap
import asyncio, functools # dontWrap
from threading import Timer as ThreadingTimer # dontWrap
class AsyncTimer: # rename if want to use # AsyncTimer
def __init__(self, timeout, callback): # AsyncTimer
self._timeout = timeout; self._callback = callback # AsyncTimer
async def _job(self): # AsyncTimer
await asyncio.sleep(self._timeout); self._callback() # AsyncTimer
def start(self): self._task = asyncio.ensure_future(self._job()) # AsyncTimer
def cancel(self): self._task.cancel() # AsyncTimer
[docs]
def debounce(wait, threading=False): # debounce
"""Decorator that will postpone a function's execution until after
``wait`` seconds have elapsed since the last time it was invoked. Taken
from `ipywidgets <https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20Events.html>`_.
Example::
import k1lib, time; value = 0
@k1lib.debounce(0.5, True)
def f(x): global value; value = x**2
f(2); time.sleep(0.3); f(3)
print(value) # prints "0"
time.sleep(0.7)
print(value) # prints "9"
:param wait: wait time in seconds
:param threading: if True, use multiple threads, else just use async stuff""" # debounce
Timer = ThreadingTimer if threading else AsyncTimer # debounce
def decorator(fn): # debounce
timer = None # debounce
def debounced(*args, **kwargs): # debounce
nonlocal timer # debounce
if timer is not None: timer.cancel() # debounce
timer = Timer(wait, lambda: fn(*args, **kwargs)) # debounce
timer.start() # debounce
functools.update_wrapper(debounced, fn); return debounced # debounce
return decorator # debounce
[docs]
def scaleSvg(svg:str, scale:float=None) -> str: # scaleSvg
"""Scales an svg xml string by some amount.""" # scaleSvg
if scale is None: scale = k1lib.settings.svgScale # scaleSvg
wS = w = re.findall("width=\"\\d*pt\"", svg)[0] # scaleSvg
hS = w = re.findall("height=\"\\d*pt\"", svg)[0] # scaleSvg
w = int(int(re.findall("\\d+", wS)[0])*scale) # scaleSvg
h = int(int(re.findall("\\d+", hS)[0])*scale) # scaleSvg
svg = re.sub(wS, f'width="{w}pt"', svg) # scaleSvg
svg = re.sub(hS, f'height="{h}pt"', svg) # scaleSvg
return svg # scaleSvg
[docs]
def now(): # now
"""Convenience function for returning a simple time
string, with timezone and whatnot.""" # now
import datetime; return datetime.datetime.now().astimezone().isoformat() # now
[docs]
def pushNotification(title="Some title", content="Some content", url="https://k1lib.com"): # pushNotification
"""Sends push notification to your device.
Setting things up:
- Download this app: https://play.google.com/store/apps/details?id=net.xdroid.pn
- Set the `settings.pushNotificationKey` key obtained from the app. Example key: `k-967fe9...`
- Alternatively, set the environment variable `k1lib_pushNotificationKey` instead
- Run the function as usual""" # pushNotification
import requests # pushNotification
key = k1lib.settings.pushNotificationKey # pushNotification
requests.get("http://xdroid.net/api/message?" + urllib.parse.urlencode({'k': key, 't': title, 'c': content, 'u': url})) # pushNotification
print("Pushed!") # pushNotification
tickCheckpoints = np.array([1, 2, 2.5, 5, 10])/10 # pushNotification
[docs]
def ticks(x:float, y:float, rounding:int=6): # ticks
"""Get tick locations in a plot that look reasonable.
Example::
ticks(-5, 40) # returns [-10.0, -5.0, 0.0, 5.0, 10.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 45.0]
ticks(0.05, 0.07) # returns [0.05, 0.0525, 0.055, 0.0575, 0.06, 0.0625, 0.065, 0.0675, 0.07, 0.0725]
ticks(-5, 5) # returns [-6.0, -5.0, -4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
So essentially, whenever you try to plot something, you want both the x and y axis
to not have too many lines, and that the tick values snap to a nice number.
Normally you don't have to do this, as matplotlib does this automatically behind
the scenes, but sometimes you need to implement plotting again, in strange
situations, so this function comes in handy
:param x: start of interval
:param y: end of interval
:param rounding: internally, it rounds tick values to this number of
digits, to fix tiny float overflows that make numbers ugly. So
you can disable it if you're working with really small numbers""" # ticks
cli = k1lib.cli; ld = math.log10(y-x); scale = ld-ld%1 # log delta # ticks
sel = ((ld%1 - tickCheckpoints)**2).argmin() # selecting the best fit tick checkpoint # ticks
interval = 10**scale*tickCheckpoints[sel] # interval between ticks # ticks
seed = int(y/10**scale)*10**scale # seed tick # ticks
f1 = cli.applySerial(cli.op()-interval) | cli.breakIf(cli.op()<x-interval) # ticks
f2 = cli.applySerial(cli.op()+interval) | cli.breakIf(cli.op()>y+interval) # ticks
# finally, use the seed to expand in both directions to get all the ticks # ticks
return seed | f1 & f2 | cli.joinStreams() | cli.aS(set) | cli.sort(None) | cli.apply(round, ndigits=rounding) | cli.deref() | cli.aS(np.array) # ticks
try: # ticks
import graphviz # ticks
[docs]
def digraph(): # ticks
"""Convenience method for creating a new graphviz Digraph.
Example::
g = k1lib.graph()
g("a", "b", "c")
g # displays arrows from "a" to "b" and "a" to "c"
""" # ticks
return graphviz.Digraph(graph_attr={"rankdir":"TB"}) # ticks
[docs]
def graph(): # ticks
"""Convenience method for creating a new graphviz Graph. See also: :meth:`digraph`""" # ticks
return graphviz.Graph(graph_attr={"rankdir":"TB"}) # ticks
except ImportError: # ticks
digraph = graph = lambda: print("Module `graphviz` not found! Please install it first, something like `pip install graphviz`") # ticks
def encode(obj:object) -> str: # encode
"""Serialize random objects into bytes, then turn those bytes to
normal strings using base64. Example::
a = {"b": 3}
encode(a) # returns "gASVCgAAAAAAAAB9lIwBYpRLA3Mu"
decode(encode(a)) # returns {"b"}: 3
See also: :meth:`decode`
""" # encode
return base64.b64encode(dill.dumps(obj)).decode() # encode
def decode(s:str) -> object: # decode
"""Turns a string produced by :meth:`encode` back into a random object.""" # decode
return dill.loads(base64.b64decode(s.encode())) # decode
import hashlib # decode
def hash(msg:str) -> int: # hash
"""A universal hash function. Why not just use the builtin hash function?
Because it actually changes for different interpreter instances, it won't be
good for code that runs on multiple computers, so this is sort of like a
drop-in replacement. Although it does output an integer, don't rely on it having
the same numeric properties as a normal hash function.""" # hash
if not isinstance(msg, str): msg = f"{msg}" # hash
m = hashlib.sha256(); m.update(f"{msg}".encode()); return int.from_bytes(m.digest(), "big") # hash
hash(34) # hash
import traceback # hash
try: # hash
import asyncio, threading, time; from collections import deque # hash
_coroutineQueue = deque() # deque of (idx, coroutine) # hash
_coroutineAns = dict() # Dict[idx -> coroutine ans] # hash
_coroutineAutoIdx = 0 # hash
def _coroutineResolvingThread(): # hash
loop = asyncio.new_event_loop() # hash
while True: # hash
if len(_coroutineQueue) == 0: time.sleep(0.01) # hash
else: # hash
idx, coroutine = _coroutineQueue.popleft() # hash
# important to recover from exceptions # hash
try: ans = loop.run_until_complete(coroutine); _coroutineAns[idx] = {"type": "success", "ans": ans} # hash
except Exception as e: _coroutineAns[idx] = {"type": "failure", "e": f"{e}", "tb": traceback.format_exc()} # hash
threading.Thread(target=_coroutineResolvingThread, daemon=True).start() # hash
_resolve_err = None # hash
except Exception as e: _resolve_err = f"{e}"; _resolve_tb = traceback.format_exc() # hash
def resolve(coroutine): # resolve
"""Resolves coroutines without having to use await.
Example::
async def f(x):
await asyncio.sleep(1) # simulates heavy processing
return x + 4
k1.resolve(f(5))
This kinda feels just like ``asyncio.run(f(5))``, so why does this exist?
Here's the docstring of that method:
.. code-block:: text
This function cannot be called when another asyncio event loop is running in the same thread.
This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once.
This has more limitations that I found annoying to deal with day-to-day. I want
a function that always work, no matter my setup. So, how this function work
internally is that it spins up a new (permanent, daemon) thread, creates a new
event loop in that thread, then whenever a coroutine comes in, it runs it in
that event loop, returns, then pass control back to whatever thread that called
:meth:`resolve`. I'm sure this can still be messed up in some way, but seems
more useful than the builtin method.
This is just meant as a quick and dirty way to force resolving coroutines. Use
this sparingly, as performance won't be as good as a proper async application.
If you find yourself using this way too often, then I'd suggest reviewing how
:mod:`asyncio` works""" # resolve
global _coroutineAutoIdx # resolve
if _resolve_err: raise Exception(f"k1lib.resolve() not available, encoutered this error while starting up: {_resolve_err}. Traceback:\n{_resolve_tb}") # resolve
idx = _coroutineAutoIdx; _coroutineAutoIdx += 1; _coroutineQueue.append([idx, coroutine]) # resolve
while idx not in _coroutineAns: time.sleep(0.01) # resolve
ans = _coroutineAns[idx]; del _coroutineAns[idx] # resolve
if ans["type"] == "success": return ans["ans"] # resolve
else: raise Exception(f"Exception occured while trying to k1lib.resolve(): {ans['e']}. Traceback:\n{ans['tb']}") # resolve
def config(s:str): # config
"""Convenience method to grab JSON config file from config.mlexps.com""" # config
return json.loads("".join(k1lib.cli.cat(f"https://config.mlexps.com/{s}"))) # config
[docs]
def modbusCrc(msg:list[int]) -> list[int]: # modbusCrc
"""Calculates Modbus CRC-16 checksum for a given message.
Example::
k1.modbusCrc([0x01, 0x03, 0x00, 0x1e, 0x00, 0x01]) # returns (228, 12), or (0xe4, 0x0c)
""" # modbusCrc
crc = 0xFFFF # modbusCrc
for byte in msg: # modbusCrc
crc ^= byte # modbusCrc
for _ in range(8): # modbusCrc
if crc & 0x0001: crc = (crc >> 1) ^ 0xA001 # modbusCrc
else: crc >>= 1 # modbusCrc
return crc % 256, crc // 256 # modbusCrc
[docs]
def parseTimeStr(timeStr): # parseTimeStr
"""Parses time string and returns 2 unix timestamps.
Example::
now = time.time()
k1.parseTimeStr("3 minute") # returns [now - 3*60, now]
k1.parseTimeStr("3 min") # returns [now - 3*60, now]
k1.parseTimeStr("1 hour") # returns [now - 3600, now]
k1.parseTimeStr("2.3 hour") # returns [now - 3600*2.3, now]
You get the idea. This is useful in quickly parsing time series graphs between a
time period in the past and now. There are a bunch of period strings with their
corresponding multiplier::
1, second # second, sec
60, minute # minute, min
3600, hour # hour, hr
86400, day # day
7 * 86400, week # week, wk
30* 86400, month # month, mo
365.25*86400, year # year, yr
365.25*864000, decade # decade, dec
Fastest to access are the shortest period strings, like "sec", "min", etc. Use those if
you care about speed.
Unfortunately, this function can't parse composite time strings, like "1 hr 5 min"
:param timeStr: time string to parse""" # parseTimeStr
now = time.time() # parseTimeStr
if timeStr.endswith("sec"): return now-float(timeStr[:-3]), now # parseTimeStr
if timeStr.endswith("min"): return now-60 *float(timeStr[:-3]), now # parseTimeStr
if timeStr.endswith("hr"): return now-3600 *float(timeStr[:-2]), now # parseTimeStr
if timeStr.endswith("day"): return now-86400*float(timeStr[:-3]), now # parseTimeStr
if timeStr.endswith("wk"): return now-86400*7*float(timeStr[:-2]), now # parseTimeStr
if timeStr.endswith("mo"): return now-86400*30*float(timeStr[:-2]), now # parseTimeStr
if timeStr.endswith("yr"): return now-86400*365.25*float(timeStr[:-2]), now # parseTimeStr
if timeStr.endswith("dec"): return now-864000*365.25*float(timeStr[:-3]), now # parseTimeStr
# parseTimeStr
if timeStr.endswith("second"): return now-float(timeStr[:-6]), now # parseTimeStr
if timeStr.endswith("minute"): return now-60 *float(timeStr[:-6]), now # parseTimeStr
if timeStr.endswith("hour"): return now-3600 *float(timeStr[:-4]), now # parseTimeStr
if timeStr.endswith("week"): return now-86400*7*float(timeStr[:-4]), now # parseTimeStr
if timeStr.endswith("month"): return now-86400*30*float(timeStr[:-5]), now # parseTimeStr
if timeStr.endswith("year"): return now-86400*365.25*float(timeStr[:-4]), now # parseTimeStr
if timeStr.endswith("decade"): return now-864000*365.25*float(timeStr[:-6]), now # parseTimeStr
if timeStr == "lastHour": return now-3600, now # "last" + period is sort of a hidden mode, to ensure backwards compatible with other systems I have, but let's not expose it in the docs # parseTimeStr
if timeStr == "lastDay": return now-86400, now # parseTimeStr
if timeStr == "lastWeek": return now-86400*7, now # parseTimeStr
raise flask.FailureException(f"Don't understand '{timeStr}'. Has to be int followed by either 'hour' or 'day' or 'week'") # parseTimeStr