# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import logging, warnings, os, time, re, json, k1lib, importlib, urllib.parse, math, base64, dill, inspect, threading, datetime, traceback
import numpy as np
from typing import Any, List, Union, Tuple, Iterator, Dict
from functools import partial
try: import torch; hasTorch = True
except: hasTorch = False
__all__ = ["_docsUrl", "isNumeric",
"patch", "cron", "wrapMod", "wraps", "squeeze", "raiseEx",
"numDigits", "limitLines",
"limitChars", "showLog", "cleanDiv",
"beep", "beepOnAvailable", "dontWrap",
"debounce", "scaleSvg", "now", "pushNotification", "dep", "depCli", "ticks", "digraph", "graph",
"encode", "decode", "hash", "resolve", "config"]
_docsUrl = "https://k1lib.com"
[docs]def isNumeric(x) -> bool: # isNumeric
"""Returns whether object is actually a number""" # isNumeric
return isinstance(x, (int, float, np.number)) # isNumeric
[docs]def patch(_class:type, name:str=None, docs:Union[str, Any]=None, static=False): # patch
"""Patches a function to a class/object.
:param _class: object to patch function. Can also be a type
:param name: name of patched function, if different from current
:param docs: docs of patched function. Can be object with defined __doc__ attr
:param static: whether to wrap this inside :class:`staticmethod` or not
:return: modified function just before patching
Intended to be used like this::
class A:
def methA(self):
return "inside methA"
@k1lib.patch(A)
def methB(self):
return "inside methB"
a = A()
a.methB() # returns "inside methB"
You can do ``@property`` attributes like this::
class A: pass
@k1lib.patch(A, "propC")
@property
def propC(self): return self._propC
@k1lib.patch(A, "propC")
@propC.setter
def propC(self, value): self._propC = value
a = A(); a.propC = "abc"
a.propC # returns "abc"
The attribute name unfortunately has to be explicitly declared, as I can't
really find a way to extract the original name. You can also do static
methods like this::
class A: pass
@k1lib.patch(A, static=True)
def staticD(arg1): return arg1
A.staticD("def") # returns "def"
""" # patch
def inner(function): # patch
_docs = docs # patch
if _docs is not None and not isinstance(_docs, str): _docs = _docs.__doc__ # patch
_docs = _docs or function.__doc__ or _class.__doc__ # patch
_name = name or function.__qualname__.split(".")[-1] # patch
# patch
_function = staticmethod(function) if static else function # patch
_function.__doc__ = _docs; setattr(_class, _name, _function); return _function # patch
return inner # patch
[docs]def cron(f): # cron
"""Sets up a cron job in another thread, running the decorated
function whenever ``f`` goes from False to True. Example::
@k1.cron(lambda minute: minute == 0)
def f1(): # runs every hour
... # do some stuff
@k1.cron(lambda second: second % 5 == 0)
def f2(): # runs every 5 seconds
... # do some stuff
So, the first function will run every hour, and the second function will
run every 5 seconds. Pretty straightforward. The timing function ``f`` can
be as complicated as you want, but it can only accept the following parameters:
- year
- month: 1-12
- day: 1-31
- weekday: 0-6, 0 for Monday
- hour: 0-23
- minute: 0-59
- second: 0-59""" # cron
def inner(func): # cron
args = list(inspect.signature(f).parameters.keys()) # cron
s = {"year", "month", "day", "weekday", "hour", "minute", "second"} # cron
for arg in args: # cron
if arg not in s: raise Exception(f"Unknown argument {arg}. Only (year, month, day, weekday, hour, minute, seconds) are allowed") # cron
def startLoop(): # cron
last = False; this = last # cron
while True: # cron
a = datetime.datetime.now() # cron
now = {"year": a.year, "month": a.month, "day": a.day, "weekday": a.weekday(), "hour": a.hour, "minute": a.minute, "second": a.second} # cron
this = f(*[now[e] for e in args]) # cron
if not last and this: func() # cron
last = this; time.sleep(0.5) # cron
threading.Thread(target=startLoop).start() # cron
return func # cron
return inner # cron
[docs]class wrapMod: # wrapMod
[docs] def __init__(self, m, moduleName=None): # wrapMod
"""Wraps around a module, and only suggest symbols in __all__ list
defined inside the module. Example::
from . import randomModule
randomModule = wrapMod(randomModule)
:param m: the imported module
:param moduleName: optional new module name for elements (their __module__ attr)""" # wrapMod
self._init(m, moduleName) # wrapMod
def _init(self, m, moduleName): # wrapMod
if moduleName is not None: # wrapMod
for v in m.__dict__.values(): # wrapMod
v.__module__ = moduleName # wrapMod
self._wrapMod_moduleName = moduleName; self._wrapMod_m = m # wrapMod
self.__dict__.update(m.__dict__) # wrapMod
self._wrapMod_extraDirs = [] # wrapMod
def __dir__(self): return self._wrapMod_m.__all__ + self._wrapMod_extraDirs # wrapMod
def __str__(self): return str(self._wrapMod_m) # wrapMod
def __getstate__(self): return {"m": self._wrapMod_m, "name": self._wrapMod_moduleName} # wrapMod
def __setstate__(self, d): self._init(d["m"], d["name"]) # wrapMod
def __repr__(self): return str(self) # wrapMod
[docs]def wraps(ogF): # wraps
"""Kinda like :meth:`functools.wraps`, but don't update __annotations__.""" # wraps
def inner(f): # wraps
f.__doc__ = ogF.__doc__ # wraps
f.__name__ = ogF.__name__ # wraps
f.__qualname__ = ogF.__qualname__ # wraps
f.__module__ = ogF.__module__ # wraps
return f # wraps
return inner # wraps
[docs]def squeeze(_list:Union[list, tuple, Any], hard=False): # squeeze
"""If list only has 1 element, returns that element, else returns original
list.
:param hard: If True, then if list/tuple, filters out None, and takes the first
element out even if that list/tuple has more than 1 element""" # squeeze
if isinstance(_list, (tuple, list)): # squeeze
if hard: return [e for e in _list if e != None and e != ""][0] # squeeze
elif len(_list) == 1: return _list[0] # squeeze
if hasTorch and isinstance(_list, torch.Tensor): return _list.squeeze() # squeeze
return _list # squeeze
[docs]def raiseEx(ex:Exception): # raiseEx
"""Raises a specific exception. May be useful in lambdas""" # raiseEx
raise ex # raiseEx
[docs]def numDigits(num) -> int: # numDigits
"""Get the number of digits/characters of this number/object""" # numDigits
return len(f"{num}") # numDigits
[docs]def limitLines(s:str, limit:int=10) -> str: # limitLines
"""If input string is too long, truncates it and adds ellipsis""" # limitLines
splits = s.split("\n") # limitLines
if len(splits) > limit: return "\n".join(splits[:limit]) + "\n....." # limitLines
else: return s # limitLines
[docs]def limitChars(s:str, limit:int=50): # limitChars
"""If input string is too long, truncates to first `limit` characters of the first line""" # limitChars
if s is None: return "" # limitChars
s = f"{s}".split("\n")[0] # limitChars
return s[:limit-3] + "..." if len(s) > limit else s # limitChars
[docs]def showLog(loggerName:str="", level:int=logging.DEBUG): # showLog
"""Prints out logs of a particular logger at a particular level""" # showLog
logger = logging.getLogger(loggerName); logger.setLevel(level) # showLog
sh = logging.StreamHandler(); sh.setLevel(logging.DEBUG); logger.addHandler(sh) # showLog
def cleanDiv(_list:List[float], total:int) -> List[int]: # cleanDiv
"""Cleanly divides :total: into int segments with weights specified by
:_list:. Looks like this: ``(_list / _list.sum()) * total``, but
everything is an ``int``""" # cleanDiv
_list = np.array(_list) # cleanDiv
_list = (_list*total/_list.sum()).astype(int) # cleanDiv
_list[-1] = total - _list[:-1].sum() # cleanDiv
return _list # cleanDiv
[docs]def beep(seconds=0.3): # beep
"""Plays a beeping sound, may be useful as notification for long-running tasks""" # beep
try: import IPython; IPython.core.display.display_html(IPython.display.HTML(f"""<script>(new Audio('data:audio/wav;base64,UklGRl9vT19XQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YU'+Array(Math.round(3.3333e3*{seconds})).join(123))).play();</script>""")); # beep
except: os.system("printf '\a'") # beep
def beepOnAvailable(url:str, timeout=5, **kwargs): # beepOnAvailable
"""Tries to connect with a url repeatedly, and if successful, plays
a beep sound""" # beepOnAvailable
import requests # beepOnAvailable
try: # beepOnAvailable
while True: # beepOnAvailable
time.sleep(1); successful = False # beepOnAvailable
try: # beepOnAvailable
if requests.get(url, timeout=timeout, **kwargs).ok: # beepOnAvailable
successful = True # beepOnAvailable
except: pass # beepOnAvailable
if successful: # beepOnAvailable
beep(); break # beepOnAvailable
except KeyboardInterrupt: print("Still not available") # beepOnAvailable
[docs]def dontWrap(): # dontWrap
"""Don't wrap horizontally when in a notebook. Normally, if you're
displaying something long, like the output of ``print('a'*1000)`` in a notebook,
it will display it in multiple lines. This may be undesirable, so this solves
that by displaying some HTML with css styles so that the notebook doesn't wrap.""" # dontWrap
try: # dontWrap
from IPython.core.display import display, HTML # dontWrap
display(HTML("""<style>
div.jp-OutputArea-output pre {white-space: pre;}
div.output_area pre {white-space: pre;}
div.CodeMirror > div.highlight {overflow-y: auto;}
</style>""")) # dontWrap
except: pass#print("Can't run dontWrap()") # dontWrap
import asyncio, functools # dontWrap
from threading import Timer as ThreadingTimer # dontWrap
class AsyncTimer: # rename if want to use # AsyncTimer
def __init__(self, timeout, callback): # AsyncTimer
self._timeout = timeout; self._callback = callback # AsyncTimer
async def _job(self): # AsyncTimer
await asyncio.sleep(self._timeout); self._callback() # AsyncTimer
def start(self): self._task = asyncio.ensure_future(self._job()) # AsyncTimer
def cancel(self): self._task.cancel() # AsyncTimer
[docs]def debounce(wait, threading=False): # debounce
"""Decorator that will postpone a function's execution until after
``wait`` seconds have elapsed since the last time it was invoked. Taken
from `ipywidgets <https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20Events.html>`_.
Example::
import k1lib, time; value = 0
@k1lib.debounce(0.5, True)
def f(x): global value; value = x**2
f(2); time.sleep(0.3); f(3)
print(value) # prints "0"
time.sleep(0.7)
print(value) # prints "9"
:param wait: wait time in seconds
:param threading: if True, use multiple threads, else just use async stuff""" # debounce
Timer = ThreadingTimer if threading else AsyncTimer # debounce
def decorator(fn): # debounce
timer = None # debounce
def debounced(*args, **kwargs): # debounce
nonlocal timer # debounce
if timer is not None: timer.cancel() # debounce
timer = Timer(wait, lambda: fn(*args, **kwargs)) # debounce
timer.start() # debounce
functools.update_wrapper(debounced, fn); return debounced # debounce
return decorator # debounce
[docs]def scaleSvg(svg:str, scale:float=None) -> str: # scaleSvg
"""Scales an svg xml string by some amount.""" # scaleSvg
if scale is None: scale = k1lib.settings.svgScale # scaleSvg
wS = w = re.findall("width=\"\d*pt\"", svg)[0] # scaleSvg
hS = w = re.findall("height=\"\d*pt\"", svg)[0] # scaleSvg
w = int(int(re.findall("\d+", wS)[0])*scale) # scaleSvg
h = int(int(re.findall("\d+", hS)[0])*scale) # scaleSvg
svg = re.sub(wS, f'width="{w}pt"', svg) # scaleSvg
svg = re.sub(hS, f'height="{h}pt"', svg) # scaleSvg
return svg # scaleSvg
import datetime # scaleSvg
[docs]def now(): # now
"""Convenience function for returning a simple time
string, with timezone and whatnot.""" # now
return datetime.datetime.now().astimezone().isoformat() # now
now() # now
[docs]def pushNotification(title="Some title", content="Some content", url="https://k1lib.com"): # pushNotification
"""Sends push notification to your device.
Setting things up:
- Download this app: https://play.google.com/store/apps/details?id=net.xdroid.pn
- Set the `settings.pushNotificationKey` key obtained from the app. Example key: `k-967fe9...`
- Alternatively, set the environment variable `k1lib_pushNotificationKey` instead
- Run the function as usual""" # pushNotification
import requests # pushNotification
key = k1lib.settings.pushNotificationKey # pushNotification
requests.get("http://xdroid.net/api/message?" + urllib.parse.urlencode({'k': key, 't': title, 'c': content, 'u': url})) # pushNotification
print("Pushed!") # pushNotification
class Dependency: # Dependency
def __init__(self, s, alt:str=None, url:str=None): self.s = s; self.alt = alt; self.url = url; self.errorMsg = "" # Dependency
def __getattr__(self, attr): # Dependency
after = f"More information is available on {self.url}. " if self.url else f"" # Dependency
raise ImportError(f"Python package `{self.alt or self.s}` not found. Please install it. {after}This is the error when importing it:\n\n{self.errorMsg}") # Dependency
[docs]def dep(s, alt:str=None, url:str=None): # dep
"""Imports a potentially unavailable package
Example::
graphviz = k1.dep("graphviz")
# executes as normal, if graphviz is available, else throws an error
g = graphviz.Digraph()
I don't imagine this would be useful for everyday use though.
This is mainly for writing this library, so that it can use
optional dependencies.
:param s: name of the package. Can be nested, like `matplotlib.pyplot`
:param alt: (optional) name of the package to display in error message if package is not found
:param url: (optional) url of the package's website, so that they know where to get official docs""" # dep
try: return importlib.import_module(s) # dep
except: d = Dependency(s, alt, url); d.errorMsg = traceback.format_exc(); return d # dep
dep.mpl = dep("matplotlib", url="https://matplotlib.org/"); dep.plt = dep("matplotlib.pyplot", url="https://matplotlib.org/"); dep.cm = dep("matplotlib.cm", url="https://matplotlib.org/") # dep
dep.pygments = dep("pygments", url="https://pygments.org/"); dep.torch = dep("torch", url="https://pytorch.org/") # dep
dep.yaml = dep("yaml", "pyyaml", "https://pyyaml.org/"); dep.pd = dep("pandas", url="https://pandas.pydata.org/") # dep
dep.requests = dep("requests", url="https://requests.readthedocs.io/"); dep.graphviz = dep("graphviz", url="https://graphviz.readthedocs.io/") # dep
dep.PIL = dep("PIL", "Pillow", "https://pillow.readthedocs.io/"); dep.websockets = dep("websockets", url="https://websockets.readthedocs.io/") # dep
def depCli(prog="ls", insCmd=None, website=None): # depCli
"""Checks whether a particular command-line program is installed or not.
Example::
k1.depCli("vim")
This will throw an error if that program is not found
:param prog: program name
:param insCmd: (optional) installation command, could be "apt install vim"
:param website: (optional) website that the user can learn more about the program""" # depCli
if None | k1lib.cli.cmd(("where " if os.name == "nt" else "which ") + prog) | k1lib.cli.shape(0) == 0: # depCli
insS = f" by executing '{insCmd}'" if insCmd else "" # depCli
webS = f". Visit {website} to learn more about the program" if website else "" # depCli
raise Exception(f"Command line program '{prog}' not found in PATH. Please install it{insS}{webS}") # depCli
return True # depCli
tickCheckpoints = np.array([1, 2, 2.5, 5, 10])/10 # depCli
[docs]def ticks(x:float, y:float, rounding:int=6): # ticks
"""Get tick locations in a plot that look reasonable.
Example::
ticks(-5, 40) # returns [-10.0, -5.0, 0.0, 5.0, 10.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 45.0]
ticks(0.05, 0.07) # returns [0.05, 0.0525, 0.055, 0.0575, 0.06, 0.0625, 0.065, 0.0675, 0.07, 0.0725]
ticks(-5, 5) # returns [-6.0, -5.0, -4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
So essentially, whenever you try to plot something, you want both the x and y axis
to not have too many lines, and that the tick values snap to a nice number.
Normally you don't have to do this, as matplotlib does this automatically behind
the scenes, but sometimes you need to implement plotting again, in strange
situations, so this function comes in handy
:param x: start of interval
:param y: end of interval
:param rounding: internally, it rounds tick values to this number of
digits, to fix tiny float overflows that make numbers ugly. So
you can disable it if you're working with really small numbers""" # ticks
cli = k1lib.cli; ld = math.log10(y-x); scale = ld-ld%1 # log delta # ticks
sel = ((ld%1 - tickCheckpoints)**2).argmin() # selecting the best fit tick checkpoint # ticks
interval = 10**scale*tickCheckpoints[sel] # interval between ticks # ticks
seed = int(y/10**scale)*10**scale # seed tick # ticks
f1 = cli.applySerial(cli.op()-interval) | cli.breakIf(cli.op()<x-interval) # ticks
f2 = cli.applySerial(cli.op()+interval) | cli.breakIf(cli.op()>y+interval) # ticks
# finally, use the seed to expand in both directions to get all the ticks # ticks
return seed | f1 & f2 | cli.joinStreams() | cli.aS(set) | cli.sort(None) | cli.apply(round, ndigits=rounding) | cli.deref() | cli.aS(np.array) # ticks
try: # ticks
import graphviz # ticks
[docs] def digraph(): # ticks
"""Convenience method for creating a new graphviz Digraph.
Example::
g = k1lib.graph()
g("a", "b", "c")
g # displays arrows from "a" to "b" and "a" to "c"
""" # ticks
return graphviz.Digraph(graph_attr={"rankdir":"TB"}) # ticks
[docs] def graph(): # ticks
"""Convenience method for creating a new graphviz Graph. See also: :meth:`digraph`""" # ticks
return graphviz.Graph(graph_attr={"rankdir":"TB"}) # ticks
except ImportError: # ticks
digraph = graph = lambda: print("Module `graphviz` not found! Please install it first, something like `pip install graphviz`") # ticks
def encode(obj:object) -> str: # encode
"""Serialize random objects into bytes, then turn those bytes to
normal strings using base64. Example::
a = {"b": 3}
encode(a) # returns "gASVCgAAAAAAAAB9lIwBYpRLA3Mu"
decode(encode(a)) # returns {"b"}: 3
See also: :meth:`decode`
""" # encode
return base64.b64encode(dill.dumps(obj)).decode() # encode
def decode(s:str) -> object: # decode
"""Turns a string produced by :meth:`encode` back into a random object.""" # decode
return dill.loads(base64.b64decode(s.encode())) # decode
import hashlib # decode
def hash(msg:str) -> int: # hash
"""A universal hash function. Why not just use the builtin hash function?
Because it actually changes for different interpreter instances, it won't be
good for code that runs on multiple computers, so this is sort of like a
drop-in replacement. Although it does output an integer, don't rely on it having
the same numeric properties as a normal hash function.""" # hash
if not isinstance(msg, str): msg = f"{msg}" # hash
m = hashlib.sha256(); m.update(f"{msg}".encode()); return int.from_bytes(m.digest(), "big") # hash
hash(34) # hash
import traceback # hash
try: # hash
import asyncio, threading, time; from collections import deque # hash
_coroutineQueue = deque() # deque of (idx, coroutine) # hash
_coroutineAns = dict() # Dict[idx -> coroutine ans] # hash
_coroutineAutoIdx = 0 # hash
def _coroutineResolvingThread(): # hash
loop = asyncio.new_event_loop() # hash
while True: # hash
if len(_coroutineQueue) == 0: time.sleep(0.01) # hash
else: # hash
idx, coroutine = _coroutineQueue.popleft() # hash
# important to recover from exceptions # hash
try: ans = loop.run_until_complete(coroutine); _coroutineAns[idx] = {"type": "success", "ans": ans} # hash
except Exception as e: _coroutineAns[idx] = {"type": "failure", "e": f"{e}", "tb": traceback.format_exc()} # hash
threading.Thread(target=_coroutineResolvingThread, daemon=True).start() # hash
_resolve_err = None # hash
except Exception as e: _resolve_err = f"{e}"; _resolve_tb = traceback.format_exc() # hash
def resolve(coroutine): # resolve
"""Resolves coroutines without having to use await.
Example::
async def f(x):
await asyncio.sleep(1) # simulates heavy processing
return x + 4
k1.resolve(f(5))
This kinda feels just like ``asyncio.run(f(5))``, so why does this exist?
Here's the docstring of that method:
.. code-block:: text
This function cannot be called when another asyncio event loop is running in the same thread.
This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once.
This has more limitations that I found annoying to deal with day-to-day. I want
a function that always work, no matter my setup. So, how this function work
internally is that it spins up a new (permanent, daemon) thread, creates a new
event loop in that thread, then whenever a coroutine comes in, it runs it in
that event loop, returns, then pass control back to whatever thread that called
:meth:`resolve`. I'm sure this can still be messed up in some way, but seems
more useful than the builtin method.
This is just meant as a quick and dirty way to force resolving coroutines. Use
this sparingly, as performance won't be as good as a proper async application.
If you find yourself using this way too often, then I'd suggest reviewing how
:mod:`asyncio` works""" # resolve
global _coroutineAutoIdx # resolve
if _resolve_err: raise Exception(f"k1lib.resolve() not available, encoutered this error while starting up: {_resolve_err}. Traceback:\n{_resolve_tb}") # resolve
idx = _coroutineAutoIdx; _coroutineAutoIdx += 1; _coroutineQueue.append([idx, coroutine]) # resolve
while idx not in _coroutineAns: time.sleep(0.01) # resolve
ans = _coroutineAns[idx]; del _coroutineAns[idx] # resolve
if ans["type"] == "success": return ans["ans"] # resolve
else: raise Exception(f"Exception occured while trying to k1lib.resolve(): {ans['e']}. Traceback:\n{ans['tb']}") # resolve
def config(s:str): # config
"""Convenience method to grab JSON config file from config.mlexps.com""" # config
return json.loads("".join(k1lib.cli.cat(f"https://config.mlexps.com/{s}"))) # config