Source code for k1lib._basics

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import logging, warnings, os, time, re, gc, json, k1lib, importlib, urllib.parse, math, base64, dill, inspect, threading, datetime, traceback, functools, collections, numpy as np, pprint
from typing import Any, List, Union, Tuple, Iterator, Dict
try: import torch; hasTorch = True
except: hasTorch = False
__all__ = ["_docsUrl", "mplLock", "dep", "depCli", "isNumeric",
           "patch", "cache", "cron", "preload", "wrapMod", "wraps", "squeeze", "raiseEx",
           "numDigits", "limitLines",
           "limitChars", "showLog", "cleanDiv",
           "beep", "beepOnAvailable", "dontWrap",
           "debounce", "scaleSvg", "now", "pushNotification", "ticks", "digraph", "graph",
           "encode", "decode", "hash", "resolve", "config", "modbusCrc", "parseTimeStr"]
_docsUrl = "https://k1lib.com"
mplLock = threading.Lock()
class Dependency:                                                                # Dependency
    def __init__(self, s, alt:str=None, url:str=None): self.s = s; self.alt = alt; self.url = url; self.errorMsg = "" # Dependency
    def __getattr__(self, attr):                                                 # Dependency
        after = f"More information is available on {self.url}. " if self.url else f"" # Dependency
        raise ImportError(f"Python package `{self.alt or self.s}` not found. Please install it. {after}After installing, if you're in a notebook environment, then restart the kernel. This is the error when importing it:\n\n{self.errorMsg}") # Dependency
def oldDep(s, alt:str=None, url:str=None):                                       # oldDep
    try: return importlib.import_module(s)                                       # oldDep
    except: d = Dependency(s, alt, url); d.errorMsg = traceback.format_exc(); return d # oldDep
