Source code for k1lib._basics

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import logging, warnings, os, time, re, json, k1lib, importlib, urllib.parse, math, base64, dill, inspect, threading, datetime, traceback
import numpy as np
from typing import Any, List, Union, Tuple, Iterator, Dict
from functools import partial
try: import torch; hasTorch = True
except: hasTorch = False
__all__ = ["_docsUrl", "isNumeric",
           "patch", "cron", "wrapMod", "wraps", "squeeze", "raiseEx",
           "numDigits", "limitLines",
           "limitChars", "showLog", "cleanDiv",
           "beep", "beepOnAvailable", "dontWrap",
           "debounce", "scaleSvg", "now", "pushNotification", "dep", "depCli", "ticks", "digraph", "graph",
           "encode", "decode", "hash", "resolve", "config"]
_docsUrl = "https://k1lib.com"
[docs]def isNumeric(x) -> bool: # isNumeric """Returns whether object is actually a number""" # isNumeric return isinstance(x, (int, float, np.number)) # isNumeric
[docs]def patch(_class:type, name:str=None, docs:Union[str, Any]=None, static=False): # patch """Patches a function to a class/object. :param _class: object to patch function. Can also be a type :param name: name of patched function, if different from current :param docs: docs of patched function. Can be object with defined __doc__ attr :param static: whether to wrap this inside :class:`staticmethod` or not :return: modified function just before patching Intended to be used like this:: class A: def methA(self): return "inside methA" @k1lib.patch(A) def methB(self): return "inside methB" a = A() a.methB() # returns "inside methB" You can do ``@property`` attributes like this:: class A: pass @k1lib.patch(A, "propC") @property def propC(self): return self._propC @k1lib.patch(A, "propC") @propC.setter def propC(self, value): self._propC = value a = A(); a.propC = "abc" a.propC # returns "abc" The attribute name unfortunately has to be explicitly declared, as I can't really find a way to extract the original name. You can also do static methods like this:: class A: pass @k1lib.patch(A, static=True) def staticD(arg1): return arg1 A.staticD("def") # returns "def" """ # patch def inner(function): # patch _docs = docs # patch if _docs is not None and not isinstance(_docs, str): _docs = _docs.__doc__ # patch _docs = _docs or function.__doc__ or _class.__doc__ # patch _name = name or function.__qualname__.split(".")[-1] # patch # patch _function = staticmethod(function) if static else function # patch _function.__doc__ = _docs; setattr(_class, _name, _function); return _function # patch return inner # patch
[docs]def cron(f): # cron """Sets up a cron job in another thread, running the decorated function whenever ``f`` goes from False to True. Example:: @k1.cron(lambda minute: minute == 0) def f1(): # runs every hour ... # do some stuff @k1.cron(lambda second: second % 5 == 0) def f2(): # runs every 5 seconds ... # do some stuff So, the first function will run every hour, and the second function will run every 5 seconds. Pretty straightforward. The timing function ``f`` can be as complicated as you want, but it can only accept the following parameters: - year - month: 1-12 - day: 1-31 - weekday: 0-6, 0 for Monday - hour: 0-23 - minute: 0-59 - second: 0-59""" # cron def inner(func): # cron args = list(inspect.signature(f).parameters.keys()) # cron s = {"year", "month", "day", "weekday", "hour", "minute", "second"} # cron for arg in args: # cron if arg not in s: raise Exception(f"Unknown argument {arg}. Only (year, month, day, weekday, hour, minute, seconds) are allowed") # cron def startLoop(): # cron last = False; this = last # cron while True: # cron a = datetime.datetime.now() # cron now = {"year": a.year, "month": a.month, "day": a.day, "weekday": a.weekday(), "hour": a.hour, "minute": a.minute, "second": a.second} # cron this = f(*[now[e] for e in args]) # cron if not last and this: func() # cron last = this; time.sleep(0.5) # cron threading.Thread(target=startLoop).start() # cron return func # cron return inner # cron
[docs]class wrapMod: # wrapMod
[docs] def __init__(self, m, moduleName=None): # wrapMod """Wraps around a module, and only suggest symbols in __all__ list defined inside the module. Example:: from . import randomModule randomModule = wrapMod(randomModule) :param m: the imported module :param moduleName: optional new module name for elements (their __module__ attr)""" # wrapMod self._init(m, moduleName) # wrapMod
def _init(self, m, moduleName): # wrapMod if moduleName is not None: # wrapMod for v in m.__dict__.values(): # wrapMod v.__module__ = moduleName # wrapMod self._wrapMod_moduleName = moduleName; self._wrapMod_m = m # wrapMod self.__dict__.update(m.__dict__) # wrapMod self._wrapMod_extraDirs = [] # wrapMod def __dir__(self): return self._wrapMod_m.__all__ + self._wrapMod_extraDirs # wrapMod def __str__(self): return str(self._wrapMod_m) # wrapMod def __getstate__(self): return {"m": self._wrapMod_m, "name": self._wrapMod_moduleName} # wrapMod def __setstate__(self, d): self._init(d["m"], d["name"]) # wrapMod def __repr__(self): return str(self) # wrapMod
[docs]def wraps(ogF): # wraps """Kinda like :meth:`functools.wraps`, but don't update __annotations__.""" # wraps def inner(f): # wraps f.__doc__ = ogF.__doc__ # wraps f.__name__ = ogF.__name__ # wraps f.__qualname__ = ogF.__qualname__ # wraps f.__module__ = ogF.__module__ # wraps return f # wraps return inner # wraps
[docs]def squeeze(_list:Union[list, tuple, Any], hard=False): # squeeze """If list only has 1 element, returns that element, else returns original list. :param hard: If True, then if list/tuple, filters out None, and takes the first element out even if that list/tuple has more than 1 element""" # squeeze if isinstance(_list, (tuple, list)): # squeeze if hard: return [e for e in _list if e != None and e != ""][0] # squeeze elif len(_list) == 1: return _list[0] # squeeze if hasTorch and isinstance(_list, torch.Tensor): return _list.squeeze() # squeeze return _list # squeeze
[docs]def raiseEx(ex:Exception): # raiseEx """Raises a specific exception. May be useful in lambdas""" # raiseEx raise ex # raiseEx
[docs]def numDigits(num) -> int: # numDigits """Get the number of digits/characters of this number/object""" # numDigits return len(f"{num}") # numDigits
[docs]def limitLines(s:str, limit:int=10) -> str: # limitLines """If input string is too long, truncates it and adds ellipsis""" # limitLines splits = s.split("\n") # limitLines if len(splits) > limit: return "\n".join(splits[:limit]) + "\n....." # limitLines else: return s # limitLines
[docs]def limitChars(s:str, limit:int=50): # limitChars """If input string is too long, truncates to first `limit` characters of the first line""" # limitChars if s is None: return "" # limitChars s = f"{s}".split("\n")[0] # limitChars return s[:limit-3] + "..." if len(s) > limit else s # limitChars
[docs]def showLog(loggerName:str="", level:int=logging.DEBUG): # showLog """Prints out logs of a particular logger at a particular level""" # showLog logger = logging.getLogger(loggerName); logger.setLevel(level) # showLog sh = logging.StreamHandler(); sh.setLevel(logging.DEBUG); logger.addHandler(sh) # showLog
def cleanDiv(_list:List[float], total:int) -> List[int]: # cleanDiv """Cleanly divides :total: into int segments with weights specified by :_list:. Looks like this: ``(_list / _list.sum()) * total``, but everything is an ``int``""" # cleanDiv _list = np.array(_list) # cleanDiv _list = (_list*total/_list.sum()).astype(int) # cleanDiv _list[-1] = total - _list[:-1].sum() # cleanDiv return _list # cleanDiv
[docs]def beep(seconds=0.3): # beep """Plays a beeping sound, may be useful as notification for long-running tasks""" # beep try: import IPython; IPython.core.display.display_html(IPython.display.HTML(f"""<script>(new Audio('data:audio/wav;base64,UklGRl9vT19XQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YU'+Array(Math.round(3.3333e3*{seconds})).join(123))).play();</script>""")); # beep except: os.system("printf '\a'") # beep
def beepOnAvailable(url:str, timeout=5, **kwargs): # beepOnAvailable """Tries to connect with a url repeatedly, and if successful, plays a beep sound""" # beepOnAvailable import requests # beepOnAvailable try: # beepOnAvailable while True: # beepOnAvailable time.sleep(1); successful = False # beepOnAvailable try: # beepOnAvailable if requests.get(url, timeout=timeout, **kwargs).ok: # beepOnAvailable successful = True # beepOnAvailable except: pass # beepOnAvailable if successful: # beepOnAvailable beep(); break # beepOnAvailable except KeyboardInterrupt: print("Still not available") # beepOnAvailable
[docs]def dontWrap(): # dontWrap """Don't wrap horizontally when in a notebook. Normally, if you're displaying something long, like the output of ``print('a'*1000)`` in a notebook, it will display it in multiple lines. This may be undesirable, so this solves that by displaying some HTML with css styles so that the notebook doesn't wrap.""" # dontWrap try: # dontWrap from IPython.core.display import display, HTML # dontWrap display(HTML("""<style> div.jp-OutputArea-output pre {white-space: pre;} div.output_area pre {white-space: pre;} div.CodeMirror > div.highlight {overflow-y: auto;} </style>""")) # dontWrap except: pass#print("Can't run dontWrap()") # dontWrap
import asyncio, functools # dontWrap from threading import Timer as ThreadingTimer # dontWrap class AsyncTimer: # rename if want to use # AsyncTimer def __init__(self, timeout, callback): # AsyncTimer self._timeout = timeout; self._callback = callback # AsyncTimer async def _job(self): # AsyncTimer await asyncio.sleep(self._timeout); self._callback() # AsyncTimer def start(self): self._task = asyncio.ensure_future(self._job()) # AsyncTimer def cancel(self): self._task.cancel() # AsyncTimer
[docs]def debounce(wait, threading=False): # debounce """Decorator that will postpone a function's execution until after ``wait`` seconds have elapsed since the last time it was invoked. Taken from `ipywidgets <https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20Events.html>`_. Example:: import k1lib, time; value = 0 @k1lib.debounce(0.5, True) def f(x): global value; value = x**2 f(2); time.sleep(0.3); f(3) print(value) # prints "0" time.sleep(0.7) print(value) # prints "9" :param wait: wait time in seconds :param threading: if True, use multiple threads, else just use async stuff""" # debounce Timer = ThreadingTimer if threading else AsyncTimer # debounce def decorator(fn): # debounce timer = None # debounce def debounced(*args, **kwargs): # debounce nonlocal timer # debounce if timer is not None: timer.cancel() # debounce timer = Timer(wait, lambda: fn(*args, **kwargs)) # debounce timer.start() # debounce functools.update_wrapper(debounced, fn); return debounced # debounce return decorator # debounce
[docs]def scaleSvg(svg:str, scale:float=None) -> str: # scaleSvg """Scales an svg xml string by some amount.""" # scaleSvg if scale is None: scale = k1lib.settings.svgScale # scaleSvg wS = w = re.findall("width=\"\d*pt\"", svg)[0] # scaleSvg hS = w = re.findall("height=\"\d*pt\"", svg)[0] # scaleSvg w = int(int(re.findall("\d+", wS)[0])*scale) # scaleSvg h = int(int(re.findall("\d+", hS)[0])*scale) # scaleSvg svg = re.sub(wS, f'width="{w}pt"', svg) # scaleSvg svg = re.sub(hS, f'height="{h}pt"', svg) # scaleSvg return svg # scaleSvg
import datetime # scaleSvg
[docs]def now(): # now """Convenience function for returning a simple time string, with timezone and whatnot.""" # now return datetime.datetime.now().astimezone().isoformat() # now
now() # now
[docs]def pushNotification(title="Some title", content="Some content", url="https://k1lib.com"): # pushNotification """Sends push notification to your device. Setting things up: - Download this app: https://play.google.com/store/apps/details?id=net.xdroid.pn - Set the `settings.pushNotificationKey` key obtained from the app. Example key: `k-967fe9...` - Alternatively, set the environment variable `k1lib_pushNotificationKey` instead - Run the function as usual""" # pushNotification import requests # pushNotification key = k1lib.settings.pushNotificationKey # pushNotification requests.get("http://xdroid.net/api/message?" + urllib.parse.urlencode({'k': key, 't': title, 'c': content, 'u': url})) # pushNotification print("Pushed!") # pushNotification
class Dependency: # Dependency def __init__(self, s, alt:str=None, url:str=None): self.s = s; self.alt = alt; self.url = url; self.errorMsg = "" # Dependency def __getattr__(self, attr): # Dependency after = f"More information is available on {self.url}. " if self.url else f"" # Dependency raise ImportError(f"Python package `{self.alt or self.s}` not found. Please install it. {after}This is the error when importing it:\n\n{self.errorMsg}") # Dependency
[docs]def dep(s, alt:str=None, url:str=None): # dep """Imports a potentially unavailable package Example:: graphviz = k1.dep("graphviz") # executes as normal, if graphviz is available, else throws an error g = graphviz.Digraph() I don't imagine this would be useful for everyday use though. This is mainly for writing this library, so that it can use optional dependencies. :param s: name of the package. Can be nested, like `matplotlib.pyplot` :param alt: (optional) name of the package to display in error message if package is not found :param url: (optional) url of the package's website, so that they know where to get official docs""" # dep try: return importlib.import_module(s) # dep except: d = Dependency(s, alt, url); d.errorMsg = traceback.format_exc(); return d # dep
dep.mpl = dep("matplotlib", url="https://matplotlib.org/"); dep.plt = dep("matplotlib.pyplot", url="https://matplotlib.org/"); dep.cm = dep("matplotlib.cm", url="https://matplotlib.org/") # dep dep.pygments = dep("pygments", url="https://pygments.org/"); dep.torch = dep("torch", url="https://pytorch.org/") # dep dep.yaml = dep("yaml", "pyyaml", "https://pyyaml.org/"); dep.pd = dep("pandas", url="https://pandas.pydata.org/") # dep dep.requests = dep("requests", url="https://requests.readthedocs.io/"); dep.graphviz = dep("graphviz", url="https://graphviz.readthedocs.io/") # dep dep.PIL = dep("PIL", "Pillow", "https://pillow.readthedocs.io/"); dep.websockets = dep("websockets", url="https://websockets.readthedocs.io/") # dep def depCli(prog="ls", insCmd=None, website=None): # depCli """Checks whether a particular command-line program is installed or not. Example:: k1.depCli("vim") This will throw an error if that program is not found :param prog: program name :param insCmd: (optional) installation command, could be "apt install vim" :param website: (optional) website that the user can learn more about the program""" # depCli if None | k1lib.cli.cmd(("where " if os.name == "nt" else "which ") + prog) | k1lib.cli.shape(0) == 0: # depCli insS = f" by executing '{insCmd}'" if insCmd else "" # depCli webS = f". Visit {website} to learn more about the program" if website else "" # depCli raise Exception(f"Command line program '{prog}' not found in PATH. Please install it{insS}{webS}") # depCli return True # depCli tickCheckpoints = np.array([1, 2, 2.5, 5, 10])/10 # depCli
[docs]def ticks(x:float, y:float, rounding:int=6): # ticks """Get tick locations in a plot that look reasonable. Example:: ticks(-5, 40) # returns [-10.0, -5.0, 0.0, 5.0, 10.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 45.0] ticks(0.05, 0.07) # returns [0.05, 0.0525, 0.055, 0.0575, 0.06, 0.0625, 0.065, 0.0675, 0.07, 0.0725] ticks(-5, 5) # returns [-6.0, -5.0, -4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0] So essentially, whenever you try to plot something, you want both the x and y axis to not have too many lines, and that the tick values snap to a nice number. Normally you don't have to do this, as matplotlib does this automatically behind the scenes, but sometimes you need to implement plotting again, in strange situations, so this function comes in handy :param x: start of interval :param y: end of interval :param rounding: internally, it rounds tick values to this number of digits, to fix tiny float overflows that make numbers ugly. So you can disable it if you're working with really small numbers""" # ticks cli = k1lib.cli; ld = math.log10(y-x); scale = ld-ld%1 # log delta # ticks sel = ((ld%1 - tickCheckpoints)**2).argmin() # selecting the best fit tick checkpoint # ticks interval = 10**scale*tickCheckpoints[sel] # interval between ticks # ticks seed = int(y/10**scale)*10**scale # seed tick # ticks f1 = cli.applySerial(cli.op()-interval) | cli.breakIf(cli.op()<x-interval) # ticks f2 = cli.applySerial(cli.op()+interval) | cli.breakIf(cli.op()>y+interval) # ticks # finally, use the seed to expand in both directions to get all the ticks # ticks return seed | f1 & f2 | cli.joinStreams() | cli.aS(set) | cli.sort(None) | cli.apply(round, ndigits=rounding) | cli.deref() | cli.aS(np.array) # ticks
try: # ticks import graphviz # ticks
[docs] def digraph(): # ticks """Convenience method for creating a new graphviz Digraph. Example:: g = k1lib.graph() g("a", "b", "c") g # displays arrows from "a" to "b" and "a" to "c" """ # ticks return graphviz.Digraph(graph_attr={"rankdir":"TB"}) # ticks
[docs] def graph(): # ticks """Convenience method for creating a new graphviz Graph. See also: :meth:`digraph`""" # ticks return graphviz.Graph(graph_attr={"rankdir":"TB"}) # ticks
except ImportError: # ticks digraph = graph = lambda: print("Module `graphviz` not found! Please install it first, something like `pip install graphviz`") # ticks def encode(obj:object) -> str: # encode """Serialize random objects into bytes, then turn those bytes to normal strings using base64. Example:: a = {"b": 3} encode(a) # returns "gASVCgAAAAAAAAB9lIwBYpRLA3Mu" decode(encode(a)) # returns {"b"}: 3 See also: :meth:`decode` """ # encode return base64.b64encode(dill.dumps(obj)).decode() # encode def decode(s:str) -> object: # decode """Turns a string produced by :meth:`encode` back into a random object.""" # decode return dill.loads(base64.b64decode(s.encode())) # decode import hashlib # decode def hash(msg:str) -> int: # hash """A universal hash function. Why not just use the builtin hash function? Because it actually changes for different interpreter instances, it won't be good for code that runs on multiple computers, so this is sort of like a drop-in replacement. Although it does output an integer, don't rely on it having the same numeric properties as a normal hash function.""" # hash if not isinstance(msg, str): msg = f"{msg}" # hash m = hashlib.sha256(); m.update(f"{msg}".encode()); return int.from_bytes(m.digest(), "big") # hash hash(34) # hash import traceback # hash try: # hash import asyncio, threading, time; from collections import deque # hash _coroutineQueue = deque() # deque of (idx, coroutine) # hash _coroutineAns = dict() # Dict[idx -> coroutine ans] # hash _coroutineAutoIdx = 0 # hash def _coroutineResolvingThread(): # hash loop = asyncio.new_event_loop() # hash while True: # hash if len(_coroutineQueue) == 0: time.sleep(0.01) # hash else: # hash idx, coroutine = _coroutineQueue.popleft() # hash # important to recover from exceptions # hash try: ans = loop.run_until_complete(coroutine); _coroutineAns[idx] = {"type": "success", "ans": ans} # hash except Exception as e: _coroutineAns[idx] = {"type": "failure", "e": f"{e}", "tb": traceback.format_exc()} # hash threading.Thread(target=_coroutineResolvingThread, daemon=True).start() # hash _resolve_err = None # hash except Exception as e: _resolve_err = f"{e}"; _resolve_tb = traceback.format_exc() # hash def resolve(coroutine): # resolve """Resolves coroutines without having to use await. Example:: async def f(x): await asyncio.sleep(1) # simulates heavy processing return x + 4 k1.resolve(f(5)) This kinda feels just like ``asyncio.run(f(5))``, so why does this exist? Here's the docstring of that method: .. code-block:: text This function cannot be called when another asyncio event loop is running in the same thread. This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once. This has more limitations that I found annoying to deal with day-to-day. I want a function that always work, no matter my setup. So, how this function work internally is that it spins up a new (permanent, daemon) thread, creates a new event loop in that thread, then whenever a coroutine comes in, it runs it in that event loop, returns, then pass control back to whatever thread that called :meth:`resolve`. I'm sure this can still be messed up in some way, but seems more useful than the builtin method. This is just meant as a quick and dirty way to force resolving coroutines. Use this sparingly, as performance won't be as good as a proper async application. If you find yourself using this way too often, then I'd suggest reviewing how :mod:`asyncio` works""" # resolve global _coroutineAutoIdx # resolve if _resolve_err: raise Exception(f"k1lib.resolve() not available, encoutered this error while starting up: {_resolve_err}. Traceback:\n{_resolve_tb}") # resolve idx = _coroutineAutoIdx; _coroutineAutoIdx += 1; _coroutineQueue.append([idx, coroutine]) # resolve while idx not in _coroutineAns: time.sleep(0.01) # resolve ans = _coroutineAns[idx]; del _coroutineAns[idx] # resolve if ans["type"] == "success": return ans["ans"] # resolve else: raise Exception(f"Exception occured while trying to k1lib.resolve(): {ans['e']}. Traceback:\n{ans['tb']}") # resolve def config(s:str): # config """Convenience method to grab JSON config file from config.mlexps.com""" # config return json.loads("".join(k1lib.cli.cat(f"https://config.mlexps.com/{s}"))) # config