Source code for k1lib._baseClasses

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
.. module:: k1lib
"""
from typing import Callable, Iterator, Tuple, Union, Dict, Any, List
from k1lib import isNumeric; import k1lib, contextlib, warnings
import random, math, sys, io, os, numpy as np, functools
plt = k1lib.dep.plt
try: import torch; hasTorch = True
except: hasTorch = False
__all__ = ["Object", "Range", "Domain", "AutoIncrement", "Wrapper", "Every",
           "RunOnce", "MaxDepth", "MovingAvg", "Absorber",
           "Settings", "settings", "_settings", "UValue", "ConstantPad"]
[docs]class Object: # Object """Convenience class that acts like :class:`~collections.defaultdict`. You can use it like a normal object:: a = k1lib.Object() a.b = 3 print(a.b) # outputs "3" ``__repr__()`` output is pretty nice too: .. code-block:: text <class '__main__.Object'>, with attrs: - b You can instantiate it from a dict:: a = k1lib.Object.fromDict({"b": 3, "c": 4}) print(a.c) # outputs "4" And you can specify a default value, just like defaultdict:: a = k1lib.Object().withAutoDeclare(lambda: []) a.texts.extend(["factorio", "world of warcraft"]) print(a.texts[0]) # outputs "factorio" .. warning:: Default values only work with variables that don't start with an underscore "_". Treating it like defaultdict is okay too:: a = k1lib.Object().withAutoDeclare(lambda: []) a["movies"].append("dune") print(a.movies[0]) # outputs "dune" """ # Object def __init__(self): self._defaultValueGenerator = None; self.repr = None # Object
[docs] @staticmethod # Object def fromDict(_dict:Dict[str, Any]): # Object """Creates an object with attributes from a dictionary""" # Object answer = Object(); answer.__dict__.update(_dict); return answer # Object
@property # Object def state(self) -> dict: # Object """Essentially ``__dict__``, but only outputs the fields you defined. If your framework intentionally set some attributes, those will be reported too, so beware""" # Object answer = dict(self.__dict__); del answer["_defaultValueGenerator"] # Object del answer["repr"]; return answer # Object
[docs] def withAutoDeclare(self, defaultValueGenerator): # Object """Sets this Object up so that if a field doesn't exist, it will automatically create it with a default value.""" # Object self._defaultValueGenerator = defaultValueGenerator; return self # Object
def __getitem__(self, idx): return getattr(self, idx) # Object def __setitem__(self, idx, value): setattr(self, idx, value) # Object def __iter__(self): yield from self.state.values() # Object def __contains__(self, item:str): return item in self.__dict__ # Object def __getattr__(self, attr): # Object if attr.startswith("_"): raise AttributeError() # Object if attr == "getdoc": raise AttributeError("This param is used internally in module `IPython.core.oinspect`, so you kinda have to set it specifically yourself instead of relying on auto declare") # Object if self._defaultValueGenerator != None: # Object self.__dict__[attr] = self._defaultValueGenerator() # Object return self.__dict__[attr] # Object raise AttributeError # Object def __delitem__(self, key): del self.__dict__[key] # Object
[docs] def withRepr(self, _repr:str): # Object """Specify output of ``__repr__()``. Legacy code. You can just monkey patch it instead.""" # Object self.repr = _repr; return self # Object
def __repr__(self): # Object _dict = "\n".join([f"- {k}" for k in self.state.keys()]) # Object return self.repr or f"{type(self)}, with attrs:\n{_dict}" # Object
ninf = float("-inf"); inf = float("inf") # Object
[docs]class Range: # Range """A range of numbers. It's just 2 numbers really: start and stop This is essentially a convenience class to provide a nice, clean abstraction and to eliminate errors. You can transform values:: Range(10, 20).toUnit(13) # returns 0.3 Range(10, 20).fromUnit(0.3) # returns 13 Range(10, 20).toRange(Range(20, 10), 13) # returns 17 You can also do random math operations on it:: (Range(10, 20) * 2 + 3) == Range(23, 43) # returns True Range(10, 20) == ~Range(20, 10) # returns True""" # Range
[docs] def __init__(self, start=0, stop=None): # Range """Creates a new Range. There are different ``__init__`` functions for many situations: - Range(2, 11.1): create range [2, 11.1] - Range(15.2): creates range [0, 15.2] - Range(Range(2, 3)): create range [2, 3]. This serves as sort of a catch-all - Range(slice(2, 5, 2)): creates range [2, 5]. Can also be a :class:`range` - Range(slice(2, -1), 10): creates range [2, 9] - Range([1, 2, 7, 5]): creates range [1, 5]. Can also be a tuple """ # Range if (isNumeric(start) and isNumeric(stop)): # Range self.start, self.stop = start, stop # Range elif isNumeric(start) and stop == None: # Range self.start, self.stop = 0, start # Range elif stop == None and isinstance(start, (range, slice, Range)): # Range self.start, self.stop = start.start, start.stop # Range elif isNumeric(stop) and isinstance(start, slice): # Range r = range(stop)[start]; self.start, self.stop = r.start, r.stop # Range elif isinstance(start, (list, tuple)): # Range self.start, self.stop = start[0], start[-1] # Range else: raise AttributeError(f"Don't understand {start} and {stop}") # Range self.delta = self.stop - self.start # Range
[docs] def __getitem__(self, index): # Range """0 for start, 1 for stop You can also pass in a :class:`slice` object, in which case, a range subset will be returned. Code kinda looks like this:: range(start, stop)[index]""" # Range if index == 0: return self.start # Range if index == 1: return self.stop # Range if type(index) == slice: # Range return Range(range(self.start, self.stop)[index]) # Range raise Exception(f"Can't get index {index} of range [{self.start}, {self.stop}]") # Range
[docs] def fixOrder(self) -> "Range": # Range """If start greater than stop, switch the 2, else do nothing""" # Range if self.start > self.stop: # Range self.start, self.stop = self.stop, self.start # Range return self # Range
def _common(self, x, f:Callable[[float], float]): # Range if isNumeric(x): return f(x) # Range if isinstance(x, (list, tuple)): # Range return [self._common(elem, f) for elem in x] # Range if isinstance(x, (range, slice, Range)): # Range return Range(self._common(x.start if x.start != None else 0, f), self._common(x.stop if x.stop != None else 1, f)) # Range raise AttributeError(f"Doesn't understand {x}") # Range def __iter__(self): yield self.start; yield self.stop # Range
[docs] def intIter(self, step:int=1) -> Iterator[int]: # Range """Returns integers within this Range""" # Range return range(int(self.start), int(self.stop), step) # Range
[docs] def toUnit(self, x): # Range """Converts x from current range to [0, 1] range. Example:: r = Range(2, 10) r.toUnit(5) # will return 0.375, as that is (5-2)/(10-2) You can actually pass in a lot in place of x:: r = Range(0, 10) r.toUnit([5, 3, 6]) # will be [0.5, 0.3, 0.6]. Can also be a tuple r.toUnit(slice(5, 6)) # will be slice(0.5, 0.6). Can also be a range, or Range .. note:: In the last case, if ``start`` is None, it gets defaulted to 0, and if ``end`` is None, it gets defaulted to 1 """ # Range def f(x): # Range if self.delta == 0: return float("nan") # Range return (x - self.start) / self.delta # Range return self._common(x, lambda x: float("nan") if self.delta == 0 else (x - self.start) / self.delta) # Range
[docs] def fromUnit(self, x): # Range """Converts x from [0, 1] range to this range. Example:: r = Range(0, 10) r.fromUnit(0.3) # will return 3 x can be a lot of things, see :meth:`toUnit` for more""" # Range return self._common(x, lambda x: x * self.delta + self.start) # Range
[docs] def toRange(self, _range:"Range", x): # Range """Converts x from current range to another range. Example:: r = Range(0, 10) r.toRange(Range(0, 100), 6) # will return 60 x can be a lot of things, see :meth:`toUnit` for more.""" # Range return self._common(x, lambda x: Range(_range).fromUnit(self.toUnit(x))) # Range
[docs] def fromRange(self, _range:"Range", x): # Range """Reverse of :meth:`toRange`, effectively.""" # Range return _range.toRange(self, x) # Range
@property # Range def range_(self): # Range """Returns a :class:`range` object with start and stop values rounded off""" # Range return range(math.floor(self.start+0.001), math.floor(self.stop+0.001)) # Range @property # Range def slice_(self): # Range """Returns a :class:`slice` object with start and stop values rounded off""" # Range return slice(math.floor(self.start+0.001), math.floor(self.stop+0.001)) # Range
[docs] @staticmethod # Range def proportionalSlice(r1, r2, r1Slice:slice) -> Tuple["Range", "Range"]: # Range """Slices r1 and r2 proportionally. Best to explain using an example. Let's say you have 2 arrays created from a time-dependent procedure like this:: a = []; b = [] for t in range(100): if t % 3 == 0: a.append(t) if t % 5 == 0: b.append(1 - t) len(a), len(b) # returns (34, 20) a and b are of different lengths, but you want to plot both from 30% mark to 50% mark (for a, it's elements 10 -> 17, for b it's 6 -> 10), as they are time-dependent. As you can probably tell, to get the indicies 10, 17, 6, 10 is messy. So, you can do something like this instead:: r1, r2 = Range.proportionalSlice(Range(len(a)), Range(len(b)), slice(10, 17)) This will return the Ranges [10, 17] and [5.88, 10] Then, you can plot both of them side by side like this:: fig, axes = plt.subplots(ncols=2) axes[0].plot(r1.range_, a[r1.slice_]) axes[1].plot(r2.range_, a[r2.slice_]) """ # Range r1, r2 = Range(r1), Range(r2) # Range ar1 = r1[r1Slice]; ar2 = r1.toRange(r2, ar1) # Range return ar1, ar2 # Range
[docs] def bound(self, rs:Union[range, slice]) -> Union[range, slice]: # Range """If input range|slice's stop and start is missing, then use this range's start and stop instead.""" # Range start = rs.start or self.start # Range stop = rs.stop or self.stop # Range return type(rs)(start, stop) # Range
[docs] def copy(self): return Range(self.start, self.stop) # Range
def __str__(self): return f"[{self.start}, {self.stop}]" # Range def __eq__(self, _range): # Range _range = Range(_range) # Range return (_range.start == self.start or abs(_range.start - self.start) < 1e-9) and\ (_range.stop == self.stop or abs(_range.stop - self.stop) < 1e-9) # Range def __contains__(self, x:float): return x >= self.start and x < self.stop # Range def __neg__(self): return Range(-self.start, -self.stop) # Range
[docs] def __invert__(self): return Range(self.stop, self.start) # Range
def __add__(self, num): return Range(self.start + num, self.stop + num) # Range def __radd__(self, num): return self + num # Range def __mul__(self, num): return Range(self.start * num, self.stop * num) # Range def __rmul__(self, num): return self * num # Range def __truediv__(self, num): return num * (1/num) # Range def __rtruediv__(self, num): raise "Doesn't make sense to do this!" # Range def __round__(self): return Range(round(self.start), round(self.stop)) # Range def __ceil__(self): return Range(math.ceil(self.start), math.ceil(self.stop)) # Range def __floor__(self): return Range(math.floor(self.start), math.floor(self.stop)) # Range def __repr__(self): # Range return f"""A range of numbers: [{self.start}, {self.stop}]. Can do: - r.toUnit(x): will convert x from range [{self.start}, {self.stop}] to [0, 1] - r.fromUnit(x): will convert x from range [0, 1] to range [{self.start}, {self.stop}] - r.toRange([a, b], x): will convert x from range [{self.start}, {self.stop}] to range [a, b] - r[0], r[1], r.start, r.stop: get start and stop values of range Note: for conversion methods, you can pass in""" # Range
def yieldLowest(r1s:Iterator[Range], r2s:Iterator[Range]): # yieldLowest """Given 2 :class:`Range` generators with lengths a and b, yield every object (a + b) so that :class:`Range`s with smaller start point gets yielded first. Assumes that each generator: - Does not intersect with itself - Is sorted by start point already .. warning:: This method will sometimes yield the same objects given by the Iterators. Make sure you copy each :class:`Range` if your use case requires""" # yieldLowest r1s = iter(r1s); r2s = iter(r2s) # yieldLowest r1 = next(r1s, None) # yieldLowest if r1 is None: yield from r2s; return # yieldLowest r2 = next(r2s, None) # yieldLowest if r2 is None: yield r1; yield from r1s; return # yieldLowest while True: # yieldLowest while r1.start <= r2.start: # yieldLowest yield r1 # yieldLowest r1 = next(r1s, None) # yieldLowest if r1 is None: yield r2; yield from r2s; return # yieldLowest while r2.start <= r1.start: # yieldLowest yield r2 # yieldLowest r2 = next(r2s, None) # yieldLowest if r2 is None: yield r1; yield from r1s; return # yieldLowest def join(r1s:Iterator[Range], r2s:Iterator[Range]): # join """Joins 2 :class:`Range` generators, so that overlaps gets merged together. .. warning:: This method will sometimes yield the same objects given by the Iterators. Make sure you copy each :class:`Range` if your use case requires""" # join it = yieldLowest(r1s, r2s); r = next(it, None) # join if r is None: return # join while True: # join nr = next(it, None) # join if nr is None: yield r; return # join if r.stop >= nr.start: # join r = r.copy(); r.stop = max(r.stop, nr.stop) # join else: yield r; r = nr # join def intersect(r1s:Iterator[Range], r2s:Iterator[Range]): # intersect """Intersects 2 :class:`Range` generators, so that it only returns overlaping regions""" # intersect r1s = iter(r1s); r2s = iter(r2s) # intersect r1 = next(r1s, None) # intersect if r1 is None: return # intersect r2 = next(r2s, None) # intersect if r2 is None: return # intersect while True: # intersect if True: # doesn't intersect at all # intersect a = max(r1.start, r2.start) # intersect b = min(r1.stop, r2.stop) # intersect if a < b: yield Range(a, b) # intersect if r1.stop > r2.stop: # loads next r2 # intersect r2 = next(r2s, None) # intersect if r2 is None: return # intersect else: # loads next r1 # intersect r1 = next(r1s, None) # intersect if r1 is None: return # intersect def neg(rs:List[Range]): # neg """Returns R - rs, where R is the set of real numbers.""" # neg rs = iter(rs); r = next(rs, None) # neg if r is None: yield Range(ninf, inf); return # neg if ninf < r.start: yield Range(ninf, r.start) # check -inf case # neg while True: # neg start = r.stop # neg r = next(rs, None) # neg if r is None: # neg if start < inf: yield Range(start, inf) # neg return # neg yield Range(start, r.start) # neg
[docs]class Domain: # Domain
[docs] def __init__(self, *ranges, dontCheck:bool=False): # Domain """Creates a new domain. :param ranges: each element is a :class:`Range`, although any format will be fine as this selects for that :param dontCheck: don't sanitize inputs, intended to boost perf internally only A domain is just an array of :class:`Range` that represents what intervals on the real number line is chosen. Some examples:: inf = float("inf") # shorthand for infinity Domain([5, 7.5], [2, 3]) # represents "[2, 3) U [5, 7.5)" Domain([2, 3.2], [3, 8]) # represents "[2, 8)" as overlaps are merged -Domain([2, 3]) # represents "(-inf, 2) U [3, inf)", so essentially R - d, with R being the set of real numbers -Domain([-inf, 3]) # represents "[3, inf)" Domain.fromInts(2, 3, 6) # represents "[2, 4) U [6, 7)" You can also do arithmetic on them, and check "in" oeprator:: Domain([2, 3]) + Domain([4, 5]) # represents "[2, 3) U [4, 5)" Domain([2, 3]) + Domain([2.9, 5]) # represents "[2, 5)", also merges overlaps Domain([2, 3]) & Domain([2.5, 5]) # represents "[2, 3) A [2.5, 5)", or "[2.5, 3)" 3 in Domain([2, 3]) # returns False 2 in Domain([2, 3]) # returns True""" # Domain if dontCheck: self.ranges = list(ranges); return # Domain # convert all to Range type, fix its order, and sort based on .start # Domain ranges = [(r if isinstance(r, Range) else Range(r)).fixOrder() for r in ranges] # Domain ranges = sorted(ranges, key=lambda r: r.start) # Domain # merges overlapping segments # Domain self.ranges = list(join(ranges, [])) # Domain
[docs] @staticmethod # Domain def fromInts(*ints:List[int]): # Domain """Returns a new :class:`Domain` which has ranges [i, i+1] for each int given.""" # Domain return Domain(*(Range(i, i+1) for i in ints)) # Domain
[docs] def copy(self): return Domain(*(r.copy() for r in self.ranges)) # Domain
[docs] def intIter(self, step:int=1, start:int=0): # Domain """Yields ints in all ranges of this domain. If first range's domain is :math:`(-\inf, a)`, then starts at the specified integer""" # Domain if len(self.ranges) == 0: return # Domain for r in self.ranges: # Domain x = int(start) if r.start == -inf else int(r.start) # Domain while x < r.stop: yield x; x += step # Domain
def __neg__(self): return Domain(*neg(self.ranges), dontCheck=True) # Domain def __add__(self, domain): return Domain(*(r.copy() for r in join(self.ranges, domain.ranges)), dontCheck=True) # Domain def __sub__(self, domain): return self + (-domain) # Domain def __and__(self, domain): return Domain(*intersect(self.ranges, domain.ranges), dontCheck=True) # Domain def __eq__(self, domain): return self.ranges == domain.ranges # Domain def __str__(self): return f"Domain: {', '.join(str(r) for r in self.ranges)}" # Domain def __contains__(self, x): return any(x in r for r in self.ranges) # Domain def __repr__(self): # Domain rs = '\n'.join(f"- {r}" for r in self.ranges) # Domain return f"""Domain:\n{rs}\n\nCan: - 3 in d: check whether a number is in this domain or not - d1 + d2: joins 2 domains - -d: excludes the domain from R - d1 - d2: same as d1 + (-d2) - d1 & d2: intersects 2 domains""" # Domain
puas = [[ord(c) for c in cs] for cs in [["\ue000", "\uf8ff"], ["\U000f0000", "\U000ffffd"], ["\U00100000", "\U0010fffd"]]] # Domain
[docs]class AutoIncrement: # AutoIncrement
[docs] def __init__(self, initialValue:int=-1, n:int=float("inf"), prefix:str=None): # AutoIncrement """Creates a new AutoIncrement object. Every time the object is called it gets incremented by 1 automatically. Example:: a = k1lib.AutoIncrement() a() # returns 0 a() # returns 1 a() # returns 2 a.value # returns 2 a.value # returns 2 a() # returns 3 a = AutoIncrement(n=3, prefix="cluster_") a() # returns "cluster_0" a() # returns "cluster_1" a() # returns "cluster_2" a() # returns "cluster_0" :param n: if specified, then will wrap around to 0 when hit this number :param prefix: if specified, will yield strings with specified prefix""" # AutoIncrement self.value = initialValue; self.n = n; self.prefix = prefix # AutoIncrement
[docs] @staticmethod # AutoIncrement def random() -> "AutoIncrement": # AutoIncrement """Creates a new AutoIncrement object that has a random integer initial value""" # AutoIncrement return AutoIncrement(random.randint(0, 1e9)) # AutoIncrement
@property # AutoIncrement def value(self): # AutoIncrement """Get the value as-is, without auto incrementing it""" # AutoIncrement if self.prefix is None: return self._value # AutoIncrement return f"{self.prefix}{self._value}" # AutoIncrement @value.setter # AutoIncrement def value(self, value): self._value = value # AutoIncrement
[docs] def __call__(self): # AutoIncrement """Increments internal counter, and return it.""" # AutoIncrement self._value += 1 # AutoIncrement if self._value >= self.n: self._value = 0 # AutoIncrement return self.value # AutoIncrement
[docs] @staticmethod # AutoIncrement def unicode_pua(): # AutoIncrement """Returns a generator that generates unicode characters from within unicode's private use area (PUA). Example:: a = k1.AutoIncrement.unicode_pua() a | head() | deref() # returns ['\ue000', '\ue001', '\ue002', '\ue003', '\ue004', '\ue005', '\ue006', '\ue007', '\ue008', '\ue009'] """ # AutoIncrement for pua in puas: # AutoIncrement for c in range(pua[0], pua[1]+1): yield chr(c) # AutoIncrement
[docs]class Wrapper: # Wrapper value:Any # Wrapper """Internal value of this :class:`Wrapper`""" # Wrapper
[docs] def __init__(self, value=None): # Wrapper """Creates a wrapper for some value and get it by calling it. Example:: a = k1.Wrapper(list(range(int(1e7)))) # returns [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] a()[:10] This exists just so that Jupyter Lab's contextual help won't automatically display the (possibly humongous) value. Could be useful if you want to pass a value by reference everywhere like this:: o = k1.Wrapper(None) def f(obj): obj.value = 3 f(o) o() # returns 3 You can also pipe into it like this: o = 3 | k1.Wrapper() o() # returns 3 """ # Wrapper self.value = value # Wrapper
def __call__(self): return self.value # Wrapper
[docs] def __ror__(self, it): return Wrapper(it) # Wrapper
[docs]class Every: # Every
[docs] def __init__(self, n): # Every """Returns True every interval. Example:: e = k1lib.Every(4) e() # returns True e() # returns False e() # returns False e() # returns False e() # returns True""" # Every self.n = n; self.i = -1 # Every
[docs] def __call__(self) -> bool: # Every """Returns True or False based on internal count.""" # Every self.i += 1; return self.value # Every
@property # Every def value(self) -> bool: # Every if self.i % self.n: return False # Every else: return True # Every
[docs]class RunOnce: # RunOnce
[docs] def __init__(self): # RunOnce """Returns False first time only. Example:: r = k1lib.RunOnce() r.done() # returns False r.done() # returns True r.done() # returns True r.revert() r.done() # returns False r.done() # returns True r.done() # returns True May be useful in situations like:: class A: def __init__(self): self.ro = k1lib.RunOnce() def f(self, x): if self.ro.done(): return 3 + x return 5 + x a = A() a.f(4) # returns 9 a.f(4) # returns 7""" # RunOnce self.value = False # RunOnce
[docs] def done(self): # RunOnce """Whether this has been called once before.""" # RunOnce v = self.value # RunOnce self.value = True # RunOnce return v # RunOnce
def __call__(self): # RunOnce """Alias of :meth:`done`.""" # RunOnce return self.done() # RunOnce
[docs] def revert(self): # RunOnce self.value = False # RunOnce
[docs]class MaxDepth: # MaxDepth
[docs] def __init__(self, maxDepth:int, depth:int=0): # MaxDepth """Convenience utility to check for graph max depth. Example:: def f(d): print(d.depth) if d: f(d.enter()) # prints "0\\n1\\n2\\n3" f(k1lib.MaxDepth(3)) Of course, this might look unpleasant to the end user, so this is more likely for internal tools.""" # MaxDepth self.maxDepth = maxDepth; self.depth = depth # MaxDepth
[docs] def enter(self) -> "MaxDepth": # MaxDepth return MaxDepth(self.maxDepth, self.depth + 1) # MaxDepth
def __bool__(self): # MaxDepth return self.depth < self.maxDepth # MaxDepth def __call__(self): # MaxDepth """Alias of :meth:`__bool__`.""" # MaxDepth return bool(self) # MaxDepth
[docs]class MovingAvg: # MovingAvg
[docs] def __init__(self, initV:float=0, alpha=0.9, debias=False): # MovingAvg """Smoothes out sequential data using momentum. Example:: a = k1lib.MovingAvg(5) a(3).value # returns 4.8, because 0.9*5 + 0.1*3 = 4.8 a(3).value # returns 4.62 There's also a cli at :class:`~k1lib.cli.conv.toMovingAvg` that does the exact same thing, but just more streamlined and cli-like. Both versions are kept as sometimes I do want a separate object with internal state Difference between normal and debias modes:: x = torch.linspace(0, 10, 100); y = torch.cos(x) | op().item().all() | deref() plt.plot(x, y); a = k1lib.MovingAvg(debias=False); plt.plot(x, y | apply(lambda y: a(y).value) | deref()) a = k1lib.MovingAvg(debias=True); plt.plot(x, y | apply(lambda y: a(y).value) | deref()) plt.legend(["Signal", "Normal", "Debiased"]) .. image:: images/movingAvg.png As you can see, normal mode still has the influence of the initial value at 0 and can't rise up fast, whereas the debias mode will ignore the initial value and immediately snaps to the first saved value. :param initV: initial value :param alpha: number in [0, 1]. Basically how much to keep old value? :param debias: whether to debias the initial value""" # MovingAvg self.value = initV; self.alpha = alpha; self.debias = debias # MovingAvg self.m = self.value; self.t = 0 # MovingAvg
def __call__(self, value): # MovingAvg """Updates the average with a new value""" # MovingAvg self.m = self.m * self.alpha + value * (1 - self.alpha) # MovingAvg if self.debias: # MovingAvg self.t += 1 # MovingAvg self.value = self.m / (1 - self.alpha**self.t) # MovingAvg else: self.value = self.m # MovingAvg return self # MovingAvg def __add__(self, o): return self.value + o # MovingAvg def __radd__(self, o): return o + self.value # MovingAvg def __sub__(self, o): return self.value - o # MovingAvg def __rsub__(self, o): return o - self.value # MovingAvg def __mul__(self, o): return self.value * o # MovingAvg def __rmul__(self, o): return o * self.value # MovingAvg def __truediv__(self, o): return self.value / o # MovingAvg def __rtruediv__(self, o): return o / self.value # MovingAvg def __repr__(self): # MovingAvg return f"Moving average: {self.value}, alpha: {self.alpha}" # MovingAvg
sen = "_ab_sentinel" # MovingAvg jitOpcodes = {"__len__": lambda x: f"len({x})", # MovingAvg "__neg__": lambda x: f"(-{x})", # MovingAvg "__pos__": lambda x: f"(+{x})", # MovingAvg "__abs__": lambda x: f"abs({x})", # MovingAvg "__invert__": lambda x: f"(~{x})", # MovingAvg "__getattr__": lambda x, idx: f"getattr({x},{idx})", # MovingAvg "__getitem__": lambda x, idx: f"({x}[{idx}])", # MovingAvg "__round__": lambda x, o: f"round({x}, {o})", # MovingAvg "__add__": lambda x, o: f"({x}+{o})", # MovingAvg "__radd__": lambda x, o: f"({o}+{x})", # MovingAvg "__sub__": lambda x, o: f"({x}-{o})", # MovingAvg "__rsub__": lambda x, o: f"({o}-{x})", # MovingAvg "__mul__": lambda x, o: f"({x}*{o})", # MovingAvg "__rmul__": lambda x, o: f"({o}*{x})", # MovingAvg "__matmul__": lambda x, o: f"({x}@{o})", # MovingAvg "__rmatmul__": lambda x, o: f"({o}@{x})", # MovingAvg "__truediv__": lambda x, o: f"({x}/{o})", # MovingAvg "__rtruediv__": lambda x, o: f"({o}/{x})", # MovingAvg "__floordiv__": lambda x, o: f"({x}//{o})", # MovingAvg "__rfloordiv__": lambda x, o: f"({o}//{x})", # MovingAvg "__mod__": lambda x, o: f"({x}%{o})", # MovingAvg "__rmod__": lambda x, o: f"({o}%{x})", # MovingAvg "__pow__": lambda x, o: f"({x}**{o})", # MovingAvg "__rpow__": lambda x, o: f"({o}**{x})", # MovingAvg "__lshift__": lambda x, o: f"({x}<<{o})", # MovingAvg "__rlshift__": lambda x, o: f"({o}<<{x})", # MovingAvg "__rshift__": lambda x, o: f"({x}>>{o})", # MovingAvg "__rrshift__": lambda x, o: f"({o}>>{x})", # MovingAvg "__and__": lambda x, o: f"({x}&{o})", # MovingAvg "__rand__": lambda x, o: f"({o}&{x})", # MovingAvg "__xor__": lambda x, o: f"({x}^{o})", # MovingAvg "__rxor__": lambda x, o: f"({o}^{x})", # MovingAvg "__or__": lambda x, o: f"({x}|{o})", # MovingAvg "__ror__": lambda x, o: f"({o}|{x})", # MovingAvg "__lt__": lambda x, o: f"({x}<{o})", # MovingAvg "__le__": lambda x, o: f"({x}<={o})", # MovingAvg "__eq__": lambda x, o: f"({x}=={o})", # MovingAvg "__ne__": lambda x, o: f"({x}!={o})", # MovingAvg "__gt__": lambda x, o: f"({x}>{o})", # MovingAvg "__ge__": lambda x, o: f"({x}>={o})",} # MovingAvg opcodeAuto = AutoIncrement(prefix=f"_op_{random.randint(100,999)}_var_") # MovingAvg compareOps = {"__lt__", "__le__", "__eq__", "__ne__", "__gt__", "__ge__"} # MovingAvg
[docs]class Absorber: # Absorber """Creates an object that absorbes every operation done on it. Could be useful in some scenarios:: ab = k1lib.Absorber() # absorbs all operations done on the object abs(ab[::3].sum(dim=1)) t = torch.randn(5, 3, 3) # returns transformed tensor of size [2, 3] ab.ab_operate(t) Another:: ab = Absorber() ab[2] = -50 # returns [0, 1, -50, 3, 4] ab.ab_operate(list(range(5))) Because this object absorbs every operation done on it, you have to be gentle with it, as any unplanned disturbances might throw your code off. Best to create a new one on the fly, and pass them immediately to functions, because if you're in a notebook environment like Jupyter, it might poke at variables. For extended code example that utilizes this, check over :class:`k1lib.cli.modifier.op` source code.""" # Absorber
[docs] def __init__(self, initDict:dict=dict()): # Absorber """Creates a new Absorber. :param initDict: initial variables to set, as setattr operation is normally absorbed""" # Absorber self._ab_sentinel = True # Absorber self._ab_steps = [] # Absorber self._ab_solidified = False # Absorber for k, v in initDict.items(): setattr(self, k, v) # Absorber self._ab_sentinel = False # Absorber
[docs] def ab_solidify(self): # Absorber """Use this to not absorb ``__call__`` operations anymore and makes it feel like a regular function (still absorbs other operations though):: f = op()**2 3 | f # returns 9, but may be you don't want to pipe it in f.op_solidify() f(3) # returns 9""" # Absorber self._ab_sentinel = True # Absorber self._ab_solidified = True # Absorber self._ab_sentinel = False # Absorber return self # Absorber
[docs] def ab_operate(self, x): # Absorber """Special method to actually operate on an object and get the result. Not absorbed. Example:: # returns 6 (op() * 2).ab_operate(3)""" # Absorber for desc, step in self._ab_steps: x = step(x) # Absorber return x # Absorber
[docs] def ab_fastFS(self) -> str: # Absorber s = self._ab_steps; l = len(s) # Absorber x = k1lib.cli.init._jsDAuto() # Absorber try: # jit compilation # Absorber ss = x; values = {} # Absorber for (opcode, *o), *_ in s: # Absorber if opcode == "__call__": # Absorber va = opcodeAuto(); vk = opcodeAuto() # Absorber values[va], values[vk] = o[0] # Absorber ss = f"({ss}(*{va}, **{vk}))" # Absorber elif len(o) > 0: # Absorber varname = opcodeAuto(); v = o[0] # Absorber if isinstance(v, (int, float)): # Absorber ss = jitOpcodes[opcode](ss, v) # Absorber else: # Absorber values[varname] = v # Absorber ss = jitOpcodes[opcode](ss, varname) # Absorber else: ss = jitOpcodes[opcode](ss) # Absorber return [f"lambda {x}: {ss}", values] # Absorber except Exception as e: pass # Absorber
[docs] def ab_fastF(self): # Absorber """Returns a function that operates on the input (just like :meth:`ab_operate`), but much faster, suitable for high performance tasks. Example:: f = (k1lib.Absorber() * 2).ab_fastF() # returns 6 f(3)""" # Absorber s = self._ab_steps; l = len(s) # Absorber res = self.ab_fastFS() # jit compilation, compressing multiple steps to a single simple expression # Absorber if res: fn, values = res; return eval(compile(fn, "", "eval"), values) # Absorber if l == 0: return lambda x: x # Absorber if l == 1: return s[0][1] # Absorber if l == 2: # Absorber a, b = s[0][1], s[1][1] # Absorber return lambda x: b(a(x)) # Absorber if l == 3: # Absorber a, b, c = s[0][1], s[1][1], s[2][1] # Absorber return lambda x: c(b(a(x))) # Absorber if l == 4: # Absorber a, b, c, d = s[0][1], s[1][1], s[2][1], s[3][1] # Absorber return lambda x: d(c(b(a(x)))) # Absorber if l == 5: # Absorber a, b, c, d, e = s[0][1], s[1][1], s[2][1], s[3][1], s[4][1] # Absorber return lambda x: e(d(c(b(a(x))))) # Absorber return self.ab_operate # Absorber
def _ab_steps_append(self, o): # Absorber if not self._ab_solidified: # Absorber s = self._ab_steps # Absorber if len(s) > 0 and s[-1][0][0] in compareOps and o[0][0] in compareOps: # Absorber p = s.pop(); a = p[1]; b = o[1] # for 2 consecutive compare operations # Absorber s.append([["compareOps"], lambda x: a(x) and b(x)]) # Absorber else: s.append(o) # Absorber return self # Absorber def __getattr__(self, idx): # Absorber if isinstance(idx, str) and idx.startswith("_"): raise AttributeError("Getting attributes starting with underscore is prohibited. If you're using `op`, consider using `aS(lambda x: x._field)` instead.") # Absorber return self._ab_steps_append([["__getattr__", idx], lambda x: getattr(x, idx)]); # Absorber def __setattr__(self, k, v): # Absorber """Only allows legit variable setting when '_ab_sentinel' is True. Absorbs operations if it's False.""" # Absorber if k == sen: self.__dict__[k] = v # Absorber else: # Absorber if self.__dict__[sen]: self.__dict__[k] = v # Absorber else: # Absorber def f(x): setattr(x, k, v); return x # Absorber self._ab_steps_append([["__setattr__", [k, v]], f]) # Absorber if self._ab_solidified: self.__dict__[k] = v # Absorber return self # Absorber def __getitem__(self, idx): return self._ab_steps_append([["__getitem__", idx], lambda x: x[idx]]); # Absorber def __setitem__(self, k, v): # Absorber def f(x): x[k] = v; return x # Absorber return self._ab_steps_append([["__setitem__", [k, v]], f]); # Absorber def __call__(self, *args, **kwargs): return self._ab_steps_append([["__call__", [args, kwargs]], lambda x: x(*args, **kwargs)]); # Absorber def __round__(self, ndigits=0): return self._ab_steps_append([["__round__", ndigits], lambda x: round(x, ndigits)]); # Absorber def __add__(self, o): return self._ab_steps_append([["__add__", o], lambda x: x+o ]); # Absorber def __radd__(self, o): return self._ab_steps_append([["__radd__", o], lambda x: o+x ]); # Absorber def __sub__(self, o): return self._ab_steps_append([["__sub__", o], lambda x: x-o ]); # Absorber def __rsub__(self, o): return self._ab_steps_append([["__rsub__", o], lambda x: o-x ]); # Absorber def __mul__(self, o): return self._ab_steps_append([["__mul__", o], lambda x: x*o ]); # Absorber def __rmul__(self, o): return self._ab_steps_append([["__rmul__", o], lambda x: o*x ]); # Absorber def __matmul__(self, o): return self._ab_steps_append([["__matmul__", o], lambda x: x@o ]); # Absorber def __rmatmul__(self, o): return self._ab_steps_append([["__rmatmul__", o], lambda x: o@x ]); # Absorber def __truediv__(self, o): return self._ab_steps_append([["__truediv__", o], lambda x: x/o ]); # Absorber def __rtruediv__(self, o): return self._ab_steps_append([["__rtruediv__", o], lambda x: o/x ]); # Absorber def __floordiv__(self, o): return self._ab_steps_append([["__floordiv__", o], lambda x: x//o]); # Absorber def __rfloordiv__(self, o): return self._ab_steps_append([["__rfloordiv__", o], lambda x: o//x]); # Absorber def __mod__(self, o): return self._ab_steps_append([["__mod__", o], lambda x: x%o ]); # Absorber def __rmod__(self, o): return self._ab_steps_append([["__rmod__", o], lambda x: o%x ]); # Absorber def __pow__(self, o): return self._ab_steps_append([["__pow__", o], lambda x: x**o]); # Absorber def __rpow__(self, o): return self._ab_steps_append([["__rpow__", o], lambda x: o**x]); # Absorber def __lshift__(self, o): return self._ab_steps_append([["__lshift__", o], lambda x: x<<o]); # Absorber def __rlshift__(self, o): return self._ab_steps_append([["__rlshift__", o], lambda x: o<<x]); # Absorber def __rshift__(self, o): return self._ab_steps_append([["__rshift__", o], lambda x: x>>o]); # Absorber def __rrshift__(self, o): return self._ab_steps_append([["__rrshift__", o], lambda x: o>>x]); # Absorber def __and__(self, o): return self._ab_steps_append([["__and__", o], lambda x: x&o ]); # Absorber def __rand__(self, o): return self._ab_steps_append([["__rand__", o], lambda x: o&x ]); # Absorber def __xor__(self, o): return self._ab_steps_append([["__xor__", o], lambda x: x^o ]); # Absorber def __rxor__(self, o): return self._ab_steps_append([["__rxor__", o], lambda x: o^x ]); # Absorber def __or__(self, o): return self._ab_steps_append([["__or__", o], lambda x: x|o ]); # Absorber
[docs] def __ror__(self, o): return self._ab_steps_append([["__ror__", o], lambda x: o|x ]); # Absorber
def __lt__(self, o): return self._ab_steps_append([["__lt__", o], lambda x: x<o ]); # Absorber def __le__(self, o): return self._ab_steps_append([["__le__", o], lambda x: x<=o]); # Absorber def __eq__(self, o): return self._ab_steps_append([["__eq__", o], lambda x: x==o]); # Absorber def __ne__(self, o): return self._ab_steps_append([["__ne__", o], lambda x: x!=o]); # Absorber def __gt__(self, o): return self._ab_steps_append([["__gt__", o], lambda x: x>o ]); # Absorber def __ge__(self, o): return self._ab_steps_append([["__ge__", o], lambda x: x>=o]); # Absorber def __neg__(self): return self._ab_steps_append([["__neg__"], lambda x: -x ]); # Absorber def __pos__(self): return self._ab_steps_append([["__pos__"], lambda x: +x ]); # Absorber def __abs__(self): return self._ab_steps_append([["__abs__"], lambda x: abs(x) ]); # Absorber
[docs] def __invert__(self): return self._ab_steps_append([["__invert__"], lambda x: ~x ]); # Absorber
[docs] def ab_int(self): # Absorber """Replacement for ``int(ab)``, as that requires returning an actual :class:`int`.""" # Absorber return self._ab_steps_append([["__int__"], lambda x: int(x) ]); # Absorber
def __int__(self): return self.int() # Absorber
[docs] def ab_float(self): # Absorber """Replacement for ``float(ab)``, as that requires returning an actual :class:`float`.""" # Absorber return self._ab_steps_append([["__float__"], lambda x: float(x)]); # Absorber
def __float__(self): return self.float() # Absorber
[docs] def ab_str(self): # Absorber """Replacement for ``str(ab)``, as that requires returning an actual :class:`str`.""" # Absorber return self._ab_steps_append([["__str__"], lambda x: str(x) ]); # Absorber
[docs] def ab_len(self): # Absorber """Replacement for ``len(ab)``, as that requires returning an actual :class:`int`.""" # Absorber return self._ab_steps_append([["__len__"], lambda x: len(x) ]); # Absorber
[docs] def ab_contains(self, key): # Absorber """Replacement for ``key in ab``, as that requires returning an actual :class:`int`.""" # Absorber return self._ab_steps_append([["__contains__", key], lambda x: key in x]); # Absorber
sep = "\u200b" # weird separator, guaranteed (mostly) to not appear anywhere in the # Absorber # settings, so that I can pretty print it # Absorber
[docs]class Settings: # Settings
[docs] def __init__(self, **kwargs): # Settings """Creates a new settings object. Basically fancy version of :class:`dict`. Example:: s = k1lib.Settings(a=3, b="42") s.c = k1lib.Settings(d=8) s.a # returns 3 s.b # returns "42" s.c.d # returns 8 print(s) # prints nested settings nicely""" # Settings self._setattr_sentinel = True # Settings for k, v in kwargs.items(): setattr(self, k, v) # Settings self._docs = dict(); self._cbs = dict(); self._sensitives = dict(); self._envs = dict() # Settings self._setattr_sentinel = False # Settings
[docs] @contextlib.contextmanager # Settings def context(self, **kwargs): # Settings """Context manager to temporarily modify some settings. Applies to all sub-settings. Example:: s = k1lib.Settings(a=3, b="42", c=k1lib.Settings(d=8)) with s.context(a=4): s.c.d = 20 s.a # returns 4 s.c.d # returns 20 s.a # returns 3 s.c.d # returns 8""" # Settings oldValues = dict(self.__dict__); err = None # Settings for k in kwargs.keys(): # Settings if k not in oldValues: # Settings raise RuntimeError(f"'{k}' settings not found!") # Settings try: # Settings with contextlib.ExitStack() as stack: # Settings for _, sub in self._subSettings(): # Settings stack.enter_context(sub.context()) # Settings for k, v in kwargs.items(): setattr(self, k, v) # Settings yield # Settings finally: # Settings for k, v in oldValues.items(): setattr(self, k, v) # Settings
[docs] def add(self, k:str, v:Any, docs:str="", cb:Callable[["Settings", Any], None]=None, sensitive:bool=False, env:str=None) -> "Settings": # Settings """Long way to add a variable. Advantage of this is that you can slip in extra documentation for the variable. Example:: s = k1lib.Settings() s.add("a", 3, "some docs") print(s) # displays the extra docs You can also specify that a variable should load from the environment variables:: s = k1lib.Settings() s.add("a", "/this/path/will:/be/overridden", env="PATH") s.a # will returns a string that might look like "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games" You can specify a transform function if you want to:: s = k1lib.Settings() s.add("a", ["/this/path/will", "/be/overridden"], env=("PATH", lambda x: x.split(":"))) s.a # returns a list that might look like ['/usr/local/sbin', '/usr/local/bin', '/usr/sbin', '/usr/bin', '/sbin', '/bin', '/usr/games', '/usr/local/games'] :param cb: callback that takes in (settings, new value) if any property changes :param sensitive: if True, won't display the value when displaying the whole Settings object :param env: if specified, will try to load up the value from environment variables if it's available""" # Settings if env is not None: # Settings s, f = (env, lambda x: x) if isinstance(env, str) else env; res = os.environ.get(s); self._envs[k] = s # Settings if res is not None: v = f(res) # Settings setattr(self, k, v); self._docs[k] = docs; self._cbs[k] = cb # Settings self._sensitives[k] = sensitive; return self # Settings
def _docsOf(self, k:str): return f"{self._docs[k]}" if k in self._docs else "" # Settings def _envsOf(self, k:str): return f"env: {self._envs[k]}" if k in self._envs else "" # Settings def _subSettings(self) -> List[Tuple[str, "Settings"]]: # Settings return [(k, v) for k, v in self.__dict__.items() if isinstance(v, Settings) and not k.startswith("_")] # Settings def _simpleSettings(self) -> List[Tuple[str, Any]]: # Settings return [(k, v) for k, v in self.__dict__.items() if not isinstance(v, Settings) and not k.startswith("_")] # Settings def __setattr__(self, k, v): # Settings self.__dict__[k] = v # Settings if k != "_setattr_sentinel" and not self._setattr_sentinel: # Settings if k in self._cbs and self._cbs[k] is not None: self._cbs[k](self, v) # Settings def __repr__(self): # Settings """``includeDocs`` mainly used internally when generating docs in sphinx.""" # Settings ks = list(k for k in self.__dict__ if not k.startswith("_")) # Settings kSpace = max([1, *(ks | k1lib.cli.shape(0).all())]); s = "Settings:\n" # Settings for k, v in self._simpleSettings(): # Settings s += f"- {k.ljust(kSpace)} = {k1lib.limitChars('<redacted>' if self._sensitives.get(k, False) else str(v), settings.displayCutoff)}{sep}{self._envsOf(k)}{sep}{self._docsOf(k)}\n" # Settings for k, v in self._subSettings(): # Settings sub = v.__repr__().split("\n")[1:-1] | k1lib.cli.tab(" ") | k1lib.cli.join("\n") # Settings s += f"- {k.ljust(kSpace)} = <Settings>{sep}{self._envsOf(k)}{sep}{self._docsOf(k)}\n" + sub + "\n" # Settings return s.split("\n") | k1lib.cli.op().split(sep).all() | k1lib.cli.pretty(sep) | k1lib.cli.join("\n") # Settings
_settings = Settings().add("test", Settings().add("bio", True, "whether to test bioinformatics clis that involve strange command line tools like samtools and bwa")) # Settings _settings.add("packages", Settings(), "which package is available to use?") # Settings settings = Settings().add("displayCutoff", 50, "cutoff length when displaying a Settings object") # Settings settings.add("svgScale", 0.7, "default svg scales for clis that displays graphviz graphs") # Settings def _cb_wd(s, p): # _cb_wd if p != None: p = os.path.abspath(os.path.expanduser(p)); _oschdir(p) # _cb_wd s.__dict__["wd"] = p # _cb_wd def oschdir(path): settings.wd = path # oschdir _oschdir = os.chdir; os.chdir = oschdir; os.chdir.__doc__ = _oschdir.__doc__ # oschdir settings.add("wd", os.getcwd(), "default working directory, will get from `os.getcwd()`. Will update using `os.chdir()` automatically when changed", _cb_wd) # oschdir settings.add("cancelRun_newLine", True, "whether to add a new line character at the end of the cancel run/epoch/batch message") # oschdir or_patch = Settings()\ .add("numpy", True, "whether to patch numpy arrays")\ .add("dict", True, "whether to patch Python dict keys and items") # oschdir startup = Settings().add("init_ray", True, "whether to connect to ray's cluster accessible locally automatically")\ .add("import_optionals", True, "whether to try to import optional dependencies automatically or not. Set this to False if you want a faster load time, but with reduced functionalities")\ .add("or_patch", or_patch, "whether to patch __or__() method for several C-extension datatypes (numpy array, dict, etc). This would make cli operations with them a lot more pleasant, but might cause strange bugs. Haven't met them myself though") # oschdir settings.add("startup", startup, "these settings have to be applied like this: `import k1lib; k1lib.settings.startup.or_patch = False; from k1lib.imports import *` to ensure that the values are set") # oschdir settings.add("pushNotificationKey", os.getenv("k1lib_pushNotificationKey", None), "API key for `k1lib.pushNotification()`. See docs of that for more info") # oschdir settings.add("cred", Settings(), "general default credentials for other places in the system") # oschdir def sign(v): return 1 if v > 0 else -1 # sign def roundOff(a, b): # roundOff m = (a + b) / 2 # roundOff return m # roundOff dec = math.log10(abs(a-m)+1e-7) # decimal place # roundOff factor = 10**(sign(dec) * math.floor(abs(dec)+1e-7)+1) # roundOff return factor*round(m/factor) # roundOff def toPrecision(num, sig=1): # toPrecision try: # toPrecision if num == 0: return 0 # toPrecision s = sign(num); num = abs(num) # toPrecision fac = 10**(-math.floor(math.log10(num))+sig-1) # toPrecision return s*round(num*fac)/fac # toPrecision except: return num # toPrecision def niceUS(mean, std): # niceUS try: # niceUS if std < 1e-12: return mean, std # niceUS pres = 2 if std/10**math.floor(math.log10(std)) < 2 else 1 # niceUS std2 = toPrecision(std, pres) # niceUS fac = 10**(-math.floor(math.log10(std2))+pres-1) # niceUS return round(mean*fac)/fac, std2 # niceUS except: return mean, std # niceUS def removeOutliers(t, fraction=0.01): # removeOutliers b = int(len(t)*fraction/2) # removeOutliers return np.sort(t)[b:-b] # removeOutliers def _US(v): return [*v] if isinstance(v, UValue) else [v, 0] # _US if True: # used to be a hasTorch condition here, lazy to reindent # _US
[docs] @functools.total_ordering # _US class UValue: # _US _unit = np.random.randn(2, 5, 100000) # _US
[docs] def __init__(self, mean=0, std=1, N=None): # _US """Creates a new "uncertain value", which has a mean and a standard deviation. You can then do math operations on them as normal, and the propagation errors will be automatically calculated for you. Make sure to run the calculation multiple times as the mean and std values fluctuates by a little run-by-run. Example:: # returns UValue(mean=4.7117, std=3.4736) object abs(k1lib.UValue() * 5 + 3) You can also instantiate from an existing list/numpy array/pytorch tensor:: # returns UValue(mean=24.5, std=14.431) object k1lib.UValue.fromSeries(range(50)) You can also do arbitrary complex math operations:: # returns UValue(mean=0.5544, std=0.4871) (20 + k1lib.UValue()).f(np.sin) # same as above, but takes longer to run! (20 + k1lib.UValue()).f(math.sin) I suggest you to make your arbitrary function out of numpy's operations, as those are a fair bit faster than regular Python. If you have a list of :class:`UValue`, and want to plot them with error bars, then you can do something like this:: x = np.linspace(0, 6) y = list(np.sin(x)*10) | apply(k1lib.UValue) | toList() plt.errorbar(x, *(y | transpose())); There are several caveats however: .. note:: First is the problem of theoretically vs actually sample a distribution. Let's see an example:: # returns theoretical value UValue(mean=8000.0, std=1200.0) -> 8000.0 ± 1200.0 k1lib.UValue(20) ** 3 # prints out actual mean and std value of (8064.1030, 1204.3529) a = k1lib.UValue(20).sample() ** 3 print(a.mean(), a.std()) So far so good. However, let's create some uncertainty in "3":: # returns theoretical value UValue(mean=8000.0, std=23996.0) -> 10000.0 ± 20000.0 k1lib.UValue(20) ** k1lib.UValue(3) # prints out actual mean and std value of (815302.8750, 27068828.), but is very unstable and changes a lot a = k1lib.UValue(20).sample() ** k1lib.UValue(3).sample() print(a.mean(), a.std()) Woah, what happens here? The actual mean and std values are completely different from the theoretical values. This is mainly due to UValue(3) has some outlier values large enough to boost the result up multiple times. Even removing 1% of values on either end of the spectrum does not quite work. So, becareful to interpret these uncertainty values, and in some case the theoretical estimates from math are actually very unstable and will not be observed in real life. .. note:: Then there's the problem of each complex operation, say ``(v*2+3)/5`` will be done step by step, meaning ``a=v*2`` mean and std will be calculated first, then ignoring the calculated sample values and just go with the mean and std, sample a bunch of values from there and calculate ``a+3`` mean and std. Rinse and repeat. This means that these 2 statements may differ by a lot:: # prints out (0.15867302766786406, 0.12413313456900205) x = np.linspace(-3, 3, 1000); sq = (abs(x)-0.5)**2; y = sq*np.exp(-sq) print(y.mean(), y.std()) # returns UValue(mean=0.081577, std=0.32757) -> 0.1 ± 0.3 x = k1lib.UValue(0, 1); sq = (abs(x)-0.5)**2; y = sq*(-sq).f(np.exp) Why this weird function? It converts from a single nice hump into multiple complex humps. Anyway, this serves to demonstrate that the result from the ``calculate -> get mean, std -> sample from new distribution -> calculate`` process might be different from just calculating from start to end and then get the mean and std. .. note:: Lastly, you might have problems when using the same UValue multiple times in an expression:: a = UValue(10, 1) a * 2 # has mean 20, std 2 a + a # has mean 20, std 1.4 :param N: how many data points in this sample""" # _US if isinstance(mean, k1lib.settings.cli.arrayTypes): mean = mean.item() # _US if isinstance(std, k1lib.settings.cli.arrayTypes): std = std.item() # _US self.mean = mean; self.std = std; self.N = N # _US
@staticmethod # _US def _sample(mean, std, n=None, _class=0): # _US t = UValue._unit[_class, random.randint(0, 4)] # _US if n is not None: t = t[:n] # _US return t * std + mean # _US
[docs] def sample(self, n=100, _class=0): # _US """Gets a sample :class:`numpy.ndarray` representative of this uncertain value. Example:: # returns tensor([-5.1095, 3.3117, -2.5759, ..., -2.5810, -1.8131, 1.8339]) (k1lib.UValue() * 5).sample()""" # _US return UValue._sample(*self, n, _class) # _US
[docs] @staticmethod # _US def fromSeries(series, ddof=0): # _US """Creates a :class:`UValue` from a bunch of numbers :param series: can be a list of numbers, numpy array or PyTorch tensor :param unbiased: if True, Bessel’s correction will be used""" # _US if hasTorch and isinstance(series, torch.Tensor): series = series.numpy() # _US if not isinstance(series, np.ndarray): series = np.array(list(series)) # _US series = series * 1.0 # _US return UValue(series.mean(), np.std(series, ddof=ddof), len(series)) # _US
[docs] @staticmethod # _US def fromBounds(min_, max_): # _US """Creates a :class:`UValue` from min and max values. Example:: # returns UValue(mean=2.5, std=0.5) k1lib.UValue.fromBounds(2, 3)""" # _US mid = (min_ + max_)/2 # _US return k1lib.UValue(mid, abs(max_-mid)) # _US
def __iter__(self): yield self.mean; yield self.std # _US def _niceValue(self, v, _class=0): # _US if isinstance(v, UValue): return [UValue._sample(*v, None, _class), UValue._sample(*v, None, _class)] # _US return [UValue._sample(v, 0, None, _class), UValue._sample(v, 0, None, _class)] # _US def _postProcess(self, c1, c2): # _US if c1.hasNan() or c2.hasNan(): # _US warnings.warn("Calculations has NaN values. They will be replaced with 0, which can affect accuracy of mean and std calculations") # _US c1.clearNan(); c2.clearNan() # _US c1 = removeOutliers(c1); c2 = removeOutliers(c2); # _US return UValue(roundOff(c1.mean().item(), c2.mean().item()), roundOff(c1.std().item(), c2.std().item())) # _US @property # _US def exact(self): # _US """Whether this UValue is exact or not""" # _US return self.std == 0 # _US @staticmethod # _US def _isValueExact(v): # _US if isinstance(v, UValue): return v.exact # _US try: len(v); return False # _US except: return True # _US @staticmethod # _US def _value(v): # gets mean value # _US if isinstance(v, UValue): return v.mean # _US try: len(v); raise RuntimeError("Can't convert a series into an exact value") # _US except: return v # _US
[docs] def test(self, v): # _US """Returns how many sigma a particular value is.""" # _US return (v-self.mean)/self.std # _US
[docs] def f(self, func): # _US """Covered in :meth:`__init__` docs""" # _US if self.exact: return UValue(func(self.mean), 0) # _US f = func; a1, a2 = self._niceValue(self) # _US try: return self._postProcess(f(a1), f(a2)) # _US except: # _US f = lambda xs: np.array([func(x) for x in xs[:10000]]) # _US return self._postProcess(f(a1), f(a2)) # _US
[docs] def bounds(self): # _US """Returns (mean-std, mean+std)""" # _US return self.mean - self.std, self.mean + self.std # _US
def _op2(self, func, a, b): # _US if UValue._isValueExact(a) and UValue._isValueExact(b): # _US return UValue(func(UValue._value(a), UValue._value(b)), 0) # _US f = func; a1, a2 = self._niceValue(a, 0); b1, b2 = self._niceValue(b, 1) # _US try: return self._postProcess(f(a1, b1), f(a2, b2)) # _US except: # _US f = lambda xs, ys: np.array([func(x, y).item() for x, y in zip(xs[:10000], ys[:10000])]) # _US return self._postProcess(f(a1, b1), f(a2, b2)) # _US
[docs] @staticmethod # _US def combine(*values, samples=1000): # _US """Combines multiple UValues into 1. Example:: a = k1lib.UValue(5, 1) b = k1lib.UValue(7, 1) # both returns 6.0 ± 1.4 k1lib.UValue.combine(a, b) [a, b] | k1lib.UValue.combine() This will sample each UValue by default 1000 times, put them into a single series and get a UValue from that. Why not just take the average instead? Because the standard deviation will be less, and will not actually reflect the action of combining UValues together:: # returns 6.0 ± 0.7, which is narrower than expected (a + b) / 2""" # _US if len(values) == 0: return ~k1lib.cli.aS(UValue.combine) # _US n = len([0 for v in values if v.N == None]) # _US if 0 < n < len(values): raise Exception("Some UValues specified their dataset size (.N variable) and some don't, which makes it impossible to combine these series together. Either specify the dataset size (change .N value) to specific numbers for all of them, or set them all to None, in which case each series importance will be set to the same level") # _US means = [v.mean for v in values]; stds = [v.std for v in values] # _US Ns = [(v.N or 1) for v in values]; N = sum(Ns) # _US mean = sum(m*n for m,n in zip(means, Ns))/N # _US std = (sum(n*(s**2 + (m-mean)**2) for m,s,n in zip(means, stds, Ns))/N)**0.5 # _US return UValue(mean, std, N) # _US
def __add__(self, v): # _US m1, s1 = _US(self); m2, s2 = _US(v) # _US return UValue(m1+m2, math.sqrt(s1**2 + s2**2)) # _US return self._op2(lambda a, b: a+b, v, self) # representative of how this would work stochastically # _US def __radd__(self, v): # _US m1, s1 = _US(self); m2, s2 = _US(v) # _US return UValue(m1+m2, math.sqrt(s1**2 + s2**2)) # _US def __sub__(self, v): # _US m1, s1 = _US(self); m2, s2 = _US(v) # _US return UValue(m1-m2, math.sqrt(s1**2 + s2**2)) # _US def __rsub__(self, v): # _US m1, s1 = _US(self); m2, s2 = _US(v) # _US return UValue(m2-m1, math.sqrt(s1**2 + s2**2)) # _US def __mul__(self, v): # _US m1, s1 = _US(self); m2, s2 = _US(v) # _US return UValue(m1*m2, math.sqrt(m2**2*s1**2 + m1**2*s2**2)) # _US def __rmul__(self, v): # _US m1, s1 = _US(self); m2, s2 = _US(v) # _US return UValue(m1*m2, math.sqrt(m2**2*s1**2 + m1**2*s2**2)) # _US def __truediv__(self, v): # _US m1, s1 = _US(self); m2, s2 = _US(v) # _US return UValue(m1/m2, math.sqrt(1/m2**2*s1**2 + m1**2/m2**4*s2**2)) # _US def __rtruediv__(self, v): # _US m1, s1 = _US(v); m2, s2 = _US(self) # _US return UValue(m1/m2, math.sqrt(1/m2**2*s1**2 + m1**2/m2**4*s2**2)) # _US def __pow__(self, v): # _US m1, s1 = _US(self); m2, s2 = _US(v); m = m1**m2 # _US return UValue(m, math.sqrt((m2*m/m1)**2*s1**2 + (math.log(m1)*m)**2*s2**2)) # _US def __rpow__(self, v): # _US m1, s1 = _US(v); m2, s2 = _US(self); m = m1**m2 # _US return UValue(m, math.sqrt((m2*m/m1)**2*s1**2 + (math.log(m1)*m)**2*s2**2)) # _US def __abs__(self): return self.f(lambda a: abs(a)) # can't convert to pure math that makes sense # _US def __neg__(self): return 0 - self # _US def __lt__(self, v): return self.mean < v.mean if isinstance(v, UValue) else self.mean < v # _US def __eq__(self, v): return self.mean == v.mean and self.std == v.std and self.N == v.N if isinstance(v, UValue) else self.mean == v # _US def __float__(self): return self.mean # _US def __int__(self): return int(self.mean) # _US def __str__(self): mean, std = niceUS(self.mean, self.std); return f"{mean} ± {std}" # _US def __repr__(self): # _US mean, std = niceUS(self.mean, self.std) # _US return f"UValue(mean={toPrecision(self.mean, 5)}, std={toPrecision(self.std, 5)}) -> {mean} ± {std}" # _US
[docs] def plot(self, name=None): # _US """Quickly plots a histogram of the distribution. Possible to plot multiple histograms in 1 plot.""" # _US plt.hist(self.sample(None).numpy(), bins=100, alpha=0.7, label=name) # _US if name != None: plt.legend() # _US
if hasTorch: # _US @k1lib.patch(torch.Tensor) # _US def clearNan(self, value:float=0.0) -> torch.Tensor: # _US """Sets all nan values to a specified value. Example:: a = torch.randn(3, 3) * float("nan") a.clearNan() # now full of zeros""" # _US self[self != self] = value # _US return self # _US class ConstantPad: # ConstantPad def __init__(self, left=False): # ConstantPad """Adds constant amount of padding to strings. Example:: p = k1.ConstantPad() p("123") # returns "123" p("23") # returns " 23" "12345" | p # returns "12345", can pipe it in too, but is not strictly a cli tool p("123") # returns " 123" Basically, this is useful in situations when you're printing a table or status bar and needs relatively constant width but you don't know what's the desired width at the start. As you normally use a bunch of these in groups, there's a convenience function for that too:: p1, p2 = k1.ConstantPad.multi(2) :param left: whether to align left or not""" # ConstantPad self.left = left; self.length = 0 # ConstantPad def __call__(self, s): # ConstantPad self.length = max(self.length, len(s)) # ConstantPad return s.ljust(self.length) if self.left else s.rjust(self.length) # ConstantPad def __ror__(self, s): return self.__call__(s) # ConstantPad @staticmethod # ConstantPad def multi(n, *args, **kwargs): return [ConstantPad(*args, **kwargs) for i in range(n)] # ConstantPad