[docs] class dep: # dep def __init__(self, s, alt=None, url=None): # dep """Imports a potentially unavailable package Example:: graphviz = k1.dep("graphviz") # executes as normal, if graphviz is available, else throws an error g = graphviz.Digraph() I don't imagine this would be useful for everyday use though. This is mainly for writing this library, so that it can use optional dependencies. :param s: name of the package. Can be nested, like `matplotlib.pyplot` :param alt: (optional) name of the package to display in error message if package is not found :param url: (optional) url of the package's website, so that they know where to get official docs""" # dep self._k_s = s; self._k_alt = alt; self._k_url = url; self._k_mod = None; self.loaded = False # dep def _k_bootstrap(self): # dep try: self.loaded = True; self._k_mod = importlib.import_module(self._k_s); return self._k_mod # dep except: # dep after = f"More information is available on {self._k_url}. " if self._k_url else f"" # dep raise ImportError(f"Python package `{self._k_alt or self._k_s}` not found. Please install it. {after}After installing, if you're in a notebook environment, then restart the kernel. This is the error when importing it:\n\n{traceback.format_exc()}") # dep def __getattr__(self, attr): # dep """This code is a little magical. Say you're doing this:: np = k1.dep("numpy") np # it's an object of class k1lib.dep np.array([2, 3]) # instantiate a new array like usual. This is by running k1lib.__getattr__ np # now the old python object has turned into the actual numpy module, as if you import it using "import numpy as np" It's not foolproof, but works in most cases and gives a nice little speed boost Inspired from https://github.com/cart0113/pyjack""" # dep if not self.loaded: # dep replaceWith = self._k_bootstrap(); gc.collect(); arr = gc.get_referrers(self); iSelf = id(self) # dep for x in arr: # dep if isinstance(x, dict): # dep for k,v in x.items(): # dep if id(v) == iSelf: x[k] = replaceWith # dep return getattr(self._k_mod, attr) # dep
dep.mpl = dep("matplotlib", url="https://matplotlib.org/"); dep.plt = dep("matplotlib.pyplot", url="https://matplotlib.org/"); dep.cm = dep("matplotlib.cm", url="https://matplotlib.org/") # dep dep.pygments = dep("pygments", url="https://pygments.org/"); dep.torch = dep("torch", url="https://pytorch.org/") # dep try: import pygments.formatters; import pygments.lexers # these are not loaded by default, so have to prime them # dep except: pass # dep dep.bs4 = dep("bs4", "beautifulsoup4", "https://www.crummy.com/software/BeautifulSoup/") # dep dep.yaml = dep("yaml", "pyyaml", "https://pyyaml.org/"); dep.pd = dep("pandas", url="https://pandas.pydata.org/") # dep dep.requests = dep("requests", url="https://requests.readthedocs.io/"); dep.graphviz = dep("graphviz", url="https://graphviz.readthedocs.io/") # dep dep.PIL = dep("PIL", "Pillow", "https://pillow.readthedocs.io/"); dep.websockets = dep("websockets", url="https://websockets.readthedocs.io/") # dep dep.cv2 = dep("cv2", "opencv-python", "https://opencv.org/") # dep def depCli(prog="ls", insCmd=None, website=None): # depCli """Checks whether a particular command-line program is installed or not. Example:: k1.depCli("vim") This will throw an error if that program is not found :param prog: program name :param insCmd: (optional) installation command, could be "apt install vim" :param website: (optional) website that the user can learn more about the program""" # depCli if None | k1lib.cli.cmd(("where " if os.name == "nt" else "which ") + prog) | k1lib.cli.shape(0) == 0: # depCli insS = f" by executing '{insCmd}'" if insCmd else "" # depCli webS = f". Visit {website} to learn more about the program" if website else "" # depCli raise Exception(f"Command line program '{prog}' not found in PATH. Please install it{insS}{webS}") # depCli return True # depCli
[docs] def isNumeric(x) -> bool: # isNumeric """Returns whether object is actually a number""" # isNumeric return isinstance(x, (int, float, np.number)) # isNumeric
[docs] def patch(_class:type, name:str=None, docs:Union[str, Any]=None, static=False): # patch """Patches a function to a class/object. :param _class: object to patch function. Can also be a type :param name: name of patched function, if different from current :param docs: docs of patched function. Can be object with defined __doc__ attr :param static: whether to wrap this inside :class:`staticmethod` or not :return: modified function just before patching Intended to be used like this:: class A: def methA(self): return "inside methA" @k1lib.patch(A) def methB(self): return "inside methB" a = A() a.methB() # returns "inside methB" You can do ``@property`` attributes like this:: class A: pass @k1lib.patch(A, "propC") @property def propC(self): return self._propC @k1lib.patch(A, "propC") @propC.setter def propC(self, value): self._propC = value a = A(); a.propC = "abc" a.propC # returns "abc" The attribute name unfortunately has to be explicitly declared, as I can't really find a way to extract the original name. You can also do static methods like this:: class A: pass @k1lib.patch(A, static=True) def staticD(arg1): return arg1 A.staticD("def") # returns "def" """ # patch def inner(function): # patch _docs = docs # patch if _docs is not None and not isinstance(_docs, str): _docs = _docs.__doc__ # patch _docs = _docs or function.__doc__ or _class.__doc__ # patch _name = name or function.__qualname__.split(".")[-1] # patch # patch _function = staticmethod(function) if static else function # patch _function.__doc__ = _docs; setattr(_class, _name, _function); return _function # patch return inner # patch
def hashableAKw(args, kwargs): # hashableAKw a = tuple((tuple(x) if isinstance(x, list) else x) for x in args) # can accept lists of hashable items! # hashableAKw b = tuple(sorted(list(kwargs.items()), key=lambda x: x[0])); key = (*a, *b); return (*a, *b) # hashableAKw _k1_cacheD = {} # Dict[name -> {docs, func, clearF, hits, misses}] # hashableAKw _k1_cache_autoInc = [1] # hashableAKw
[docs] class cache: # cache
[docs] def __init__(self, maxsize=128, timeout=60, name=None, docs=None): # cache """Like functools.lru_cache, but with timeout period. Example:: @k1.cache(timeout=10) def slow_function(x): return x * 2 You can also quickly deploy a management plane to manage all caches:: app = flask.Flask(__name__) k1.cache.flask(app) app.run(host="0.0.0.0", port=80) See more about this at :meth:`flask` :param maxsize: max size of the cache :param timeout: timeout in seconds""" # cache if not name: name = f"_k1_cache_{_k1_cache_autoInc[0]}"; _k1_cache_autoInc[0] += 1 # cache if "/" in name: raise Exception("Can't have forward slash in the name") # cache self.maxsize = maxsize; self.timeout = timeout; self.cache = collections.OrderedDict(); self.name = name; self.docs = docs # cache
def __call__(self, func): # cache if self.name in _k1_cacheD: raise Exception(f"Name '{self.name}' has appeared before. Please use a different name") # cache _k1_cacheD[self.name] = meta = {"docs": self.docs, "func": func, "clearF": (lambda: self.cache.clear()), "hits": 0, "timeouts": 0, "misses": 0, "capacity": self.maxsize, "len": len(self.cache)} # cache @functools.wraps(func) # cache def wrapper(*args, _k1_cache_refresh=False, **kwargs): # _k1_cache_refresh: if True, returns the cached item, but then immediately fetches the new version. Not entirely sure where this is used in # cache if self.maxsize == 0: return func(*args, **kwargs) # cache key = hashableAKw(args, kwargs); now = time.time() # cache if key in self.cache: # cache result, timestamp = self.cache[key] # cache if _k1_cache_refresh: result = func(*args, **kwargs); self.cache[key] = (result, now); return result # cache if now - timestamp < self.timeout: self.cache.move_to_end(key); meta["hits"] += 1; return result # cache else: del self.cache[key]; meta["timeouts"] += 1 # cache else: meta["misses"] += 1 # cache result = func(*args, **kwargs) # cache if len(self.cache) >= self.maxsize: self.cache.popitem(last=False) # cache self.cache[key] = (result, now); meta["len"] = len(self.cache); return result # cache return wrapper # cache
[docs] @staticmethod # cache def allData(): # cache """Grabs ``Dict[name -> {docs, func, clearF}]`` of all caches""" # cache return _k1_cacheD # cache
[docs] @staticmethod # cache def flask(app, **kwargs): # cache """Attaches a cache management plane to a flask app. Example:: app = flask.Flask(__name__) k1.cache.flask(app) app.run(host="0.0.0.0", port=80) Then, you can access the route "/k1/cache" to see an overview of all caches :param app: flask app object :param kwargs: extra random kwargs that you want to add to ``app.route()`` function""" # cache k1 = k1lib; cli = k1.cli; viz = k1.viz # cache @app.route("/k1/cache", **kwargs) # cache def index(): ui3 = k1.cache.allData().items() | ~cli.apply(lambda k,v: [k, v["func"].__name__, inspect.getfile(v["func"]), v["hits"], v["misses"], v["timeouts"], round(v["hits"]/(v["hits"] + v["misses"] + v["timeouts"] + 1e-9)*100, 2), round(v["timeouts"]/(v["hits"] + v["misses"] + v["timeouts"] + 1e-9)*100, 2), v["len"], v["capacity"], round(v["len"]/(v["capacity"]+1e-9)*100, 2), f"<button onclick='clearCache(\"{k}\")'>Clear</button>", v["docs"]]) | cli.deref() | (cli.toJsFunc("term") | cli.grep("${term}") | viz.Table(["name", "func's name", "func's file name", "hits", "misses", "timeouts", "% hit", "% timeouts", "len", "capacity", "% capacity", "clear cache", "docs"], height=600, sortF=True)) | cli.op().interface() | cli.toHtml(); return f""" <h1>Caches</h1><div style="overflow-x: auto">{ui3}</div><script>async function clearCache(x) {{ await fetch(`/k1/cache/api/${{x}}/clear`); alert("Cleared"); }}</script>""" # cache @app.route("/k1/cache/api/<cache>/clear") # cache def api_clear(cache): k1.cache.allData()[cache]["clearF"](); return "ok" # cache
_k1_cron_delay_data = {} # Dict[name -> {docs, delay, logs, lastRun}] # cache _k1_cron_autoInc = [1] # cache
[docs] class cron: # cron stop = "_k1.cron.stopSignal" # cron
[docs] def __init__(self, f=None, delay=None, kw=None, name=None, docs=None, daemon=False, delayedStart=0): # cron """Sets up a cron job in another thread, running the decorated function in 2 modes: Mode 1: whenever ``f`` goes from False to True. It's activated when ``delay`` is not specified. Example:: @k1.cron(lambda minute: minute == 0) def f1(): # runs every hour ... # do some stuff @k1.cron(lambda second: second % 5 == 0) def f2(): # runs every 5 seconds ... # do some stuff So, the first function will run every hour, and the second function will run every 5 seconds. Pretty straightforward. The timing function ``f`` can be as complicated as you want, but it can only accept the following parameters: - year - month: 1-12 - day: 1-31 - weekday: 0-6, 0 for Monday - hour: 0-23 - minute: 0-59 - second: 0-59 Mode 2: every ``delay`` seconds. It's activated when ``delay`` is specified. Example:: @k1.cron(delay=10) def f1(): # runs every 10 seconds ... # do some stuff Theres a special sentinel ``k1.cron.stop`` that when returns, it terminates the cron loop, for both modes. Example:: @k1.cron(delay=10) def f1(): # only prints out "abc" once print("abc") return k1.cron.stop You can also get a python object describing past runs, stdout and errors by doing ``k1.cron.allData`` You can also quickly deploy a management plane to manage all cron jobs:: app = flask.Flask(__name__) k1.cron.flask(app) app.run(host="0.0.0.0", port=80) See more about this at :meth:`flask` :param f: function to trigger wrapped function when this transitions from False to True :param delay: if specified, just run the function repeatedly, delayed by this amount after running. Mutually exclusive with .f :param kw: extra keyword arguments to pass to the function :param name: optional name to show up in :meth:`allData` :param docs: optional documentation to show up in :meth:`allData` :param daemon: if True, makes the cron thread daemon :param delayedStart: if specified, delay the start of the first function execution to after this number of seconds""" # cron if not name: name = f"_k1_cron_{_k1_cron_autoInc[0]}"; _k1_cron_autoInc[0] += 1 # cron if "/" in name: raise Exception("Can't have forward slash in the name") # cron self.f = f; self.delay = delay; self.kw = kw; self.name = name; self.docs = docs; self.daemon = daemon; self.delayedStart = delayedStart # cron
[docs] @staticmethod # cron def allData(): # cron """Grabs ``Dict[name -> {docs, delay, logs}]`` for all delay crons""" # cron return _k1_cron_delay_data # cron
def __call__(self, func): # cron f = self.f; delay = self.delay; kw = self.kw; name = self.name; docs = self.docs; kw = kw or {} # cron if self.name in _k1_cron_delay_data: raise Exception(f"Name '{self.name}' has appeared before. Please use a different name") # cron _k1_cron_delay_data[name] = self.obj = {"docs": docs, "delay": delay, "logs": collections.deque([], 100), "lastRun": [0, ""], "delayedStart": self.delayedStart, "func": func} # cron if delay: # cron def startLoop(): # cron if self.delayedStart > 0: time.sleep(self.delayedStart) # cron while True: # cron beginTime = time.time() # cron try: # cron _k1_cron_delay_data[name]["lastRun"] = [beginTime, "now"] # cron res = func(**kw) # _k1_cron_delay_data[name]["logs"].append({"beginTime": beginTime, "duration": time.time() - beginTime, "success": True}) # don't log successful requests, cause that's too frequent # cron if res == cron.stop: break # cron except Exception as e: print(f"{type(e)}\ue004{e}"); print(traceback.format_exc()); _k1_cron_delay_data[name]["logs"].append({"beginTime": beginTime, "duration": time.time() - beginTime, "success": False, "exc": f"{type(e)}\ue004{e}", "tb": traceback.format_exc()}) # cron time.sleep(delay) # cron T = threading.Thread(target=startLoop) # cron if self.daemon: T.setDaemon(True) # cron T.start(); return func # cron else: # cron args = list(inspect.signature(f).parameters.keys()) # cron s = {"year", "month", "day", "weekday", "hour", "minute", "second"} # cron for arg in args: # cron if arg not in s: raise Exception(f"Unknown argument {arg}. Only (year, month, day, weekday, hour, minute, seconds) are allowed") # cron def startLoop(): # cron last = False; this = last # cron if self.delayedStart > 0: time.sleep(self.delayedStart) # cron while True: # cron a = datetime.datetime.now() # cron now = {"year": a.year, "month": a.month, "day": a.day, "weekday": a.weekday(), "hour": a.hour, "minute": a.minute, "second": a.second} # cron this = f(*[now[e] for e in args]) # cron if not last and this: # cron try: # cron if func(**kw) == cron.stop: break # cron except Exception as e: print(f"{type(e)}\ue004{e}"); print(traceback.format_exc()) # cron last = this; time.sleep(0.5) # cron T = threading.Thread(target=startLoop) # cron if self.daemon: T.setDaemon(True) # cron T.start(); return func # cron
[docs] @staticmethod # cron def flask(app, **kwargs): # cron """Attaches a cron management plane to a flask app. Example:: app = flask.Flask(__name__) k1.cron.flask(app) app.run(host="0.0.0.0", port=80) Then, you can access the route "/k1/cron" to see an overview of all cron jobs :param app: flask app object :param kwargs: extra random kwargs that you want to add to ``app.route()`` function""" # cron k1 = k1lib; cli = k1.cli; viz = k1.viz # cron @app.route("/k1/cron", **kwargs) # cron def index(): # cron ui1 = k1.cron.allData().items() | ~cli.apply(lambda k,v: [k, v["delay"], v["delayedStart"], v["lastRun"][1], v["func"].__name__, inspect.getfile(v["func"]), v["docs"]]) | cli.deref() | (cli.toJsFunc("term") | cli.grep("${term}") | viz.Table(["name", "delay", "delayedStart", "lastRun", "func's name", "func's file name", "docs"], height=600, sortF=True)) | cli.op().interface() | cli.toHtml() # cron ui2 = k1.cron.allData().items() | ~cli.apply(lambda k,v: [k, f"<h2>{k}</h2><pre>{pprint.pformat(v)}</pre>"]) | cli.deref() | (cli.toJsFunc("term") | cli.grep("${term}", col=0) | cli.cut(1) | cli.join("") | viz.Scroll(600)) | cli.op().interface() | cli.toHtml() # cron return f"""<h1>Crons</h1><div style="overflow-x: auto">{ui1}</div><div style="overflow-x: auto">{ui2}</div>""" # cron
@cron(delay=10, daemon=True, delayedStart=8, name="scanCron", docs="scans through all cron jobs (k1.cron) to calculate time string of how long ago it was executed") # cron def scanCron(): # scans all crons and inject in seconds ago, things like that # scanCron now = time.time() # scanCron for key in _k1_cron_delay_data: # scanCron beginTime = _k1_cron_delay_data[key]["lastRun"][0] # scanCron _k1_cron_delay_data[key]["lastRun"] = [beginTime, k1lib.fmt.time(now - beginTime, metric=True) + " ago"] # scanCron _k1_preload_autoInc = [1] # scanCron
[docs] class preload: # preload
[docs] def __init__(self, refreshTime=10, inactiveTime=60, maxsize=128, name=None, docs=None): # preload """Wraps a function and pre executes that function every once in a while if the function is being used. Example:: @k1.preload(timeout=10) def slow_func(x): time.sleep(0.1); return x * 2 slow_func(3) # takes 100ms time.sleep(4); slow_func(3) # takes 2us time.sleep(4); slow_func(3) # takes 2us time.sleep(4); slow_func(3) # takes 2us time.sleep(4); slow_func(3) # takes 2us time.sleep(15); slow_func(3) # takes 100ms Normally, you don't have to use this, use :class:`~k1lib.cache` instead. This is for those scenarios where you can't afford to have any delays, as :class:`~k1lib.cache` will sometime take quite a while to run, but most of the time it runs fast. With this decorator, if you use the function frequent enough (less than ``inactiveTime``), then you will never experience any kind of delays, no matter how infrequent. This is good for generating plots that don't change for a long time, but requires fast server response all the time. :param refreshTime: how often to refresh the internal cache to make content fresh? :param inactiveTime: how long before a function's argument considered inactive and will stop refreshing automatically :param maxsize: maximum size of the internal cache. See also :class:`~k1lib.cache` """ # preload if not name: name = f"_k1_preload_{_k1_preload_autoInc[0]}"; _k1_preload_autoInc[0] += 1 # preload self.refreshTime = refreshTime; self.inactiveTime = inactiveTime; self.maxsize = maxsize # preload self.cache = cache(maxsize, refreshTime*3, name=f"preload: {name or '(no name)'}", docs=docs); self.name = name; self.docs = docs # preload
def __call__(self, func): # preload cachedFunc = self.cache(func); lastFetches = collections.defaultdict(lambda: [0, None]) # Dict[key -> [lastFetch, og args]] # preload @cron(delay=self.refreshTime, name=f'preload: {(self.name or "(no name)")}', docs=self.docs) # preload def watchdog(): # preload for lastFetch, [args, kwargs] in list(lastFetches.values()): # preload if time.time() - lastFetch < self.inactiveTime: cachedFunc(*args, **kwargs, _k1_cache_refresh=True) # preload @functools.wraps(func) # preload def wrapper(*args, **kwargs): key = hashableAKw(args, kwargs); lastFetches[key][0] = time.time(); lastFetches[key][1] = [args, kwargs]; return cachedFunc(*args, **kwargs) # preload return wrapper # preload
[docs] class wrapMod: # wrapMod
[docs] def __init__(self, m, moduleName=None): # wrapMod """Wraps around a module, and only suggest symbols in __all__ list defined inside the module. Example:: from . import randomModule randomModule = wrapMod(randomModule) :param m: the imported module :param moduleName: optional new module name for elements (their __module__ attr)""" # wrapMod self._init(m, moduleName) # wrapMod
def _init(self, m, moduleName): # wrapMod if moduleName is not None: # wrapMod for v in m.__dict__.values(): # wrapMod v.__module__ = moduleName # wrapMod self._wrapMod_moduleName = moduleName; self._wrapMod_m = m # wrapMod self.__dict__.update(m.__dict__) # wrapMod self._wrapMod_extraDirs = [] # wrapMod def __dir__(self): return self._wrapMod_m.__all__ + self._wrapMod_extraDirs # wrapMod def __str__(self): return str(self._wrapMod_m) # wrapMod def __getstate__(self): return {"m": self._wrapMod_m, "name": self._wrapMod_moduleName} # wrapMod def __setstate__(self, d): self._init(d["m"], d["name"]) # wrapMod def __repr__(self): return str(self) # wrapMod
[docs] def wraps(ogF): # wraps """Kinda like :meth:`functools.wraps`, but don't update __annotations__.""" # wraps def inner(f): # wraps f.__doc__ = ogF.__doc__ # wraps f.__name__ = ogF.__name__ # wraps f.__qualname__ = ogF.__qualname__ # wraps f.__module__ = ogF.__module__ # wraps return f # wraps return inner # wraps
[docs] def squeeze(_list:Union[list, tuple, Any], hard=False): # squeeze """If list only has 1 element, returns that element, else returns original list. :param hard: If True, then if list/tuple, filters out None, and takes the first element out even if that list/tuple has more than 1 element""" # squeeze if isinstance(_list, (tuple, list)): # squeeze if hard: return [e for e in _list if e != None and e != ""][0] # squeeze elif len(_list) == 1: return _list[0] # squeeze if hasTorch and isinstance(_list, torch.Tensor): return _list.squeeze() # squeeze return _list # squeeze
[docs] def raiseEx(ex:Exception): # raiseEx """Raises a specific exception. May be useful in lambdas""" # raiseEx raise ex # raiseEx
[docs] def numDigits(num) -> int: # numDigits """Get the number of digits/characters of this number/object""" # numDigits return len(f"{num}") # numDigits
[docs] def limitLines(s:str, limit:int=10) -> str: # limitLines """If input string is too long, truncates it and adds ellipsis""" # limitLines splits = s.split("\n") # limitLines if len(splits) > limit: return "\n".join(splits[:limit]) + "\n....." # limitLines else: return s # limitLines
[docs] def limitChars(s:str, limit:int=50): # limitChars """If input string is too long, truncates to first `limit` characters of the first line""" # limitChars if s is None: return "" # limitChars s = f"{s}".split("\n")[0] # limitChars return s[:limit-3] + "..." if len(s) > limit else s # limitChars
[docs] def showLog(loggerName:str="", level:int=logging.DEBUG): # showLog """Prints out logs of a particular logger at a particular level""" # showLog logger = logging.getLogger(loggerName); logger.setLevel(level) # showLog sh = logging.StreamHandler(); sh.setLevel(logging.DEBUG); logger.addHandler(sh) # showLog
def cleanDiv(_list:List[float], total:int) -> List[int]: # cleanDiv """Cleanly divides :total: into int segments with weights specified by :_list:. Looks like this: ``(_list / _list.sum()) * total``, but everything is an ``int``""" # cleanDiv _list = np.array(_list) # cleanDiv _list = (_list*total/_list.sum()).astype(int) # cleanDiv _list[-1] = total - _list[:-1].sum() # cleanDiv return _list # cleanDiv
[docs] def beep(seconds=0.3): # beep """Plays a beeping sound, may be useful as notification for long-running tasks""" # beep try: import IPython; IPython.core.display.display_html(IPython.display.HTML(f"""<script>(new Audio('data:audio/wav;base64,UklGRl9vT19XQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YU'+Array(Math.round(3.3333e3*{seconds})).join(123))).play();</script>""")); # beep except: os.system("printf '\a'") # beep
def beepOnAvailable(url:str, timeout=5, **kwargs): # beepOnAvailable """Tries to connect with a url repeatedly, and if successful, plays a beep sound""" # beepOnAvailable import requests # beepOnAvailable try: # beepOnAvailable while True: # beepOnAvailable time.sleep(1); successful = False # beepOnAvailable try: # beepOnAvailable if requests.get(url, timeout=timeout, **kwargs).ok: # beepOnAvailable successful = True # beepOnAvailable except: pass # beepOnAvailable if successful: # beepOnAvailable beep(); break # beepOnAvailable except KeyboardInterrupt: print("Still not available") # beepOnAvailable
[docs] def dontWrap(): # dontWrap """Don't wrap horizontally when in a notebook. Normally, if you're displaying something long, like the output of ``print('a'*1000)`` in a notebook, it will display it in multiple lines. This may be undesirable, so this solves that by displaying some HTML with css styles so that the notebook doesn't wrap.""" # dontWrap try: # dontWrap from IPython.core.display import display, HTML # dontWrap display(HTML("""<style> div.jp-OutputArea-output pre {white-space: pre;} div.output_area pre {white-space: pre;} div.CodeMirror > div.highlight {overflow-y: auto;} </style>""")) # dontWrap except: pass#print("Can't run dontWrap()") # dontWrap
import asyncio, functools # dontWrap from threading import Timer as ThreadingTimer # dontWrap class AsyncTimer: # rename if want to use # AsyncTimer def __init__(self, timeout, callback): # AsyncTimer self._timeout = timeout; self._callback = callback # AsyncTimer async def _job(self): # AsyncTimer await asyncio.sleep(self._timeout); self._callback() # AsyncTimer def start(self): self._task = asyncio.ensure_future(self._job()) # AsyncTimer def cancel(self): self._task.cancel() # AsyncTimer
[docs] def debounce(wait, threading=False): # debounce """Decorator that will postpone a function's execution until after ``wait`` seconds have elapsed since the last time it was invoked. Taken from `ipywidgets <https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20Events.html>`_. Example:: import k1lib, time; value = 0 @k1lib.debounce(0.5, True) def f(x): global value; value = x**2 f(2); time.sleep(0.3); f(3) print(value) # prints "0" time.sleep(0.7) print(value) # prints "9" :param wait: wait time in seconds :param threading: if True, use multiple threads, else just use async stuff""" # debounce Timer = ThreadingTimer if threading else AsyncTimer # debounce def decorator(fn): # debounce timer = None # debounce def debounced(*args, **kwargs): # debounce nonlocal timer # debounce if timer is not None: timer.cancel() # debounce timer = Timer(wait, lambda: fn(*args, **kwargs)) # debounce timer.start() # debounce functools.update_wrapper(debounced, fn); return debounced # debounce return decorator # debounce
[docs] def scaleSvg(svg:str, scale:float=None) -> str: # scaleSvg """Scales an svg xml string by some amount.""" # scaleSvg if scale is None: scale = k1lib.settings.svgScale # scaleSvg wS = w = re.findall("width=\"\\d*pt\"", svg)[0] # scaleSvg hS = w = re.findall("height=\"\\d*pt\"", svg)[0] # scaleSvg w = int(int(re.findall("\\d+", wS)[0])*scale) # scaleSvg h = int(int(re.findall("\\d+", hS)[0])*scale) # scaleSvg svg = re.sub(wS, f'width="{w}pt"', svg) # scaleSvg svg = re.sub(hS, f'height="{h}pt"', svg) # scaleSvg return svg # scaleSvg
[docs] def now(): # now """Convenience function for returning a simple time string, with timezone and whatnot.""" # now import datetime; return datetime.datetime.now().astimezone().isoformat() # now
[docs] def pushNotification(title="Some title", content="Some content", url="https://k1lib.com"): # pushNotification """Sends push notification to your device. Setting things up: - Download this app: https://play.google.com/store/apps/details?id=net.xdroid.pn - Set the `settings.pushNotificationKey` key obtained from the app. Example key: `k-967fe9...` - Alternatively, set the environment variable `k1lib_pushNotificationKey` instead - Run the function as usual""" # pushNotification import requests # pushNotification key = k1lib.settings.pushNotificationKey # pushNotification requests.get("http://xdroid.net/api/message?" + urllib.parse.urlencode({'k': key, 't': title, 'c': content, 'u': url})) # pushNotification print("Pushed!") # pushNotification
tickCheckpoints = np.array([1, 2, 2.5, 5, 10])/10 # pushNotification
[docs] def ticks(x:float, y:float, rounding:int=6): # ticks """Get tick locations in a plot that look reasonable. Example:: ticks(-5, 40) # returns [-10.0, -5.0, 0.0, 5.0, 10.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 45.0] ticks(0.05, 0.07) # returns [0.05, 0.0525, 0.055, 0.0575, 0.06, 0.0625, 0.065, 0.0675, 0.07, 0.0725] ticks(-5, 5) # returns [-6.0, -5.0, -4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0] So essentially, whenever you try to plot something, you want both the x and y axis to not have too many lines, and that the tick values snap to a nice number. Normally you don't have to do this, as matplotlib does this automatically behind the scenes, but sometimes you need to implement plotting again, in strange situations, so this function comes in handy :param x: start of interval :param y: end of interval :param rounding: internally, it rounds tick values to this number of digits, to fix tiny float overflows that make numbers ugly. So you can disable it if you're working with really small numbers""" # ticks cli = k1lib.cli; ld = math.log10(y-x); scale = ld-ld%1 # log delta # ticks sel = ((ld%1 - tickCheckpoints)**2).argmin() # selecting the best fit tick checkpoint # ticks interval = 10**scale*tickCheckpoints[sel] # interval between ticks # ticks seed = int(y/10**scale)*10**scale # seed tick # ticks f1 = cli.applySerial(cli.op()-interval) | cli.breakIf(cli.op()<x-interval) # ticks f2 = cli.applySerial(cli.op()+interval) | cli.breakIf(cli.op()>y+interval) # ticks # finally, use the seed to expand in both directions to get all the ticks # ticks return seed | f1 & f2 | cli.joinStreams() | cli.aS(set) | cli.sort(None) | cli.apply(round, ndigits=rounding) | cli.deref() | cli.aS(np.array) # ticks
try: # ticks import graphviz # ticks
[docs] def digraph(): # ticks """Convenience method for creating a new graphviz Digraph. Example:: g = k1lib.graph() g("a", "b", "c") g # displays arrows from "a" to "b" and "a" to "c" """ # ticks return graphviz.Digraph(graph_attr={"rankdir":"TB"}) # ticks
[docs] def graph(): # ticks """Convenience method for creating a new graphviz Graph. See also: :meth:`digraph`""" # ticks return graphviz.Graph(graph_attr={"rankdir":"TB"}) # ticks
except ImportError: # ticks digraph = graph = lambda: print("Module `graphviz` not found! Please install it first, something like `pip install graphviz`") # ticks def encode(obj:object) -> str: # encode """Serialize random objects into bytes, then turn those bytes to normal strings using base64. Example:: a = {"b": 3} encode(a) # returns "gASVCgAAAAAAAAB9lIwBYpRLA3Mu" decode(encode(a)) # returns {"b"}: 3 See also: :meth:`decode` """ # encode return base64.b64encode(dill.dumps(obj)).decode() # encode def decode(s:str) -> object: # decode """Turns a string produced by :meth:`encode` back into a random object.""" # decode return dill.loads(base64.b64decode(s.encode())) # decode import hashlib # decode def hash(msg:str) -> int: # hash """A universal hash function. Why not just use the builtin hash function? Because it actually changes for different interpreter instances, it won't be good for code that runs on multiple computers, so this is sort of like a drop-in replacement. Although it does output an integer, don't rely on it having the same numeric properties as a normal hash function.""" # hash if not isinstance(msg, str): msg = f"{msg}" # hash m = hashlib.sha256(); m.update(f"{msg}".encode()); return int.from_bytes(m.digest(), "big") # hash hash(34) # hash import traceback # hash try: # hash import asyncio, threading, time; from collections import deque # hash _coroutineQueue = deque() # deque of (idx, coroutine) # hash _coroutineAns = dict() # Dict[idx -> coroutine ans] # hash _coroutineAutoIdx = 0 # hash def _coroutineResolvingThread(): # hash loop = asyncio.new_event_loop() # hash while True: # hash if len(_coroutineQueue) == 0: time.sleep(0.01) # hash else: # hash idx, coroutine = _coroutineQueue.popleft() # hash # important to recover from exceptions # hash try: ans = loop.run_until_complete(coroutine); _coroutineAns[idx] = {"type": "success", "ans": ans} # hash except Exception as e: _coroutineAns[idx] = {"type": "failure", "e": f"{e}", "tb": traceback.format_exc()} # hash threading.Thread(target=_coroutineResolvingThread, daemon=True).start() # hash _resolve_err = None # hash except Exception as e: _resolve_err = f"{e}"; _resolve_tb = traceback.format_exc() # hash def resolve(coroutine): # resolve """Resolves coroutines without having to use await. Example:: async def f(x): await asyncio.sleep(1) # simulates heavy processing return x + 4 k1.resolve(f(5)) This kinda feels just like ``asyncio.run(f(5))``, so why does this exist? Here's the docstring of that method: .. code-block:: text This function cannot be called when another asyncio event loop is running in the same thread. This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once. This has more limitations that I found annoying to deal with day-to-day. I want a function that always work, no matter my setup. So, how this function work internally is that it spins up a new (permanent, daemon) thread, creates a new event loop in that thread, then whenever a coroutine comes in, it runs it in that event loop, returns, then pass control back to whatever thread that called :meth:`resolve`. I'm sure this can still be messed up in some way, but seems more useful than the builtin method. This is just meant as a quick and dirty way to force resolving coroutines. Use this sparingly, as performance won't be as good as a proper async application. If you find yourself using this way too often, then I'd suggest reviewing how :mod:`asyncio` works""" # resolve global _coroutineAutoIdx # resolve if _resolve_err: raise Exception(f"k1lib.resolve() not available, encoutered this error while starting up: {_resolve_err}. Traceback:\n{_resolve_tb}") # resolve idx = _coroutineAutoIdx; _coroutineAutoIdx += 1; _coroutineQueue.append([idx, coroutine]) # resolve while idx not in _coroutineAns: time.sleep(0.01) # resolve ans = _coroutineAns[idx]; del _coroutineAns[idx] # resolve if ans["type"] == "success": return ans["ans"] # resolve else: raise Exception(f"Exception occured while trying to k1lib.resolve(): {ans['e']}. Traceback:\n{ans['tb']}") # resolve def config(s:str): # config """Convenience method to grab JSON config file from config.mlexps.com""" # config return json.loads("".join(k1lib.cli.cat(f"https://config.mlexps.com/{s}"))) # config
[docs] def modbusCrc(msg:list[int]) -> list[int]: # modbusCrc """Calculates Modbus CRC-16 checksum for a given message. Example:: k1.modbusCrc([0x01, 0x03, 0x00, 0x1e, 0x00, 0x01]) # returns (228, 12), or (0xe4, 0x0c) """ # modbusCrc crc = 0xFFFF # modbusCrc for byte in msg: # modbusCrc crc ^= byte # modbusCrc for _ in range(8): # modbusCrc if crc & 0x0001: crc = (crc >> 1) ^ 0xA001 # modbusCrc else: crc >>= 1 # modbusCrc return crc % 256, crc // 256 # modbusCrc
[docs] def parseTimeStr(timeStr): # parseTimeStr """Parses time string and returns 2 unix timestamps. Example:: now = time.time() k1.parseTimeStr("3 minute") # returns [now - 3*60, now] k1.parseTimeStr("3 min") # returns [now - 3*60, now] k1.parseTimeStr("1 hour") # returns [now - 3600, now] k1.parseTimeStr("2.3 hour") # returns [now - 3600*2.3, now] You get the idea. This is useful in quickly parsing time series graphs between a time period in the past and now. There are a bunch of period strings with their corresponding multiplier:: 1, second # second, sec 60, minute # minute, min 3600, hour # hour, hr 86400, day # day 7 * 86400, week # week, wk 30* 86400, month # month, mo 365.25*86400, year # year, yr 365.25*864000, decade # decade, dec Fastest to access are the shortest period strings, like "sec", "min", etc. Use those if you care about speed. Unfortunately, this function can't parse composite time strings, like "1 hr 5 min" :param timeStr: time string to parse""" # parseTimeStr now = time.time() # parseTimeStr if timeStr.endswith("sec"): return now-float(timeStr[:-3]), now # parseTimeStr if timeStr.endswith("min"): return now-60 *float(timeStr[:-3]), now # parseTimeStr if timeStr.endswith("hr"): return now-3600 *float(timeStr[:-2]), now # parseTimeStr if timeStr.endswith("day"): return now-86400*float(timeStr[:-3]), now # parseTimeStr if timeStr.endswith("wk"): return now-86400*7*float(timeStr[:-2]), now # parseTimeStr if timeStr.endswith("mo"): return now-86400*30*float(timeStr[:-2]), now # parseTimeStr if timeStr.endswith("yr"): return now-86400*365.25*float(timeStr[:-2]), now # parseTimeStr if timeStr.endswith("dec"): return now-864000*365.25*float(timeStr[:-3]), now # parseTimeStr # parseTimeStr if timeStr.endswith("second"): return now-float(timeStr[:-6]), now # parseTimeStr if timeStr.endswith("minute"): return now-60 *float(timeStr[:-6]), now # parseTimeStr if timeStr.endswith("hour"): return now-3600 *float(timeStr[:-4]), now # parseTimeStr if timeStr.endswith("week"): return now-86400*7*float(timeStr[:-4]), now # parseTimeStr if timeStr.endswith("month"): return now-86400*30*float(timeStr[:-5]), now # parseTimeStr if timeStr.endswith("year"): return now-86400*365.25*float(timeStr[:-4]), now # parseTimeStr if timeStr.endswith("decade"): return now-864000*365.25*float(timeStr[:-6]), now # parseTimeStr if timeStr == "lastHour": return now-3600, now # "last" + period is sort of a hidden mode, to ensure backwards compatible with other systems I have, but let's not expose it in the docs # parseTimeStr if timeStr == "lastDay": return now-86400, now # parseTimeStr if timeStr == "lastWeek": return now-86400*7, now # parseTimeStr raise flask.FailureException(f"Don't understand '{timeStr}'. Has to be int followed by either 'hour' or 'day' or 'week'") # parseTimeStr